From 70ee61ecac166c56776644a311048b562125553b Mon Sep 17 00:00:00 2001 From: jacky Date: Sun, 1 Dec 2024 23:49:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20Kafka=20=E7=AC=94=E8=AE=B0?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .obsidian/workspace.json | 45 ++++++------ ...a \347\232\204\346\225\260\346\215\256.md" | 41 ++++++++--- docs/Kafka/Kafka Tips.md | 3 + ...70\347\224\250\345\221\275\344\273\244.md" | 9 ++- ...53\351\200\237\345\274\200\345\247\213.md" | 69 +++++++++++++++++++ 5 files changed, 135 insertions(+), 32 deletions(-) create mode 100644 docs/Kafka/Kafka Tips.md create mode 100644 "docs/Kafka/Kafka \345\277\253\351\200\237\345\274\200\345\247\213.md" diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 4a636ce7..899f3d62 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -13,12 +13,12 @@ "state": { "type": "markdown", "state": { - "file": "docs/MySQL/MySQL 配置主从.md", + "file": "docs/Kafka/Kafka 快速开始.md", "mode": "source", "source": false }, "icon": "lucide-file", - "title": "MySQL 配置主从" + "title": "Kafka 快速开始" } } ] @@ -52,7 +52,7 @@ "state": { "type": "search", "state": { - "query": "atmos", + "query": "VIM", "matchingCase": false, "explainSearch": false, "collapseAll": false, @@ -139,10 +139,10 @@ "state": { "type": "outline", "state": { - "file": "docs/MySQL/MySQL 配置主从.md" + "file": "docs/Kafka/Kafka Tips.md" }, "icon": "lucide-list", - "title": "MySQL 配置主从 的大纲" + "title": "Kafka Tips 的大纲" } } ], @@ -150,50 +150,51 @@ } ], "direction": "horizontal", - "width": 300 + "width": 300, + "collapsed": true }, "left-ribbon": { "hiddenItems": { - "vscode-editor:新建代码文件": false, "switcher:打开快速切换": false, "graph:查看关系图谱": false, "canvas:新建白板": false, "daily-notes:打开/创建今天的日记": false, "templates:插入模板": false, - "command-palette:打开命令面板": false + "command-palette:打开命令面板": false, + "vscode-editor:新建代码文件": false } }, "active": "0743fed77e619d96", "lastOpenFiles": [ + "docs/Kafka/Kafka Tips.md", + "docs/Kafka/Kafka 快速开始.md", + "docs/Kafka/Kafka 集群搭建.md", + "docs/Kafka/Kafka 消费者.md", + "docs/Kafka/Kafka 常用命令.md", + "docs/Linux/Nginx 使用自签名证书.md", + "docs/Linux/Linux Tips.md", + "docs/Linux/Debian 软件安装.md", + "docs/Linux/VIM.md", + "docs/Server/本地服务器192.168.31.11.md", + "docs/Kafka/保证消息不丢失.md", + "docs/Doris/Doris 接入 Kafka 的数据.md", + "mkdocs.yml", + "docs/MySQL/MySQL 配置主从.md", "docs/MySQL/MySQL 备份.md", "docs/MySQL/MySQL Tips.md", "docs/Doris/Doris 接入 MySQL 的数据.md", - "docs/Doris/Doris 接入 Kafka 的数据.md", "docs/Doris/Doris 分区表.md", "docs/Doris/Doris 日期时间类型.md", "docs/Doris/Doris 用户角色权限.md", "docs/DM/达梦安装.md", "docs/DM/达梦备份.md", "docs/DM/达梦 Tips.md", - "docs/Kafka/Kafka 集群搭建.md", - "docs/Kafka/Kafka 常用命令.md", - "docs/Kafka/Kafka 消费者.md", "docs/Java/Tips.md", "docs/Java/重复提交.md", "未命名.md", "docs/Docker/安装 Docker.md", - "docs/Docker/镜像操作.md", - "docs/Arts/满江红-岳飞.md", - "docs/Arts/大观楼长联-孙髯.md", - "docs/Arts/侠客行-李白.md", - "docs/Algorithms/求解算数表达式的值.md", - "docs/Algorithms/字符串匹配算法.md", - "docs/Algorithms/Base64 算法.md", "overrides/partials/comments.html", "overrides/partials/footer.html", - "template/Doris.md", - "docs/OS/线程.md", - "mkdocs.yml", "docs/LocalFile/Picture/ReentrantReadWriteLock类图.svg", "docs/LocalFile/Picture/ReentrantLock类图.svg", "docs/LocalFile/Picture/ReentrantLock类图.png", diff --git "a/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" "b/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" index 405b0227..802a7125 100644 --- "a/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" +++ "b/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" @@ -5,7 +5,7 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User ``` ### 创建唯一模型 -``` +```sql CREATE TABLE test_json_load( user_id BIGINT NOT NULL COMMENT "用户 ID", name VARCHAR(20) COMMENT "用户姓名", @@ -15,12 +15,10 @@ DISTRIBUTED BY HASH(user_id) BUCKETS 1; ``` ### 创建导入作业 -``` -CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON -`quickstart_events` +```sql +CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON `quickstart_events` PROPERTIES ( - "desired_concurrent_number" = "3", "format" = "json", "strict_mode" = "false" ) @@ -40,7 +38,7 @@ KAFKA ### 往 Kafka 中写入数据测试 值为 null 或者日期格式都兼容,unique key 模式也能唯一去重保留新的数据。 -``` +```json { "user_id" : 1, "name" : "Benjamin", "born_time": "2023-01-01 00:00:00" } { "user_id" : 2, "name" : "Emily", "born_time": "20240101122323" } { "user_id" : 3, "name" : "Emily", "born_time": "20240101122323" } @@ -57,5 +55,32 @@ show routine load 能够查询导入作业的状态。 测试完毕之后可以通过 pause 暂停导入任务:`pause routine load for task_name;`。 -## 调优 -Kafka 多 \ No newline at end of file + +## 案例调优 + +前端上传的数据日期格式是对的,但是值不对,比如 0024-01-01 00:00:00,这种数据由于没有对应的分区表,导致 Routine Load 导入任务暂停。 + +可以通过修改 Routine Load 导入任务,从指定的 Kafka Topic 分区和偏移开始消费数据,但是这样定位脏数据的位置比较麻烦。 + +或者,可以通过修改 Routine Load 导入任务 load_properties 的 WHERE 子模块,来过滤掉不符合条件的数据。 +```sql +CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON `quickstart_events` +WHERE born_time > '1900-01-01 00:00:00' + PROPERTIES + ( + "format" = "json", + "strict_mode" = "false" + ) +FROM +KAFKA + ( + "kafka_broker_list" = "172.31.8.164:9092,172.31.8.165:9092,172.31.8.166:9092", + "kafka_topic" = "quickstart-events", + "property.group.id" = "xxxxx", + "property.kafka_default_offsets" = "OFFSET_BEGINNING", + "property.security.protocol" = "SASL_PLAINTEXT", + "property.sasl.mechanism" = "PLAIN", + "property.sasl.username" = "xxxx", + "property.sasl.password" = "xxxx" + ); +``` \ No newline at end of file diff --git a/docs/Kafka/Kafka Tips.md b/docs/Kafka/Kafka Tips.md new file mode 100644 index 00000000..df8f08b8 --- /dev/null +++ b/docs/Kafka/Kafka Tips.md @@ -0,0 +1,3 @@ + +为什么要使用 Kafka。 +1. 解耦,将进程之间的直接交互调整为统一面向 Kafka Topic 的标准对接。 \ No newline at end of file diff --git "a/docs/Kafka/Kafka \345\270\270\347\224\250\345\221\275\344\273\244.md" "b/docs/Kafka/Kafka \345\270\270\347\224\250\345\221\275\344\273\244.md" index 56b6a4d4..85aa89c4 100644 --- "a/docs/Kafka/Kafka \345\270\270\347\224\250\345\221\275\344\273\244.md" +++ "b/docs/Kafka/Kafka \345\270\270\347\224\250\345\221\275\344\273\244.md" @@ -7,7 +7,7 @@ tags: 创建 topic ``` -bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic_name --command-config admin-jass +bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic --create ``` 查看所有的 topic @@ -17,13 +17,18 @@ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list 查看 topic 的信息 ``` -bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic +bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic --describe ``` 查看 topic 每个分区的 offset ``` bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ``` + +修改 topic 的分区数量 +``` +bin/kafka +``` ## Consumer 控制台消费者 diff --git "a/docs/Kafka/Kafka \345\277\253\351\200\237\345\274\200\345\247\213.md" "b/docs/Kafka/Kafka \345\277\253\351\200\237\345\274\200\345\247\213.md" new file mode 100644 index 00000000..233cd04b --- /dev/null +++ "b/docs/Kafka/Kafka \345\277\253\351\200\237\345\274\200\345\247\213.md" @@ -0,0 +1,69 @@ +### Kafka 安装 +准备两个文件 +kafka_2.13-3.9.0.tgz +OpenJDK21U-jdk_aarch64_linux_hotspot_21.0.5_11.tar.gz + +解压缩 + tar -zxvf OpenJDK21U-jdk_aarch64_linux_hotspot_21.0.5_11.tar.gz -C /usr/local/ + tar -zxvf kafka_2.13-3.9.0.tgz -C /usr/local/ + +配置 Java 环境 /etc/profile.d/java.sh, 执行 source /etc/profile,java -version 验证配置。 +``` +export JAVA_HOME=/usr/local/jdk-21.0.5+11 +export PATH=$PATH:$JAVA_HOME/bin +``` + +进入到 Kafka 目录下,新建目录 kafka-files,编辑配置文件 config/kraft/server.properties,修改 log.dirs=/usr/local/kafka_2.13-3.9.0/kafka-files。 + +初始化启动 Kafka +``` +KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" +bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties +bin/kafka-server-start.sh -daemon config/kraft/reconfig-server.properties +``` + + +创建 Topic +``` +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --create +Created topic quickstart-events. +``` + +查看 Topic 列表 +``` +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --list +quickstart-events +``` + +查看 Topic 详情 +``` +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --describe +Topic: quickstart-events TopicId: l9MVAfeWQ1am7pCdFHbdiA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 +Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr: +``` + +修改 Topic 的分区 +``` +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --alter --partitions 2 +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --describe +Topic: quickstart-events TopicId: l9MVAfeWQ1am7pCdFHbdiA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 +Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:  +Topic: quickstart-events Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr: +``` + +命令行消费消息 +``` +# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events +``` + +命令行生产消息 +``` +# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic quickstart-events +``` + +删除 Topic +``` +# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --delete +``` + +### 生产消费代码示例