Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split-table: Support split-table in mysql sink and ddl test for split table #968

Merged
merged 10 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,18 @@ jobs:
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=overwrite_resume_with_syncpoint

- name: Test ddl_for_split_tables
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=ddl_for_split_tables

- name: Test multi_source
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_source

# The 20th case in this group
# The 21th case in this group
- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/worker/kafka_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (w *KafkaDDLWorker) WriteBlockEvent(ctx context.Context, event *event.DDLEv
return errors.Trace(err)
}
}
log.Info("kafka ddl worker send block event", zap.Any("event", event))
// after flush all the ddl event, we call the callback function.
event.PostFlush()
return nil
Expand Down
4 changes: 0 additions & 4 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
return err
}
}
// TODO: Remove the hack once span replication is compatible with all sinks.
if !isSinkCompatibleWithSpanReplication(sinkURI) {
c.Scheduler.EnableTableAcrossNodes = false
}

if c.Integrity != nil {
switch strings.ToLower(sinkURI.Scheme) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[scheduler]
enable-table-across-nodes = true
region-threshold = 2
28 changes: 28 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# diff Configuration.
check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/ddl_for_split_tables/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["split_region.*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
19 changes: 19 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/data/ddls.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use split_region;

alter table test1 add column c1 int;

alter table test2 drop column val;

alter table test1 add index idx_test (id);

alter table test1 modify column c1 bigint;

alter table test1 drop index idx_test;

truncate table test2;

rename table test1 to test3;

drop table test3;

recover table test3;
23 changes: 23 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/data/dmls.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use split_region;

insert into test3(id) values (11110),(11111),(12111),(12221),(14111),(14221),(16111),(16221),(18111),(18221);
insert into test3(id) values (21110),(21111),(22111),(22221),(24111),(24221),(26111),(26221),(28111),(28221);
insert into test3(id) values (31110),(31111),(32111),(32221),(34111),(34221),(36111),(36221),(38111),(38221);
insert into test3(id) values (41110),(41111),(42111),(42221),(44111),(44221),(46111),(46221),(48111),(48221);
insert into test3(id) values (51110),(51111),(52111),(52221),(54111),(54221),(56111),(56221),(58111),(58221);
insert into test3(id) values (61110),(61111),(62111),(62221),(64111),(64221),(66111),(66221),(68111),(68221);
insert into test3(id) values (71110),(71111),(72111),(72221),(74111),(74221),(76111),(76221),(78111),(78221);
insert into test3(id) values (81110),(81111),(82111),(82221),(84111),(84221),(86111),(86221),(88111),(88221);
insert into test3(id) values (91110),(91111),(92111),(92221),(94111),(94221),(96111),(96221),(98111),(98221);
insert into test3(id) values (111122),(111101),(102111),(102221),(104111),(104221),(106111),(106221),(108111),(108221);

insert into test2(id) values (11110),(11111),(12111),(12221),(14111),(14221),(16111),(16221),(18111),(18221);
insert into test2(id) values (21110),(21111),(22111),(22221),(24111),(24221),(26111),(26221),(28111),(28221);
insert into test2(id) values (31110),(31111),(32111),(32221),(34111),(34221),(36111),(36221),(38111),(38221);
insert into test2(id) values (41110),(41111),(42111),(42221),(44111),(44221),(46111),(46221),(48111),(48221);
insert into test2(id) values (51110),(51111),(52111),(52221),(54111),(54221),(56111),(56221),(58111),(58221);
insert into test2(id) values (61110),(61111),(62111),(62221),(64111),(64221),(66111),(66221),(68111),(68221);
insert into test2(id) values (71110),(71111),(72111),(72221),(74111),(74221),(76111),(76221),(78111),(78221);
insert into test2(id) values (81110),(81111),(82111),(82221),(84111),(84221),(86111),(86221),(88111),(88221);
insert into test2(id) values (91110),(91111),(92111),(92221),(94111),(94221),(96111),(96221),(98111),(98221);
insert into test2(id) values (111122),(111101),(102111),(102221),(104111),(104221),(106111),(106221),(108111),(108221);
36 changes: 36 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/data/pre.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
drop database if exists `split_region`;
create database `split_region`;
use `split_region`;
create table test1 (id int primary key, val int);
create table test2 (id int primary key, val int);
insert into test1(id) values (16),(32),(64),(128),(256),(512),(1024),(2048),(4096),(8192),(16384);
insert into test1(id) values (-2147483648),(-1),(0),(2147483647);

split table split_region.test1 between (1) and (100000) regions 50;
split table split_region.test2 between (1) and (100000) regions 50;

insert into test1(id) values (-100),(-99),(-98),(-97),(-96),(-95),(-94),(-93),(-92),(-91);
insert into test1(id) values (1),(2),(2000),(2001),(4000),(4001),(6000),(6001),(8000),(8001);
insert into test1(id) values (10000),(10001),(12000),(12001),(14000),(14001),(16000),(16001),(18000),(18001);
insert into test1(id) values (20000),(20001),(22000),(22001),(24000),(24001),(26000),(26001),(28000),(28001);
insert into test1(id) values (30000),(30001),(32000),(32001),(34000),(34001),(36000),(36001),(38000),(38001);
insert into test1(id) values (40000),(40001),(42000),(42001),(44000),(44001),(46000),(46001),(48000),(48001);
insert into test1(id) values (50000),(50001),(52000),(52001),(54000),(54001),(56000),(56001),(58000),(58001);
insert into test1(id) values (60000),(60001),(62000),(62001),(64000),(64001),(66000),(66001),(68000),(68001);
insert into test1(id) values (70000),(70001),(72000),(72001),(74000),(74001),(76000),(76001),(78000),(78001);
insert into test1(id) values (80000),(80001),(82000),(82001),(84000),(84001),(86000),(86001),(88000),(88001);
insert into test1(id) values (90000),(90001),(92000),(92001),(94000),(94001),(96000),(96001),(98000),(98001);
insert into test1(id) values (100000),(100001),(102000),(102001),(104000),(104001),(106000),(106001),(108000),(108001);

insert into test2(id) values (-100),(-99),(-98),(-97),(-96),(-95),(-94),(-93),(-92),(-91);
insert into test2(id) values (1),(2),(2000),(2001),(4000),(4001),(6000),(6001),(8000),(8001);
insert into test2(id) values (10000),(10001),(12000),(12001),(14000),(14001),(16000),(16001),(18000),(18001);
insert into test2(id) values (20000),(20001),(22000),(22001),(24000),(24001),(26000),(26001),(28000),(28001);
insert into test2(id) values (30000),(30001),(32000),(32001),(34000),(34001),(36000),(36001),(38000),(38001);
insert into test2(id) values (40000),(40001),(42000),(42001),(44000),(44001),(46000),(46001),(48000),(48001);
insert into test2(id) values (50000),(50001),(52000),(52001),(54000),(54001),(56000),(56001),(58000),(58001);
insert into test2(id) values (60000),(60001),(62000),(62001),(64000),(64001),(66000),(66001),(68000),(68001);
insert into test2(id) values (70000),(70001),(72000),(72001),(74000),(74001),(76000),(76001),(78000),(78001);
insert into test2(id) values (80000),(80001),(82000),(82001),(84000),(84001),(86000),(86001),(88000),(88001);
insert into test2(id) values (90000),(90001),(92000),(92001),(94000),(94001),(96000),(96001),(98000),(98001);
insert into test2(id) values (100000),(100001),(102000),(102001),(104000),(104001),(106000),(106001),(108000),(108001);
69 changes: 69 additions & 0 deletions tests/integration_tests/ddl_for_split_tables/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# this script is test all the basic ddls when the table is split into
# multiple dispatchers in multi cdc server
# TODO:This script need to add kafka-class sink

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
check_time=60

function prepare() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301"

TOPIC_NAME="ticdc-ddl_split_table-$RANDOM"

# to make the table multi regions, to help create multiple dispatchers for the table
run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/pre.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 500_

SINK_URI="mysql://root:@127.0.0.1:3306/"
#SINK_URI="kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
# case $SINK_TYPE in
# kafka) SINK_URI="kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
# storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
# pulsar)
# run_pulsar_cluster $WORK_DIR normal
# SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
# ;;
# *) SINK_URI="mysql://normal:[email protected]:3306/" ;;
# esac

sleep 10

run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c "test" --config="$CUR/conf/changefeed.toml"

#run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=1&version=${KAFKA_VERSION}&max-message-bytes=10485760"

# case $SINK_TYPE in
# kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
# storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
# pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
# esac
}

trap stop_tidb_cluster EXIT

prepare $*
## execute ddls
run_sql_file $CUR/data/ddls.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# ## insert some datas
run_sql_file $CUR/data/dmls.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 30
cleanup_process $CDC_BINARY

check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Loading