Skip to content

Commit

Permalink
更新笔记。
Browse files Browse the repository at this point in the history
  • Loading branch information
jacky committed Mar 3, 2025
1 parent 75cd83b commit d23795f
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 12 deletions.
17 changes: 9 additions & 8 deletions .obsidian/workspace.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
"state": {
"type": "markdown",
"state": {
"file": "docs/Kafka/Kafka 快速开始.md",
"file": "docs/Doris/Doris 接入 Kafka 的数据.md",
"mode": "source",
"source": false
},
"icon": "lucide-file",
"title": "Kafka 快速开始"
"title": "Doris 接入 Kafka 的数据"
}
}
]
Expand Down Expand Up @@ -140,13 +140,13 @@
"state": {
"type": "outline",
"state": {
"file": "docs/Kafka/Kafka 快速开始.md",
"file": "docs/Doris/Doris 接入 Kafka 的数据.md",
"followCursor": false,
"showSearch": false,
"searchQuery": ""
},
"icon": "lucide-list",
"title": "Kafka 快速开始 的大纲"
"title": "Doris 接入 Kafka 的数据 的大纲"
}
}
],
Expand All @@ -169,9 +169,13 @@
},
"active": "3f2894de142cae47",
"lastOpenFiles": [
"docs/Flink/未命名.md",
"docs/Flink/快速开始.md",
"docs/MySQL/MySQL 安装.md",
"docs/Java/反射和动态代理.md",
"docs/Kafka/Kafka 快速开始.md",
"docs/Kafka/Kafka 常用命令.md",
"docs/Kafka/Kafka 集群搭建.md",
"docs/Kafka/Kafka 快速开始.md",
"mkdocs.yml",
"docs/Kafka/Kafka 保证消息不丢失.md",
"docs/Kafka/Kafka 配置.md",
Expand All @@ -187,14 +191,11 @@
"docs/Doris/Doris 接入 MySQL 的数据.md",
"docs/Doris/Doris 分区表.md",
"docs/Java/虚拟机类加载机制.md",
"docs/Java/反射和动态代理.md",
"docs/Java/日期时间操作.md",
"docs/Java/重复提交.md",
"docs/Java/MyBatis 快速入门.md",
"docs/Java/Java 并发.md",
"docs/Java/Java 打包.md",
"docs/Java/Java 队列.md",
"docs/Java/NIO Tips.md",
"docs/stylesheets/extra.css",
"overrides/partials/footer.html",
"overrides/partials/comments.html",
Expand Down
9 changes: 7 additions & 2 deletions docs/Doris/Doris 接入 Kafka 的数据.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ show routine load 能够查询导入作业的状态。
可以通过修改 Routine Load 导入任务,从指定的 Kafka Topic 分区和偏移开始消费数据,但是这样定位脏数据的位置比较麻烦。

或者,可以通过修改 Routine Load 导入任务 load_properties 的 WHERE 子模块,来过滤掉不符合条件的数据。

目前还不支持直接修改 RoutineLoad 增加 Where 子句,所以要 STOP RoutineLoad, 然后在重新创建。重新创建时,可以使用 kafka_partitions 和 kafka_offsets 指定每个分区的起始消费 offset,来避免重复或者遗漏消费数据。
```sql
CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON `quickstart_events`
WHERE born_time > '1900-01-01 00:00:00'
Expand All @@ -80,15 +82,18 @@ PROPERTIES (
FROM KAFKA (
"kafka_broker_list" = "172.31.8.164:9092,172.31.8.165:9092,172.31.8.166:9092",
"kafka_topic" = "quickstart-events",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "1239,1235,1237",
"property.group.id" = "xxxxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "xxxx",
"property.sasl.password" = "xxxx");
```

参考:https://doris.apache.org/zh-CN/docs/2.0/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD
参考:
https://doris.apache.org/zh-CN/docs/2.0/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD。
https://doris.apache.org/zh-CN/docs/sql-manual/sql-statements/data-modification/load-and-export/ALTER-ROUTINE-LOAD


导入任务报错 TOO_MANY_TASKS,参考官方公众号“2.0.9和2.1.3之前都存在已知的bug导致TOO_MANY_TASKS的问题”。实际使用下来 2.0.12 也存在这个问题,解决方案是升级至 2.0.15 或者 2.1.8,最终 2.0.12 直接升级到 2.1.8 解决报错的问题。升级参考:
2 changes: 1 addition & 1 deletion docs/Flink/快速开始.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ JobManager 调度作业,默认进程为 org.apache.flink.runtime.entrypoint.St

TaskManager 执行作业,连接到 JobManager 宣布自己可用,并被分配工作。默认进程为 org.apache.flink.runtime.taskexecutor.TaskManagerRunner。

启动 Flink 服务,Web 地址默认为 http://localhost:8081/#/overview
启动 Flink 服务,Web 地址默认为 http://localhost:8081/#/overview 默认只有 localhost 访问,编辑配置文件 rest.bin-address 绑定 0.0.0.0 即可。
```
./bin/start-cluster.sh
```
Expand Down
15 changes: 15 additions & 0 deletions docs/Flink/未命名.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## Hadoop
### HDFS
NameNode 存储文件的元数据,记录文件属性以及文件块所在的 DataNode 信息。
DataNode 具体存储数据及校验和。
SecondaryNameNode NameNode 的备份节点。
### YARN
ResourceManager 管理所有集群的资源。处理客户端的请求,监控 NodeManager,启动监控 ApplicationMaster。
NodeManager 管理单个节点的资源。处理来自 ResourceManager 的命令,处理来自 ApplicationMaster 的命令。
ApplicationMaster 负责管理单个运行任务。
Container 任务运行的容器。
### MapReduce

## Flink
JobManager 对作业进行调度,分发给 TaskManager。
TaskManager 指定任务。
52 changes: 51 additions & 1 deletion docs/Java/反射和动态代理.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,54 @@ java -jar arthas-boot.jar
> sc *otto*
```

结论:程序启动后默认会加载 App 类,输入 one 之后加载 TestLoadOne 类,输入 two 之后加载 TestLoadTwo 类。
结论:程序启动后默认会加载 App 类,输入 one 之后加载 TestLoadOne 类,输入 two 之后加载 TestLoadTwo 类。

获取类的所有方法,不包括 Object 集成而来的方法。
```java
private Method[] getClassMethods(Class<?> clazz) {
Map<String, Method> uniqueMethods = new HashMap<>();
Class<?> currentClass = clazz;
while (currentClass != null && currentClass != Object.class) {
// 由于 getMethods() 方法只会返回 public 修饰的方法,
// 所以这里使用 getDeclaredMethods() + 循环的方式获取所有的方法
addUniqueMethods(uniqueMethods, currentClass.getDeclaredMethods());

// 获取所有实现接口的方法接口都是 public 的
Class<?>[] interfaces = currentClass.getInterfaces();
for (Class<?> anInterface : interfaces) {
addUniqueMethods(uniqueMethods, anInterface.getMethods());
}

currentClass = currentClass.getSuperclass();
}

Collection<Method> methods = uniqueMethods.values();

return methods.toArray(new Method[0]);
}

private void addUniqueMethods(Map<String, Method> uniqueMethods, Method[] methods) {
for (Method currentMethod : methods) {
// 桥接方法是编译器在类型擦除时确保类型安全生成的方法,忽略掉
if (!currentMethod.isBridge()) {
String signature = getSignature(currentMethod);
if (!uniqueMethods.containsKey(signature)) {
uniqueMethods.put(signature, currentMethod);
}
}
}
}

// 生成方法签名,类似
private String getSignature(Method method) {
StringBuilder sb = new StringBuilder();
Class<?> returnType = method.getReturnType();
sb.append(returnType.getName()).append('#');
sb.append(method.getName());
Class<?>[] parameters = method.getParameterTypes();
for (int i = 0; i < parameters.length; i++) {
sb.append(i == 0 ? ':' : ',').append(parameters[i].getName());
}
return sb.toString();
}
```

0 comments on commit d23795f

Please sign in to comment.