Skip to content

Commit

Permalink
refactor and remove non-mysql cdc source
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 20, 2025
1 parent 35ec8ea commit 71fc043
Show file tree
Hide file tree
Showing 49 changed files with 1,525 additions and 4,112 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This repository contains connectors as following:
| Flink Connector: OceanBase | This Connector uses the JDBC driver supported by OceanBase to write data to OceanBase, and supports MySQL and Oracle compatibility modes. | [Sink](docs/sink/flink-connector-oceanbase.md) |
| Flink Connector: OceanBase Direct Load | This Connector uses the [direct load](https://en.oceanbase.com/docs/common-oceanbase-database-10000000001375568) API to write data to OceanBase. | [Sink](docs/sink/flink-connector-oceanbase-directload.md) |
| Flink Connector: OBKV HBase | This Connector uses the [OBKV HBase API](https://github.com/oceanbase/obkv-hbase-client-java) to write data to OceanBase. | [Sink](docs/sink/flink-connector-obkv-hbase.md) |
| Flink Connector: Cli | This command line tool to migrate data to OceanBase. | [Sink](docs/sink/flink-connector-oceanbase-cli.md) |

### Other External Projects

Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
| Flink Connector: OceanBase | 该Connector通过OceanBase支持的JDBC驱动将数据写入OceanBase,支持MySQL 和 Oracle 兼容模式。 | [Sink](docs/sink/flink-connector-oceanbase_cn.md) |
| Flink Connector: OceanBase Direct Load | 该Connector通过[旁路导入](https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000001428636)API将数据写入OceanBase。 | [Sink](docs/sink/flink-connector-oceanbase-directload_cn.md) |
| Flink Connector: OBKV HBase | 该Connector通过[OBKV HBase API](https://github.com/oceanbase/obkv-hbase-client-java)将数据写入OceanBase。 | [Sink](docs/sink/flink-connector-obkv-hbase_cn.md) |
| Flink Connector: Cli | 从其他数据源迁移数据到 OceanBase 的命令行工具。 | [Sink](docs/sink/flink-connector-oceanbase-cli_cn.md) |

### 其他外部项目

Expand Down
222 changes: 157 additions & 65 deletions docs/sink/flink-connector-oceanbase-cli.md

Large diffs are not rendered by default.

222 changes: 157 additions & 65 deletions docs/sink/flink-connector-oceanbase-cli_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,41 @@ mvn clean package -DskipTests
然后在 MySQL 数据库中准备表和数据。

```mysql
use test_db;
CREATE TABLE test_history_str (
itemid bigint NOT NULL,
clock integer DEFAULT '0' NOT NULL,
value varchar(255) DEFAULT '' NOT NULL,
ns integer DEFAULT '0' NOT NULL,
PRIMARY KEY (itemid,clock,ns)
USE test_db;

CREATE TABLE products
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT
);
CREATE TABLE test_history_text (
itemid bigint NOT NULL,
clock integer DEFAULT '0' NOT NULL,
value text NOT NULL,
ns integer DEFAULT '0' NOT NULL,
PRIMARY KEY (itemid,clock,ns)
);

INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
(1,21131,'ces1',21321);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
(default, "car battery", "12V car battery", 8.1),
(default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
(default, "hammer", "12oz carpenter's hammer", 0.75),
(default, "hammer", "14oz carpenter's hammer", 0.875),
(default, "hammer", "16oz carpenter's hammer", 1.0),
(default, "rocks", "box of assorted rocks", 5.3),
(default, "jacket", "water resistent black wind breaker", 0.1),
(default, "spare tire", "24 inch spare tire", 22.2);

CREATE TABLE customers
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT = 1001;

INSERT INTO customers
VALUES (default, "Sally", "Thomas", "[email protected]"),
(default, "George", "Bailey", "[email protected]"),
(default, "Edward", "Walker", "[email protected]"),
(default, "Anne", "Kretchmar", "[email protected]");
```

##### 通过 CLI 提交作业
Expand All @@ -65,14 +82,15 @@ $FLINK_HOME/bin/flink run \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.CdcCli \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-sync-database \
mysql-cdc \
--database test_db \
--mysql-conf hostname=xxxx \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=xxxx \
--mysql-conf database-name=test_db \
--including-tables "tbl1|test.*" \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--including-tables ".*" \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx
Expand All @@ -82,49 +100,123 @@ $FLINK_HOME/bin/flink run \

##### 检查和验证

检查目标 OceanBase 数据库,你应该找到这两个表和一行数据。

你可以继续将测试数据插入到 MySQL 数据库,如下所示:

```sql
INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES
(1,2,'ces1',1123);
INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
(2,21321,'ces2',12321);
```
检查目标 OceanBase 数据库,你应该找到这两个表和多行数据。

由于是CDC任务,MySQL中插入数据后,即可在 OceanBase 中查询验证同步过来的数据。
你可以继续将测试数据插入到 MySQL 数据库,由于是CDC任务,MySQL中插入数据后,即可在 OceanBase 中查询验证同步过来的数据。

#### 配置项

| 配置项 | 描述 |
|------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| {identifier} | 数据源标识,目前经过验证的只有 MySQL 源端 `mysql-sync-database`|
| --job-name | Flink 任务名称,非必需。 |
| --database | 同步到 OceanBase 的数据库名。 |
| --table-prefix | OceanBase表前缀名,例如 --table-prefix ods_|
| --table-suffix | 同上,OceanBase表的后缀名。 |
| --including-tables | 需要同步的 MySQL 表,可以使用|分隔多个表,并支持正则表达式。比如--including-tables table1。 |
| --excluding-tables | 不需要同步的表,用法同上。 |
| --mysql-conf | MySQL CDCSource 配置,其中 hostname/username/password/database-name 是必需的。同步的库表中含有非主键表时,必须设置scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。<br/>例如:`scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`,不同的库表列之间用`,`隔开。 |
| --oracle-conf | Oracle CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 |
| --postgres-conf | Postgres CDCSource 配置,其中 hostname/username/password/database-name/schema-name/slot.name 是必需的。 |
| --sqlserver-conf | SQLServer CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 |
| --db2-conf | SQLServer CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 |
| --sink-conf | 见下面--sink-conf的配置项。 |
| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 OceanBase 时,字段有默认值,但实际插入数据为 null 情况。 |
| --create-table-only | 是否只仅仅同步表的结构。 |

`--sink-conf` 配置项:

| 配置项 | 默认值 | 是否需要 | 描述 |
|----------|-----|------|-----------------------------------------|
| url | -- | N | jdbc 连接信息,如:jdbc:mysql://127.0.0.1:2881 |
| username | -- | Y | 访问 oceanbase 的用户名 |
| password | -- | Y | 访问 oceanbase 的密码 |

## 参考信息

- [https://github.com/oceanbase/obkv-table-client-java](https://github.com/oceanbase/obkv-table-client-java)
- [https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector)
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 10%">参数</th>
<th class="text-left" style="width: 5%">是否必需</th>
<th class="text-left" style="width: 15%">类型</th>
<th class="text-left" style="width: 10%">默认值</th>
<th class="text-left" style="width: 50%">说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>${job-type}</td>
<td>是</td>
<td>枚举值</td>
<td style="word-wrap: break-word;"></td>
<td>任务类型,可以是 <code>mysql-cdc</code>。</td>
</tr>
<tr>
<td>--source-conf</td>
<td>是</td>
<td>多值参数</td>
<td style="word-wrap: break-word;"></td>
<td>指定类型的 Flink CDC 源端连接器的配置参数。</td>
</tr>
<tr>
<td>--sink-conf</td>
<td>是</td>
<td>多值参数</td>
<td style="word-wrap: break-word;"></td>
<td>OceanBase 写连接器的配置参数。</td>
</tr>
<tr>
<td>--job-name</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;">${job-type} Sync</td>
<td>Flink 任务名称。</td>
</tr>
<tr>
<td>--database</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td>OceanBase 中目标 db 的名称,不设置时将使用源端的 db 名称。</td>
</tr>
<tr>
<td>--table-prefix</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td>OceanBase 中目标表名称的前缀。</td>
</tr>
<tr>
<td>--table-suffix</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td>OceanBase 中目标表名称的后缀。</td>
</tr>
<tr>
<td>--including-tables</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td>源端表的白名单模式。</td>
</tr>
<tr>
<td>--excluding-tables</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td>源端表的黑名单模式。</td>
</tr>
<tr>
<td>--multi-to-one-origin</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td>将多个源表映射到一个目标表的源表名称模式,多个值以<code>|</code>分隔。</td>
</tr>
<tr>
<td>--multi-to-one-target</td>
<td>否</td>
<td>String</td>
<td style="word-wrap: break-word;"></td>
<td><code>--multi-to-one-origin</code>对应的目标表名,多个值之间以<code>|</code>分隔,长度必须等于<code>--multi-to-one-origin</code>。</td>
</tr>
<tr>
<td>--create-table-only</td>
<td>否</td>
<td>Boolean</td>
<td style="word-wrap: break-word;">false</td>
<td>是否只同步库表结构。</td>
</tr>
<tr>
<td>--ignore-default-value</td>
<td>否</td>
<td>Boolean</td>
<td style="word-wrap: break-word;">false</td>
<td>是否忽略默认值。</td>
</tr>
<tr>
<td>--ignore-incompatible</td>
<td>否</td>
<td>Boolean</td>
<td style="word-wrap: break-word;">false</td>
<td>是否忽略不兼容的数据类型。</td>
</tr>
</tbody>
</table>
</div>

Loading

0 comments on commit 71fc043

Please sign in to comment.