wiki:drmsSubscribe

Version 9 (modified by joe, 9 years ago) (diff)

--

Subscription and slony logs

Subscription

When a site 'subscribes' to a series, the process is initiated by the site sending a message to JSOC telling them that they're interested in that series. JSOC sends back the commands to build the table, and populate it to the current slony checkpoint. Then JSOC adds the table name to their list of what's being subscribed to by that site.

The only way to know if you're subscribed is to actually look in the lists at JSOC, which are named according to the convention:

/data/pgsql/slon_logs/live/etc/(site).lst

Slony logs

At JSOC, the logs are written to the directory :

/data/pgsql/slon_logs/live/site_logs/(site)

The slony logs have names according to the convention :

slony1_log_2_00000000000000690325.sql

Where the number 690325 is a counter that increments for each file. The term "log" is perhaps somewhat misleading with respect to these files, as they are not warnings or errors printed as the database operates. Rather, they are lists of database operations that have taken place at JSOC. If other sites are to keep in sync with JSOC, they must apply the same operations to their local databases. A typical log file may have entries like :

insert into "hmi"."rdvpspec_fd05" ("recnum","sunum","slotnum","sessionid","sessionns","ln_source_isset","ln_source_carrrot","ln_source_cmlon_index","ln_source_lonhg_index","ln_source_lathg_index",
"ln_source_loncm_index","cparms_sg000","logp_bzero","logp_bscale","carrrot","cmlon","lonhg","lathg","loncm","crpix1","crpix2","cdelt1","cdelt2","cdelt3","delta_k","delta_nu","d_omega","module",
"source","input","created","bld_vers","log_base","datamin","datamax","apode_f","apod_min","apod_max","cmlon_index","loncm_index","lathg_index","lonhg_index","sg_000_file","history") values
('9919708','528962345','0','27939659','su_rsb','1','2146','32','48','41','-32','','-8.48432540893554688','0.000635004483736478385','2146','200','120','-12.5','80','64.5','64.5',
'0.101023711','0.101023711','0.000181805124','0.101023711','28.9351845','0.000181805124','pspec3 v 1.1','hmi.rdVtrack_fd05[2146][200][120.0][-12.5][-80.0]',
'hmi.rdVtrack_fd05[2146][200]','1171214459','V8R2','2.71828182845904509','-29.1219711','12.1533213','0.96875','0.9765625','1','32','-32','41','48','logP.fits','');

insert into "hmi"."rdvpspec_fd05" ("recnum","sunum","slotnum","sessionid","sessionns","ln_source_isset","ln_source_carrrot","ln_source_cmlon_index","ln_source_lonhg_index","ln_source_lathg_index",
"ln_source_loncm_index","cparms_sg000","logp_bzero","logp_bscale","carrrot","cmlon","lonhg","lathg","loncm","crpix1","crpix2","cdelt1","cdelt2","cdelt3","delta_k","delta_nu","d_omega","module",
"source","input","created","bld_vers","log_base","datamin","datamax","apode_f","apod_min","apod_max","cmlon_index","loncm_index","lathg_index","lonhg_index","sg_000_file","history") values
('9919709','528962345','1','27939659','su_rsb','1','2146','32','48','42','-32','','-9.26125526428222656','0.000644568281907301633','2146','200','120','-15','-80','64.5','64.5',
'0.101023711','0.101023711','0.000181805124','0.101023711','28.9351845','0.000181805124','pspec3 v 1.1','hmi.rdVtrack_fd05[2146][200][120.0][-15.0][-80.0]',
'hmi.rdVtrack_fd05[2146][200]','1171214461','V8R2','2.71828182845904509','-30.2097244','11.6872139','0.96875','0.9765625','1','32','-32','42','48','logP.fits','');

After a while (daily?), these slony log files are archived into a bundle, named like so :

slony_logs_688467-689890.tar.gz

And then after a longer period of time - two weeks? - these archives of slony log files are deleted. It is critical that the slony logs are applied at remote sites before they deleted at JSOC.

Slony and the DRMS database

The small table _jsoc.sl_archive_tracking contains information about the most recent slony log ingested :

prompt> psql nso_drms

nso_drms=# select * from _jsoc.sl_archive_tracking;
 at_counter |         at_created         |        at_applied
------------+----------------------------+---------------------------
     886400 | 2014-06-27 09:36:17.452634 | 2014-06-27 16:37:03.71184

Subscription at your site

At your site will be a file named subscribe_series - it will be in the same directory as the get_slony_logs.pl script (at NSO it is in /datarea/production/subscribe_series).

To subscribe to a series you need to edit the config file - etc/subscribe_list.cfg at NSO - and then run it like so :

./subscribe_series ./etc/repclient.live.cfg ./etc/subscribe_list.cfg ~/.ssh-agent_rs.sh

The format of etc/subscribe_list.cfg is :

