Skip to content

Commit

Permalink
Take an ACCESS SHARE LOCK on the target table, in an initial attempt …
Browse files Browse the repository at this point in the history
…to prevent concurrent DDL.

This is a first pass at Daniele's suggestion in Issue #8, although it is
definitely still buggy -- it is still possible for another transaction
to get in an AccessExclusive lock and perform DDL either before the
ACCESS SHARE lock is acquired or immediately after it is released.
  • Loading branch information
schmiddy committed Oct 28, 2012
1 parent 0511137 commit b1cb82f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 6 deletions.
72 changes: 66 additions & 6 deletions bin/pg_reorg.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ const char *PROGRAM_EMAIL = "[email protected]";
*/
#define APPLY_COUNT 1000

/* Record the PIDs of any possibly-conflicting transactions. Ignore the PID
* of our primary connection, and our second connection holding an
* ACCESS SHARE table lock.
*/
#define SQL_XID_SNAPSHOT \
"SELECT reorg.array_accum(virtualtransaction) FROM pg_locks"\
" WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()"
" WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)"

/* Later, check whether any of the transactions we saw before are still
* alive, and wait for them to go away.
*/
#define SQL_XID_ALIVE \
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
Expand Down Expand Up @@ -76,7 +83,7 @@ static void reorg_cleanup(bool fatal, void *userdata);

static char *getstr(PGresult *res, int row, int col);
static Oid getoid(PGresult *res, int row, int col);
static void lock_exclusive(const char *relid, const char *lock_query);
static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2);

#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
#define SQLSTATE_QUERY_CANCELED "57014"
Expand Down Expand Up @@ -371,6 +378,7 @@ reorg_one_table(const reorg_table *table, const char *orderby)
int i;
int num_waiting = 0;
char *vxid;
char *lock_conn_pid;
char buffer[12];
StringInfoData sql;

Expand Down Expand Up @@ -401,7 +409,8 @@ reorg_one_table(const reorg_table *table, const char *orderby)
* 1. Setup workspaces and a trigger.
*/
elog(DEBUG2, "---- setup ----");
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE);


/*
* Check z_reorg_trigger is the trigger executed at last so that
Expand All @@ -425,6 +434,37 @@ reorg_one_table(const reorg_table *table, const char *orderby)
command(sql.data, 0, NULL);
command("COMMIT", 0, NULL);

PQclear(PQexec(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED"));
elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name);

/* XXX: table name escaping? */
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
table->target_name);
res = PQexec(conn2, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
printf("%s", PQerrorMessage(conn2));
PQclear(res);
exit(1);
}
PQclear(res);

elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name);

/* store the backend PID of our connection keeping an ACCESS SHARE
lock on the target table.
*/
res = PQexec(conn2, "SELECT pg_backend_pid()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
printf("%s", PQerrorMessage(conn2));
PQclear(res);
exit(1);
}
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
elog(WARNING, "Have backend PID: %s", lock_conn_pid);
PQclear(res);

/*
* Register the table to be dropped on error. We use pktype as
* an advisory lock. The registration should be done after
Expand All @@ -442,9 +482,14 @@ reorg_one_table(const reorg_table *table, const char *orderby)
command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
if (orderby && !orderby[0])
command("SET LOCAL synchronize_seqscans = off", 0, NULL);
res = execute(SQL_XID_SNAPSHOT, 0, NULL);

/* Fetch an array of Virtual IDs of all transactions active right now.
*/
params[0] = lock_conn_pid;
res = execute(SQL_XID_SNAPSHOT, 1, params);
vxid = strdup(PQgetvalue(res, 0, 0));
PQclear(res);

command(table->delete_log, 0, NULL);
command(table->create_table, 0, NULL);
printfStringInfo(&sql, "SELECT reorg.disable_autovacuum('reorg.table_%u')", table->target_oid);
Expand Down Expand Up @@ -541,7 +586,7 @@ reorg_one_table(const reorg_table *table, const char *orderby)
* 5. Swap.
*/
elog(DEBUG2, "---- swap ----");
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE);
apply_log(table, 0);
params[0] = utoa(table->target_oid, buffer);
command("SELECT reorg.reorg_swap($1)", 1, params);
Expand Down Expand Up @@ -580,9 +625,15 @@ reorg_one_table(const reorg_table *table, const char *orderby)

/*
* Try acquire a table lock but avoid long time locks when conflict.
* Arguments:
*
* relid: OID of relation
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
* release_conn2: whether we should issue a COMMIT in conn2 to release
* its lock.
*/
static void
lock_exclusive(const char *relid, const char *lock_query)
lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
{
time_t start = time(NULL);
int i;
Expand Down Expand Up @@ -621,6 +672,12 @@ lock_exclusive(const char *relid, const char *lock_query)
command(cancel_query, 1, &relid);
}

/* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
* lock.
*/
if (release_conn2)
PQclear(PQexec(conn2, "COMMIT"));

/* wait for a while to lock the table. */
wait_msec = Min(1000, i * 100);
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
Expand Down Expand Up @@ -670,6 +727,9 @@ reorg_cleanup(bool fatal, void *userdata)
const char *params[1];

/* Rollback current transaction */
if (conn2)
PQclear(PQexec(conn2, "ROLLBACK"));

if (connection)
command("ROLLBACK", 0, NULL);

Expand Down
2 changes: 2 additions & 0 deletions bin/pgut/pgut-fe.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ char *password = NULL;
YesNo prompt_password = DEFAULT;

PGconn *connection = NULL;
PGconn *conn2 = NULL;

static bool parse_pair(const char buffer[], char key[], char value[]);
static char *get_username(void);
Expand Down Expand Up @@ -50,6 +51,7 @@ reconnect(int elevel)
appendStringInfo(&buf, "password=%s ", password);

connection = pgut_connect(buf.data, prompt_password, elevel);
conn2 = pgut_connect(buf.data, prompt_password, elevel);

/* update password */
if (connection)
Expand Down
1 change: 1 addition & 0 deletions bin/pgut/pgut-fe.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ extern char *password;
extern YesNo prompt_password;

extern PGconn *connection;
extern PGconn *conn2;

extern void pgut_help(bool details);
extern void help(bool details);
Expand Down

0 comments on commit b1cb82f

Please sign in to comment.