Skip to content

Commit

Permalink
Merge pull request #138663 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-138556

release-24.3: workload/schemachange: fix check for uniqueness when adding FK
  • Loading branch information
rafiss authored Jan 8, 2025
2 parents 8c89c90 + e01988c commit 3770f88
Showing 1 changed file with 56 additions and 21 deletions.
77 changes: 56 additions & 21 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"regexp"
"slices"
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -898,35 +899,69 @@ func (og *operationGenerator) constraintIsPrimary(
`, tableName.String()), constraintName)
}

// Checks if a column has a single unique constraint.
// Checks if a column has a unique constraint or is in a primary key definition
// that will guarantee uniqueness of the column.
func (og *operationGenerator) columnHasSingleUniqueConstraint(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName tree.Name,
) (bool, error) {
// Rowid will always be unique, though the index is hidden.
if columnName == "rowid" {
return true, nil
}
return og.scanBool(ctx, tx, `
SELECT EXISTS(
SELECT column_name
FROM (
SELECT table_schema, table_name, column_name, ordinal_position,
concat(table_schema,'.',table_name)::REGCLASS::INT8 AS tableid
FROM information_schema.columns
) AS cols
JOIN (
SELECT contype, conkey, conrelid
FROM pg_catalog.pg_constraint
) AS cons ON cons.conrelid = cols.tableid
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
AND (contype = 'u' OR contype = 'p')
AND array_length(conkey, 1) >= 1 -- unique index has to have this a prefix
AND conkey[1] = ordinal_position
)
`, tableName.Schema(), tableName.Object(), columnName)

rows, err := tx.Query(ctx, `
SELECT ordinal_position, conkey, shard_column_positions
FROM (
SELECT table_schema, table_name, column_name, ordinal_position, (SELECT array_agg(ordinal_position)
FROM information_schema.columns AS c
WHERE c.table_schema = columns.table_schema
AND c.table_name = columns.table_name
AND c.column_name ILIKE '%\_shard\_%')
AS shard_column_positions, concat(table_schema,'.',table_name)::REGCLASS::INT8 AS tableid
FROM information_schema.columns
) AS cols
JOIN (
SELECT contype, conkey, conrelid
FROM pg_catalog.pg_constraint
) AS cons ON cons.conrelid = cols.tableid
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
AND (contype = 'u' OR contype = 'p')
`, tableName.Schema(), tableName.Object(), columnName,
)
if err != nil {
return false, err
}

isColUnique, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (bool, error) {
var ordinalPosition int
var conkey []int
var shardColumnPositions []int
if err := row.Scan(&ordinalPosition, &conkey, &shardColumnPositions); err != nil {
return false, err
}

// Find the first non-shard column in conkey, and check if it's equal
// the column we are inspecting.
for i, conkeyPos := range conkey {
if slices.Contains(shardColumnPositions, conkeyPos) {
continue
}
if conkeyPos == ordinalPosition {
// If this is the last column in the constraint, then it means the
// column is unique across the whole table.
return i == len(conkey)-1, nil
}
}
return false, nil
})
if err != nil {
return false, err
}
return slices.Contains(isColUnique, true), nil
}

func (og *operationGenerator) constraintIsUnique(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, constraintName string,
) (bool, error) {
Expand Down

0 comments on commit 3770f88

Please sign in to comment.