Skip to content

Commit

Permalink
Fix the corner case with ddl and syncpoint share the same commit; Add…
Browse files Browse the repository at this point in the history
… a new test for random ddl failover test (#958)
  • Loading branch information
hongyunyan authored Jan 27, 2025
1 parent 38ec3a4 commit 2ae9d0e
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,11 @@ jobs:
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix
- name: Test fail_over_ddl_mix_with_syncpoint
run: |
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=fail_over_ddl_mix_with_syncpoint
- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
Expand Down
10 changes: 9 additions & 1 deletion downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type EventDispatcher interface {
GetFilterConfig() *eventpb.FilterConfig
EnableSyncPoint() bool
GetSyncPointInterval() time.Duration
GetStartTsIsSyncpoint() bool
GetResolvedTs() uint64
SetInitialTableInfo(tableInfo *common.TableInfo)
HandleEvents(events []DispatcherEvent, wakeCallback func()) (block bool)
Expand Down Expand Up @@ -85,7 +86,8 @@ type Dispatcher struct {
schemaID int64
tableSpan *heartbeatpb.TableSpan
// startTs is the timestamp that the dispatcher need to receive and flush events.
startTs uint64
startTs uint64
startTsIsSyncpoint bool
// The ts from pd when the dispatcher is created.
// when downstream is mysql-class, for dml event we need to compare the commitTs with this ts
// to determine whether the insert event should use `Replace` or just `Insert`
Expand Down Expand Up @@ -154,6 +156,7 @@ func NewDispatcher(
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
syncPointConfig *syncpoint.SyncPointConfig,
startTsIsSyncpoint bool,
filterConfig *eventpb.FilterConfig,
currentPdTs uint64,
errCh chan error,
Expand All @@ -164,6 +167,7 @@ func NewDispatcher(
tableSpan: tableSpan,
sink: sink,
startTs: startTs,
startTsIsSyncpoint: startTsIsSyncpoint,
blockStatusesChan: blockStatusesChan,
syncPointConfig: syncPointConfig,
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
Expand Down Expand Up @@ -631,6 +635,10 @@ func (d *Dispatcher) GetSyncPointInterval() time.Duration {
return time.Duration(0)
}

func (d *Dispatcher) GetStartTsIsSyncpoint() bool {
return d.startTsIsSyncpoint
}

func (d *Dispatcher) Remove() {
log.Info("table event dispatcher component status changed to stopping",
zap.String("table", d.tableSpan.String()))
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Dis
SyncPointInterval: time.Duration(5 * time.Second),
SyncPointRetention: time.Duration(10 * time.Minute),
}, // syncPointConfig
false,
nil, // filterConfig
common.Ts(0), // pdTs
make(chan error, 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re
// Besides, we batch the creatation for the dispatchers,
// mainly because we need to batch the query for startTs when sink is mysql-class to reduce the time cost.
var newStartTsList []int64
startTsIsSyncpointList := make([]bool, len(startTsList))
var err error
if e.sink.SinkType() == common.MysqlSinkType {
newStartTsList, err = e.sink.(*sink.MysqlSink).GetStartTsList(tableIds, startTsList, removeDDLTs)
newStartTsList, startTsIsSyncpointList, err = e.sink.(*sink.MysqlSink).GetStartTsList(tableIds, startTsList, removeDDLTs)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -412,6 +413,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re
schemaIds[idx],
e.schemaIDToDispatchers,
e.syncPointConfig,
startTsIsSyncpointList[idx],
e.filterConfig,
pdTsList[idx],
e.errCh)
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string,
message.RegisterDispatcherRequest.FilterConfig = req.Dispatcher.GetFilterConfig()
message.RegisterDispatcherRequest.EnableSyncPoint = req.Dispatcher.EnableSyncPoint()
message.RegisterDispatcherRequest.SyncPointInterval = uint64(req.Dispatcher.GetSyncPointInterval().Seconds())
message.RegisterDispatcherRequest.SyncPointTs = syncpoint.CalculateStartSyncPointTs(req.StartTs, req.Dispatcher.GetSyncPointInterval())
message.RegisterDispatcherRequest.SyncPointTs = syncpoint.CalculateStartSyncPointTs(req.StartTs, req.Dispatcher.GetSyncPointInterval(), req.Dispatcher.GetStartTsIsSyncpoint())
}

err := c.mc.SendCommand(&messaging.TargetMessage{
Expand Down
13 changes: 7 additions & 6 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,24 @@ func (s *MysqlSink) GetStartTsList(
tableIds []int64,
startTsList []int64,
removeDDLTs bool,
) ([]int64, error) {
) ([]int64, []bool, error) {
if removeDDLTs {
// means we just need to remove the ddl ts item for this changefeed, and return startTsList directly.
err := s.ddlWorker.RemoveDDLTsItem()
if err != nil {
atomic.StoreUint32(&s.isNormal, 0)
return nil, err
return nil, nil, err
}
return startTsList, nil
isSyncpointList := make([]bool, len(startTsList))
return startTsList, isSyncpointList, nil
}

startTsList, err := s.ddlWorker.GetStartTsList(tableIds, startTsList)
startTsList, isSyncpointList, err := s.ddlWorker.GetStartTsList(tableIds, startTsList)
if err != nil {
atomic.StoreUint32(&s.isNormal, 0)
return nil, err
return nil, nil, err
}
return startTsList, nil
return startTsList, isSyncpointList, nil
}

func (s *MysqlSink) Close(removeChangefeed bool) {
Expand Down
3 changes: 2 additions & 1 deletion downstreamadapter/sink/mysql_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ func TestMysqlSinkBasicFunctionality(t *testing.T) {
table_id bigint(21),
finished bool,
related_table_id bigint(21),
is_syncpoint bool,
created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX (ticdc_cluster_id, changefeed, table_id),
PRIMARY KEY (ticdc_cluster_id, changefeed, table_id)
);`).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

mock.ExpectBegin()
mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, related_table_id, finished) VALUES ('default', 'test/test', '1', 0, 1, 1), ('default', 'test/test', '1', 1, 1, 1) ON DUPLICATE KEY UPDATE finished=VALUES(finished), related_table_id=VALUES(related_table_id), ddl_ts=VALUES(ddl_ts), created_at=NOW();").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, related_table_id, finished, is_syncpoint) VALUES ('default', 'test/test', '1', 0, 1, 1, 0), ('default', 'test/test', '1', 1, 1, 1, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), related_table_id=VALUES(related_table_id), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

mock.ExpectBegin()
Expand Down
4 changes: 3 additions & 1 deletion downstreamadapter/syncpoint/sync_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ type SyncPointConfig struct {
SyncPointRetention time.Duration
}

func CalculateStartSyncPointTs(startTs uint64, syncPointInterval time.Duration) uint64 {
func CalculateStartSyncPointTs(startTs uint64, syncPointInterval time.Duration, startTsIsSyncpoint bool) uint64 {
if syncPointInterval == time.Duration(0) {
return 0
}
k := oracle.GetTimeFromTS(startTs).Sub(time.Unix(0, 0)) / syncPointInterval
if oracle.GetTimeFromTS(startTs).Sub(time.Unix(0, 0))%syncPointInterval != 0 || oracle.ExtractLogical(startTs) != 0 {
k += 1
} else if startTsIsSyncpoint {
k += 1
}
return oracle.GoTimeToTS(time.Unix(0, 0).Add(k * syncPointInterval))
}
11 changes: 7 additions & 4 deletions downstreamadapter/worker/mysql_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,20 @@ func (w *MysqlDDLWorker) SetTableSchemaStore(tableSchemaStore *util.TableSchemaS
w.mysqlWriter.SetTableSchemaStore(tableSchemaStore)
}

func (w *MysqlDDLWorker) GetStartTsList(tableIds []int64, startTsList []int64) ([]int64, error) {
ddlTsList, err := w.mysqlWriter.GetStartTsList(tableIds)
func (w *MysqlDDLWorker) GetStartTsList(tableIds []int64, startTsList []int64) ([]int64, []bool, error) {
ddlTsList, isSyncpointList, err := w.mysqlWriter.GetStartTsList(tableIds)
if err != nil {
return nil, err
return nil, nil, err
}
resTs := make([]int64, len(ddlTsList))
for idx, ddlTs := range ddlTsList {
if startTsList[idx] > ddlTs {
isSyncpointList[idx] = false
}
resTs[idx] = max(ddlTs, startTsList[idx])
}

return resTs, nil
return resTs, isSyncpointList, nil
}

func (w *MysqlDDLWorker) WriteBlockEvent(event commonEvent.BlockEvent) error {
Expand Down
23 changes: 16 additions & 7 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea
key := getEventKey(blockState.BlockTs, blockState.IsSyncPoint)
event, ok := b.blockedEvents.Get(key)
if !ok {
event = NewBlockEvent(common.NewChangefeedIDFromPB(resp.ChangefeedID), common.NewDispatcherIDFromPB(span.ID), b.controller, blockState, b.splitTableEnabled, true)
event = NewBlockEvent(common.NewChangefeedIDFromPB(resp.ChangefeedID), common.NewDispatcherIDFromPB(span.ID), b.controller, blockState, b.splitTableEnabled)
b.blockedEvents.Set(key, event)
}
switch blockState.Stage {
Expand Down Expand Up @@ -212,7 +212,10 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea
replications := b.controller.replicationDB.GetTasksByTableID(tableId)
for _, replication := range replications {
if replication.GetStatus().CheckpointTs >= barrierEvent.commitTs {
barrierEvent.rangeChecker.AddSubRange(replication.Span.TableID, replication.Span.StartKey, replication.Span.EndKey)
// one related table has forward checkpointTs, means the block event can be advanced
barrierEvent.selected.Store(true)
barrierEvent.writerDispatcherAdvanced = true
return false
}
}
}
Expand All @@ -221,14 +224,20 @@ func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea
replications := b.controller.replicationDB.GetTasksBySchemaID(schemaID)
for _, replication := range replications {
if replication.GetStatus().CheckpointTs >= barrierEvent.commitTs {
barrierEvent.rangeChecker.AddSubRange(replication.Span.TableID, replication.Span.StartKey, replication.Span.EndKey)
// one related table has forward checkpointTs, means the block event can be advanced
barrierEvent.selected.Store(true)
barrierEvent.writerDispatcherAdvanced = true
return false
}
}
case heartbeatpb.InfluenceType_All:
replications := b.controller.replicationDB.GetAllTasks()
for _, replication := range replications {
if replication.GetStatus().CheckpointTs >= barrierEvent.commitTs {
barrierEvent.rangeChecker.AddSubRange(replication.Span.TableID, replication.Span.StartKey, replication.Span.EndKey)
// one related table has forward checkpointTs, means the block event can be advanced
barrierEvent.selected.Store(true)
barrierEvent.writerDispatcherAdvanced = true
return false
}
}
}
Expand Down Expand Up @@ -306,7 +315,7 @@ func (b *Barrier) handleEventDone(changefeedID common.ChangeFeedID, dispatcherID
event, ok := b.blockedEvents.Get(key)
if !ok {
// no block event found
be := NewBlockEvent(changefeedID, dispatcherID, b.controller, status.State, b.splitTableEnabled, false)
be := NewBlockEvent(changefeedID, dispatcherID, b.controller, status.State, b.splitTableEnabled)
// the event is a fake event, the dispatcher will not send the block event
be.rangeChecker = range_checker.NewBoolRangeChecker(false)
return be
Expand Down Expand Up @@ -361,7 +370,7 @@ func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID,
// it's not a blocked event, it must be sent by table event trigger dispatcher, just for doing scheduler
// and the ddl already synced to downstream , e.g.: create table
// if ack failed, dispatcher will send a heartbeat again, so we do not need to care about resend message here
event := NewBlockEvent(changefeedID, dispatcherID, b.controller, blockState, b.splitTableEnabled, false)
event := NewBlockEvent(changefeedID, dispatcherID, b.controller, blockState, b.splitTableEnabled)
event.scheduleBlockEvent()
return event, nil
}
Expand All @@ -372,7 +381,7 @@ func (b *Barrier) getOrInsertNewEvent(changefeedID common.ChangeFeedID, dispatch
) *BarrierEvent {
event, ok := b.blockedEvents.Get(key)
if !ok {
event = NewBlockEvent(changefeedID, dispatcherID, b.controller, blockState, b.splitTableEnabled, false)
event = NewBlockEvent(changefeedID, dispatcherID, b.controller, blockState, b.splitTableEnabled)
b.blockedEvents.Set(key, event)
}
return event
Expand Down
5 changes: 2 additions & 3 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func NewBlockEvent(cfID common.ChangeFeedID,
controller *Controller,
status *heartbeatpb.State,
dynamicSplitEnabled bool,
bootstraped bool,
) *BarrierEvent {
event := &BarrierEvent{
controller: controller,
Expand Down Expand Up @@ -98,15 +97,15 @@ func NewBlockEvent(cfID common.ChangeFeedID,
// TODO:clean code
// create range checker if dispatcher is ddl dispatcher
// otherwise store dispatcherID in reportedDispatchers, and not create rangeChecker
if bootstraped || dispatcherID == controller.ddlDispatcherID {
if dispatcherID == controller.ddlDispatcherID {
event.createRangeCheckerForTypeDB()
} else {
event.reportedDispatchers[dispatcherID] = struct{}{}
}
case heartbeatpb.InfluenceType_All:
// create range checker if dispatcher is ddl dispatcher
// otherwise store dispatcherID in reportedDispatchers, and not create rangeChecker
if bootstraped || dispatcherID == controller.ddlDispatcherID {
if dispatcherID == controller.ddlDispatcherID {
event.createRangeCheckerForTypeAll()
} else {
event.reportedDispatchers[dispatcherID] = struct{}{}
Expand Down
16 changes: 8 additions & 8 deletions maintainer/barrier_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestScheduleEvent(t *testing.T) {
BlockTs: 10,
NeedDroppedTables: &heartbeatpb.InfluencedTables{InfluenceType: heartbeatpb.InfluenceType_All},
NeedAddedTables: []*heartbeatpb.Table{{2, 1}, {3, 1}},
}, true, false)
}, true)
event.scheduleBlockEvent()
// drop table will be executed first
require.Equal(t, 2, controller.replicationDB.GetAbsentSize())
Expand All @@ -63,7 +63,7 @@ func TestScheduleEvent(t *testing.T) {
SchemaID: 1,
},
NeedAddedTables: []*heartbeatpb.Table{{4, 1}},
}, false, false)
}, false)
event.scheduleBlockEvent()
// drop table will be executed first, then add the new table
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
Expand All @@ -76,7 +76,7 @@ func TestScheduleEvent(t *testing.T) {
TableIDs: []int64{4},
},
NeedAddedTables: []*heartbeatpb.Table{{5, 1}},
}, false, false)
}, false)
event.scheduleBlockEvent()
// drop table will be executed first, then add the new table
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestResendAction(t *testing.T) {
BlockTables: &heartbeatpb.InfluencedTables{
InfluenceType: heartbeatpb.InfluenceType_All,
},
}, false, false)
}, false)
// time is not reached
event.lastResendTime = time.Now()
event.selected.Store(true)
Expand All @@ -138,7 +138,7 @@ func TestResendAction(t *testing.T) {
InfluenceType: heartbeatpb.InfluenceType_DB,
SchemaID: 1,
},
}, false, false)
}, false)
event.selected.Store(true)
event.writerDispatcherAdvanced = true
msgs = event.resend()
Expand All @@ -156,7 +156,7 @@ func TestResendAction(t *testing.T) {
InfluenceType: heartbeatpb.InfluenceType_All,
SchemaID: 1,
},
}, false, false)
}, false)
event.selected.Store(true)
event.writerDispatcherAdvanced = true
msgs = event.resend()
Expand All @@ -175,7 +175,7 @@ func TestResendAction(t *testing.T) {
TableIDs: []int64{1, 2},
SchemaID: 1,
},
}, false, false)
}, false)
event.selected.Store(true)
event.writerDispatcherAdvanced = true
msgs = event.resend()
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestUpdateSchemaID(t *testing.T) {
NewSchemaID: 2,
},
},
}, true, false)
}, true)
event.scheduleBlockEvent()
require.Equal(t, 1, controller.replicationDB.GetAbsentSize())
// check the schema id and map is updated
Expand Down
Loading

0 comments on commit 2ae9d0e

Please sign in to comment.