From e01988c24902d2c174a5c113d66b627e6ffcd786 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 7 Jan 2025 23:28:11 +0000 Subject: [PATCH] workload/schemachange: fix check for uniqueness when adding FK Previously, the check we made to see if the column that will be referenced by a foreign key is unique did not account for columns that appear in a hash-sharded index. Now this is fixed. Release note: None --- pkg/workload/schemachange/error_screening.go | 77 ++++++++++++++------ 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/pkg/workload/schemachange/error_screening.go b/pkg/workload/schemachange/error_screening.go index cb6678516d02..0df0f9c4370d 100644 --- a/pkg/workload/schemachange/error_screening.go +++ b/pkg/workload/schemachange/error_screening.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "regexp" + "slices" "strings" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -898,7 +899,8 @@ func (og *operationGenerator) constraintIsPrimary( `, tableName.String()), constraintName) } -// Checks if a column has a single unique constraint. +// Checks if a column has a unique constraint or is in a primary key definition +// that will guarantee uniqueness of the column. func (og *operationGenerator) columnHasSingleUniqueConstraint( ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName tree.Name, ) (bool, error) { @@ -906,27 +908,60 @@ func (og *operationGenerator) columnHasSingleUniqueConstraint( if columnName == "rowid" { return true, nil } - return og.scanBool(ctx, tx, ` - SELECT EXISTS( - SELECT column_name - FROM ( - SELECT table_schema, table_name, column_name, ordinal_position, - concat(table_schema,'.',table_name)::REGCLASS::INT8 AS tableid - FROM information_schema.columns - ) AS cols - JOIN ( - SELECT contype, conkey, conrelid - FROM pg_catalog.pg_constraint - ) AS cons ON cons.conrelid = cols.tableid - WHERE table_schema = $1 - AND table_name = $2 - AND column_name = $3 - AND (contype = 'u' OR contype = 'p') - AND array_length(conkey, 1) >= 1 -- unique index has to have this a prefix - AND conkey[1] = ordinal_position - ) - `, tableName.Schema(), tableName.Object(), columnName) + + rows, err := tx.Query(ctx, ` + SELECT ordinal_position, conkey, shard_column_positions + FROM ( + SELECT table_schema, table_name, column_name, ordinal_position, (SELECT array_agg(ordinal_position) + FROM information_schema.columns AS c + WHERE c.table_schema = columns.table_schema + AND c.table_name = columns.table_name + AND c.column_name ILIKE '%\_shard\_%') + AS shard_column_positions, concat(table_schema,'.',table_name)::REGCLASS::INT8 AS tableid + FROM information_schema.columns + ) AS cols + JOIN ( + SELECT contype, conkey, conrelid + FROM pg_catalog.pg_constraint + ) AS cons ON cons.conrelid = cols.tableid + WHERE table_schema = $1 + AND table_name = $2 + AND column_name = $3 + AND (contype = 'u' OR contype = 'p') + `, tableName.Schema(), tableName.Object(), columnName, + ) + if err != nil { + return false, err + } + + isColUnique, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (bool, error) { + var ordinalPosition int + var conkey []int + var shardColumnPositions []int + if err := row.Scan(&ordinalPosition, &conkey, &shardColumnPositions); err != nil { + return false, err + } + + // Find the first non-shard column in conkey, and check if it's equal + // the column we are inspecting. + for i, conkeyPos := range conkey { + if slices.Contains(shardColumnPositions, conkeyPos) { + continue + } + if conkeyPos == ordinalPosition { + // If this is the last column in the constraint, then it means the + // column is unique across the whole table. + return i == len(conkey)-1, nil + } + } + return false, nil + }) + if err != nil { + return false, err + } + return slices.Contains(isColUnique, true), nil } + func (og *operationGenerator) constraintIsUnique( ctx context.Context, tx pgx.Tx, tableName *tree.TableName, constraintName string, ) (bool, error) {