Skip to content

Commit

Permalink
Update Flink-connector-starrocks.md
Browse files Browse the repository at this point in the history
Signed-off-by: hellolilyliuyi <[email protected]>
  • Loading branch information
hellolilyliuyi authored Apr 17, 2024
1 parent 810d876 commit fda2864
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions docs/zh/loading/Flink-connector-starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Flink connector JAR 文件的命名格式如下:
| sink.max-retries | No | 3 | Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 `sink.version` 为 `V1` 才会生效。 |
| sink.connect.timeout-ms | No | 30000 | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 Flink connector v1.2.9 之前,默认值为 `1000`。 |
| sink.wait-for-continue.timeout-ms | No | 10000 | 此参数自 Flink connector 1.2.7 开始支持。等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
| sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键表中视为 DELETE 操作。 |
| sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键模型表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键模型表中视为DELETE 操作。 |
| sink.parallelism | No | NONE | 写入的并行度。仅适用于Flink SQL。如果未设置, Flink planner 将决定并行度。**在多并行度的场景中,用户需要确保数据按正确顺序写入。** |
| sink.properties.* | No | NONE | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 [STREAM LOAD](../sql-reference/sql-statements/data-manipulation/STREAM_LOAD.md)。 |
| sink.properties.format | No | csv | Stream Load 导入时的数据格式。Flink connector 会将内存的数据转换为对应格式,然后通过 Stream Load 导入至 StarRocks。取值为 CSV 或者 JSON。 |
Expand Down Expand Up @@ -168,7 +168,7 @@ Flink connector JAR 文件的命名格式如下:

请注意,当您设置一个较大的值时,则建议指定 `sink.label-prefix` 的值,则 Flink connector 可以根据 label 前缀和检查点中的一些信息来清理未完成的事务,而不是因事务超时后由 StarRocks 清理(这可能会导致数据丢失)。

