-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Further improvements to concurrent-DDL guard.
Fix table locking so that race conditions don't exist between lock release in primary conn, and lock acquisition in conn2. Also, have conn2 be in charge of performing the table swap step, to avoid a similar race. Part of work for Issue #8.
- Loading branch information
Showing
1 changed file
with
119 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,24 @@ const char *PROGRAM_EMAIL = "[email protected]"; | |
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\ | ||
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)" | ||
|
||
/* To be run while our main connection holds an AccessExclusive lock on the | ||
* target table, and our secondary conn is attempting to grab an AccessShare | ||
* lock. We know that "granted" must be false for these queries because | ||
* we already hold the AccessExclusive lock. Also, we only care about other | ||
* transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock | ||
* level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE | ||
* or TRUNCATE. | ||
*/ | ||
#define CANCEL_COMPETING_LOCKS \ | ||
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ | ||
" AND granted = false AND relation = %u"\ | ||
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" | ||
|
||
#define KILL_COMPETING_LOCKS \ | ||
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\ | ||
" AND granted = false AND relation = %u"\ | ||
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()" | ||
|
||
/* | ||
* per-table information | ||
*/ | ||
|
@@ -83,7 +101,7 @@ static void reorg_cleanup(bool fatal, void *userdata); | |
|
||
static char *getstr(PGresult *res, int row, int col); | ||
static Oid getoid(PGresult *res, int row, int col); | ||
static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2); | ||
static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact); | ||
|
||
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000" | ||
#define SQLSTATE_QUERY_CANCELED "57014" | ||
|
@@ -344,7 +362,7 @@ reorg_one_database(const char *orderby, const char *table) | |
} | ||
|
||
static int | ||
apply_log(const reorg_table *table, int count) | ||
apply_log(PGconn *conn, const reorg_table *table, int count) | ||
{ | ||
int result; | ||
PGresult *res; | ||
|
@@ -358,8 +376,9 @@ apply_log(const reorg_table *table, int count) | |
params[4] = table->sql_pop; | ||
params[5] = utoa(count, buffer); | ||
|
||
res = execute("SELECT reorg.reorg_apply($1, $2, $3, $4, $5, $6)", | ||
6, params); | ||
res = pgut_execute(conn, | ||
"SELECT reorg.reorg_apply($1, $2, $3, $4, $5, $6)", | ||
6, params); | ||
result = atoi(PQgetvalue(res, 0, 0)); | ||
PQclear(res); | ||
|
||
|
@@ -409,8 +428,7 @@ reorg_one_table(const reorg_table *table, const char *orderby) | |
* 1. Setup workspaces and a trigger. | ||
*/ | ||
elog(DEBUG2, "---- setup ----"); | ||
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE); | ||
|
||
lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE); | ||
|
||
/* | ||
* Check z_reorg_trigger is the trigger executed at last so that | ||
|
@@ -432,39 +450,101 @@ reorg_one_table(const reorg_table *table, const char *orderby) | |
command(table->alter_table, 0, NULL); | ||
printfStringInfo(&sql, "SELECT reorg.disable_autovacuum('reorg.log_%u')", table->target_oid); | ||
command(sql.data, 0, NULL); | ||
command("COMMIT", 0, NULL); | ||
|
||
/* While we are still holding an AccessExclusive lock on the table, submit | ||
* the request for an AccessShare lock asynchronously from conn2. | ||
* We want to submit this query in conn2 while connection's | ||
* transaction still holds its lock, so that no DDL may sneak in | ||
* between the time that connection commits and conn2 gets its lock. | ||
* | ||
*/ | ||
pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); | ||
elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name); | ||
|
||
/* XXX: table name escaping? */ | ||
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", | ||
table->target_name); | ||
res = pgut_execute(conn2, sql.data, 0, NULL); | ||
if (PQresultStatus(res) != PGRES_COMMAND_OK) | ||
/* grab the backend PID of conn2; we'll need this when querying | ||
* pg_locks momentarily. | ||
*/ | ||
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL); | ||
if (PQresultStatus(res) != PGRES_TUPLES_OK) | ||
{ | ||
printf("%s", PQerrorMessage(conn2)); | ||
PQclear(res); | ||
exit(1); | ||
} | ||
lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); | ||
PQclear(res); | ||
|
||
elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name); | ||
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", | ||
table->target_name); | ||
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name); | ||
if (!(PQsendQuery(conn2, sql.data))) { | ||
printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2)); | ||
exit(1); | ||
} | ||
|
||
/* store the backend PID of our connection keeping an ACCESS SHARE | ||
lock on the target table. | ||
/* Now that we've submitted the LOCK TABLE request through conn2, | ||
* look for and cancel any (potentially dangerous) DDL commands which | ||
* might also be waiting on our table lock at this point -- | ||
* it's not safe to let them wait, because they may grab their | ||
* AccessExclusive lock before conn2 gets its AccessShare lock, | ||
* and perform unsafe DDL on the table. | ||
* | ||
* XXX: maybe we should use a loop canceling queries, as in | ||
* lock_exclusive(). | ||
*/ | ||
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL); | ||
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid); | ||
res = execute(sql.data, 0, NULL); | ||
if (PQresultStatus(res) != PGRES_TUPLES_OK) | ||
{ | ||
printf("%s", PQerrorMessage(conn2)); | ||
printf("Error canceling competing queries: %s", PQerrorMessage(connection)); | ||
PQclear(res); | ||
exit(1); | ||
} | ||
lock_conn_pid = strdup(PQgetvalue(res, 0, 0)); | ||
elog(WARNING, "Have backend PID: %s", lock_conn_pid); | ||
if (PQntuples(res) > 0) | ||
{ | ||
elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res)); | ||
|
||
if (PQserverVersion(connection) >= 80400) | ||
{ | ||
PQclear(res); | ||
printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid); | ||
res = execute(sql.data, 0, NULL); | ||
if (PQresultStatus(res) != PGRES_TUPLES_OK) | ||
{ | ||
printf("Error killing competing queries: %s", PQerrorMessage(connection)); | ||
PQclear(res); | ||
exit(1); | ||
} | ||
} | ||
|
||
} | ||
else | ||
{ | ||
elog(DEBUG2, "No competing DDL to cancel."); | ||
} | ||
PQclear(res); | ||
|
||
|
||
/* We're finished killing off any unsafe DDL. COMMIT in our main | ||
* connection, so that conn2 may get its AccessShare lock. | ||
*/ | ||
command("COMMIT", 0, NULL); | ||
|
||
/* Keep looping PQgetResult() calls until it returns NULL, indicating the | ||
* command is done and we have obtained our lock. | ||
*/ | ||
while ((res = PQgetResult(conn2))) | ||
{ | ||
elog(DEBUG2, "Waiting on ACCESS SHARE lock..."); | ||
if (PQresultStatus(res) != PGRES_COMMAND_OK) | ||
{ | ||
printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2)); | ||
PQclear(res); | ||
exit(1); | ||
} | ||
PQclear(res); | ||
} | ||
|
||
|
||
/* | ||
* Register the table to be dropped on error. We use pktype as | ||
* an advisory lock. The registration should be done after | ||
|
@@ -546,7 +626,7 @@ reorg_one_table(const reorg_table *table, const char *orderby) | |
*/ | ||
for (;;) | ||
{ | ||
num = apply_log(table, APPLY_COUNT); | ||
num = apply_log(connection, table, APPLY_COUNT); | ||
if (num > 0) | ||
continue; /* there might be still some tuples, repeat. */ | ||
|
||
|
@@ -583,14 +663,17 @@ reorg_one_table(const reorg_table *table, const char *orderby) | |
} | ||
|
||
/* | ||
* 5. Swap. | ||
* 5. Swap: will be done with conn2, since it already holds an | ||
* AccessShare lock. | ||
*/ | ||
elog(DEBUG2, "---- swap ----"); | ||
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE); | ||
apply_log(table, 0); | ||
/* Bump our existing AccessShare lock to AccessExclusive */ | ||
lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table, | ||
FALSE); | ||
apply_log(conn2, table, 0); | ||
params[0] = utoa(table->target_oid, buffer); | ||
command("SELECT reorg.reorg_swap($1)", 1, params); | ||
command("COMMIT", 0, NULL); | ||
pgut_command(conn2, "SELECT reorg.reorg_swap($1)", 1, params); | ||
pgut_command(conn2, "COMMIT", 0, NULL); | ||
|
||
/* | ||
* 6. Drop. | ||
|
@@ -604,6 +687,7 @@ reorg_one_table(const reorg_table *table, const char *orderby) | |
|
||
pgut_atexit_pop(&reorg_cleanup, (void *) table); | ||
free(vxid); | ||
free(lock_conn_pid); | ||
|
||
/* | ||
* 7. Analyze. | ||
|
@@ -627,31 +711,32 @@ reorg_one_table(const reorg_table *table, const char *orderby) | |
* Try acquire a table lock but avoid long time locks when conflict. | ||
* Arguments: | ||
* | ||
* conn: connection to use | ||
* relid: OID of relation | ||
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed | ||
* release_conn2: whether we should issue a COMMIT in conn2 to release | ||
* its lock. | ||
* start_xact: whether we need to issue a BEGIN; | ||
*/ | ||
static void | ||
lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) | ||
lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact) | ||
{ | ||
time_t start = time(NULL); | ||
int i; | ||
|
||
for (i = 1; ; i++) | ||
{ | ||
time_t duration; | ||
char sql[1024]; | ||
PGresult *res; | ||
int wait_msec; | ||
|
||
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); | ||
if (start_xact) | ||
pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL); | ||
|
||
duration = time(NULL) - start; | ||
if (duration > wait_timeout) | ||
{ | ||
const char *cancel_query; | ||
if (PQserverVersion(connection) >= 80400 && | ||
if (PQserverVersion(conn) >= 80400 && | ||
duration > wait_timeout * 2) | ||
{ | ||
elog(WARNING, "terminating conflicted backends"); | ||
|
@@ -669,21 +754,15 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2) | |
" AND relation = $1 AND pid <> pg_backend_pid()"; | ||
} | ||
|
||
command(cancel_query, 1, &relid); | ||
pgut_command(conn, cancel_query, 1, &relid); | ||
} | ||
|
||
/* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE | ||
* lock. | ||
*/ | ||
if (release_conn2) | ||
pgut_command(conn2, "COMMIT", 0, NULL); | ||
|
||
/* wait for a while to lock the table. */ | ||
wait_msec = Min(1000, i * 100); | ||
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec); | ||
command(sql, 0, NULL); | ||
|
||
res = execute_elevel(lock_query, 0, NULL, DEBUG2); | ||
res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2); | ||
if (PQresultStatus(res) == PGRES_COMMAND_OK) | ||
{ | ||
PQclear(res); | ||
|