hmi.rdvflows_fd30_frame subscribe
hmi.rdvflows_fd15_frame subscribe
hmi.fsvbinned_nrt subscribe
hmi.rdVfitsc_fd05 subscribe
hmi.rdVfitsc_fd15 subscribe
hmi.rdVfitsc_fd30 subscribe
hmi.rdVfitsf_fd05 subscribe
hmi.rdVfitsf_fd15 subscribe
hmi.rdVfitsf_fd30 subscribe

Manual cleanup before subscription

Sometimes it becomes necessary to manually clean out after a failed attempt at subscription, since if tables are partially built it can block subsequent subscription attempts. To do this for the hmi.v_720s series, the following needs to be done in the database :

DROP TABLE hmi.v_720s;
DROP SEQUENCE hmi.v_720s_seq;
DELETE FROM hmi.drms_series where seriesname = 'hmi.v_720s';
DELETE FROM hmi.drms_segment where seriesname = 'hmi.v_720s';
DELETE FROM hmi.drms_keyword where seriesname = 'hmi.v_720s';

NOTE that you MUST then subscribe. Otherwise your slony updates may pertain to this table, which could cause slony updates to fail.

If you miss more than about a month's worth of slony updates, it will be necessary to unsubscribe and resubscribe to regenerate your data tables. Obviously, this is to be avoided.

VSO Specific Triggers

There are two triggers that may be installed after the subscription process

Automatic data mirroring

Sites that are using the JMD to retrieve data will have a table in their database named sunum_queue :

sdac_drms=# \d public.sunum_queue;
                                     Table "public.sunum_queue"
    Column    |            Type             |                       Modifiers
--------------+-----------------------------+-------------------------------------------------------
 queue_id     | bigint                      | not null default nextval('sunum_queue_key'::regclass)
 sunum        | bigint                      | not null
 series_name  | text                        | not null
 timestamp    | timestamp without time zone | default now()
 recnum       | bigint                      |
 request_type | character varying(10)       | not null default 'MIRROR'::character varying
 priority     | integer                     | not null default 0
Indexes:
    "sunum_queue_pkey" PRIMARY KEY, btree (queue_id)

The JMD will check this table for entries of files that it should retrieve. We use a trigger on the table under slony control to write records into the sunum_queue table. There is a script in the CVS tree to build the appropriate triggers: vso/DataProviders/JSOC/db_triggers/sunum_queue_trigger_sampled.pl. It takes the following arguments:

        Usage : ./sunum_queue_trigger_sampled.pl <series> [<instrument> [<cadence> [<max_age>]]]

        series     : series name (with namespace)
        instrument : hmi or aia
        cadence    : integer, in seconds (should be a multiple of 72 for aia, 45 for hmi)
        max_age    : maximum age, in days, of records to queue (default 14 days)

If given with just a series name, it will emit the SQL commands to create a trigger that retrieves all updates for observations within the last 14 days. Specifying a cadence will allow you to retrieve observations at lower than the full data rate. Note that only SQL is emitted to STDOUT, so that you can redirect this cleanly to a file. Warnings and other messages are emitted to STDERR.

WARNING : a DROP TRIGGER is right before the CREATE TRIGGER as you can't set up a CREATE OR REPLACE TRIGGER in Postgres. You may get an message similar to to ERROR: trigger "hmi_ic_720s_trg" for table "ic_720s" does not exist.

[oneiros@sdo3 db_triggers]$ ./sunum_queue_trigger_sampled.pl hmi.v_720s

-- You only need to issue this once.  In theory, it'd have already been
-- done as part of the setup for DRMS & slony:
--
-- CREATE LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION hmi_v_720s_fc() returns TRIGGER as $hmi_v_720s_trg$
BEGIN
  IF (TG_OP='INSERT' AND new.sunum > 0 and NEW.date__obs <> 'NaN') THEN
    IF ( EXTRACT('epoch' FROM NOW()) - NEW.date__obs < 222134367 ) THEN
      INSERT INTO sunum_queue (sunum,recnum,series_name) VALUES (new.sunum,NEW.recnum,'hmi.v_720s');
    END IF;

  END IF;
RETURN NEW;
END
$hmi_v_720s_trg$ LANGUAGE plpgsql;


