Skip to content

Commit

Permalink
Add blackhole sink (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Dec 20, 2024
1 parent 97b0cbb commit 65f34ab
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 1 deletion.
84 changes: 84 additions & 0 deletions downstreamadapter/sink/blackhole.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/sink/util"
"go.uber.org/zap"
)

// BlackHoleSink is responsible for writing data to blackhole.
// Including DDL and DML.
type BlackHoleSink struct {
}

func NewBlackHoleSink() (*BlackHoleSink, error) {
blackholeSink := BlackHoleSink{}
return &blackholeSink, nil
}

func (s *BlackHoleSink) IsNormal() bool {
return true
}

func (s *BlackHoleSink) SinkType() common.SinkType {
return common.BlackHoleSinkType
}

func (s *BlackHoleSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
}

func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) {
log.Debug("BlackHoleSink: DML Event", zap.Any("dml", event))
for _, callback := range event.PostTxnFlushed {
callback()
}
}

func (s *BlackHoleSink) PassBlockEvent(event commonEvent.BlockEvent) {
event.PostFlush()
}

func (s *BlackHoleSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch event.GetType() {
case commonEvent.TypeDDLEvent:
e := event.(*commonEvent.DDLEvent)
for _, callback := range e.PostTxnFlushed {
callback()
}
case commonEvent.TypeSyncPointEvent:
e := event.(*commonEvent.SyncPointEvent)
for _, callback := range e.PostTxnFlushed {
callback()
}
default:
log.Error("unknown event type",
zap.Any("event", event))
}
return nil
}

func (s *BlackHoleSink) AddCheckpointTs(ts uint64) {
}

func (s *BlackHoleSink) GetStartTsList(tableIds []int64, startTsList []int64) ([]int64, error) {
return []int64{}, nil
}

func (s *BlackHoleSink) Close(removeChangefeed bool) error {
return nil
}
84 changes: 84 additions & 0 deletions downstreamadapter/sink/blackhole_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"testing"
"time"

commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/stretchr/testify/require"
)

// Test callback and tableProgress works as expected after AddDMLEvent
func TestBlacHoleSinkBasicFunctionality(t *testing.T) {
sink, err := NewBlackHoleSink()
require.NoError(t, err)

count := 0

helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
createTableSQL := "create table t (id int primary key, name varchar(32));"
job := helper.DDL2Job(createTableSQL)
require.NotNil(t, job)

ddlEvent := &commonEvent.DDLEvent{
Query: job.Query,
SchemaName: job.SchemaName,
TableName: job.TableName,
FinishedTs: 1,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}},
PostTxnFlushed: []func(){
func() { count++ },
},
}

ddlEvent2 := &commonEvent.DDLEvent{
Query: job.Query,
SchemaName: job.SchemaName,
TableName: job.TableName,
FinishedTs: 4,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}},
PostTxnFlushed: []func(){
func() { count++ },
},
}

dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')", "insert into t values (2, 'test2');")
dmlEvent.PostTxnFlushed = []func(){
func() { count++ },
}
dmlEvent.CommitTs = 2

err = sink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

sink.AddDMLEvent(dmlEvent)
time.Sleep(1 * time.Second)

sink.PassBlockEvent(ddlEvent2)

require.Equal(t, count, 3)
}
4 changes: 3 additions & 1 deletion downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID
return NewMysqlSink(ctx, changefeedID, 16, config, sinkURI, errCh)
case sink.KafkaScheme, sink.KafkaSSLScheme:
return NewKafkaSink(ctx, changefeedID, sinkURI, config.SinkConfig, errCh)
case sink.BlackHoleScheme:
return NewBlackHoleSink()
}
return nil, nil
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
}
1 change: 1 addition & 0 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,5 @@ type SinkType int
const (
MysqlSinkType SinkType = iota
KafkaSinkType
BlackHoleSinkType
)

0 comments on commit 65f34ab

Please sign in to comment.