Skip to content

Commit

Permalink
changefeedccl: protect system.comments and system.zones
Browse files Browse the repository at this point in the history
The catalog reader reads from system.descriptor, system.comments, and
system.zones when reading a table descriptor from disk. We were only
protecting system.descriptors.

Further, the test here previously wasn't testing anything because it
was only forcing the GC-threshold to 1 second before read timestamp it
was using.

Informs #128806

Release note (bug fix): Fix bug that could prevent a CHANGEFEED from
being able to resume after being paused for prolonged period of time.
  • Loading branch information
stevendanna committed Sep 30, 2024
1 parent e875d80 commit 6676177
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 18 deletions.
22 changes: 15 additions & 7 deletions pkg/ccl/changefeedccl/protected_timestamps.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,30 @@ func createProtectedTimestampRecord(
jobsprotectedts.Jobs, targetToProtect)
}

// systemTablesToProtect holds the descriptor IDs of the system tables
// that need to be protected to ensure that a CHANGEFEED can do a
// historical read of a table descriptor.
var systemTablesToProtect = []descpb.ID{
keys.DescriptorTableID,
keys.CommentsTableID,
keys.ZonesTableID,
}

func makeTargetToProtect(targets changefeedbase.Targets) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, targets.NumUniqueTables()+1)
tablesToProtect := make(descpb.IDs, 0, targets.NumUniqueTables()+len(systemTablesToProtect))
_ = targets.EachTableID(func(id descpb.ID) error {
tablesToProtect = append(tablesToProtect, id)
return nil
})
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
tablesToProtect = append(tablesToProtect, systemTablesToProtect...)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets changefeedbase.Targets) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
spansToProtect := make([]roachpb.Span, 0, targets.NumUniqueTables()+1)
spansToProtect := make([]roachpb.Span, 0, targets.NumUniqueTables()+len(systemTablesToProtect))
addTablePrefix := func(id uint32) {
tablePrefix := codec.TablePrefix(id)
spansToProtect = append(spansToProtect, roachpb.Span{
Expand All @@ -71,6 +77,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets changefeedbase.Targets) []r
addTablePrefix(uint32(id))
return nil
})
addTablePrefix(keys.DescriptorTableID)
for _, id := range systemTablesToProtect {
addTablePrefix(uint32(id))
}
return spansToProtect
}
55 changes: 53 additions & 2 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -456,7 +457,7 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
defer stopServer()
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`)
sqlDB.Exec(t, "CREATE TABLE foo (a INT, b STRING)")
ts := s.Clock().Now()
ctx := context.Background()
Expand All @@ -473,10 +474,60 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
}))

// The following code was shameless stolen from
// TestShowTenantFingerprintsProtectsTimestamp which almost
// surely copied it from the 2-3 other tests that have
// something similar. We should put this in a helper. We have
// ForceTableGC, but in ad-hoc testing that appeared to bypass
// the PTS record making it useless for this test.
//
// TODO(ssd): Make a helper that does this.
refreshPTSReaderCache := func(asOf hlc.Timestamp, tableName, databaseName string) {
tableID, err := s.QueryTableID(ctx, username.RootUserName(), tableName, databaseName)
require.NoError(t, err)
tableKey := s.Codec().TablePrefix(uint32(tableID))
store, err := s.StorageLayer().GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
var repl *kvserver.Replica
testutils.SucceedsSoon(t, func() error {
repl = store.LookupReplica(roachpb.RKey(tableKey))
if repl == nil {
return errors.New("could not find replica")
}
return nil
})
ptsReader := store.GetStoreConfig().ProtectedTimestampReader
t.Logf("updating PTS reader cache to %s", asOf)
require.NoError(
t,
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, asOf),
)
require.NoError(t, repl.ReadProtectedTimestampsForTesting(ctx))
}
gcTestTableRange := func(tableName, databaseName string) {
row := sqlDB.QueryRow(t, fmt.Sprintf("SELECT range_id FROM [SHOW RANGES FROM TABLE %s.%s]", tableName, databaseName))
var rangeID int64
row.Scan(&rangeID)
refreshPTSReaderCache(s.Clock().Now(), tableName, databaseName)
t.Logf("enqueuing range %d for mvccGC", rangeID)
sqlDB.Exec(t, `SELECT crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, rangeID)
}

// Alter foo few times, then force GC at ts-1.
sqlDB.Exec(t, "ALTER TABLE foo ADD COLUMN c STRING")
sqlDB.Exec(t, "ALTER TABLE foo ADD COLUMN d STRING")
require.NoError(t, s.ForceTableGC(ctx, "system", "descriptor", ts.Add(-1, 0)))
time.Sleep(2 * time.Second)
// If you want to GC all system tables:
//
// tabs := systemschema.MakeSystemTables()
// for _, t := range tabs {
// if t.IsPhysicalTable() && !t.IsSequence() {
// gcTestTableRange("system", t.GetName())
// }
// }
gcTestTableRange("system", "descriptor")
gcTestTableRange("system", "zones")
gcTestTableRange("system", "comments")

// We can still fetch table descriptors because of protected timestamp record.
asOf := ts
Expand Down
19 changes: 10 additions & 9 deletions pkg/upgrade/upgrades/v24_1_migrate_pts_records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,17 @@ func TestMigrateOldStlePTSRecords(t *testing.T) {
tableDesc := desctestutils.TestingGetTableDescriptor(
s.DB(), execCfg.Codec, "defaultdb", "public", tbl,
)
allTargets = append(allTargets, []catid.DescID{keys.DescriptorTableID, tableDesc.GetID()})
allTargets = append(allTargets, []catid.DescID{
keys.DescriptorTableID,
keys.ZonesTableID,
keys.CommentsTableID,
tableDesc.GetID()})
descIDsArr = append(descIDsArr, tableDesc.GetID())
allTables = append(allTables, tbl)
}
_, err = sqlDB.Exec(fmt.Sprintf("create changefeed for %s INTO 'null://'", strings.Join(allTables, ",")))
require.NoError(t, err)
descIDsArr = append(descIDsArr, keys.DescriptorTableID)
descIDsArr = append(descIDsArr, keys.DescriptorTableID, keys.ZonesTableID, keys.CommentsTableID)
sort.Slice(descIDsArr, func(i int, j int) bool {
return descIDsArr[i] < descIDsArr[j]
})
Expand Down Expand Up @@ -141,13 +145,10 @@ func TestMigrateOldStlePTSRecords(t *testing.T) {
if len(b) < len(a) {
return false
}
// Since each slice was sorted, the descriptor table will be at index
// 0 since it has the lowest descriptor ID. Then, order by the protected
// data table descriptor at index 1.
return a[1] < b[1]
// Since each slice was sorted, the data data should
// be last. Sort on its descriptor ID.
return a[len(a)-1] < b[len(b)-1]
})

t.Logf("%v", allTargets)
t.Logf("%v", seenTargets)
require.Equal(t, seenTargets, allTargets)
require.Equal(t, allTargets, seenTargets)
}

0 comments on commit 6676177

Please sign in to comment.