- `label_keep_max_second` 和 `label_keep_max_num`:StarRocks FE 参数,默认值分别为 `259200` 和 `1000`。更多信息,参见[FE 配置](./loading_introduction/loading_considerations.md#fe-配置)。`label_keep_max_second` 的值需要大于 Flink job 的停止时间。否则,Flink connector 无法使用保存在 Flink 的 savepoint 或 checkpoint 中的事务 label 来检查事务在 StarRocks 中的状态,并判断这些事务是否已提交,最终可能导致数据丢失。
- `label_keep_max_second` 和 `label_keep_max_num`:StarRocks FE 参数,默认值分别为 `259200` 和 `1000`。更多信息,参见[FE 配置](../loading/Loading_intro.md#fe-配置)。`label_keep_max_second` 的值需要大于 Flink job 的停止时间。否则,Flink connector 无法使用保存在 Flink 的 savepoint 或 checkpoint 中的事务 label 来检查事务在 StarRocks 中的状态,并判断这些事务是否已提交,最终可能导致数据丢失。

您可以使用 `ADMIN SET FRONTEND CONFIG` 修改上述配置。

Expand All @@ -180,7 +180,7 @@ Flink connector JAR 文件的命名格式如下:

### Flush 策略

Flink connector 先在内存中 buffer 数据,然后通过 Stream Load 将其一次性 flush 到 StarRocks。在 at-least-once 和 exactly-once 场景中使用不同的方式触发 flush
Flink connector 先在内存中 buffer 数据,然后通过 Stream Load 将其一次性 flush 到 StarRocks。在 at-least-once 和 exactly-once 场景中使用不同的方式触发 flush。

对于 at-least-once,在满足以下任何条件时触发 flush:

Expand Down Expand Up @@ -209,7 +209,7 @@ Flink connector 提供以下指标来监控导入情况。

#### 创建 StarRocks 表

创建数据库 `test`,并创建主键表 `score_board`。
创建数据库 `test`,并创建主键模型表 `score_board`。

```SQL
CREATE DATABASE test;
Expand Down Expand Up @@ -247,7 +247,7 @@ DISTRIBUTED BY HASH(id);
./bin/sql-client.sh
```

- 在 Flink SQL 客户端,创建一个表 `score_board`,并且插入数据。 注意,如果您想将数据导入到 StarRocks 主键表中,您必须在 Flink 表的 DDL 中定义主键。对于其他类型的 StarRocks 表,这是可选的。
- 在 Flink SQL 客户端,创建一个表 `score_board`,并且插入数据。 注意,如果您想将数据导入到 StarRocks 主键模型表中,您必须在 Flink 表的 DDL 中定义主键。对于其他类型的 StarRocks 表,这是可选的。

```sql
CREATE TABLE `score_board` (
Expand All @@ -272,7 +272,7 @@ DISTRIBUTED BY HASH(id);

根据 input records 的类型,编写对应 Flink DataStream 作业,例如 input records 为 CSV 格式的 Java `String`、JSON 格式的 Java `String` 或自定义的 Java 对象。

- 如果 input records 为 CSV 格式的 `String`,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见 [LoadCsvRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/cd8086cfedc64d5181785bdf5e89a847dc294c1d/examples/src/main/java/com/starrocks/connector/flink/examples/datastream)
- 如果 input records 为 CSV 格式的 `String`,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见 [LoadCsvRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCsvRecords.java)。

```Java
/**
Expand Down Expand Up @@ -307,7 +307,7 @@ DISTRIBUTED BY HASH(id);
source.addSink(starRockSink);
```

- 如果 input records 为 JSON 格式的 `String`,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见[LoadJsonRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/cd8086cfedc64d5181785bdf5e89a847dc294c1d/examples/src/main/java/com/starrocks/connector/flink/examples/datastream)
- 如果 input records 为 JSON 格式的 `String`,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见[LoadJsonRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadJsonRecords.java)。

```Java
/**
Expand Down Expand Up @@ -340,7 +340,7 @@ DISTRIBUTED BY HASH(id);
source.addSink(starRockSink);
```

- 如果 input records 为自定义的 Java 对象,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见[LoadCustomJavaRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/cd8086cfedc64d5181785bdf5e89a847dc294c1d/examples/src/main/java/com/starrocks/connector/flink/examples/datastream)
- 如果 input records 为自定义的 Java 对象,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见[LoadCustomJavaRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java)。

- 本示例中,input record 是一个简单的 POJO `RowData`

Expand Down Expand Up @@ -431,17 +431,15 @@ DISTRIBUTED BY HASH(id);
快速上手教程可以参考[从 MySQL 到 StarRocks 的流式 ELT 管道](https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/quickstart/mysql-to-starrocks)。
建议您使用 StarRocks v3.2.1 及以后的版本,以开启 [fast_schema_evolution](../sql-reference/sql-statements/data-definition/CREATE_TABLE.md#设置-fast-schema-evolution),来提高加减列的速度并降低资源使用。
## 最佳实践
### 导入至主键表
### 导入至主键模型表
本节将展示如何将数据导入到 StarRocks 主键表中,以实现部分更新和条件更新。以下示例使用 Flink SQL。 部分更新和条件更新的更多介绍,请参见[通过导入实现数据变更](./Load_to_Primary_Key_tables.md)。
本节将展示如何将数据导入到 StarRocks 主键模型表中,以实现部分更新和条件更新。以下示例使用 Flink SQL。 部分更新和条件更新的更多介绍,请参见[通过导入实现数据变更](./Load_to_Primary_Key_tables.md)。
#### 准备工作
在StarRocks中创建一个名为`test`的数据库,并在其中创建一个名为`score_board`的主键表
在StarRocks中创建一个名为`test`的数据库,并在其中创建一个名为`score_board`的主键模型表
```sql
CREATE DATABASE `test`;
Expand Down

0 comments on commit fda2864

Please sign in to comment.