diff --git a/expected/column_filter.out b/expected/column_filter.out index d1df3ce..a5f71e5 100644 --- a/expected/column_filter.out +++ b/expected/column_filter.out @@ -9,6 +9,13 @@ CREATE TABLE public.basic_dml ( data text, something interval ); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + INSERT INTO basic_dml(other, data, something) VALUES (5, 'foo', '1 minute'::interval), (4, 'bar', '12 weeks'::interval), @@ -17,7 +24,8 @@ VALUES (5, 'foo', '1 minute'::interval), (1, NULL, NULL); \c :subscriber_dsn -- create table on subscriber to receive replicated filtered data from provider --- there are some extra columns too. +-- there are some extra columns too, and we omit 'other' as a non-replicated +-- table on upstream only. CREATE TABLE public.basic_dml ( id serial primary key, data text, @@ -25,13 +33,40 @@ CREATE TABLE public.basic_dml ( subonly integer, subonly_def integer DEFAULT 99 ); -SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml', ARRAY['default']); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); nspname | relname | att_list | has_row_filter ---------+-----------+-----------------------------------------+---------------- public | basic_dml | {id,data,something,subonly,subonly_def} | f (1 row) +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + \c :provider_dsn +-- Fails: the column filter list must include the key +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something}'); +ERROR: REPLICA IDENTITY columns must be replicated +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + +-- Fails: the column filter list may not include cols that are not in the table +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something, nosuchcol}'); +ERROR: table public.basic_dml does not have column nosuchcol +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + -- At provider, add table to replication set, with filtered columns SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data, something}'); replication_set_add_table @@ -39,6 +74,19 @@ SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchr t (1 row) +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+-----------+---------------------+---------------- + public | basic_dml | {id,data,something} | f +(1 row) + SELECT id, data, something FROM basic_dml ORDER BY id; id | data | something ----+------+------------------ @@ -71,6 +119,13 @@ SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_tab public | basic_dml | {id,data,something,subonly,subonly_def} | f (1 row) +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + -- data should get replicated to subscriber SELECT id, data, something FROM basic_dml ORDER BY id; id | data | something @@ -90,14 +145,49 @@ CREATE TABLE public.basic_oids_dml ( data text, something interval ) with oids ; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------------+---------------- + public | basic_oids_dml | {id,other,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + nspname | relname | set_name +---------+----------------+---------- + public | basic_oids_dml | +(1 row) + +-- Fails: cannot use system column 'oid' explicitly SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{oid, id, data, something}'); ERROR: table public.basic_oids_dml does not have column oid +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + nspname | relname | set_name +---------+----------------+---------- + public | basic_oids_dml | +(1 row) + +-- WITH OIDS table OK SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{id, data, something}'); replication_set_add_table --------------------------- t (1 row) +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + nspname | relname | set_name +---------+----------------+---------- + public | basic_oids_dml | default +(1 row) + SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); wait_slot_confirm_lsn ----------------------- @@ -120,6 +210,12 @@ VALUES (5, 'foo', '1 minute'::interval), (3, 'baz', '2 years 1 hour'::interval), (2, 'qux', '8 months 2 days'::interval), (1, NULL, NULL); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + UPDATE basic_oids_dml SET other = '40', data = NULL, something = '3 days'::interval WHERE id = 4; SELECT * from basic_oids_dml ORDER BY id; id | other | data | something @@ -149,6 +245,129 @@ SELECT id, data, something FROM basic_oids_dml ORDER BY id; (5 rows) \c :provider_dsn +-- Adding a table that's already selectively replicated fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true); +ERROR: duplicate key value violates unique constraint "replication_set_table_pkey" +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +-- So does trying to re-add to change the column set +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data}'); +ERROR: duplicate key value violates unique constraint "replication_set_table_pkey" +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +-- Shouldn't be able to drop a replicated col in a rel +-- but due to RM#5916 you can +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +ROLLBACK; +-- Even when wrapped (RM#5916) +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data; +$$); + replicate_ddl_command +----------------------- + t +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +ROLLBACK; +-- CASCADE should be allowed though +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +ROLLBACK; +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +$$); + replicate_ddl_command +----------------------- + t +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +ROLLBACK; +-- We can drop a non-replicated col. We must not replicate this DDL because in +-- this case the downstream doesn't have the 'other' column and apply will +-- fail. +ALTER TABLE public.basic_dml DROP COLUMN other; +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.basic_dml CASCADE; diff --git a/expected/replication_set.out b/expected/replication_set.out index 7fcec6e..a61e7a4 100644 --- a/expected/replication_set.out +++ b/expected/replication_set.out @@ -123,6 +123,11 @@ SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', repli ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_delete := true); ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY +-- Adding already-added fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('repset_replicate_all', 'public.test_publicschema'); +ERROR: duplicate key value violates unique constraint "replication_set_table_pkey" +\set VERBOSITY default -- check the replication sets SELECT nspname, relname, set_name FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3; @@ -157,6 +162,48 @@ SELECT nspname, relname, set_name FROM pglogical.tables --too short SELECT pglogical.create_replication_set(''); ERROR: replication set name cannot be empty +-- Can't drop table while it's in a repset +DROP TABLE public.test_publicschema; +ERROR: cannot drop table test_publicschema because other objects depend on it +DETAIL: table test_publicschema membership in replication set default_insert_only depends on table test_publicschema +table test_publicschema membership in replication set repset_replicate_all depends on table test_publicschema +HINT: Use DROP ... CASCADE to drop the dependent objects too. +-- Can't drop table while it's in a repset +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +DROP TABLE public.test_publicschema; +$$); +ERROR: cannot drop table public.test_publicschema because other objects depend on it +DETAIL: table public.test_publicschema membership in replication set default_insert_only depends on table public.test_publicschema +table public.test_publicschema membership in replication set repset_replicate_all depends on table public.test_publicschema +HINT: Use DROP ... CASCADE to drop the dependent objects too. +CONTEXT: during execution of queued SQL statement: +DROP TABLE public.test_publicschema; + +ROLLBACK; +-- Can CASCADE though, even outside ddlrep +BEGIN; +DROP TABLE public.test_publicschema CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test_publicschema membership in replication set default_insert_only +drop cascades to table test_publicschema membership in replication set repset_replicate_all +ROLLBACK; +-- ... and can drop after repset removal +SELECT pglogical.replication_set_remove_table('repset_replicate_all', 'public.test_publicschema'); + replication_set_remove_table +------------------------------ + t +(1 row) + +SELECT pglogical.replication_set_remove_table('default_insert_only', 'public.test_publicschema'); + replication_set_remove_table +------------------------------ + t +(1 row) + +BEGIN; +DROP TABLE public.test_publicschema; +ROLLBACK; \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.test_publicschema CASCADE; @@ -165,7 +212,6 @@ SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.test_nopkey CASCADE; DROP TABLE public.test_unlogged CASCADE; $$); -NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to table normalschema.test_normalschema NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to table "strange.schema-IS".test_strangeschema diff --git a/expected/row_filter.out b/expected/row_filter.out index 41a72a5..3216ab4 100644 --- a/expected/row_filter.out +++ b/expected/row_filter.out @@ -125,7 +125,7 @@ SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', true, -- fail, the membership in repset depends on data column \set VERBOSITY terse ALTER TABLE basic_dml DROP COLUMN data; -ERROR: cannot drop column data of table basic_dml because other objects depend on it +ERROR: cannot drop table basic_dml column data because other objects depend on it \set VERBOSITY default SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); wait_slot_confirm_lsn @@ -399,6 +399,42 @@ SELECT * FROM test_jsonb ORDER BY json_type; scalar | "a scalar" (2 rows) +\c :provider_dsn +-- Filter may refer to not-replicated columns +SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml'); + replication_set_remove_table +------------------------------ + t +(1 row) + +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false, columns := ARRAY['id', 'data'], row_filter := $rf$other = 2$rf$); + replication_set_add_table +--------------------------- + t +(1 row) + +INSERT INTO basic_dml(other, data, "SomeThing") VALUES (2, 'itstwo', '1 second'::interval); +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + other | data | SomeThing +-------+--------+----------- + 2 | itstwo | @ 1 sec +(1 row) + +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- 'other' will be NULL as it wasn't in the repset +-- even though we filtered on it. So will SomeThing. +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + other | data | SomeThing +-------+--------+----------- + | itstwo | +(1 row) + \c :provider_dsn \set VERBOSITY terse DROP FUNCTION funcn_add(integer, integer); diff --git a/pglogical_apply.c b/pglogical_apply.c index aea2ef7..a94e9a3 100644 --- a/pglogical_apply.c +++ b/pglogical_apply.c @@ -1788,7 +1788,11 @@ process_syncing_tables(XLogRecPtr end_lsn) { PGLogicalWorker *worker = (PGLogicalWorker *) lfirst(wlc); - if (pglogical_worker_running(worker)) + /* Is any sync worker running for the given table */ + if (pglogical_worker_running(worker) + && strcmp(NameStr(worker->worker.sync.nspname), NameStr(sync->nspname)) == 0 + && strcmp(NameStr(worker->worker.sync.relname), NameStr(sync->relname)) == 0 + ) nworkers++; } LWLockRelease(PGLogicalCtx->lock); @@ -1800,6 +1804,55 @@ process_syncing_tables(XLogRecPtr end_lsn) } } + //Following codes are modified version of the code from handle_commit() function + if (MyPGLogicalWorker->worker_type == PGLOGICAL_WORKER_SYNC) + { + /* + * Stop replay if we're doing limited replay and we've replayed up to the + * last record we're supposed to process. + */ + if (MyApplyWorker->replay_stop_lsn != InvalidXLogRecPtr + && MyApplyWorker->replay_stop_lsn <= end_lsn) + { + ereport(LOG, + (errmsg("pglogical sync finished processing; replayed to %X/%X of required %X/%X", + (uint32)(end_lsn>>32), (uint32)end_lsn, + (uint32)(MyApplyWorker->replay_stop_lsn >>32), + (uint32)MyApplyWorker->replay_stop_lsn))); + + /* + * If this is sync worker, update syncing table state to done. + */ + StartTransactionCommand(); + set_table_sync_status(MyApplyWorker->subid, + NameStr(MyPGLogicalWorker->worker.sync.nspname), + NameStr(MyPGLogicalWorker->worker.sync.relname), + SYNC_STATUS_SYNCDONE, end_lsn); + CommitTransactionCommand(); + + /* + * Flush all writes so the latest position can be reported back to the + * sender. + */ + XLogFlush(GetXLogWriteRecPtr()); + + /* + * Disconnect. + * + * This needs to happen before the pglogical_sync_worker_finish() + * call otherwise slot drop will fail. + */ + PQfinish(applyconn); + + pglogical_sync_worker_finish(); + + /* Stop gracefully */ + proc_exit(0); + } + + } + //Copied code ends + Assert(CurrentMemoryContext == MessageContext); } diff --git a/pglogical_dependency.c b/pglogical_dependency.c index 2dbf616..cc3bd7f 100644 --- a/pglogical_dependency.c +++ b/pglogical_dependency.c @@ -2051,7 +2051,7 @@ doDeletion(const ObjectAddress *object) drop_replication_set(object->objectId); else if (object->classId == get_replication_set_table_rel_oid()) replication_set_remove_table(object->objectId, object->objectSubId, - true); + true); else if (object->classId == get_replication_set_seq_rel_oid()) replication_set_remove_seq(object->objectId, object->objectSubId, true); diff --git a/pglogical_functions.c b/pglogical_functions.c index c0f4c0f..afed6f9 100644 --- a/pglogical_functions.c +++ b/pglogical_functions.c @@ -1784,6 +1784,8 @@ pglogical_replicate_ddl_command(PG_FUNCTION_ARGS) } PG_END_TRY(); + in_pglogical_replicate_ddl_command = false; + /* * Restore the GUC variables we set above. */ @@ -1889,7 +1891,8 @@ pglogical_node_info(PG_FUNCTION_ARGS) * Get replication info about table. * * This is called by downstream sync worker on the upstream to obtain - * info needed to do initial synchronization correctly. + * info needed to do initial synchronization correctly. Be careful + * about changing it, as it must be upward- and downward-compatible. */ Datum pglogical_show_repset_table_info(PG_FUNCTION_ARGS) diff --git a/pglogical_sync.c b/pglogical_sync.c index eb42dd9..a33bf78 100644 --- a/pglogical_sync.c +++ b/pglogical_sync.c @@ -31,6 +31,7 @@ #include "commands/dbcommands.h" #include "commands/tablecmds.h" +#include "commands/copy.h" #include "lib/stringinfo.h" @@ -39,6 +40,9 @@ #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" +#include "parser/parse_node.h" +#include "parser/parse_relation.h" + #include "pgstat.h" #include "replication/origin.h" @@ -84,6 +88,11 @@ void pglogical_sync_main(Datum main_arg); static PGLogicalSyncWorker *MySyncWorker = NULL; +static StringInfo copybuf = NULL; +static PGconn *source_copy_conn = NULL; + +static int copy_read_data(void *outbuf, int minread, int maxread); +static int libpqrcv_receive_pglogical(PGconn *conn, char **buffer,pgsocket *wait_fd); static void dump_structure(PGLogicalSubscription *sub, const char *destfile, @@ -393,13 +402,11 @@ make_copy_attnamelist(PGLogicalRelation *rel) * COPY single table over wire. */ static void -copy_table_data(PGconn *origin_conn, PGconn *target_conn, +copy_table_data(PGconn *origin_conn, PGLogicalRemoteRel *remoterel, List *replication_sets) { PGLogicalRelation *rel; PGresult *res; - int bytes; - char *copybuf; List *attnamelist; ListCell *lc; bool first; @@ -407,6 +414,9 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, StringInfoData attlist; MemoryContext curctx = CurrentMemoryContext, oldctx; + CopyState cstate; + ParseState *pstate; + StringInfoData stringinfodata = {0}; /* Build the relation map. */ StartTransactionCommand(); @@ -501,59 +511,39 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, PQerrorMessage(origin_conn)))); } - /* Build COPY FROM query. */ - resetStringInfo(&query); - appendStringInfo(&query, "COPY %s.%s ", - PQescapeIdentifier(origin_conn, remoterel->nspname, - strlen(remoterel->nspname)), - PQescapeIdentifier(origin_conn, remoterel->relname, - strlen(remoterel->relname))); - if (list_length(attnamelist)) - appendStringInfo(&query, "(%s) ", attlist.data); - appendStringInfoString(&query, "FROM stdin"); - - /* Execute COPY FROM. */ - res = PQexec(target_conn, query.data); - if (PQresultStatus(res) != PGRES_COPY_IN) - { - ereport(ERROR, - (errmsg("table copy failed"), - errdetail("Query '%s': %s", query.data, - PQerrorMessage(origin_conn)))); - } + PQclear(res); - while ((bytes = PQgetCopyData(origin_conn, ©buf, false)) > 0) - { - if (PQputCopyData(target_conn, copybuf, bytes) != 1) - { - ereport(ERROR, - (errmsg("writing to target table failed"), - errdetail("destination connection reported: %s", - PQerrorMessage(target_conn)))); - } - PQfreemem(copybuf); + /* + * Instead of creating another libpq connection in to the target database, we + * use the same sync worker to write to the target database. + * + * This connection is in replica mode, so foreign key constraints wont be + * checked, just like pg's logical replication sync worker does. This means + * that tables can be copied in any order without triggering FK violation. + */ - CHECK_FOR_INTERRUPTS(); - } + copybuf = &stringinfodata; + StartTransactionCommand(); + rel = pglogical_relation_open(remoterel->relid, RowExclusiveLock); + pstate = make_parsestate(NULL); + addRangeTableEntryForRelation(pstate, rel->rel, NULL, false, false); - if (bytes != -1) - { - ereport(ERROR, - (errmsg("reading from origin table failed"), - errdetail("source connection returned %d: %s", - bytes, PQerrorMessage(origin_conn)))); - } + source_copy_conn = origin_conn; - /* Send local finish */ - if (PQputCopyEnd(target_conn, NULL) != 1) - { - ereport(ERROR, - (errmsg("sending copy-completion to destination connection failed"), - errdetail("destination connection reported: %s", - PQerrorMessage(target_conn)))); - } + cstate = BeginCopyFrom(pstate, rel->rel, NULL, false, copy_read_data, attnamelist, NIL); - PQclear(res); + /* Do the copy */ + (void) CopyFrom(cstate); + + source_copy_conn = NULL; + + if(copybuf->data) + PQfreemem(copybuf->data); // malloc-ed by libpq + + copybuf = NULL; + + pglogical_relation_close(rel, RowExclusiveLock); + CommitTransactionCommand(); } /* @@ -568,16 +558,12 @@ copy_tables_data(char *sub_name, const char *origin_dsn, const char *origin_name) { PGconn *origin_conn; - PGconn *target_conn; ListCell *lc; /* Connect to origin node. */ origin_conn = pglogical_connect(origin_dsn, sub_name, "copy"); start_copy_origin_tx(origin_conn, origin_snapshot); - /* Connect to target node. */ - target_conn = pglogical_connect(target_dsn, sub_name, "copy"); - start_copy_target_tx(target_conn, origin_name); /* Copy every table. */ foreach (lc, tables) @@ -588,14 +574,13 @@ copy_tables_data(char *sub_name, const char *origin_dsn, remoterel = pg_logical_get_remote_repset_table(origin_conn, rv, replication_sets); - copy_table_data(origin_conn, target_conn, remoterel, replication_sets); + copy_table_data(origin_conn, remoterel, replication_sets); CHECK_FOR_INTERRUPTS(); } /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); - finish_copy_target_tx(target_conn); } /* @@ -616,7 +601,6 @@ copy_replication_sets_data(char *sub_name, const char *origin_dsn, PGconn *origin_conn; PGconn *target_conn; List *tables; - ListCell *lc; /* Connect to origin node. */ origin_conn = pglogical_connect(origin_dsn, sub_name, "copy"); @@ -630,15 +614,20 @@ copy_replication_sets_data(char *sub_name, const char *origin_dsn, target_conn = pglogical_connect(target_dsn, sub_name, "copy"); start_copy_target_tx(target_conn, origin_name); - /* Copy every table. */ - foreach (lc, tables) - { - PGLogicalRemoteRel *remoterel = lfirst(lc); - copy_table_data(origin_conn, target_conn, remoterel, replication_sets); + /* + * We don't copy the table data here. Instead a sync worker + * is spawned for each table and it does the initial copying. + */ + // /* Copy every table. */ + // foreach (lc, tables) + // { + // PGLogicalRemoteRel *remoterel = lfirst(lc); + + // copy_table_data(origin_conn, target_conn, remoterel, replication_sets); - CHECK_FOR_INTERRUPTS(); - } + // CHECK_FOR_INTERRUPTS(); + // } /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); @@ -647,6 +636,172 @@ copy_replication_sets_data(char *sub_name, const char *origin_dsn, return tables; } +/* + * Data source callback for the COPY FROM, which reads from the remote + * connection and passes the data back to our local COPY. + * Modified version of copy_read_data() from pg/src/backend/replication/logical/tablesync.c + */ +static int +copy_read_data(void *outbuf, int minread, int maxread) +{ + int bytesread = 0; + int avail; + + /* If there are some leftover data from previous read, use it. */ + avail = copybuf->len - copybuf->cursor; + if (avail) + { + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + while (maxread > 0 && bytesread < minread) + { + pgsocket fd = PGINVALID_SOCKET; + int rc; + int len; + char *buf = NULL; + + for (;;) + { + /* Try read the data. */ + len = libpqrcv_receive_pglogical(source_copy_conn, &buf, &fd); + + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + break; + else if (len < 0) + return bytesread; + else + { + /* Process the data */ + if(copybuf->data) + PQfreemem(copybuf->data); // malloc-ed by libpq + copybuf->data = buf; + copybuf->len = len; + copybuf->cursor = 0; + + avail = copybuf->len - copybuf->cursor; + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + outbuf = (void *) ((char *) outbuf + avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + if (maxread <= 0 || bytesread >= minread) + return bytesread; + } + + /* + * Wait for more data or latch. + */ + rc = WaitLatchOrSocket(MyLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_POSTMASTER_DEATH, + fd, 1000L/* , WAIT_EVENT_LOGICAL_SYNC_DATA */); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(MyLatch); + } + + return bytesread; +} + +/* + * Modified version of libpqrcv_receive from + * pg/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c + */ +static int libpqrcv_receive_pglogical(PGconn *conn, char **buffer, + pgsocket *wait_fd) +{ + int rawlen; + char *recvBuf = NULL; + + *buffer = NULL; + + /* Try to receive a CopyData message */ + rawlen = PQgetCopyData(conn, &recvBuf, 1); + if (rawlen == 0) + { + /* Try consuming some data. */ + if (PQconsumeInput(conn) == 0) + ereport(ERROR, + (errmsg("could not receive data from WAL stream: %s", + pchomp(PQerrorMessage(conn))))); + + /* Now that we've consumed some input, try again */ + rawlen = PQgetCopyData(conn, &recvBuf, 1); + if (rawlen == 0) + { + /* Tell caller to try again when our socket is ready. */ + *wait_fd = PQsocket(conn); + return 0; + } + } + if (rawlen == -1) /* end-of-streaming or error */ + { + PGresult *res; + + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + + /* Verify that there are no more results. */ + res = PQgetResult(conn); + if (res != NULL) + { + PQclear(res); + + /* + * If the other side closed the connection orderly (otherwise + * we'd seen an error, or PGRES_COPY_IN) don't report an error + * here, but let callers deal with it. + */ + if (PQstatus(conn) == CONNECTION_BAD) + return -1; + + ereport(ERROR, + (errmsg("unexpected result after CommandComplete: %s", + PQerrorMessage(conn)))); + } + + return -1; + } + else if (PQresultStatus(res) == PGRES_COPY_IN) + { + PQclear(res); + return -1; + } + else + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive data from WAL stream: %s", + pchomp(PQerrorMessage(conn))))); + } + } + if (rawlen < -1) + ereport(ERROR, + (errmsg("could not receive data from WAL stream: %s", + pchomp(PQerrorMessage(conn))))); + + /* Return received messages to caller */ + *buffer = recvBuf; + return rawlen; +} + static void pglogical_sync_worker_cleanup(PGLogicalSubscription *sub) { @@ -851,7 +1006,7 @@ pglogical_sync_subscription(PGLogicalSubscription *sub) { set_table_sync_status(sub->id, remoterel->nspname, remoterel->relname, - SYNC_STATUS_READY, + SYNC_STATUS_INIT, lsn); } else @@ -862,7 +1017,7 @@ pglogical_sync_subscription(PGLogicalSubscription *sub) newsync.subid = sub->id; namestrcpy(&newsync.nspname, remoterel->nspname); namestrcpy(&newsync.relname, remoterel->relname); - newsync.status = SYNC_STATUS_READY; + newsync.status = SYNC_STATUS_INIT; newsync.statuslsn = lsn; create_local_sync_status(&newsync); } diff --git a/pglogical_worker.c b/pglogical_worker.c index bc40564..7649390 100644 --- a/pglogical_worker.c +++ b/pglogical_worker.c @@ -29,6 +29,8 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#include "replication/logicallauncher.h" + #include "pgstat.h" #include "pglogical_sync.h" @@ -115,11 +117,34 @@ pglogical_worker_register(PGLogicalWorker *worker) LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE); + /* + * Limit sync workers per subscription upto the + * GUC max_sync_workers_per_subscprition + */ + if(worker->worker_type == PGLOGICAL_WORKER_SYNC) + { + int nsyncWorkers; + + nsyncWorkers = num_of_sync_workers(worker->dboid, worker->worker.sync.apply.subid); + + if(nsyncWorkers >= max_sync_workers_per_subscription) + { + LWLockRelease(PGLogicalCtx->lock); + return -1; + } + } + slot = find_empty_worker_slot(worker->dboid); if (slot == -1) { LWLockRelease(PGLogicalCtx->lock); - elog(ERROR, "could not register pglogical worker: all background worker slots are already used"); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of worker slots for pglogical workers"), + errhint("You might need to increase max_worker_processes."))); + + return -1; } worker_shm = &PGLogicalCtx->workers[slot]; @@ -179,7 +204,7 @@ pglogical_worker_register(PGLogicalWorker *worker) if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { worker_shm->crashed_at = GetCurrentTimestamp(); - ereport(ERROR, + ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("worker registration failed, you might want to increase max_worker_processes setting"))); } @@ -737,3 +762,30 @@ pglogical_worker_type_name(PGLogicalWorkerType type) default: Assert(false); return NULL; } } + + +/* + * Find number of sync workers for the given subscriptions. + */ +int +num_of_sync_workers(Oid dboid, Oid subid) +{ + int i; + int nsync = 0; + + Assert(LWLockHeldByMe(PGLogicalCtx->lock)); + + for (i = 0; i < PGLogicalCtx->total_workers; i++) + { + /* + * Find num of sync workers for the given subscription which are not crashed + */ + if(PGLogicalCtx->workers[i].worker_type == PGLOGICAL_WORKER_SYNC + && PGLogicalCtx->workers[i].dboid == dboid + && PGLogicalCtx->workers[i].worker.sync.apply.subid == subid + && PGLogicalCtx->workers[i].crashed_at == 0 ) + nsync++; + } + + return nsync; +} diff --git a/pglogical_worker.h b/pglogical_worker.h index 663d0ba..99dbe36 100644 --- a/pglogical_worker.h +++ b/pglogical_worker.h @@ -108,4 +108,7 @@ extern void pglogical_worker_kill(PGLogicalWorker *worker); extern const char * pglogical_worker_type_name(PGLogicalWorkerType type); +extern int num_of_sync_workers(Oid dboid, Oid subid); + + #endif /* PGLOGICAL_WORKER_H */ diff --git a/sql/column_filter.sql b/sql/column_filter.sql index 826ed05..9b138f6 100644 --- a/sql/column_filter.sql +++ b/sql/column_filter.sql @@ -10,6 +10,10 @@ CREATE TABLE public.basic_dml ( data text, something interval ); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + INSERT INTO basic_dml(other, data, something) VALUES (5, 'foo', '1 minute'::interval), (4, 'bar', '12 weeks'::interval), @@ -19,7 +23,8 @@ VALUES (5, 'foo', '1 minute'::interval), \c :subscriber_dsn -- create table on subscriber to receive replicated filtered data from provider --- there are some extra columns too. +-- there are some extra columns too, and we omit 'other' as a non-replicated +-- table on upstream only. CREATE TABLE public.basic_dml ( id serial primary key, data text, @@ -27,11 +32,34 @@ CREATE TABLE public.basic_dml ( subonly integer, subonly_def integer DEFAULT 99 ); -SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml', ARRAY['default']); + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; \c :provider_dsn + +-- Fails: the column filter list must include the key +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something}'); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +-- Fails: the column filter list may not include cols that are not in the table +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something, nosuchcol}'); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + -- At provider, add table to replication set, with filtered columns SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data, something}'); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + SELECT id, data, something FROM basic_dml ORDER BY id; SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); @@ -44,6 +72,10 @@ SELECT pglogical.wait_for_table_sync_complete('test_subscription', 'basic_dml'); COMMIT; SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + -- data should get replicated to subscriber SELECT id, data, something FROM basic_dml ORDER BY id; @@ -56,10 +88,25 @@ CREATE TABLE public.basic_oids_dml ( something interval ) with oids ; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + +-- Fails: cannot use system column 'oid' explicitly SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{oid, id, data, something}'); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + +-- WITH OIDS table OK SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{id, data, something}'); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); \c :subscriber_dsn @@ -82,6 +129,8 @@ VALUES (5, 'foo', '1 minute'::interval), (2, 'qux', '8 months 2 days'::interval), (1, NULL, NULL); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + UPDATE basic_oids_dml SET other = '40', data = NULL, something = '3 days'::interval WHERE id = 4; SELECT * from basic_oids_dml ORDER BY id; @@ -92,6 +141,66 @@ SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); SELECT id, data, something FROM basic_oids_dml ORDER BY id; \c :provider_dsn + +-- Adding a table that's already selectively replicated fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true); +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +-- So does trying to re-add to change the column set +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data}'); +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +-- Shouldn't be able to drop a replicated col in a rel +-- but due to RM#5916 you can +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +ROLLBACK; + +-- Even when wrapped (RM#5916) +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data; +$$); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +ROLLBACK; + +-- CASCADE should be allowed though +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +ROLLBACK; + +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +$$); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +ROLLBACK; + +-- We can drop a non-replicated col. We must not replicate this DDL because in +-- this case the downstream doesn't have the 'other' column and apply will +-- fail. +ALTER TABLE public.basic_dml DROP COLUMN other; + +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.basic_dml CASCADE; diff --git a/sql/replication_set.sql b/sql/replication_set.sql index 7fab2a1..1a0fc4b 100644 --- a/sql/replication_set.sql +++ b/sql/replication_set.sql @@ -49,6 +49,11 @@ SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}'); SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_update := true); SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_delete := true); +-- Adding already-added fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('repset_replicate_all', 'public.test_publicschema'); +\set VERBOSITY default + -- check the replication sets SELECT nspname, relname, set_name FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3; @@ -61,6 +66,28 @@ SELECT nspname, relname, set_name FROM pglogical.tables --too short SELECT pglogical.create_replication_set(''); +-- Can't drop table while it's in a repset +DROP TABLE public.test_publicschema; + +-- Can't drop table while it's in a repset +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +DROP TABLE public.test_publicschema; +$$); +ROLLBACK; + +-- Can CASCADE though, even outside ddlrep +BEGIN; +DROP TABLE public.test_publicschema CASCADE; +ROLLBACK; + +-- ... and can drop after repset removal +SELECT pglogical.replication_set_remove_table('repset_replicate_all', 'public.test_publicschema'); +SELECT pglogical.replication_set_remove_table('default_insert_only', 'public.test_publicschema'); +BEGIN; +DROP TABLE public.test_publicschema; +ROLLBACK; + \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.test_publicschema CASCADE; diff --git a/sql/row_filter.sql b/sql/row_filter.sql index 5574d19..ad847a0 100644 --- a/sql/row_filter.sql +++ b/sql/row_filter.sql @@ -196,6 +196,24 @@ SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); SELECT * FROM test_jsonb ORDER BY json_type; +\c :provider_dsn + +-- Filter may refer to not-replicated columns +SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml'); +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false, columns := ARRAY['id', 'data'], row_filter := $rf$other = 2$rf$); + +INSERT INTO basic_dml(other, data, "SomeThing") VALUES (2, 'itstwo', '1 second'::interval); + +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- 'other' will be NULL as it wasn't in the repset +-- even though we filtered on it. So will SomeThing. +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + \c :provider_dsn \set VERBOSITY terse DROP FUNCTION funcn_add(integer, integer);