Skip to content

Commit

Permalink
[doc](stream load) optimize stream load doc (#1330)
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Nov 13, 2024
1 parent 624a648 commit 84d8762
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 1,516 deletions.
479 changes: 6 additions & 473 deletions docs/data-operate/import/import-way/stream-load-manual.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,12 @@ Stream Load 支持通过 HTTP 协议将本地文件或数据流导入到 Doris
:::tip
提示

相比于直接使用 `curl` 的单并发导入,更推荐使用 专用导入工具 Doris Streamloader 该工具是一款用于将数据导入 Doris 数据库的专用客户端工具,可以提供**多并发导入**的功能,降低大数据量导入的耗时。拥有以下功能:

- 并发导入,实现 Stream Load 的多并发导入。可以通过 `workers` 值设置并发数。
- 多文件导入,一次导入可以同时导入多个文件及目录,支持设置通配符以及会自动递归获取文件夹下的所有文件。
- 断点续传,在导入过程中可能出现部分失败的情况,支持在失败点处进行继续传输。
- 自动重传,在导入出现失败的情况后,无需手动重传,工具会自动重传默认的次数,如果仍然不成功,打印出手动重传的命令。

点击 [Doris Streamloader 文档](../../../ecosystem/doris-streamloader) 了解使用方法与实践详情。
相比于直接使用 `curl` 的单并发导入,更推荐使用专用导入工具 Doris Streamloader。该工具是一款用于将数据导入 Doris 数据库的专用客户端工具,可以提供**多并发导入**的功能,降低大数据量导入的耗时。点击 [Doris Streamloader 文档](../../../ecosystem/doris-streamloader) 了解使用方法与实践详情。
:::

## 使用场景

### 支持格式

Stream Load 支持导入 CSV、JSON、Parquet 与 ORC 格式的数据。

### 使用限制
Stream Load 支持从本地或远程通过 HTTP 的方式导入 CSV、JSON、Parquet 与 ORC 格式的数据。

在导入 CSV 文件时,需要明确区分空值(null)与空字符串:

Expand All @@ -55,7 +44,7 @@ Stream Load 支持导入 CSV、JSON、Parquet 与 ORC 格式的数据。

## 基本原理

在使用 Stream Load 时,需要通过 HTTP 协议发起导入作业给 FE 节点,FE 会以轮询方式,重定向(redirect)请求给一个 BE 节点以达到负载均衡的效果。也可以直接发送 HTTP 请求作业给指定的 BE 节点。在 Stream Load 中,Doris 会选定一个节点做为 Coordinator 节点。Coordinator 节点负责接受数据并分发数据到其他节点上。
在使用 Stream Load 时,需要通过 HTTP 协议发起导入作业给 FE 节点,FE 会以轮询方式,重定向(redirect)请求给一个 BE 节点以达到负载均衡的效果。也可以直接发送 HTTP 请求作业给指定的 BE 节点。在 Stream Load 中,Doris 会选定一个节点作为 Coordinator 节点。Coordinator 节点负责接受数据并分发数据到其他节点上。

下图展示了 Stream Load 的主要流程:

Expand Down Expand Up @@ -87,7 +76,7 @@ Stream Load 需要对目标表的 INSERT 权限。如果没有 INSERT 权限,

1. 创建导入数据

创建 csv 文件 streamload_example.csv 文件。具体内容如下
创建 CSV 文件 streamload_example.csv 文件。具体内容如下

```sql
1,Emily,25
Expand Down Expand Up @@ -290,10 +279,6 @@ Stream Load 操作支持 HTTP 分块导入(HTTP chunked)与 HTTP 非分块

参数描述:Stream Load 默认的超时时间。导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。如果导入的源文件无法在规定时间内完成导入,用户可以在 Stream Load 请求中设置单独的超时时间。或者调整 FE 的参数`stream_load_default_timeout_second` 来设置全局的默认超时时间。

2. enable_pipeline_load

是否开启 Pipeline 引擎执行 Streamload 任务。详见[导入](../load-manual)文档。

**BE 配置**

1. streaming_load_max_mb
Expand Down Expand Up @@ -321,16 +306,16 @@ Stream Load 操作支持 HTTP 分块导入(HTTP chunked)与 HTTP 非分块
| strict_mode | 用户指定此次导入是否开启严格模式,默认为关闭。例如,指定开启严格模式,需要指定命令 `-H "strict_mode:true"`。 |
| timezone | 指定本次导入所使用的时区。默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。例如,指定导入时区为 Africa/Abidjan,需要指定命令 `-H "timezone:Africa/Abidjan"`。 |
| exec_mem_limit | 导入内存限制。默认为 2GB。单位为字节。 |
| format | 指定导入数据格式,默认是 CSV 格式。目前支持以下格式:csv, json, arrow, csv_with_names(支持 csv 文件行首过滤)csv_with_names_and_types(支持 csv 文件前两行过滤)parquet, orc 例如,指定导入数据格式为 json,需要指定命令 `-H "format:json"`。 |
| format | 指定导入数据格式,默认是 CSV 格式。目前支持以下格式:CSV, JSON, arrow, csv_with_names(支持 csv 文件行首过滤)csv_with_names_and_types(支持 CSV 文件前两行过滤)Parquet, ORC 例如,指定导入数据格式为 JSON,需要指定命令 `-H "format:json"`。 |
| jsonpaths | 导入 JSON 数据格式有两种方式:简单模式:没有指定 jsonpaths 为简单模式,这种模式要求 JSON 数据是对象类型匹配模式:用于 JSON 数据相对复杂,需要通过 jsonpaths 参数匹配对应的 value 在简单模式下,要求 JSON 中的 key 列与表中的列名是一一对应的,如 JSON 数据 {"k1":1, "k2":2, "k3":"hello"},其中 k1、k2 及 k3 分别对应表中的列。 |
| strip_outer_array | 指定 strip_outer_array 为 true 时表示 JSON 数据以数组对象开始且将数组对象中进行展平,默认为 false。在 JSON 数据的最外层是 [] 表示的数组时,需要设置 strip_outer_array 为 true。如以下示例数据,在设置 strip_outer_array 为 true 后,导入 Doris 中生成两行数据` [{"k1" : 1, "v1" : 2},{"k1" : 3, "v1" : 4}]` |
| json_root | json_root 为合法的 jsonpath 字符串,用于指定 json document 的根节点,默认值为 ""。 |
| merge_type | 数据的合并类型,一共支持三种类型 APPENDDELETE、MERGE;APPEND 是默认值,表示这批数据全部需要追加到现有数据中;DELETE 表示删除与这批数据 key 相同的所有行 MERGE 语义 需要与 DELETE 条件联合使用,表示满足 DELETE 条件的数据按照 DELETE 语义处理其余的按照 APPEND 语义处理例如,指定合并模式为 MERGE,需要指定命令`-H "merge_type: MERGE" -H "delete: flag=1"` |
| merge_type | 数据的合并类型,支持三种类型:<br/>- APPEND(默认值):表示这批数据全部追加到现有数据中<br/>- DELETE表示删除与这批数据 Key 相同的所有行<br/>- MERGE需要与 DELETE 条件联合使用,表示满足 DELETE 条件的数据按照 DELETE 语义处理,其余的按照 APPEND 语义处理<br/>例如,指定合并模式为 MERGE`-H "merge_type: MERGE" -H "delete: flag=1"` |
| delete | 仅在 MERGE 下有意义,表示数据的删除条件 |
| function_column.sequence_col | 只适用于 UNIQUE KEYS 模型,相同 Key 列下,保证 Value 列按照 source_sequence 列进行 REPLACE。source_sequence 可以是数据源中的列,也可以是表结构中的一列。 |
| fuzzy_parse | 布尔类型,为 true 表示 JSON 将以第一行为 schema 进行解析。开启这个选项可以提高 json 导入效率,但是要求所有 json 对象的 key 的顺序和第一行一致,默认为 false,仅用于 JSON 格式 |
| num_as_string | 布尔类型,为 true 表示在解析 JSON 数据时会将数字类型转为字符串,确保不会出现精度丢失的情况下进行导入。 |
| read_json_by_line | 布尔类型,为 true 表示支持每行读取一个 json 对象,默认值为 false。 |
| read_json_by_line | 布尔类型,为 true 表示支持每行读取一个 JSON 对象,默认值为 false。 |
| send_batch_parallelism | 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 |
| hidden_columns | 用于指定导入数据中包含的隐藏列,在 Header 中不包含 Columns 时生效,多个 hidden column 用逗号分割。系统会使用用户指定的数据导入数据。在下例中,导入数据中最后一列数据为 `__DORIS_SEQUENCE_COL__``hidden_columns: __DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__` |
| load_to_single_tablet | 布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 Tablet,默认值为 false。该参数只允许在对带有 random 分桶的 OLAP 表导数的时候设置。 |
Expand Down Expand Up @@ -453,7 +438,7 @@ curl --location-trusted -u root: -T test.csv -H "sql:insert into demo.example_
```Shell
curl --location-trusted -u <doris_user>:<doris_password> \
-H "Expect:100-continue" \
-H "timeout:3000"
-H "timeout:3000" \
-H "column_separator:," \
-H "columns:user_id,name,age" \
-T streamload_example.csv \
Expand Down Expand Up @@ -643,9 +628,9 @@ curl --location-trusted -u <doris_user>:<doris_password> \
curl --location-trusted -u <doris_user>:<doris_password> \
-H "Expect:100-continue" \
-H "merge_type: DELETE" \
-H "function_column.sequence_col: age"
-H "function_column.sequence_col: age" \
-H "column_separator:," \
-H "columns: name, gender, age"
-H "columns: name, gender, age" \
-T streamload_example.csv \
-XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load
```
Expand Down Expand Up @@ -865,7 +850,7 @@ curl --location-trusted -u <doris_user>:<doris_password> \

### 指定 JSON 根节点导入数据

如果 JSON 数据包含了嵌套 JSON 字段,需要指定导入 json 的根节点。默认值为“”。
如果 JSON 数据包含了嵌套 JSON 字段,需要指定导入 JSON 的根节点。默认值为“”。

如下列数据,期望将 comment 列中的数据导入到表中:

Expand Down Expand Up @@ -1015,7 +1000,7 @@ DISTRIBUTED BY HASH(typ_id,hou) BUCKETS 10;
```sql
curl --location-trusted -u <doris_user>:<doris_password> \
-H "Expect:100-continue" \
-H "columns:typ_id,hou,arr,arr=to_bitmap(arr)"
-H "columns:typ_id,hou,arr,arr=to_bitmap(arr)" \
-T streamload_example.csv \
-XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load
```
Expand Down Expand Up @@ -1059,13 +1044,9 @@ curl --location-trusted -u <doris_user>:<doris_password> \
-XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load
```

### Label、导入事务、多表原子性

Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。具体说明可以参阅 [导入事务和原子性](../../../data-operate/import/load-atomicity) 文档。

### 列映射、衍生列和过滤

Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和 UDF。关于如何正确的使用这个功能,可参阅 [数据转换](../../../data-operate/import/load-data-convert) 文档。
Doris 可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数。关于如何正确的使用这个功能,可参阅 [数据转换](../../../data-operate/import/load-data-convert) 文档。

### 启用严格模式导入

Expand Down
Loading

0 comments on commit 84d8762

Please sign in to comment.