From 7f84391aeef0163dd4663beb1a96e508b1f927cc Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 7 Feb 2025 14:43:49 +0800 Subject: [PATCH] update --- downstreamadapter/worker/kafka_ddl_worker.go | 1 + .../ddl_for_split_tables/run.sh | 32 +++++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/downstreamadapter/worker/kafka_ddl_worker.go b/downstreamadapter/worker/kafka_ddl_worker.go index 7a74b0c1..3c392315 100644 --- a/downstreamadapter/worker/kafka_ddl_worker.go +++ b/downstreamadapter/worker/kafka_ddl_worker.go @@ -132,6 +132,7 @@ func (w *KafkaDDLWorker) WriteBlockEvent(ctx context.Context, event *event.DDLEv return errors.Trace(err) } } + log.Info("kafka ddl worker send block event", zap.Any("event", event)) // after flush all the ddl event, we call the callback function. event.PostFlush() return nil diff --git a/tests/integration_tests/ddl_for_split_tables/run.sh b/tests/integration_tests/ddl_for_split_tables/run.sh index 684c4146..ef3d707e 100644 --- a/tests/integration_tests/ddl_for_split_tables/run.sh +++ b/tests/integration_tests/ddl_for_split_tables/run.sh @@ -1,5 +1,6 @@ # this script is test all the basic ddls when the table is split into # multiple dispatchers in multi cdc server +# TODO:This script must need to run in kafka-class sink, not in the ci now set -eu @@ -26,25 +27,28 @@ function prepare() { run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql_file $CUR/data/pre.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; - pulsar) - run_pulsar_cluster $WORK_DIR normal - SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" - ;; - *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; - esac + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" + # case $SINK_TYPE in + # kafka) SINK_URI="kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + # storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + # pulsar) + # run_pulsar_cluster $WORK_DIR normal + # SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" + # ;; + # *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + # esac sleep 10 run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c "test" --config="$CUR/conf/changefeed.toml" - case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; - esac + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + + # case $SINK_TYPE in + # kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + # storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + # pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;; + # esac } trap stop_tidb_cluster EXIT