diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 35218bcc..26c5012d 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -13,12 +13,12 @@ "state": { "type": "markdown", "state": { - "file": "docs/Kafka/Kafka 快速开始.md", + "file": "docs/Doris/Doris 接入 Kafka 的数据.md", "mode": "source", "source": false }, "icon": "lucide-file", - "title": "Kafka 快速开始" + "title": "Doris 接入 Kafka 的数据" } } ] @@ -140,13 +140,13 @@ "state": { "type": "outline", "state": { - "file": "docs/Kafka/Kafka 快速开始.md", + "file": "docs/Doris/Doris 接入 Kafka 的数据.md", "followCursor": false, "showSearch": false, "searchQuery": "" }, "icon": "lucide-list", - "title": "Kafka 快速开始 的大纲" + "title": "Doris 接入 Kafka 的数据 的大纲" } } ], @@ -169,9 +169,13 @@ }, "active": "3f2894de142cae47", "lastOpenFiles": [ + "docs/Flink/未命名.md", + "docs/Flink/快速开始.md", + "docs/MySQL/MySQL 安装.md", + "docs/Java/反射和动态代理.md", + "docs/Kafka/Kafka 快速开始.md", "docs/Kafka/Kafka 常用命令.md", "docs/Kafka/Kafka 集群搭建.md", - "docs/Kafka/Kafka 快速开始.md", "mkdocs.yml", "docs/Kafka/Kafka 保证消息不丢失.md", "docs/Kafka/Kafka 配置.md", @@ -187,14 +191,11 @@ "docs/Doris/Doris 接入 MySQL 的数据.md", "docs/Doris/Doris 分区表.md", "docs/Java/虚拟机类加载机制.md", - "docs/Java/反射和动态代理.md", "docs/Java/日期时间操作.md", "docs/Java/重复提交.md", "docs/Java/MyBatis 快速入门.md", "docs/Java/Java 并发.md", "docs/Java/Java 打包.md", - "docs/Java/Java 队列.md", - "docs/Java/NIO Tips.md", "docs/stylesheets/extra.css", "overrides/partials/footer.html", "overrides/partials/comments.html", diff --git "a/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" "b/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" index 78152c08..e3448b37 100644 --- "a/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" +++ "b/docs/Doris/Doris \346\216\245\345\205\245 Kafka \347\232\204\346\225\260\346\215\256.md" @@ -71,6 +71,8 @@ show routine load 能够查询导入作业的状态。 可以通过修改 Routine Load 导入任务,从指定的 Kafka Topic 分区和偏移开始消费数据,但是这样定位脏数据的位置比较麻烦。 或者,可以通过修改 Routine Load 导入任务 load_properties 的 WHERE 子模块,来过滤掉不符合条件的数据。 + +目前还不支持直接修改 RoutineLoad 增加 Where 子句,所以要 STOP RoutineLoad, 然后在重新创建。重新创建时,可以使用 kafka_partitions 和 kafka_offsets 指定每个分区的起始消费 offset,来避免重复或者遗漏消费数据。 ```sql CREATE ROUTINE LOAD `yunlu_iot_data`.`kafka-quickstart_events` ON `quickstart_events` WHERE born_time > '1900-01-01 00:00:00' @@ -80,15 +82,18 @@ PROPERTIES ( FROM KAFKA ( "kafka_broker_list" = "172.31.8.164:9092,172.31.8.165:9092,172.31.8.166:9092", "kafka_topic" = "quickstart-events", + "kafka_partitions" = "0,1,2", + "kafka_offsets" = "1239,1235,1237", "property.group.id" = "xxxxx", - "property.kafka_default_offsets" = "OFFSET_BEGINNING", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.mechanism" = "PLAIN", "property.sasl.username" = "xxxx", "property.sasl.password" = "xxxx"); ``` -参考:https://doris.apache.org/zh-CN/docs/2.0/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD +参考: +https://doris.apache.org/zh-CN/docs/2.0/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD。 +https://doris.apache.org/zh-CN/docs/sql-manual/sql-statements/data-modification/load-and-export/ALTER-ROUTINE-LOAD 导入任务报错 TOO_MANY_TASKS,参考官方公众号“2.0.9和2.1.3之前都存在已知的bug导致TOO_MANY_TASKS的问题”。实际使用下来 2.0.12 也存在这个问题,解决方案是升级至 2.0.15 或者 2.1.8,最终 2.0.12 直接升级到 2.1.8 解决报错的问题。升级参考: \ No newline at end of file diff --git "a/docs/Flink/\345\277\253\351\200\237\345\274\200\345\247\213.md" "b/docs/Flink/\345\277\253\351\200\237\345\274\200\345\247\213.md" index ccb71215..3be48ce8 100644 --- "a/docs/Flink/\345\277\253\351\200\237\345\274\200\345\247\213.md" +++ "b/docs/Flink/\345\277\253\351\200\237\345\274\200\345\247\213.md" @@ -97,7 +97,7 @@ JobManager 调度作业,默认进程为 org.apache.flink.runtime.entrypoint.St TaskManager 执行作业,连接到 JobManager 宣布自己可用,并被分配工作。默认进程为 org.apache.flink.runtime.taskexecutor.TaskManagerRunner。 -启动 Flink 服务,Web 地址默认为 http://localhost:8081/#/overview +启动 Flink 服务,Web 地址默认为 http://localhost:8081/#/overview 默认只有 localhost 访问,编辑配置文件 rest.bin-address 绑定 0.0.0.0 即可。 ``` ./bin/start-cluster.sh ``` diff --git "a/docs/Flink/\346\234\252\345\221\275\345\220\215.md" "b/docs/Flink/\346\234\252\345\221\275\345\220\215.md" new file mode 100644 index 00000000..7c6c9846 --- /dev/null +++ "b/docs/Flink/\346\234\252\345\221\275\345\220\215.md" @@ -0,0 +1,15 @@ +## Hadoop +### HDFS +NameNode 存储文件的元数据,记录文件属性以及文件块所在的 DataNode 信息。 +DataNode 具体存储数据及校验和。 +SecondaryNameNode NameNode 的备份节点。 +### YARN +ResourceManager 管理所有集群的资源。处理客户端的请求,监控 NodeManager,启动监控 ApplicationMaster。 +NodeManager 管理单个节点的资源。处理来自 ResourceManager 的命令,处理来自 ApplicationMaster 的命令。 +ApplicationMaster 负责管理单个运行任务。 +Container 任务运行的容器。 +### MapReduce + +## Flink +JobManager 对作业进行调度,分发给 TaskManager。 +TaskManager 指定任务。 \ No newline at end of file diff --git "a/docs/Java/\345\217\215\345\260\204\345\222\214\345\212\250\346\200\201\344\273\243\347\220\206.md" "b/docs/Java/\345\217\215\345\260\204\345\222\214\345\212\250\346\200\201\344\273\243\347\220\206.md" index f818ec56..de004e0d 100644 --- "a/docs/Java/\345\217\215\345\260\204\345\222\214\345\212\250\346\200\201\344\273\243\347\220\206.md" +++ "b/docs/Java/\345\217\215\345\260\204\345\222\214\345\212\250\346\200\201\344\273\243\347\220\206.md" @@ -287,4 +287,54 @@ java -jar arthas-boot.jar > sc *otto* ``` -结论:程序启动后默认会加载 App 类,输入 one 之后加载 TestLoadOne 类,输入 two 之后加载 TestLoadTwo 类。 \ No newline at end of file +结论:程序启动后默认会加载 App 类,输入 one 之后加载 TestLoadOne 类,输入 two 之后加载 TestLoadTwo 类。 + +获取类的所有方法,不包括 Object 集成而来的方法。 +```java + private Method[] getClassMethods(Class clazz) { + Map uniqueMethods = new HashMap<>(); + Class currentClass = clazz; + while (currentClass != null && currentClass != Object.class) { + // 由于 getMethods() 方法只会返回 public 修饰的方法, + // 所以这里使用 getDeclaredMethods() + 循环的方式获取所有的方法 + addUniqueMethods(uniqueMethods, currentClass.getDeclaredMethods()); + + // 获取所有实现接口的方法接口都是 public 的 + Class[] interfaces = currentClass.getInterfaces(); + for (Class anInterface : interfaces) { + addUniqueMethods(uniqueMethods, anInterface.getMethods()); + } + + currentClass = currentClass.getSuperclass(); + } + + Collection methods = uniqueMethods.values(); + + return methods.toArray(new Method[0]); + } + + private void addUniqueMethods(Map uniqueMethods, Method[] methods) { + for (Method currentMethod : methods) { + // 桥接方法是编译器在类型擦除时确保类型安全生成的方法,忽略掉 + if (!currentMethod.isBridge()) { + String signature = getSignature(currentMethod); + if (!uniqueMethods.containsKey(signature)) { + uniqueMethods.put(signature, currentMethod); + } + } + } + } + + // 生成方法签名,类似 + private String getSignature(Method method) { + StringBuilder sb = new StringBuilder(); + Class returnType = method.getReturnType(); + sb.append(returnType.getName()).append('#'); + sb.append(method.getName()); + Class[] parameters = method.getParameterTypes(); + for (int i = 0; i < parameters.length; i++) { + sb.append(i == 0 ? ':' : ',').append(parameters[i].getName()); + } + return sb.toString(); + } +``` \ No newline at end of file