Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Feb 7, 2025
1 parent 8122c0f commit 7f84391
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
1 change: 1 addition & 0 deletions downstreamadapter/worker/kafka_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (w *KafkaDDLWorker) WriteBlockEvent(ctx context.Context, event *event.DDLEv
return errors.Trace(err)
}
}
log.Info("kafka ddl worker send block event", zap.Any("event", event))
// after flush all the ddl event, we call the callback function.
event.PostFlush()
return nil
Expand Down
32 changes: 18 additions & 14 deletions tests/integration_tests/ddl_for_split_tables/run.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# this script is test all the basic ddls when the table is split into
# multiple dispatchers in multi cdc server
# TODO:This script must need to run in kafka-class sink, not in the ci now

set -eu

Expand All @@ -26,25 +27,28 @@ function prepare() {
run_sql_file $CUR/data/pre.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/pre.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
# case $SINK_TYPE in
# kafka) SINK_URI="kafka://127.0.0.1:9094/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
# storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
# pulsar)
# run_pulsar_cluster $WORK_DIR normal
# SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
# ;;
# *) SINK_URI="mysql://normal:[email protected]:3306/" ;;
# esac

sleep 10

run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c "test" --config="$CUR/conf/changefeed.toml"

case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"

# case $SINK_TYPE in
# kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
# storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
# pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
# esac
}

trap stop_tidb_cluster EXIT
Expand Down

0 comments on commit 7f84391

Please sign in to comment.