From b1cb82feafad12575ecba239b28f85d16463d30f Mon Sep 17 00:00:00 2001 From: Josh Kupershmidt Date: Sat, 20 Oct 2012 16:27:54 -0700 Subject: [PATCH] Take an ACCESS SHARE LOCK on the target table, in an initial attempt to prevent concurrent DDL. This is a first pass at Daniele's suggestion in Issue #8, although it is definitely still buggy -- it is still possible for another transaction to get in an AccessExclusive lock and perform DDL either before the ACCESS SHARE lock is acquired or immediately after it is released. --- bin/pg_reorg.c | 72 ++++++++++++++++++++++++++++++++++++++++++---- bin/pgut/pgut-fe.c | 2 ++ bin/pgut/pgut-fe.h | 1 + 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/bin/pg_reorg.c b/bin/pg_reorg.c index 2939f4c..e11eca3 100755 --- a/bin/pg_reorg.c +++ b/bin/pg_reorg.c @@ -26,10 +26,17 @@ const char *PROGRAM_EMAIL = "reorg-general@lists.pgfoundry.org"; */ #define APPLY_COUNT 1000 +/* Record the PIDs of any possibly-conflicting transactions. Ignore the PID + * of our primary connection, and our second connection holding an + * ACCESS SHARE table lock. + */ #define SQL_XID_SNAPSHOT \ "SELECT reorg.array_accum(virtualtransaction) FROM pg_locks"\ - " WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()" + " WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)" +/* Later, check whether any of the transactions we saw before are still + * alive, and wait for them to go away. + */ #define SQL_XID_ALIVE \ "SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\ " AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)" @@ -76,7 +83,7 @@ static void reorg_cleanup(bool fatal, void *userdata); static char *getstr(PGresult *res, int row, int col); static Oid getoid(PGresult *res, int row, int col); -static void lock_exclusive(const char *relid, const char *lock_query); +static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2); #define SQLSTATE_INVALID_SCHEMA_NAME "3F000" #define SQLSTATE_QUERY_CANCELED "57014" @@ -371,6 +378,7 @@ reorg_one_table(const reorg_table *table, const char *orderby) int i; int num_waiting = 0; char *vxid; + char *lock_conn_pid; char buffer[12]; StringInfoData sql; @@ -401,7 +409,8 @@ reorg_one_table(const reorg_table *table, const char *orderby) * 1. Setup workspaces and a trigger. */ elog(DEBUG2, "---- setup ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table); + lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE); + /* * Check z_reorg_trigger is the trigger executed at last so that @@ -425,6 +434,37 @@ reorg_one_table(const reorg_table *table, const char *orderby) command(sql.data, 0, NULL); command("COMMIT", 0, NULL); + PQclear(PQexec(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED")); + elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name); + + /* XXX: table name escaping? */ + printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", + table->target_name); + res = PQexec(conn2, sql.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + printf("%s", PQerrorMessage(conn2)); + PQclear(res); + exit(1); + } + PQclear(res); + + elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name); + + /* store the backend PID of our connection keeping an ACCESS SHARE + lock on the target table. + */ + res = PQexec(conn2, "SELECT pg_backend_pid()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + printf("%s", PQerrorMessage(conn2)); + PQclear(res); + exit(1); + } + lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); + elog(WARNING, "Have backend PID: %s", lock_conn_pid); + PQclear(res); + /* * Register the table to be dropped on error. We use pktype as * an advisory lock. The registration should be done after @@ -442,9 +482,14 @@ reorg_one_table(const reorg_table *table, const char *orderby) command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL); if (orderby && !orderby[0]) command("SET LOCAL synchronize_seqscans = off", 0, NULL); - res = execute(SQL_XID_SNAPSHOT, 0, NULL); + + /* Fetch an array of Virtual IDs of all transactions active right now. + */ + params[0] = lock_conn_pid; + res = execute(SQL_XID_SNAPSHOT, 1, params); vxid = strdup(PQgetvalue(res, 0, 0)); PQclear(res); + command(table->delete_log, 0, NULL); command(table->create_table, 0, NULL); printfStringInfo(&sql, "SELECT reorg.disable_autovacuum('reorg.table_%u')", table->target_oid); @@ -541,7 +586,7 @@ reorg_one_table(const reorg_table *table, const char *orderby) * 5. Swap. */ elog(DEBUG2, "---- swap ----"); - lock_exclusive(utoa(table->target_oid, buffer), table->lock_table); + lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE); apply_log(table, 0); params[0] = utoa(table->target_oid, buffer); command("SELECT reorg.reorg_swap($1)", 1, params); @@ -580,9 +625,15 @@ reorg_one_table(const reorg_table *table, const char *orderby) /* * Try acquire a table lock but avoid long time locks when conflict. + * Arguments: + * + * relid: OID of relation + * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed + * release_conn2: whether we should issue a COMMIT in conn2 to release + * its lock. */ static void -lock_exclusive(const char *relid, const char *lock_query) +lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) { time_t start = time(NULL); int i; @@ -621,6 +672,12 @@ lock_exclusive(const char *relid, const char *lock_query) command(cancel_query, 1, &relid); } + /* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE + * lock. + */ + if (release_conn2) + PQclear(PQexec(conn2, "COMMIT")); + /* wait for a while to lock the table. */ wait_msec = Min(1000, i * 100); snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); @@ -670,6 +727,9 @@ reorg_cleanup(bool fatal, void *userdata) const char *params[1]; /* Rollback current transaction */ + if (conn2) + PQclear(PQexec(conn2, "ROLLBACK")); + if (connection) command("ROLLBACK", 0, NULL); diff --git a/bin/pgut/pgut-fe.c b/bin/pgut/pgut-fe.c index 7136a2a..702088e 100755 --- a/bin/pgut/pgut-fe.c +++ b/bin/pgut/pgut-fe.c @@ -23,6 +23,7 @@ char *password = NULL; YesNo prompt_password = DEFAULT; PGconn *connection = NULL; +PGconn *conn2 = NULL; static bool parse_pair(const char buffer[], char key[], char value[]); static char *get_username(void); @@ -50,6 +51,7 @@ reconnect(int elevel) appendStringInfo(&buf, "password=%s ", password); connection = pgut_connect(buf.data, prompt_password, elevel); + conn2 = pgut_connect(buf.data, prompt_password, elevel); /* update password */ if (connection) diff --git a/bin/pgut/pgut-fe.h b/bin/pgut/pgut-fe.h index 89807d7..c58360e 100755 --- a/bin/pgut/pgut-fe.h +++ b/bin/pgut/pgut-fe.h @@ -55,6 +55,7 @@ extern char *password; extern YesNo prompt_password; extern PGconn *connection; +extern PGconn *conn2; extern void pgut_help(bool details); extern void help(bool details);