Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Dec 19, 2024
1 parent 5f40996 commit 7fc7fc7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 27 deletions.
38 changes: 16 additions & 22 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package dispatchermanager

import (
"sync"
"sync/atomic"
"time"

"github.com/pingcap/log"
Expand All @@ -29,30 +30,30 @@ import (

type DispatcherMap struct {
m sync.Map
// mutex is used to protect the seq.
// Here we don't need to make seq changes always atmoic with the m changed.
// Our target is :
// The seq get from ForEach should be smaller than the seq get from Set
// when ForEach is not access the new dispatcher just Set.
// So we add seq after the dispatcher is add in the m for Set, and get the seq before do range for ForRange.
mutex sync.Mutex
// sequence number is increasing when dispatcher is added.
//
// Seq is used to prevent the fallback of changefeed's checkpointTs.
// When some new dispatcher(table) is being added, the maintainer will block the forward of changefeed's checkpointTs
// until the maintainer receive the message that the new dispatcher's component status change to working
// until the maintainer receive the message that the new dispatcher's component status change to working.
//
// Besides, there is no strict order of the heartbeat message and the table status messages, which is means
// it can happen that when dispatcher A is created, event dispatcher manager may first send a table status message
// to show the new dispatcher is working, and then send a heartbeat message of the current watermark,
// which is calculated without the new disaptcher.
// When the checkpoinTs of the watermark is large than the startTs of the new dispatcher,
// When the checkpointTs of the watermark is large than the startTs of the new dispatcher,
// the watermark of next heartbeat, which calculated with the new dispatcher can be less than the previous watermark.
// Then it can cause the fallback of changefeed's checkpointTs.
// To avoid fallback, we add a seq number in each heartbeat message(including table span messages)
// To avoid fallback, we add a seq number in each heartbeat message(both collect from collectComponentStatusWhenChanged and aggregateDispatcherHeartbeats)
// When a table is added the seq number will be increase,
// and when the maintainer receive the outdate seq, it will know the heartbeat message is outdate and ignore it.
// In this way, even the above case happens, the changefeed's checkpointTs will not fallback.
seq uint64
//
// Here we don't need to make seq changes always atmoic with the m changed.
// Our target is just :
// The seq get from ForEach should be smaller than the seq get from Set
// when ForEach is not access the new dispatcher just Set.
// So we add seq after the dispatcher is add in the m for Set, and get the seq before do range for ForRange.
seq atomic.Uint64
}

func newDispatcherMap() *DispatcherMap {
Expand Down Expand Up @@ -80,9 +81,7 @@ func (d *DispatcherMap) Get(id common.DispatcherID) (*dispatcher.Dispatcher, boo
}

func (d *DispatcherMap) GetSeq() uint64 {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.seq
return d.seq.Load()
}

func (d *DispatcherMap) Delete(id common.DispatcherID) {
Expand All @@ -91,17 +90,12 @@ func (d *DispatcherMap) Delete(id common.DispatcherID) {

func (d *DispatcherMap) Set(id common.DispatcherID, dispatcher *dispatcher.Dispatcher) uint64 {
d.m.Store(id, dispatcher)
d.mutex.Lock()
defer d.mutex.Unlock()
d.seq++
return d.seq
d.seq.Add(1)
return d.seq.Load()
}

func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *dispatcher.Dispatcher)) uint64 {
var seq uint64
d.mutex.Lock()
seq = d.seq
d.mutex.Unlock()
seq := d.seq.Load()
d.m.Range(func(key, value interface{}) bool {
fn(key.(common.DispatcherID), value.(*dispatcher.Dispatcher))
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (t DDLEvent) Marshal() ([]byte, error) {
data = append(data, dispatcherIDDataSize...)

if t.TableInfo != nil {
tableInfoData, err := json.Marshal(t.TableInfo)
tableInfoData, err := t.TableInfo.Marshal()
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/common/event/ddl_event_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package event

import (
"reflect"
"testing"

"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -35,5 +36,6 @@ func TestDDLEvent(t *testing.T) {
err = reverseEvent.Unmarshal(data)
reverseEvent.eventSize = 0
require.Nil(t, err)
require.Equal(t, ddlEvent, reverseEvent)
equal := reflect.DeepEqual(ddlEvent, ddlEvent)
require.True(t, equal)
}
6 changes: 3 additions & 3 deletions pkg/common/table_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ type TableInfo struct {

preSQLs struct {
isInitialized atomic.Bool
m [4]string `json:"-"`
}
m [4]string
} `json:"-"`
}

var count atomic.Int64
Expand All @@ -312,7 +312,7 @@ func (ti *TableInfo) InitPrivateFields() {
log.Info("fizz: init private fields", zap.String("tableName", ti.TableName.Table), zap.Int64("count", count.Add(1)))
}

func (ti *TableInfo) MarshalJSON() ([]byte, error) {
func (ti *TableInfo) Marshal() ([]byte, error) {
// otherField | columnSchemaData | columnSchemaDataSize
data, err := json.Marshal(ti)
if err != nil {
Expand Down

0 comments on commit 7fc7fc7

Please sign in to comment.