-- the drop trigger will fail the first time, but there's no 'OR REPLACE' on
-- triggers, so this is so it'll work after the first time.
-- (or just don't mess with the trigger; replacing the function is enough,
-- as it's just linked by name)

DROP TRIGGER hmi_v_720s_trg ON hmi.v_720s;
CREATE TRIGGER hmi_v_720s_trg AFTER INSERT ON hmi.v_720s
    FOR EACH ROW EXECUTE PROCEDURE hmi_v_720s_fc();

VSO Shadow Tables

(note that most sites do not need this trigger. Only SDAC and NSO currently incur this extra overhead)

To speed searching for data, the VSO generates a materialized view that we refer to as the 'shadow tables'. These tables are keep up-to-date with the DRMS series table via a trigger. There are two scripts to build the SQL commands necessary, both in CVS: vso/DataProviders/JSOC/db_triggers/shadow_aia_template.pl and vso/DataProviders/JSOC/db_triggers/shadow_hmi_template.pl. They take a series name as an argument. Note that only SQL is emitted to STDOUT, so that you can redirect this cleanly to a file. Warnings and other messages are emitted to STDERR.

The UPDATE at the end will force the trigger to run for the entire table. This process takes a few minutes for HMI tables, and likely multiple hours for AIA.lev1.

WARNING : a DROP TRIGGER is right before the CREATE TRIGGER as you can't set up a CREATE OR REPLACE TRIGGER in Postgres. You may get an message similar to to ERROR: trigger "trig_update_vso_hmi__v_720s" for table "ic_720s" does not exist.

[oneiros@sdo3 db_triggers]$ perl shadow_hmi_template.pl hmi.v_720s
SERIES : hmi.v_720s
SHADOW : hmi__v_720s

DROP TABLE vso.hmi__v_720s;

CREATE TABLE vso.hmi__v_720s (
    date_obs    timestamp       NOT NULL,
    date        timestamp       NOT NULL,
    t_rec_index bigint          UNIQUE NOT NULL,
    recnum      bigint          UNIQUE NOT NULL,
    sunum       bigint          NOT NULL,
    slotnum     integer         NOT NULL,
    clusterid   bigint          NOT NULL,
    fileid      varchar(255)    UNIQUE NOT NULL,

    PRIMARY KEY (t_rec_index)
);

CREATE INDEX vso_hmi__v_720s_cluster  ON vso.hmi__v_720s ( clusterid );
CREATE INDEX vso_hmi__v_720s_date_obs ON vso.hmi__v_720s ( date_obs );
CREATE INDEX vso_hmi__v_720s_date     ON vso.hmi__v_720s ( date );

GRANT SELECT ON vso.hmi__v_720s TO vso;


CREATE OR REPLACE FUNCTION proc_update_vso_hmi__v_720s ()
RETURNS TRIGGER AS $proc_update_vso_hmi__v_720s$
    DECLARE
        record_date_obs   timestamp;
        record_date       timestamp;
        record_rec_index  bigint;
        record_recnum     bigint;
        record_sunum      bigint;
        record_slotnum    integer;
        record_fileid     varchar(255);
        record_clusterid  bigint;
        deleted_record    RECORD;
    BEGIN
        -- this seems out of order, I know.
        -- we look at the new record, and try to figure out if it's invalid
        -- if it is, we still need to try to clean up the old record.
        record_rec_index = NEW.t_rec_index;
        record_sunum     = NEW.sunum;
        -- we dont do journaling -- but we *DO* want to figure out
        -- when SUs are replaced
        DELETE FROM vso.hmi__v_720s
            WHERE t_rec_index = record_rec_index
            RETURNING sunum
            INTO deleted_record;
        IF FOUND THEN
            IF deleted_record.sunum <> NEW.sunum THEN
            -- don't flag if it's still the same sunum (headers updated?)
                INSERT INTO public.sunum_replaced (
                    old_sunum, new_sunum
                ) VALUES (
                    deleted_record.sunum, NEW.sunum
                );
            END IF;
        END IF;
--
        -- ocassionally, there's a missing record ... don't process those
        IF 'NaN' = NEW.date__obs  THEN
            RETURN NEW;
        END IF;
--
        record_date_obs  = dynamical_to_unix(NEW.date__obs);
        record_date      = dynamical_to_unix(NEW.date);
        record_recnum    = NEW.recnum;
        record_slotnum   = NEW.slotnum;

        record_fileid    = 'hmi__v_720s:'||record_rec_index||':'||record_rec_index;
        record_clusterid = ROUND( record_rec_index / 40 );
--
        -- I couldve done this as INSERT INTO ... SELECT NEW.*
        -- but then theres that nasty concat and such.
        INSERT into vso.hmi__v_720s (
            date_obs, date, t_rec_index, recnum, sunum, slotnum, clusterid, fileid
        ) values (
            record_date_obs, record_date, record_rec_index, record_recnum,
            record_sunum, record_slotnum, record_clusterid, record_fileid
        );
--
        RETURN NEW;
--
    EXCEPTION
      WHEN OTHERS THEN
          RAISE WARNING 'vso.hmi__v_720s : % : % : %', SQLSTATE, NEW.recnum, SQLERRM;
          RETURN NEW;
    END;
$proc_update_vso_hmi__v_720s$ LANGUAGE 'plpgsql';


DROP TRIGGER trig_update_vso_hmi__v_720s ON hmi.v_720s;

CREATE TRIGGER trig_update_vso_hmi__v_720s
    BEFORE INSERT OR UPDATE
    ON hmi.v_720s
    FOR EACH ROW
    EXECUTE PROCEDURE proc_update_vso_hmi__v_720s();


-- trigger it to run for the data already in the table
UPDATE hmi.v_720s SET recnum=recnum;