diff --git a/compat10/pglogical_compat.h b/compat10/pglogical_compat.h index 6893861..f0970bc 100644 --- a/compat10/pglogical_compat.h +++ b/compat10/pglogical_compat.h @@ -8,6 +8,8 @@ #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define PGLCreateTrigger CreateTrigger #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ diff --git a/compat11/pglogical_compat.h b/compat11/pglogical_compat.h index a555c12..4a1493f 100644 --- a/compat11/pglogical_compat.h +++ b/compat11/pglogical_compat.h @@ -1,9 +1,12 @@ #ifndef PG_LOGICAL_COMPAT_H #define PG_LOGICAL_COMPAT_H +#include "pgstat.h" #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat12/pglogical_compat.h b/compat12/pglogical_compat.h index af2ce82..e0f3e03 100644 --- a/compat12/pglogical_compat.h +++ b/compat12/pglogical_compat.h @@ -1,6 +1,7 @@ #ifndef PG_LOGICAL_COMPAT_H #define PG_LOGICAL_COMPAT_H +#include "pgstat.h" #include "access/amapi.h" #include "access/heapam.h" #include "access/table.h" @@ -8,6 +9,8 @@ #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat13/pglogical_compat.h b/compat13/pglogical_compat.h index bbb50ca..0d295c6 100644 --- a/compat13/pglogical_compat.h +++ b/compat13/pglogical_compat.h @@ -1,6 +1,7 @@ #ifndef PG_LOGICAL_COMPAT_H #define PG_LOGICAL_COMPAT_H +#include "pgstat.h" #include "access/amapi.h" #include "access/heapam.h" #include "access/table.h" @@ -8,6 +9,8 @@ #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat14/pglogical_compat.h b/compat14/pglogical_compat.h index 990eb75..6e43eee 100644 --- a/compat14/pglogical_compat.h +++ b/compat14/pglogical_compat.h @@ -6,6 +6,9 @@ #include "access/table.h" #include "access/tableam.h" #include "utils/varlena.h" +#include "utils/wait_event.h" + +#include "compat94/pglogical_libpq-be-fe-helpers.h" #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat15/pglogical_compat.h b/compat15/pglogical_compat.h index 3591d94..ed40c1b 100644 --- a/compat15/pglogical_compat.h +++ b/compat15/pglogical_compat.h @@ -6,6 +6,9 @@ #include "access/table.h" #include "access/tableam.h" #include "utils/varlena.h" +#include "utils/wait_event.h" + +#include "compat94/pglogical_libpq-be-fe-helpers.h" #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat16/pglogical_compat.h b/compat16/pglogical_compat.h index a607519..b781e78 100644 --- a/compat16/pglogical_compat.h +++ b/compat16/pglogical_compat.h @@ -6,6 +6,10 @@ #include "access/table.h" #include "access/tableam.h" #include "utils/varlena.h" +#include "utils/wait_event.h" + +#include "libpq/libpq-be-fe-helpers.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat94/pglogical_compat.h b/compat94/pglogical_compat.h index e5973a9..b749017 100644 --- a/compat94/pglogical_compat.h +++ b/compat94/pglogical_compat.h @@ -12,6 +12,9 @@ #include "storage/lwlock.h" #include "utils/array.h" +#define PG_WAIT_EXTENSION 0 +#include "compat94/pglogical_libpq-be-fe-helpers.h" + /* 9.4 lacks PG_*_MAX */ #ifndef PG_UINT32_MAX #define PG_UINT32_MAX (0xFFFFFFFF) diff --git a/compat94/pglogical_libpq-be-fe-helpers.h b/compat94/pglogical_libpq-be-fe-helpers.h new file mode 100644 index 0000000..f6fb167 --- /dev/null +++ b/compat94/pglogical_libpq-be-fe-helpers.h @@ -0,0 +1,438 @@ +/*------------------------------------------------------------------------- + * + * libpq-be-fe-helpers.h + * Helper functions for using libpq in extensions + * + * Code built directly into the backend is not allowed to link to libpq + * directly. Extension code is allowed to use libpq however. However, libpq + * used in extensions has to be careful not to block inside libpq, otherwise + * interrupts will not be processed, leading to issues like unresolvable + * deadlocks. Backend code also needs to take care to acquire/release an + * external fd for the connection, otherwise fd.c's accounting of fd's is + * broken. + * + * This file provides helper functions to make it easier to comply with these + * rules. It is a header only library as it needs to be linked into each + * extension using libpq, and it seems too small to be worth adding a + * dedicated static library for. + * + * TODO: For historical reasons the connections established here are not put + * into non-blocking mode. That can lead to blocking even when only the async + * libpq functions are used. This should be fixed. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/libpq/libpq-be-fe-helpers.h + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQ_BE_FE_HELPERS_H +#define LIBPQ_BE_FE_HELPERS_H + +/* + * Despite the name, BUILDING_DLL is set only when building code directly part + * of the backend. Which also is where libpq isn't allowed to be + * used. Obviously this doesn't protect against libpq-fe.h getting included + * otherwise, but perhaps still protects against a few mistakes... + */ +#ifdef BUILDING_DLL +#error "libpq may not be used code directly built into the backend" +#endif + +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" + + +#if PG_VERSION_NUM < 100000 +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout) +#else +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) +#endif + +#if PG_VERSION_NUM < 130000 +#define AcquireExternalFD() (true) +#define ReleaseExternalFD() do {} while (0) +#endif + +#if PG_VERSION_NUM < 90500 +#define MyLatch (&MyProc->procLatch) +#endif + + +static inline void libpqsrv_connect_prepare(void); +static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); + + +/* + * PQconnectdb() wrapper that reserves a file descriptor and processes + * interrupts during connection establishment. + * + * Throws an error if AcquireExternalFD() fails, but does not throw if + * connection establishment itself fails. Callers need to use PQstatus() to + * check if connection establishment succeeded. + */ +static inline PGconn * +libpqsrv_connect(const char *conninfo, uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStart(conninfo); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * Like libpqsrv_connect(), except that this is a wrapper for + * PQconnectdbParams(). + */ +static inline PGconn * +libpqsrv_connect_params(const char *const *keywords, + const char *const *values, + int expand_dbname, + uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStartParams(keywords, values, expand_dbname); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * PQfinish() wrapper that additionally releases the reserved file descriptor. + * + * It is allowed to call this with a NULL pgconn iff NULL was returned by + * libpqsrv_connect*. + */ +static inline void +libpqsrv_disconnect(PGconn *conn) +{ + /* + * If no connection was established, we haven't reserved an FD for it (or + * already released it). This rule makes it easier to write PG_CATCH() + * handlers for this facility's users. + * + * See also libpqsrv_connect_internal(). + */ + if (conn == NULL) + return; + + ReleaseExternalFD(); + PQfinish(conn); +} + + +/* internal helper functions follow */ + + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_prepare(void) +{ + /* + * We must obey fd.c's limit on non-virtual file descriptors. Assume that + * a PGconn represents one long-lived FD. (Doing this here also ensures + * that VFDs are closed if needed to make room.) + */ + if (!AcquireExternalFD()) + { +#ifndef WIN32 /* can't write #if within ereport() macro */ + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); +#else + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process setting."))); +#endif + } +} + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) +{ + /* + * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do + * that here. + */ + if (conn == NULL) + { + ReleaseExternalFD(); + return; + } + + /* + * Can't wait without a socket. Note that we don't want to close the libpq + * connection yet, so callers can emit a useful error. + */ + if (PQstatus(conn) == CONNECTION_BAD) + return; + + /* + * WaitLatchOrSocket() can conceivably fail, handle that case here instead + * of requiring all callers to do so. + */ + PG_TRY(); + { + PostgresPollingStatusType status; + + /* + * Poll connection until we have OK or FAILED status. + * + * Per spec for PQconnectPoll, first wait till socket is write-ready. + */ + status = PGRES_POLLING_WRITING; + while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED) + { + int io_flag; + int rc; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + + /* + * Windows needs a different test while waiting for + * connection-made + */ + else if (PQstatus(conn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + + rc = WaitLatchOrSocket(MyLatch, + WL_POSTMASTER_DEATH | WL_LATCH_SET | io_flag, + PQsocket(conn), + 0, + wait_event_info); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* If socket is ready, advance the libpq state machine */ + if (rc & io_flag) + status = PQconnectPoll(conn); + } + } + PG_CATCH(); + { + /* + * If an error is thrown here, the callers won't call + * libpqsrv_disconnect() with a conn, so release resources + * immediately. + */ + ReleaseExternalFD(); + PQfinish(conn); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +#undef WaitLatchOrSocket +#undef AcquireExternalFD +#undef ReleaseExternalFD +#undef MyLatch + +#endif /* LIBPQ_BE_FE_HELPERS_H */ + +/* + * Preceding half of file is v16+ header content, and following half is v17+ + * header content. By using separate include guards, v16 can include both + * this header and PostgreSQL's libpq/libpq-be-fe-helpers.h. + */ + +#ifndef PGLOGICAL_LIBPQ_BE_FE_HELPERS_H +#define PGLOGICAL_LIBPQ_BE_FE_HELPERS_H + +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" + + +#if PG_VERSION_NUM >= 100000 +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) +#else +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout) +#endif + +#if PG_VERSION_NUM < 90500 +#define MyLatch (&MyProc->procLatch) +#endif + + +static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info); + +/* + * PQexec() wrapper that processes interrupts. + * + * Unless PQsetnonblocking(conn, 1) is in effect, this can't process + * interrupts while pushing the query text to the server. Consider that + * setting if query strings can be long relative to TCP buffer size. + * + * This has the preconditions of PQsendQuery(), not those of PQexec(). Most + * notably, PQexec() would silently discard any prior query results. + */ +static inline PGresult * +libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info) +{ + if (!PQsendQuery(conn, query)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * PQexecParams() wrapper that processes interrupts. + * + * See notes at libpqsrv_exec(). + */ +static inline PGresult * +libpqsrv_exec_params(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + uint32 wait_event_info) +{ + if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * Like PQexec(), loop over PQgetResult() until it returns NULL or another + * terminal state. Return the last non-NULL result or the terminal state. + */ +static inline PGresult * +libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info) +{ + PGresult *volatile lastResult = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + /* Wait for, and collect, the next PGresult. */ + PGresult *result; + + result = libpqsrv_get_result(conn, wait_event_info); + if (result == NULL) + break; /* query is complete, or failure */ + + /* + * Emulate PQexec()'s behavior of returning the last result when + * there are many. + */ + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQstatus(conn) == CONNECTION_BAD) + break; + } + } + PG_CATCH(); + { + PQclear(lastResult); + PG_RE_THROW(); + } + PG_END_TRY(); + + return lastResult; +} + +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static inline PGresult * +libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(conn)) + { + int rc; + + rc = WaitLatchOrSocket(MyLatch, + WL_POSTMASTER_DEATH | WL_LATCH_SET | + WL_SOCKET_READABLE, + PQsocket(conn), + 0, + wait_event_info); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(conn) == 0) + { + /* trouble; expect PQgetResult() to return NULL */ + break; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(conn); +} + + +#undef WaitLatchOrSocket +#undef MyLatch + +#endif /* PGLOGICAL_LIBPQ_BE_FE_HELPERS_H */ diff --git a/compat95/pglogical_compat.h b/compat95/pglogical_compat.h index 85cb766..7c19d05 100644 --- a/compat95/pglogical_compat.h +++ b/compat95/pglogical_compat.h @@ -9,6 +9,9 @@ #include "replication/origin.h" #include "storage/lwlock.h" +#define PG_WAIT_EXTENSION 0 +#include "compat94/pglogical_libpq-be-fe-helpers.h" + extern LWLockPadded *GetNamedLWLockTranche(const char *tranche_name); extern void RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks); diff --git a/compat96/pglogical_compat.h b/compat96/pglogical_compat.h index 509b262..5035420 100644 --- a/compat96/pglogical_compat.h +++ b/compat96/pglogical_compat.h @@ -7,6 +7,9 @@ #include "executor/executor.h" #include "replication/origin.h" +#define PG_WAIT_EXTENSION 0 +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define PGLCreateTrigger CreateTrigger #define RawStmt Node diff --git a/pglogical.c b/pglogical.c index 2e1209c..958471e 100644 --- a/pglogical.c +++ b/pglogical.c @@ -431,7 +431,8 @@ pglogical_identify_system(PGconn *streamConn, uint64* sysid, { PGresult *res; - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + res = libpqsrv_exec(streamConn, "IDENTIFY_SYSTEM", + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { elog(ERROR, "could not send replication command \"%s\": %s", @@ -578,7 +579,7 @@ pglogical_start_replication(PGconn *streamConn, const char *slot_name, appendStringInfoChar(&command, ')'); - res = PQexec(streamConn, command.data); + res = libpqsrv_exec(streamConn, command.data, PG_WAIT_EXTENSION); sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); if (PQresultStatus(res) != PGRES_COPY_BOTH) elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s", diff --git a/pglogical.h b/pglogical.h index fc7ebb3..52e0371 100644 --- a/pglogical.h +++ b/pglogical.h @@ -20,8 +20,6 @@ #include "executor/executor.h" #include "miscadmin.h" -#include "libpq-fe.h" - #include "pglogical_fe.h" #include "pglogical_node.h" diff --git a/pglogical_conflict.c b/pglogical_conflict.c index a37e53f..4a5a626 100644 --- a/pglogical_conflict.c +++ b/pglogical_conflict.c @@ -47,6 +47,7 @@ #include "pglogical_conflict.h" #include "pglogical_proto_native.h" +#include "pglogical.h" int pglogical_conflict_resolver = PGLOGICAL_RESOLVE_APPLY_REMOTE; int pglogical_conflict_log_level = LOG; diff --git a/pglogical_output_plugin.h b/pglogical_output_plugin.h index aa843ae..406f9f1 100644 --- a/pglogical_output_plugin.h +++ b/pglogical_output_plugin.h @@ -16,9 +16,6 @@ #include "nodes/pg_list.h" #include "nodes/primnodes.h" -/* summon cross-PG-version compatibility voodoo */ -#include "pglogical_compat.h" - /* typedef appears in pglogical_output_plugin.h */ typedef struct PGLogicalOutputData { diff --git a/pglogical_output_proto.c b/pglogical_output_proto.c index 15e00bd..9739d7f 100644 --- a/pglogical_output_proto.c +++ b/pglogical_output_proto.c @@ -17,6 +17,7 @@ #include "pglogical_output_proto.h" #include "pglogical_proto_native.h" #include "pglogical_proto_json.h" +#include "pglogical.h" PGLogicalProtoAPI * pglogical_init_api(PGLogicalProtoType typ) diff --git a/pglogical_proto_json.c b/pglogical_proto_json.c index 6650cba..d1b7dd5 100644 --- a/pglogical_proto_json.c +++ b/pglogical_proto_json.c @@ -37,6 +37,7 @@ #include "pglogical_output_plugin.h" #include "pglogical_proto_json.h" +#include "pglogical.h" #ifdef HAVE_REPLICATION_ORIGINS #include "replication/origin.h" diff --git a/pglogical_proto_native.c b/pglogical_proto_native.c index 7a94036..696f809 100644 --- a/pglogical_proto_native.c +++ b/pglogical_proto_native.c @@ -29,6 +29,7 @@ #include "pglogical_output_plugin.h" #include "pglogical_output_proto.h" #include "pglogical_proto_native.h" +#include "pglogical.h" #define IS_REPLICA_IDENTITY 1 diff --git a/pglogical_rpc.c b/pglogical_rpc.c index ddb2c12..d77d5dd 100644 --- a/pglogical_rpc.c +++ b/pglogical_rpc.c @@ -80,7 +80,7 @@ pg_logical_get_remote_repset_tables(PGconn *conn, List *replication_sets) repsetarr.data); } - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); /* TODO: better error message? */ if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(ERROR, "could not get table list: %s", PQresultErrorMessage(res)); @@ -162,7 +162,7 @@ pg_logical_get_remote_repset_table(PGconn *conn, RangeVar *rv, repsetarr.data); } - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); /* TODO: better error message? */ if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1) elog(ERROR, "could not get table list: %s", PQresultErrorMessage(res)); @@ -195,11 +195,12 @@ pglogical_remote_slot_active(PGconn *conn, const char *slot_name) values[0] = slot_name; - res = PQexecParams(conn, - "SELECT plugin, active " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_name = $1", - 1, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, + "SELECT plugin, active " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_name = $1", + 1, types, values, NULL, NULL, 0, + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -245,11 +246,12 @@ pglogical_drop_remote_slot(PGconn *conn, const char *slot_name) values[0] = slot_name; /* Check if the slot exists */ - res = PQexecParams(conn, - "SELECT plugin " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_name = $1", - 1, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, + "SELECT plugin " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_name = $1", + 1, types, values, NULL, NULL, 0, + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -277,8 +279,9 @@ pglogical_drop_remote_slot(PGconn *conn, const char *slot_name) PQclear(res); - res = PQexecParams(conn, "SELECT pg_drop_replication_slot($1)", - 1, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, "SELECT pg_drop_replication_slot($1)", + 1, types, values, NULL, NULL, 0, + PG_WAIT_EXTENSION); /* And finally, drop the slot. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -297,7 +300,8 @@ pglogical_remote_node_info(PGconn *conn, Oid *nodeid, char **node_name, char **s { PGresult *res; - res = PQexec(conn, "SELECT node_id, node_name, sysid, dbname, replication_sets FROM pglogical.pglogical_node_info()"); + res = libpqsrv_exec(conn, "SELECT node_id, node_name, sysid, dbname, replication_sets FROM pglogical.pglogical_node_info()", + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(ERROR, "could not fetch remote node info: %s\n", PQerrorMessage(conn)); @@ -351,7 +355,8 @@ pglogical_remote_function_exists(PGconn *conn, const char *nspname, " AND %s = ANY (proargnames)", PQescapeLiteral(conn, argname, strlen(argname))); - res = PQexecParams(conn, query.data, 2, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, query.data, 2, types, values, NULL, NULL, + 0, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(ERROR, "could not fetch remote function info: %s\n", diff --git a/pglogical_sync.c b/pglogical_sync.c index 8e63815..6ba159b 100644 --- a/pglogical_sync.c +++ b/pglogical_sync.c @@ -21,8 +21,6 @@ #include #endif -#include "libpq-fe.h" - #include "miscadmin.h" #include "access/genam.h" @@ -310,7 +308,7 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, use_failover_slot ? " FAILOVER" : ""); - res = PQexec(repl_conn, query.data); + res = libpqsrv_exec(repl_conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -387,7 +385,7 @@ start_copy_origin_tx(PGconn *conn, const char *snapshot) appendStringInfo(&query, "SET TRANSACTION SNAPSHOT %s;\n", s); } - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(ERROR, "BEGIN on origin node failed: %s", PQresultErrorMessage(res)); @@ -427,7 +425,7 @@ start_copy_target_tx(PGconn *conn, const char *origin_name) appendStringInfoString(&query, setup_query); - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(ERROR, "BEGIN on target node failed: %s", PQresultErrorMessage(res)); @@ -440,7 +438,7 @@ finish_copy_origin_tx(PGconn *conn) PGresult *res; /* Close the transaction and connection on origin node. */ - res = PQexec(conn, "ROLLBACK"); + res = libpqsrv_exec(conn, "ROLLBACK", PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(WARNING, "ROLLBACK on origin node failed: %s", PQresultErrorMessage(res)); @@ -454,7 +452,7 @@ finish_copy_target_tx(PGconn *conn) PGresult *res; /* Close the transaction and connection on target node. */ - res = PQexec(conn, "COMMIT"); + res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(ERROR, "COMMIT on target node failed: %s", PQresultErrorMessage(res)); @@ -466,7 +464,8 @@ finish_copy_target_tx(PGconn *conn) */ if (PQserverVersion(conn) >= 90500) { - res = PQexec(conn, "SELECT pg_catalog.pg_replication_origin_session_reset();\n"); + res = libpqsrv_exec(conn, "SELECT pg_catalog.pg_replication_origin_session_reset();\n", + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(WARNING, "Resetting session origin on target node failed: %s", PQresultErrorMessage(res)); @@ -620,7 +619,7 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, /* Execute COPY TO. */ - res = PQexec(origin_conn, query.data); + res = libpqsrv_exec(origin_conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COPY_OUT) { ereport(ERROR, @@ -641,7 +640,7 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, appendStringInfoString(&query, "FROM stdin"); /* Execute COPY FROM. */ - res = PQexec(target_conn, query.data); + res = libpqsrv_exec(target_conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COPY_IN) { ereport(ERROR, @@ -649,6 +648,7 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, errdetail("Query '%s': %s", query.data, PQerrorMessage(origin_conn)))); } + PQclear(res); while ((bytes = PQgetCopyData(origin_conn, ©buf, false)) > 0) { @@ -671,6 +671,15 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, errdetail("source connection returned %d: %s", bytes, PQerrorMessage(origin_conn)))); } + res = libpqsrv_get_result_last(origin_conn, PG_WAIT_EXTENSION); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + ereport(ERROR, + (errmsg("reading from origin table failed"), + errdetail("Query '%s': %s", query.data, + PQerrorMessage(origin_conn)))); + } + PQclear(res); /* Send local finish */ if (PQputCopyEnd(target_conn, NULL) != 1) @@ -680,7 +689,14 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, errdetail("destination connection reported: %s", PQerrorMessage(target_conn)))); } - + res = libpqsrv_get_result_last(target_conn, PG_WAIT_EXTENSION); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + ereport(ERROR, + (errmsg("writing to target table failed"), + errdetail("destination connection reported: %s", + PQerrorMessage(target_conn)))); + } PQclear(res); elog(INFO, "finished synchronization of data for table %s.%s",