Skip to content

Commit

Permalink
更新 Kafka 笔记。
Browse files Browse the repository at this point in the history
  • Loading branch information
jacky committed Dec 1, 2024
1 parent 2abcc72 commit 70ee61e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 32 deletions.
45 changes: 23 additions & 22 deletions .obsidian/workspace.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
"state": {
"type": "markdown",
"state": {
"file": "docs/MySQL/MySQL 配置主从.md",
"file": "docs/Kafka/Kafka 快速开始.md",
"mode": "source",
"source": false
},
"icon": "lucide-file",
"title": "MySQL 配置主从"
"title": "Kafka 快速开始"
}
}
]
Expand Down Expand Up @@ -52,7 +52,7 @@
"state": {
"type": "search",
"state": {
"query": "atmos",
"query": "VIM",
"matchingCase": false,
"explainSearch": false,
"collapseAll": false,
Expand Down Expand Up @@ -139,61 +139,62 @@
"state": {
"type": "outline",
"state": {
"file": "docs/MySQL/MySQL 配置主从.md"
"file": "docs/Kafka/Kafka Tips.md"
},
"icon": "lucide-list",
"title": "MySQL 配置主从 的大纲"
"title": "Kafka Tips 的大纲"
}
}
],
"currentTab": 3
}
],
"direction": "horizontal",
"width": 300
"width": 300,
"collapsed": true
},
"left-ribbon": {
"hiddenItems": {
"vscode-editor:新建代码文件": false,
"switcher:打开快速切换": false,
"graph:查看关系图谱": false,
"canvas:新建白板": false,
"daily-notes:打开/创建今天的日记": false,
"templates:插入模板": false,
"command-palette:打开命令面板": false
"command-palette:打开命令面板": false,
"vscode-editor:新建代码文件": false
}
},
"active": "0743fed77e619d96",
"lastOpenFiles": [
"docs/Kafka/Kafka Tips.md",
"docs/Kafka/Kafka 快速开始.md",
"docs/Kafka/Kafka 集群搭建.md",
"docs/Kafka/Kafka 消费者.md",
"docs/Kafka/Kafka 常用命令.md",
"docs/Linux/Nginx 使用自签名证书.md",
"docs/Linux/Linux Tips.md",
"docs/Linux/Debian 软件安装.md",
"docs/Linux/VIM.md",
"docs/Server/本地服务器192.168.31.11.md",
"docs/Kafka/保证消息不丢失.md",
"docs/Doris/Doris 接入 Kafka 的数据.md",
"mkdocs.yml",
"docs/MySQL/MySQL 配置主从.md",
"docs/MySQL/MySQL 备份.md",
"docs/MySQL/MySQL Tips.md",
"docs/Doris/Doris 接入 MySQL 的数据.md",
"docs/Doris/Doris 接入 Kafka 的数据.md",
"docs/Doris/Doris 分区表.md",
"docs/Doris/Doris 日期时间类型.md",
"docs/Doris/Doris 用户角色权限.md",
"docs/DM/达梦安装.md",
"docs/DM/达梦备份.md",
"docs/DM/达梦 Tips.md",
"docs/Kafka/Kafka 集群搭建.md",
"docs/Kafka/Kafka 常用命令.md",
"docs/Kafka/Kafka 消费者.md",
"docs/Java/Tips.md",
"docs/Java/重复提交.md",
"未命名.md",
"docs/Docker/安装 Docker.md",
"docs/Docker/镜像操作.md",
"docs/Arts/满江红-岳飞.md",
"docs/Arts/大观楼长联-孙髯.md",
"docs/Arts/侠客行-李白.md",
"docs/Algorithms/求解算数表达式的值.md",
"docs/Algorithms/字符串匹配算法.md",
"docs/Algorithms/Base64 算法.md",
"overrides/partials/comments.html",
"overrides/partials/footer.html",
"template/Doris.md",
"docs/OS/线程.md",
"mkdocs.yml",
"docs/LocalFile/Picture/ReentrantReadWriteLock类图.svg",
"docs/LocalFile/Picture/ReentrantLock类图.svg",
"docs/LocalFile/Picture/ReentrantLock类图.png",
Expand Down
41 changes: 33 additions & 8 deletions docs/Doris/Doris 接入 Kafka 的数据.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User
```

### 创建唯一模型
```
```sql
CREATE TABLE test_json_load(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
Expand All @@ -15,12 +15,10 @@ DISTRIBUTED BY HASH(user_id) BUCKETS 1;
```

### 创建导入作业
```
CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON
`quickstart_events`
```sql
CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON `quickstart_events`
PROPERTIES
(
"desired_concurrent_number" = "3",
"format" = "json",
"strict_mode" = "false"
)
Expand All @@ -40,7 +38,7 @@ KAFKA

### 往 Kafka 中写入数据测试
值为 null 或者日期格式都兼容,unique key 模式也能唯一去重保留新的数据。
```
```json
{ "user_id" : 1, "name" : "Benjamin", "born_time": "2023-01-01 00:00:00" }
{ "user_id" : 2, "name" : "Emily", "born_time": "20240101122323" }
{ "user_id" : 3, "name" : "Emily", "born_time": "20240101122323" }
Expand All @@ -57,5 +55,32 @@ show routine load 能够查询导入作业的状态。

测试完毕之后可以通过 pause 暂停导入任务:`pause routine load for task_name;`

## 调优
Kafka 多

## 案例调优

前端上传的数据日期格式是对的,但是值不对,比如 0024-01-01 00:00:00,这种数据由于没有对应的分区表,导致 Routine Load 导入任务暂停。

可以通过修改 Routine Load 导入任务,从指定的 Kafka Topic 分区和偏移开始消费数据,但是这样定位脏数据的位置比较麻烦。

或者,可以通过修改 Routine Load 导入任务 load_properties 的 WHERE 子模块,来过滤掉不符合条件的数据。
```sql
CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON `quickstart_events`
WHERE born_time > '1900-01-01 00:00:00'
PROPERTIES
(
"format" = "json",
"strict_mode" = "false"
)
FROM
KAFKA
(
"kafka_broker_list" = "172.31.8.164:9092,172.31.8.165:9092,172.31.8.166:9092",
"kafka_topic" = "quickstart-events",
"property.group.id" = "xxxxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "xxxx",
"property.sasl.password" = "xxxx"
);
```
3 changes: 3 additions & 0 deletions docs/Kafka/Kafka Tips.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

为什么要使用 Kafka。
1. 解耦,将进程之间的直接交互调整为统一面向 Kafka Topic 的标准对接。
9 changes: 7 additions & 2 deletions docs/Kafka/Kafka 常用命令.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ tags:

创建 topic
```
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic_name --command-config admin-jass
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic_name> --create
```

查看所有的 topic
Expand All @@ -17,13 +17,18 @@ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查看 topic 的信息
```
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topic_name>
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic_name> --describe
```

查看 topic 每个分区的 offset
```
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic_name>
```

修改 topic 的分区数量
```
bin/kafka
```
## Consumer

控制台消费者
Expand Down
69 changes: 69 additions & 0 deletions docs/Kafka/Kafka 快速开始.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
### Kafka 安装
准备两个文件
kafka_2.13-3.9.0.tgz
OpenJDK21U-jdk_aarch64_linux_hotspot_21.0.5_11.tar.gz

解压缩
tar -zxvf OpenJDK21U-jdk_aarch64_linux_hotspot_21.0.5_11.tar.gz -C /usr/local/
tar -zxvf kafka_2.13-3.9.0.tgz -C /usr/local/

配置 Java 环境 /etc/profile.d/java.sh, 执行 source /etc/profile,java -version 验证配置。
```
export JAVA_HOME=/usr/local/jdk-21.0.5+11
export PATH=$PATH:$JAVA_HOME/bin
```

进入到 Kafka 目录下,新建目录 kafka-files,编辑配置文件 config/kraft/server.properties,修改 log.dirs=/usr/local/kafka_2.13-3.9.0/kafka-files。

初始化启动 Kafka
```
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
bin/kafka-server-start.sh -daemon config/kraft/reconfig-server.properties
```


创建 Topic
```
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --create
Created topic quickstart-events.
```

查看 Topic 列表
```
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
quickstart-events
```

查看 Topic 详情
```
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --describe
Topic: quickstart-events TopicId: l9MVAfeWQ1am7pCdFHbdiA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
```

修改 Topic 的分区
```
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --alter --partitions 2
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --describe
Topic: quickstart-events TopicId: l9MVAfeWQ1am7pCdFHbdiA PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr: 
Topic: quickstart-events Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
```

命令行消费消息
```
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events
```

命令行生产消息
```
# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic quickstart-events
```

删除 Topic
```
# bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --delete
```

### 生产消费代码示例

0 comments on commit 70ee61e

Please sign in to comment.