diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 9104431..3dc1457 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -21,6 +21,11 @@ jobs: with: module: spark-connector-oceanbase/spark-connector-oceanbase-base + spark-connector-obkv-hbase: + uses: ./.github/workflows/test.yml + with: + module: spark-connector-obkv-hbase/spark-connector-obkv-hbase-base + e2e-tests: strategy: matrix: diff --git a/README.md b/README.md index b28ab7c..2850fa3 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,10 @@ English | [简体中文](README_CN.md) This repository contains connectors as following: -| Connector | Description | Document | -|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------| -| Spark Connector: OceanBase | This Connector uses the JDBC driver or the [direct load](https://en.oceanbase.com/docs/common-oceanbase-database-10000000001375568) API to write data to OceanBase. | [Sink](docs/spark-connector-oceanbase.md) | +| Connector | Description | Document | +|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------| +| Spark Connector: OceanBase | This Connector uses the JDBC driver or the [direct load](https://en.oceanbase.com/docs/common-oceanbase-database-10000000001375568) API to write data to OceanBase. | [Read & Write](docs/spark-connector-oceanbase.md) | +| Spark Connector: OBKV HBase | This Connector uses the [OBKV HBase API](https://github.com/oceanbase/obkv-hbase-client-java) to write data to OceanBase. | [Write](docs/spark-connector-obkv-hbase.md) | ## Community diff --git a/README_CN.md b/README_CN.md index 52fde47..cd12cb7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -8,9 +8,10 @@ 本仓库提供了如下 Connector: -| Connector | 描述 | 使用文档 | -|----------------------------|-----------------------------------------------------------------------------------------------------------------------------|----------------------------------------------| -| Spark Connector: OceanBase | 该Connector可以通过JDBC驱动或[旁路导入](https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000001428636)API将数据写入OceanBase。 | [Sink](docs/spark-connector-oceanbase_cn.md) | +| Connector | 描述 | 使用文档 | +|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------| +| Spark Connector: OceanBase | 该Connector可以通过JDBC驱动或[旁路导入](https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000001428636)API将数据写入OceanBase。 | [Read & Write](docs/spark-connector-oceanbase_cn.md) | +| Spark Connector: OBKV HBase | 该Connector通过[OBKV HBase API](https://github.com/oceanbase/obkv-hbase-client-java)将数据写入OceanBase。 | [Write](docs/spark-connector-obkv-hbase_cn.md) | ## 社区 diff --git a/docs/spark-connector-obkv-hbase.md b/docs/spark-connector-obkv-hbase.md new file mode 100644 index 0000000..da334c3 --- /dev/null +++ b/docs/spark-connector-obkv-hbase.md @@ -0,0 +1,357 @@ +## Spark Connector OBKV HBase + +English | [简体中文](spark-connector-obkv-hbase_cn.md) + +This is the spark connector for OBKV HBase mode, which can be used to write data to OceanBase via [obkv-hbase-client-java](https://github.com/oceanbase/obkv-hbase-client-java). + +## Version compatibility + +
+ + + + + + + + + + + + + + + + + + + +
ConnectorSparkOceanBaseJavaScala
1.02.4, 3.1 ~ 3.44.2.x or later versions82.12
+
+ +- Note: If you need a package built based on other Scala versions, you can get the package by building it from source code. + +## Get the package + +You can get the release packages at [Releases Page](https://github.com/oceanbase/spark-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/spark-connector-obkv-hbase). + +```xml + + com.oceanbase + spark-connector-obkv-hbase-3.4_2.12 + ${project.version} + +``` + +If you'd rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version. + +```xml + + com.oceanbase + spark-connector-obkv-hbase-3.4_2.12 + ${project.version} + + + + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + true + + + +``` + +Of course, you can also get the package by building from source code. + +- By default, it is built with scala version 2.12 +- After successful compilation, the target jar package will be generated in the target directory under the module corresponding to each version, such as: spark-connector-obkv-hbase-3.4_2.12-1.0-SNAPSHOT.jar. Copy this file to Spark's ClassPath to use spark-connector-obkv-hbase. + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -DskipTests +``` + +- If you need a package built based on other Scala versions, refer to the command below to build based on Scala 2.11. + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTests +``` + +## Usage Examples + +Take synchronizing data from Hive to OceanBase as an example. + +### Preparation + +Create corresponding Hive tables and OceanBase tables to prepare for data synchronization + +- Start spark-sql by running `${SPARK_HOME}/bin/spark-sql` + +```sql +CREATE TABLE test.orders ( + order_id INT, + order_date TIMESTAMP, + customer_name string, + price double, + product_id INT, + order_status BOOLEAN +) using parquet; + +insert into orders values +(1, now(), 'zs', 12.2, 12, true), +(2, now(), 'ls', 121.2, 12, true), +(3, now(), 'xx', 123.2, 12, true), +(4, now(), 'jac', 124.2, 12, false), +(5, now(), 'dot', 111.25, 12, true); +``` + +- Connect to OceanBase + +```sql +use test; +CREATE TABLE `htable1$family1` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1048576) NOT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +) +``` + +### Config Url Mode + +#### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_obkv +USING `obkv-hbase` +OPTIONS( + "url" = "http://localhost:8080/services?Action=ObRootServiceInfo&ObRegion=myob", + "sys.username"= "root", + "sys.password" = "password", + "schema-name"="test", + "table-name"="htable1", + "username"="root@sys#myob", + "password"="password", + "catalog"="{ + 'order_id': {'cf': 'rowkey','col': 'order_id','type': 'int'}, + 'order_date': {'cf': 'family1','col': 'order_date','type': 'timestamp'}, + 'customer_name': {'cf': 'family1','col': 'customer_name','type': 'string'}, + 'price': {'cf': 'family1','col': 'price','type': 'double'}, + 'product_id': {'cf': 'family1','col': 'product_id','type': 'int'}, + 'order_status': {'cf': 'family1','col': 'order_status','type': 'boolean'} +}" +); + +insert into table test_obkv +select * from test.orders; +``` + +#### DataFrame + +```scala +val df = spark.sql("select * from test.orders") + +val schema: String = + """ + |{ + | "order_id": {"cf": "rowkey","col": "order_id","type": "int"}, + | "order_date": {"cf": "family1","col": "order_date","type": "timestamp"}, + | "customer_name": {"cf": "family1","col": "customer_name","type": "string"}, + | "price": {"cf": "family1","col": "price","type": "double"}, + | "product_id": {"cf": "family1","col": "product_id","type": "int"}, + | "order_status": {"cf": "family1","col": "order_status","type": "boolean"} + |} + |""".stripMargin + +df.write + .format("obkv-hbase") + .option("url", "http://localhost:8080/services?Action=ObRootServiceInfo&ObRegion=myob") + .option("sys-username", "root") + .option("sys-password", "password") + .option("username", "root@sys#myob") + .option("password", "password") + .option("schema-name", "test") + .option("table-name", "htable1") + .option("schema", schema) + .save() +``` + +### ODP Mode + +#### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_obkv +USING `obkv-hbase` +OPTIONS( + "odp-mode" = true, + "odp-ip"= "localhost", + "odp-port" = "2885", + "schema-name"="test", + "table-name"="htable1", + "username"="root@sys#myob", + "password"="password", + "catalog"="{ + 'order_id': {'cf': 'rowkey','col': 'order_id','type': 'int'}, + 'order_date': {'cf': 'family1','col': 'order_date','type': 'timestamp'}, + 'customer_name': {'cf': 'family1','col': 'customer_name','type': 'string'}, + 'price': {'cf': 'family1','col': 'price','type': 'double'}, + 'product_id': {'cf': 'family1','col': 'product_id','type': 'int'}, + 'order_status': {'cf': 'family1','col': 'order_status','type': 'boolean'} +}" +); + +insert into table test_obkv +select * from test.orders; +``` + +#### DataFrame + +```scala +val df = spark.sql("select * from test.orders") + +val schema: String = + """ + |{ + | "order_id": {"cf": "rowkey","col": "order_id","type": "int"}, + | "order_date": {"cf": "family1","col": "order_date","type": "timestamp"}, + | "customer_name": {"cf": "family1","col": "customer_name","type": "string"}, + | "price": {"cf": "family1","col": "price","type": "double"}, + | "product_id": {"cf": "family1","col": "product_id","type": "int"}, + | "order_status": {"cf": "family1","col": "order_status","type": "boolean"} + |} + |""".stripMargin + +df.write + .format("obkv-hbase") + .option("odp-mode", true) + .option("odp-ip", "localhost") + .option("odp-port", 2885) + .option("username", "root@sys#myob") + .option("password", "password") + .option("schema-name", "test") + .option("table-name", "htable1") + .option("schema", schema) + .save() +``` + +## Configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
schema-nameYesStringThe database name of OceanBase.
table-nameYesStringThe table name of HBase, note that the table name in OceanBase is hbase_table_name$family_name.
usernameYesStringThe username of non-sys tenant user.
passwordYesStringThe password of non-sys tenant user.
schemaYesStringThe custom JSON format schema supports JSON single quote and double quote modes. When using Spark-SQL, the single quote mode does not need to escape double quotes, which is more convenient. + +
odp-modeNofalseBooleanIf set to 'true', the connector will connect to OBKV via ODP, otherwise via config url.
urlNoStringThe config url, can be queried by SHOW PARAMETERS LIKE 'obconfig_url'. Required when 'odp-mode' is set to 'false'.
sys.usernameNoStringThe username of sys tenant. Required if 'odp-mode' is set to 'false'.
sys.passwordNoStringThe password of sys tenant. Required if 'odp-mode' is set to 'false'.
odp-ipNoStringIP address of the ODP. Required if 'odp-mode' is set to 'true'.
odp-portNo2885IntegerRPC port of ODP. Required if 'odp-mode' is set to 'true'.
hbase.propertiesNoStringProperties to configure 'obkv-hbase-client-java', multiple values are separated by semicolons.
batch-sizeNo10000IntegerThe size of the batch that is written to the OceanBase at one time.
+ diff --git a/docs/spark-connector-obkv-hbase_cn.md b/docs/spark-connector-obkv-hbase_cn.md new file mode 100644 index 0000000..93c5805 --- /dev/null +++ b/docs/spark-connector-obkv-hbase_cn.md @@ -0,0 +1,356 @@ +## Spark Connector OBKV HBase + +[English](spark-connector-obkv-hbase.md) | 简体中文 + +本项目是一个 OBKV HBase 的 Spark Connector,可以在 Spark 中通过 [obkv-hbase-client-java](https://github.com/oceanbase/obkv-hbase-client-java) 将数据写入到 OceanBase。 + +## 版本兼容 + +
+ + + + + + + + + + + + + + + + + + + +
ConnectorSparkOceanBaseJavaScala
1.02.4, 3.1 ~ 3.44.2.x及以后的版本82.12
+
+ +- 注意:如果需要基于其他 scala 版本构建的程序包, 您可以通过源码构建的方式获得程序包 + +## 获取程序包 + +您可以在 [Releases 页面](https://github.com/oceanbase/spark-connector-oceanbase/releases) 或者 [Maven 中央仓库](https://central.sonatype.com/artifact/com.oceanbase/spark-connector-obkv-hbase) 找到正式的发布版本。 + +```xml + + com.oceanbase + spark-connector-obkv-hbase-3.4_2.12 + ${project.version} + +``` + +如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定: + +```xml + + com.oceanbase + spark-connector-obkv-hbase-3.4_2.12 + ${project.version} + + + + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + true + + + +``` + +当然您也可以通过源码构建的方式获得程序包。 +- 默认以scala 2.12版本进行构建 +- 编译成功后,会在各个版本对应的模块下的target目录生成目标 jar 包,如:spark-connector-obkv-hbase-3.4_2.12-1.0-SNAPSHOT.jar。 将此文件复制到 Spark 的 ClassPath 中即可使用 spark-connector-obkv-hbase。 + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -DskipTests +``` + +- 如果需要其他 scala 版本,请参考下面以 scala 2.11版本构建命令 + +```shell +git clone https://github.com/oceanbase/spark-connector-oceanbase.git +cd spark-connector-oceanbase +mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTests +``` + +## 使用示例 + +以从Hive同步数据到OceanBase为例 + +#### 准备工作 + +创建对应的Hive表和OceanBase表,为数据同步做准备 + +- 通过${SPARK_HOME}/bin/spark-sql命令,开启spark-sql + +```sql +CREATE TABLE test.orders ( + order_id INT, + order_date TIMESTAMP, + customer_name string, + price double, + product_id INT, + order_status BOOLEAN +) using parquet; + +insert into orders values +(1, now(), 'zs', 12.2, 12, true), +(2, now(), 'ls', 121.2, 12, true), +(3, now(), 'xx', 123.2, 12, true), +(4, now(), 'jac', 124.2, 12, false), +(5, now(), 'dot', 111.25, 12, true); +``` + +- 连接到 OceanBase + +```sql +use test; +CREATE TABLE `htable1$family1` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1048576) NOT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +) +``` + +### Config Url 模式 + +#### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_obkv +USING `obkv-hbase` +OPTIONS( + "url" = "http://localhost:8080/services?Action=ObRootServiceInfo&ObRegion=myob", + "sys.username"= "root", + "sys.password" = "password", + "schema-name"="test", + "table-name"="htable1", + "username"="root@sys#myob", + "password"="password", + "catalog"="{ + 'order_id': {'cf': 'rowkey','col': 'order_id','type': 'int'}, + 'order_date': {'cf': 'family1','col': 'order_date','type': 'timestamp'}, + 'customer_name': {'cf': 'family1','col': 'customer_name','type': 'string'}, + 'price': {'cf': 'family1','col': 'price','type': 'double'}, + 'product_id': {'cf': 'family1','col': 'product_id','type': 'int'}, + 'order_status': {'cf': 'family1','col': 'order_status','type': 'boolean'} +}" +); + +insert into table test_obkv +select * from test.orders; +``` + +#### DataFrame + +```scala +val df = spark.sql("select * from test.orders") + +val schema: String = + """ + |{ + | "order_id": {"cf": "rowkey","col": "order_id","type": "int"}, + | "order_date": {"cf": "family1","col": "order_date","type": "timestamp"}, + | "customer_name": {"cf": "family1","col": "customer_name","type": "string"}, + | "price": {"cf": "family1","col": "price","type": "double"}, + | "product_id": {"cf": "family1","col": "product_id","type": "int"}, + | "order_status": {"cf": "family1","col": "order_status","type": "boolean"} + |} + |""".stripMargin + +df.write + .format("obkv-hbase") + .option("url", "http://localhost:8080/services?Action=ObRootServiceInfo&ObRegion=myob") + .option("sys-username", "root") + .option("sys-password", "password") + .option("username", "root@sys#myob") + .option("password", "password") + .option("table-name", "htable1") + .option("schema-name", "test") + .option("schema", schema) + .save() +``` + +### ODP模式 + +#### Spark-SQL + +```sql +CREATE TEMPORARY VIEW test_obkv +USING `obkv-hbase` +OPTIONS( + "odp-mode" = true, + "odp-ip"= "root", + "odp-port" = "password", + "schema-name"="test", + "table-name"="htable1", + "username"="root@sys#myob", + "password"="password", + "catalog"="{ + 'order_id': {'cf': 'rowkey','col': 'order_id','type': 'int'}, + 'order_date': {'cf': 'family1','col': 'order_date','type': 'timestamp'}, + 'customer_name': {'cf': 'family1','col': 'customer_name','type': 'string'}, + 'price': {'cf': 'family1','col': 'price','type': 'double'}, + 'product_id': {'cf': 'family1','col': 'product_id','type': 'int'}, + 'order_status': {'cf': 'family1','col': 'order_status','type': 'boolean'} +}" +); + +insert into table test_obkv +select * from test.orders; +``` + +#### DataFrame + +```scala +val df = spark.sql("select * from test.orders") + +val schema: String = + """ + |{ + | "order_id": {"cf": "rowkey","col": "order_id","type": "int"}, + | "order_date": {"cf": "family1","col": "order_date","type": "timestamp"}, + | "customer_name": {"cf": "family1","col": "customer_name","type": "string"}, + | "price": {"cf": "family1","col": "price","type": "double"}, + | "product_id": {"cf": "family1","col": "product_id","type": "int"}, + | "order_status": {"cf": "family1","col": "order_status","type": "boolean"} + |} + |""".stripMargin + +df.write + .format("obkv-hbase") + .option("odp-mode", true) + .option("odp-ip", "localhost") + .option("odp-port", 2885) + .option("username", "root@sys#myob") + .option("password", "password") + .option("schema-name", "test") + .option("table-name", "htable1") + .option("schema", schema) + .save() +``` + +## 配置项 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数名是否必需默认值类型描述
schema-nameStringOceanBase 的 db 名。
table-nameStringHBase 表名,注意在 OceanBase 中表名的结构是 hbase_table_name$family_name
usernameString非 sys 租户的用户名。
passwordString非 sys 租户的密码。
schemaYesString自定义的JSON格式Schema,支持JSON单引号和双引号模式,使用Spark-SQL时,单引号模式无需对双引号进行转义,更为方便。 + +
odp-modefalseBoolean如果设置为 'true',连接器将通过 ODP 连接到 OBKV,否则通过 config url 连接。
urlString集群的 config url,可以通过 SHOW PARAMETERS LIKE 'obconfig_url' 查询。当 'odp-mode' 为 'false' 时必填。
sys.usernameStringsys 租户的用户名,当 'odp-mode' 为 'false' 时必填。
sys.passwordStringsys 租户用户的密码,当 'odp-mode' 为 'false' 时必填。
odp-ipStringODP 的 IP,当 'odp-mode' 为 'true' 时必填。
odp-port2885IntegerODP 的 RPC 端口,当 'odp-mode' 为 'true' 时必填。
hbase.propertiesString配置 'obkv-hbase-client-java' 的属性,多个值用分号分隔。
batch-size10000Integer一次写入OceanBase的批大小。
+ diff --git a/pom.xml b/pom.xml index 06469af..9531904 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,7 @@ under the License. pom + spark-connector-obkv-hbase spark-connector-oceanbase spark-connector-oceanbase-common spark-connector-oceanbase-e2e-tests diff --git a/spark-connector-obkv-hbase/pom.xml b/spark-connector-obkv-hbase/pom.xml new file mode 100644 index 0000000..c3ed87a --- /dev/null +++ b/spark-connector-obkv-hbase/pom.xml @@ -0,0 +1,62 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-oceanbase-parent + ${revision} + + + spark-connector-obkv-hbase + pom + + spark-connector-obkv-hbase-base + spark-connector-obkv-hbase-2.4 + spark-connector-obkv-hbase-3.1 + spark-connector-obkv-hbase-3.2 + spark-connector-obkv-hbase-3.3 + spark-connector-obkv-hbase-3.4 + + + + + com.oceanbase + spark-connector-oceanbase-common + ${revision} + + + + com.oceanbase + obkv-hbase-client + 0.1.5 + + + junit + junit + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-nop + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-2.4/pom.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-2.4/pom.xml new file mode 100644 index 0000000..36d23a4 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-2.4/pom.xml @@ -0,0 +1,115 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-obkv-hbase + ${revision} + + + spark-connector-obkv-hbase-2.4_${scala.binary.version} + spark-connector-obkv-hbase-2.4 + + + 2.4.8 + + + + + com.oceanbase + spark-connector-obkv-hbase-base + ${revision} + + + + org.apache.spark + spark-sql_2.11 + ${spark2.version} + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + * + + + com.fasterxml.jackson.module + * + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.xbean + xbean-asm6-shaded + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.alibaba:* + com.oceanbase:* + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + mysql:mysql-connector-java + org.apache.hbase:* + + + false + + + com.oceanbase:obkv-table-client + + **/log4j/log-conf.xml + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-2.4/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-2.4/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml new file mode 100644 index 0000000..989e8c7 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-2.4/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.1/pom.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.1/pom.xml new file mode 100644 index 0000000..c9b0441 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.1/pom.xml @@ -0,0 +1,93 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-obkv-hbase + ${revision} + + + spark-connector-obkv-hbase-3.1_${scala.binary.version} + spark-connector-obkv-hbase-3.1 + + + 3.1.3 + + + + + com.oceanbase + spark-connector-obkv-hbase-base + ${revision} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.oceanbase:* + com.alibaba:* + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + org.apache.hbase:* + mysql:mysql-connector-java + + + + + com.oceanbase:obkv-table-client + + **/log4j/log-conf.xml + + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml new file mode 100644 index 0000000..989e8c7 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.1/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.2/pom.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.2/pom.xml new file mode 100644 index 0000000..4c319bd --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.2/pom.xml @@ -0,0 +1,93 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-obkv-hbase + ${revision} + + + spark-connector-obkv-hbase-3.2_${scala.binary.version} + spark-connector-obkv-hbase-3.2 + + + 3.2.2 + + + + + com.oceanbase + spark-connector-obkv-hbase-base + ${revision} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.oceanbase:* + com.alibaba:* + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + org.apache.hbase:* + mysql:mysql-connector-java + + + + + com.oceanbase:obkv-table-client + + **/log4j/log-conf.xml + + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml new file mode 100644 index 0000000..989e8c7 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.2/src/main/resources/oceanbase-table-client/log/log4j/log-conf.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.3/pom.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.3/pom.xml new file mode 100644 index 0000000..9b52d2f --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.3/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-obkv-hbase + ${revision} + + + spark-connector-obkv-hbase-3.3_${scala.binary.version} + spark-connector-obkv-hbase-3.3 + + + 3.3.2 + + + + + com.oceanbase + spark-connector-obkv-hbase-base + ${revision} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.oceanbase:* + com.alibaba:* + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + org.apache.hbase:* + mysql:mysql-connector-java + + + + + + org.apache.logging + shade.org.apache.logging + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.4/pom.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.4/pom.xml new file mode 100644 index 0000000..bdb30b6 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-3.4/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-obkv-hbase + ${revision} + + + spark-connector-obkv-hbase-3.4_${scala.binary.version} + spark-connector-obkv-hbase-3.4 + + + 3.4.0 + + + + + com.oceanbase + spark-connector-obkv-hbase-base + ${revision} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-oceanbase + + shade + + package + + + + com.oceanbase:* + com.alibaba:* + com.alipay.sofa:bolt + com.alipay.sofa.common:sofa-common-tools + org.apache.hbase:* + mysql:mysql-connector-java + + + + + + org.apache.logging + shade.org.apache.logging + + + + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/pom.xml b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/pom.xml new file mode 100644 index 0000000..438557f --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/pom.xml @@ -0,0 +1,263 @@ + + + + 4.0.0 + + com.oceanbase + spark-connector-obkv-hbase + ${revision} + + + spark-connector-obkv-hbase-base + + + 3.4.0 + + + + + org.scala-lang + scala-library + ${scala.version} + + + org.scala-lang + scala-reflect + ${scala.version} + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + + + + + com.oceanbase + spark-connector-oceanbase-common + ${project.version} + test-jar + test + + + org.apache.spark + spark-hive_2.12 + ${spark.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + + + org.apache.spark + spark-core_2.12 + ${spark.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + org.apache.orc + orc-core + + + + + org.apache.spark + spark-catalyst_2.12 + ${spark.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + test + + + + + com.lmax + disruptor + 3.4.2 + + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + -target:jvm-${target.java.version} + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + + scala-compile-first + + add-source + compile + + process-resources + + + + + scala-test-compile + + testCompile + + process-test-resources + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 3.4.3 + + ${project.basedir}/../../.scalafmt.conf + + + + + + + diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/java/com/oceanbase/spark/config/OBKVHbaseConfig.java b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/java/com/oceanbase/spark/config/OBKVHbaseConfig.java new file mode 100644 index 0000000..7e735c4 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/java/com/oceanbase/spark/config/OBKVHbaseConfig.java @@ -0,0 +1,202 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.config; + +import java.util.Map; +import java.util.Properties; + + +import org.apache.commons.lang3.StringUtils; + +public class OBKVHbaseConfig extends Config { + public static final ConfigEntry URL = + new ConfigBuilder("url") + .doc("The connection URL") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry USERNAME = + new ConfigBuilder("username") + .doc("The username") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry PASSWORD = + new ConfigBuilder("password") + .doc("The password") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry SCHEMA_NAME = + new ConfigBuilder("schema-name") + .doc("The schema name or database name") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry TABLE_NAME = + new ConfigBuilder("table-name") + .doc("The table name") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry SYS_USERNAME = + new ConfigBuilder("sys.username") + .doc("The username of system tenant") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry SYS_PASSWORD = + new ConfigBuilder("sys.password") + .doc("The password of system tenant") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry HBASE_PROPERTIES = + new ConfigBuilder("hbase.properties") + .doc("Properties to configure 'obkv-hbase-client-java'") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .create(); + + public static final ConfigEntry ODP_MODE = + new ConfigBuilder("odp-mode") + .doc("Whether to use ODP to connect to OBKV") + .version(ConfigConstants.VERSION_1_0_0) + .booleanConf() + .createWithDefault(false); + + public static final ConfigEntry ODP_IP = + new ConfigBuilder("odp-ip") + .doc("ODP IP address") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry ODP_PORT = + new ConfigBuilder("odp-port") + .doc("ODP rpc port") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(port -> port > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(2885); + + public static final ConfigEntry BATCH_SIZE = + new ConfigBuilder("batch-size") + .doc("The batch size write to OceanBase one time") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(size -> size > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(10_000); + + public static final ConfigEntry SCHEMA = + new ConfigBuilder("schema") + .doc("The schema of the obkv-hbase table") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public String getURL() { + return get(URL); + } + + public String getUsername() { + return get(USERNAME); + } + + public String getPassword() { + return get(PASSWORD); + } + + public String getSchemaName() { + return get(SCHEMA_NAME); + } + + public String getTableName() { + return get(TABLE_NAME); + } + + public String getSysUserName() { + return get(SYS_USERNAME); + } + + public String getSysPassword() { + return get(SYS_PASSWORD); + } + + public Properties getHBaseProperties() { + return parseProperties(get(HBASE_PROPERTIES)); + } + + public Boolean getOdpMode() { + return get(ODP_MODE); + } + + public String getOdpIp() { + return get(ODP_IP); + } + + public Integer getOdpPort() { + return get(ODP_PORT); + } + + public Integer getBatchSize() { + return get(BATCH_SIZE); + } + + public String getSchema() { + return get(SCHEMA); + } + + public OBKVHbaseConfig(Map properties) { + super(); + loadFromMap(properties, k -> true); + } + + private Properties parseProperties(String propsStr) { + if (StringUtils.isBlank(propsStr)) { + return null; + } + Properties props = new Properties(); + for (String propStr : propsStr.split(";")) { + if (StringUtils.isBlank(propStr)) { + continue; + } + String[] pair = propStr.trim().split("="); + if (pair.length != 2) { + throw new IllegalArgumentException("properties must have one key value pair"); + } + props.put(pair[0].trim(), pair[1].trim()); + } + return props; + } +} diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/java/com/oceanbase/spark/obkv/HTableClientUtils.java b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/java/com/oceanbase/spark/obkv/HTableClientUtils.java new file mode 100644 index 0000000..0326a75 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/java/com/oceanbase/spark/obkv/HTableClientUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark.obkv; + +import com.oceanbase.spark.config.OBKVHbaseConfig; + +import java.util.Properties; + + +import com.alipay.oceanbase.hbase.OHTableClient; +import com.alipay.oceanbase.hbase.constants.OHConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HTableClientUtils { + private static final Logger LOG = LoggerFactory.getLogger(HTableClientUtils.class); + + public static HTableInterface getHTableClient(OBKVHbaseConfig config) { + try { + OHTableClient tableClient = new OHTableClient(config.getTableName(), getConfig(config)); + tableClient.init(); + return tableClient; + } catch (Exception e) { + throw new RuntimeException("Failed to initialize OHTableClient", e); + } + } + + private static Configuration getConfig(OBKVHbaseConfig config) { + + Configuration conf = new Configuration(); + if (config.getOdpMode()) { + conf.setBoolean(OHConstants.HBASE_OCEANBASE_ODP_MODE, config.getOdpMode()); + conf.set(OHConstants.HBASE_OCEANBASE_ODP_ADDR, config.getOdpIp()); + conf.setInt(OHConstants.HBASE_OCEANBASE_ODP_PORT, config.getOdpPort()); + conf.set(OHConstants.HBASE_OCEANBASE_DATABASE, config.getSchemaName()); + } else { + String paramUrl = + String.format("%s&database=%s", config.getURL(), config.getSchemaName()); + LOG.debug("Set paramURL for database {} to {}", config.getSchemaName(), paramUrl); + conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, paramUrl); + conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, config.getSysUserName()); + conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, config.getSysPassword()); + } + conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, config.getUsername()); + conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, config.getPassword()); + Properties hbaseProperties = config.getHBaseProperties(); + if (hbaseProperties != null) { + for (String name : hbaseProperties.stringPropertyNames()) { + conf.set(name, hbaseProperties.getProperty(name)); + } + } + return conf; + } +} diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..9579d93 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.oceanbase.spark.OBKVHBaseSparkSource diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/scala/com/oceanbase/spark/HBaseRelation.scala b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/scala/com/oceanbase/spark/HBaseRelation.scala new file mode 100644 index 0000000..75cd4af --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/scala/com/oceanbase/spark/HBaseRelation.scala @@ -0,0 +1,169 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.spark + +import com.oceanbase.spark.HBaseRelation.{columnFamilyMap, convertToBytes, parseCatalog} +import com.oceanbase.spark.config.OBKVHbaseConfig +import com.oceanbase.spark.obkv.HTableClientUtils + +import com.fasterxml.jackson.core.JsonParser.Feature +import org.apache.hadoop.hbase.client.{HTableInterface, Put} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan} +import org.apache.spark.sql.types.{StructField, StructType} +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods._ + +import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDate} +import java.util + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +case class HBaseRelation( + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType] +)(@transient val sqlContext: SQLContext) + extends BaseRelation + with PrunedFilteredScan + with InsertableRelation + with Logging { + + import scala.collection.JavaConverters._ + private val config = new OBKVHbaseConfig(parameters.asJava) + private val userSchema: StructType = parseCatalog(config.getSchema) + + override def schema: StructType = userSpecifiedSchema.getOrElse(userSchema) + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + throw new NotImplementedError("Not supports reading obkv-hbase") + } + + override def insert(dataFrame: DataFrame, overwrite: Boolean): Unit = { + dataFrame.foreachPartition( + (rows: Iterator[Row]) => { + val hTableClient: HTableInterface = HTableClientUtils.getHTableClient(config) + val buffer = ArrayBuffer[Row]() + rows.foreach( + row => { + buffer += row + if (buffer.length >= config.getBatchSize) { + flush(buffer, hTableClient) + } + }) + flush(buffer, hTableClient) + hTableClient.close() + }) + } + + private def flush(buffer: ArrayBuffer[Row], hTableClient: HTableInterface): Unit = { + val putList = new util.ArrayList[Put]() + buffer.foreach( + row => { + // Get field index by col name that defined in catalog + val rowKeyIndex = row.schema.fieldIndex(HBaseRelation.rowKey) + val rowKey: Array[Byte] = convertToBytes(row(rowKeyIndex)) + val put: Put = new Put(rowKey) + // Mapping DataFrame's schema to User-defined schema + for (i <- 0 until (row.size)) { + if (i == rowKeyIndex) { + // do nothing + } else { + val rowFieldName = row.schema.fieldNames(i) + // Only write columns defined by the user in the schema. + if (HBaseRelation.columnFamilyMap.contains(rowFieldName)) { + val userFieldName = HBaseRelation.columnFamilyMap(rowFieldName)._1 + val cfName = HBaseRelation.columnFamilyMap(rowFieldName)._2 + val familyName: Array[Byte] = Bytes.toBytes(cfName) + val columnValue = convertToBytes(row.get(i)) + put.add(familyName, Bytes.toBytes(userFieldName), columnValue) + } + } + } + putList.add(put) + }) + hTableClient.put(putList) + buffer.clear() + } +} + +object HBaseRelation { + private val CF = "cf" + private val COLUMN_NAME = "col" + private val COLUMN_TYPE = "type" + + private var rowKey = "" + val columnFamilyMap: mutable.Map[String, (String, String)] = + mutable.LinkedHashMap.empty[String, (String, String)] + + def parseCatalog(catalogJson: String): StructType = { + JsonMethods.mapper.configure(Feature.ALLOW_SINGLE_QUOTES, true) + val jObject: JObject = parse(catalogJson).asInstanceOf[JObject] + val schemaMap = mutable.LinkedHashMap.empty[String, Field] + getColsPreservingOrder(jObject).foreach { + case (name, column) => + if (column(CF).equalsIgnoreCase("rowKey")) + rowKey = column(COLUMN_NAME) + else + columnFamilyMap.put(column(COLUMN_NAME), (name, column(CF))) + + val filed = Field(column(CF), column(COLUMN_NAME), column(COLUMN_TYPE)) + schemaMap.+=((name, filed)) + } + + val fields: Seq[StructField] = schemaMap.map { + case (name, field) => + StructField(name, CatalystSqlParser.parseDataType(field.columnType)) + }.toSeq + + StructType(fields) + } + + private def getColsPreservingOrder(jObj: JObject): Seq[(String, Map[String, String])] = { + jObj.obj.map { + case (name, jValue) => + (name, jValue.values.asInstanceOf[Map[String, String]]) + } + } + + def convertToBytes(data: Any): Array[Byte] = data match { + case null => null + case _: Boolean => Bytes.toBytes(data.asInstanceOf[Boolean]) + case _: Byte => Bytes.toBytes(data.asInstanceOf[Byte]) + case _: Short => Bytes.toBytes(data.asInstanceOf[Short]) + case _: Integer => Bytes.toBytes(data.asInstanceOf[Integer]) + case _: Long => Bytes.toBytes(data.asInstanceOf[Long]) + case _: Float => Bytes.toBytes(data.asInstanceOf[Float]) + case _: Double => Bytes.toBytes(data.asInstanceOf[Double]) + case _: String => Bytes.toBytes(data.asInstanceOf[String]) + case _: BigDecimal => Bytes.toBytes(data.asInstanceOf[java.math.BigDecimal]) + case _: Date => Bytes.toBytes(data.asInstanceOf[Date].getTime) + case _: LocalDate => Bytes.toBytes(data.asInstanceOf[LocalDate].toEpochDay) + case _: Timestamp => Bytes.toBytes(data.asInstanceOf[Timestamp].getTime) + case _: Instant => Bytes.toBytes(data.asInstanceOf[Instant].getEpochSecond * 1000) + case _: Array[Byte] => data.asInstanceOf[Array[Byte]] + case _ => + throw new UnsupportedOperationException(s"Unsupported type: ${data.getClass.getSimpleName}") + } +} + +case class Field(cf: String, columnName: String, columnType: String) diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/scala/com/oceanbase/spark/OBKVHBaseSparkSource.scala b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/scala/com/oceanbase/spark/OBKVHBaseSparkSource.scala new file mode 100644 index 0000000..51f2b41 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/main/scala/com/oceanbase/spark/OBKVHBaseSparkSource.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark + +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +private class OBKVHBaseSparkSource + extends DataSourceRegister + with RelationProvider + with CreatableRelationProvider { + + override def shortName(): String = "obkv-hbase" + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + HBaseRelation(parameters, None)(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val relation = HBaseRelation(parameters, Some(data.schema))(sqlContext) + relation.insert(data, overwrite = false) + relation + } +} diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/resources/log4j2-test.properties b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..bef06db --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/resources/log4j2-test.properties @@ -0,0 +1,24 @@ +# Copyright 2024 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/resources/sql/htable.sql b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/resources/sql/htable.sql new file mode 100644 index 0000000..661779d --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/resources/sql/htable.sql @@ -0,0 +1,30 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE `htable$family1` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); + +CREATE TABLE `htable$family2` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); diff --git a/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/scala/com/oceanbase/spark/OBKVHBaseConnectorITCase.scala b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/scala/com/oceanbase/spark/OBKVHBaseConnectorITCase.scala new file mode 100644 index 0000000..47f3a49 --- /dev/null +++ b/spark-connector-obkv-hbase/spark-connector-obkv-hbase-base/src/test/scala/com/oceanbase/spark/OBKVHBaseConnectorITCase.scala @@ -0,0 +1,185 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark + +import com.oceanbase.spark.OceanBaseMySQLTestBase.{constructConfigUrlForODP, createSysUser, getConfigServerAddress, getSysParameter} +import com.oceanbase.spark.OceanBaseTestBase.assertEqualsInAnyOrder + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Test} + +import java.sql.ResultSet +import java.util + +class OBKVHBaseConnectorITCase extends OceanBaseMySQLTestBase { + @BeforeEach + def before(): Unit = { + initialize("sql/htable.sql") + } + + @AfterEach + def after(): Unit = { + dropTables("htable$family1", "htable$family2") + } + + @Test + def testOdpDataFrameSink(): Unit = { + val session = SparkSession.builder.master("local[*]").getOrCreate + + val newContact = + ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson", 121.11) + val newData = Seq(newContact) + val df = session.createDataFrame(newData).toDF() + df.write + .format("obkv-hbase") + .option("odp-mode", true) + .option("odp-ip", OceanBaseMySQLTestBase.ODP.getHost) + .option("odp-port", OceanBaseMySQLTestBase.ODP.getRpcPort) + .option("username", s"$getUsername#$getClusterName") + .option("password", getPassword) + .option("table-name", "htable") + .option("schema-name", getSchemaName) + .option("schema", OBKVHBaseConnectorITCase.schema) + .save() + session.stop() + + import scala.collection.JavaConverters._ + val expected1 = List( + "16891,address,40 Ellis St.", + "16891,phone,674-555-0110", + "16891,personalName,John Jackson", + "16891,personalPhone,121.11").asJava + + val actual1 = queryHTable("htable$family1", rowConverter) + assertEqualsInAnyOrder(expected1, actual1) + } + + @Test + def testOdpSqlSink(): Unit = { + val session = SparkSession.builder.master("local[*]").getOrCreate + val newContact = + ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson", 121.11) + session.createDataFrame(Seq(newContact)).createOrReplaceTempView("content") + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING `obkv-hbase` + |OPTIONS( + | "odp-mode" = "true", + | "odp-ip"= "${OceanBaseMySQLTestBase.ODP.getHost}", + | "odp-port" = "${OceanBaseMySQLTestBase.ODP.getRpcPort}", + | "schema-name"="$getSchemaName", + | "table-name"="htable", + | "username"="$getUsername#$getClusterName", + | "password"="$getPassword", + | "schema"="${OBKVHBaseConnectorITCase.schemaWithSingleQuotes}" + |); + |""".stripMargin) + session.sql(""" + |INSERT INTO test_sink + |SELECT * FROM content; + |""".stripMargin) + session.stop() + + import scala.collection.JavaConverters._ + val expected1 = List( + "16891,address,40 Ellis St.", + "16891,phone,674-555-0110", + "16891,personalName,John Jackson", + "16891,personalPhone,121.11").asJava + + val actual1 = queryHTable("htable$family1", rowConverter) + assertEqualsInAnyOrder(expected1, actual1) + } + + protected def queryHTable( + tableName: String, + rowConverter: OceanBaseTestBase.RowConverter): util.List[String] = { + queryTable(tableName, util.Arrays.asList("K", "Q", "V"), rowConverter) + } + + def rowConverter: OceanBaseTestBase.RowConverter = new OceanBaseTestBase.RowConverter { + override def convert(rs: ResultSet, columnCount: Int): String = { + val k = Bytes.toString(rs.getBytes("K")) + val q = Bytes.toString(rs.getBytes("Q")) + val bytes = rs.getBytes("V") + var v: String = null + q match { + case "address" | "phone" | "personalName" => + v = Bytes.toString(bytes) + + case "personalPhone" => + v = String.valueOf(Bytes.toDouble(bytes)) + case _ => + throw new RuntimeException("Unknown qualifier: " + q) + } + s"$k,$q,$v" + } + } +} + +object OBKVHBaseConnectorITCase { + @BeforeAll + def setup(): Unit = { + OceanBaseMySQLTestBase.CONFIG_SERVER.start() + val configServerAddress = getConfigServerAddress(OceanBaseMySQLTestBase.CONFIG_SERVER) + val configUrlForODP = constructConfigUrlForODP(configServerAddress) + OceanBaseMySQLTestBase.CONTAINER.withEnv("OB_CONFIGSERVER_ADDRESS", configServerAddress).start() + val password = "test" + createSysUser("proxyro", password) + OceanBaseMySQLTestBase.ODP.withPassword(password).withConfigUrl(configUrlForODP).start() + } + + @AfterAll + def tearDown(): Unit = { + List( + OceanBaseMySQLTestBase.CONFIG_SERVER, + OceanBaseMySQLTestBase.CONTAINER, + OceanBaseMySQLTestBase.ODP) + .foreach(_.stop()) + } + + val schema: String = + """ + |{ + | "rowkey": {"cf": "rowkey","col": "rowkey","type": "string"}, + | "address": {"cf": "family1","col": "officeAddress","type": "string"}, + | "phone": {"cf": "family1","col": "officePhone","type": "string"}, + | "personalName": {"cf": "family1","col": "personalName","type": "string"}, + | "personalPhone": {"cf": "family1","col": "personalPhone","type": "double"} + |} + |""".stripMargin + + val schemaWithSingleQuotes: String = + """ + |{ + | 'rowkey': {'cf': 'rowkey','col': 'rowkey','type': 'string'}, + | 'address': {'cf': 'family1','col': 'address','type': 'string'}, + | 'phone': {'cf': 'family1','col': 'phone','type': 'string'}, + | 'personalName': {'cf': 'family1','col': 'personalName','type': 'string'}, + | 'personalPhone': {'cf': 'family1','col': 'personalPhone','type': 'double'} + |} + |""".stripMargin +} + +case class ContactRecord( + rowkey: String, + officeAddress: String, + officePhone: String, + personalName: String, + personalPhone: Double +) diff --git a/spark-connector-oceanbase-e2e-tests/pom.xml b/spark-connector-oceanbase-e2e-tests/pom.xml index 544efc0..9cd6f00 100644 --- a/spark-connector-oceanbase-e2e-tests/pom.xml +++ b/spark-connector-oceanbase-e2e-tests/pom.xml @@ -24,6 +24,22 @@ under the License. spark-connector-oceanbase-e2e-tests + + org.scala-lang + scala-library + ${scala.version} + + + org.scala-lang + scala-reflect + ${scala.version} + + + org.scala-lang + scala-compiler + ${scala.version} + + com.oceanbase spark-connector-oceanbase-common @@ -31,6 +47,25 @@ under the License. test-jar test + + com.oceanbase + obkv-hbase-client + 0.1.5 + + + junit + junit + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-nop + + + mysql @@ -60,6 +95,26 @@ under the License. + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + -target:jvm-${target.java.version} + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins @@ -98,6 +153,7 @@ under the License. maven-dependency-plugin + com.oceanbase spark-connector-oceanbase-2.4_${scala.binary.version} @@ -138,6 +194,48 @@ under the License. jar ${project.build.directory}/dependencies + + + + com.oceanbase + spark-connector-obkv-hbase-2.4_${scala.binary.version} + ${project.version} + spark-connector-obkv-hbase-2.4_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-obkv-hbase-3.1_${scala.binary.version} + ${project.version} + spark-connector-obkv-hbase-3.1_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-obkv-hbase-3.2_${scala.binary.version} + ${project.version} + spark-connector-obkv-hbase-3.2_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-obkv-hbase-3.3_${scala.binary.version} + ${project.version} + spark-connector-obkv-hbase-3.3_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + spark-connector-obkv-hbase-3.4_${scala.binary.version} + ${project.version} + spark-connector-obkv-hbase-3.4_${scala.binary.version}-${project.version}.jar + jar + ${project.build.directory}/dependencies + @@ -168,6 +266,48 @@ under the License. + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + + scala-compile-first + + add-source + compile + + process-resources + + + + + scala-test-compile + + testCompile + + process-test-resources + + + + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 3.4.3 + + ${project.basedir}/../.scalafmt.conf + + + + diff --git a/spark-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql b/spark-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql new file mode 100644 index 0000000..661779d --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql @@ -0,0 +1,30 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE `htable$family1` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); + +CREATE TABLE `htable$family2` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); diff --git a/spark-connector-oceanbase-e2e-tests/src/test/scala/com/oceanbase/spark/OBKVHBaseE2eITCase.scala b/spark-connector-oceanbase-e2e-tests/src/test/scala/com/oceanbase/spark/OBKVHBaseE2eITCase.scala new file mode 100644 index 0000000..052dc14 --- /dev/null +++ b/spark-connector-oceanbase-e2e-tests/src/test/scala/com/oceanbase/spark/OBKVHBaseE2eITCase.scala @@ -0,0 +1,211 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.spark + +import com.oceanbase.spark.OBKVHBaseE2eITCase.SINK_CONNECTOR_NAME +import com.oceanbase.spark.OceanBaseMySQLTestBase.{createSysUser, getSysParameter} +import com.oceanbase.spark.OceanBaseMySQLTestBase.getConfigServerAddress +import com.oceanbase.spark.OceanBaseTestBase.assertEqualsInAnyOrder +import com.oceanbase.spark.utils.SparkContainerTestEnvironment +import com.oceanbase.spark.utils.SparkContainerTestEnvironment.getResource + +import org.apache.hadoop.hbase.util.Bytes +import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Disabled, Test} +import org.junit.jupiter.api.condition.DisabledIfSystemProperty +import org.junit.jupiter.api.condition.EnabledIfSystemProperty + +import java.sql.ResultSet +import java.util + +class OBKVHBaseE2eITCase extends SparkContainerTestEnvironment { + @BeforeEach + override def before(): Unit = { + super.before() + initialize("sql/htable.sql") + } + + @AfterEach + override def after(): Unit = { + super.after() + dropTables("htable$family1", "htable$family2") + } + + @Test + @DisabledIfSystemProperty( + named = "spark_version", + matches = "^2\\.4\\.[0-9]$|^3\\.[2-9]$", + disabledReason = "The spark 2.x docker image fails to execute the spark-sql command. \n" + + "The obkv-hbase library does not support java11 and later versions: Unexpected version format: 11.0.22. \n" + + "The Spark 3.2 and later versions of docker images do not support jdk8." + ) + def testInsertValues(): Unit = { + val sqlLines: util.List[String] = new util.ArrayList[String] + sqlLines.add(s""" + |CREATE TEMPORARY VIEW test_sink + |USING `obkv-hbase` + |OPTIONS( + | "url" = "${getSysParameter("obconfig_url")}", + | "sys.username"= "$getSysUsername", + | "sys.password" = "$getSysPassword", + | "schema-name"="$getSchemaName", + | "table-name"="htable", + | "username"="$getUsername#$getClusterName", + | "password"="$getPassword", + | "schema"="${OBKVHBaseE2eITCase.schemaWithSingleQuotes}" + |); + |""".stripMargin) + sqlLines.add(""" + |INSERT INTO test_sink VALUES + |('16891', '40 Ellis St.', '674-555-0110', 'John Jackson', 121.11); + |""".stripMargin) + submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME)) + + import scala.collection.JavaConverters._ + val expected1 = List( + "16891,address,40 Ellis St.", + "16891,phone,674-555-0110", + "16891,personalName,John Jackson", + "16891,personalPhone,121.11" + ).asJava + + val actual1 = queryHTable("htable$family1", rowConverter) + assertEqualsInAnyOrder(expected1, actual1) + } + + @Test + @EnabledIfSystemProperty( + named = "spark_version", + matches = "^(2\\.4\\.[0-9])$", + disabledReason = + "This is because the spark 2.x docker image fails to execute the spark-sql command." + ) + def testInsertValuesSpark2(): Unit = { + val sqlLines: util.List[String] = new util.ArrayList[String] + sqlLines.add( + s""" + | val schema: String = + | \"\"\" + | |{ + | | "rowkey": {"cf": "rowkey","col": "rowkey","type": "string"}, + | | "address": {"cf": "family1","col": "officeAddress","type": "string"}, + | | "phone": {"cf": "family1","col": "officePhone","type": "string"}, + | | "personalName": {"cf": "family1","col": "personalName","type": "string"}, + | | "personalPhone": {"cf": "family1","col": "personalPhone","type": "double"} + | |} + | |\"\"\".stripMargin + | case class ContactRecord( + | rowkey: String, + | officeAddress: String, + | officePhone: String, + | personalName: String, + | personalPhone: Double + | ) + | val newContact = + | ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson", 121.11) + | val newData = Seq(newContact) + | val df = spark.createDataFrame(newData).toDF() + | df.write + | .format("obkv-hbase") + | .option("url", "${getSysParameter("obconfig_url")}") + | .option("sys.username", "$getSysUsername") + | .option("sys.password", "$getSysPassword") + | .option("username", "$getUsername#$getClusterName") + | .option("password", "$getPassword") + | .option("table-name", "htable") + | .option("schema-name", "$getSchemaName") + | .option("schema", schema) + | .save() + |""".stripMargin) + + submitSparkShellJob(sqlLines, getResource(SINK_CONNECTOR_NAME)) + + import scala.collection.JavaConverters._ + val expected1 = List( + "16891,address,40 Ellis St.", + "16891,phone,674-555-0110", + "16891,personalName,John Jackson", + "16891,personalPhone,121.11").asJava + + val actual1 = queryHTable("htable$family1", rowConverter) + assertEqualsInAnyOrder(expected1, actual1) + } + + protected def queryHTable( + tableName: String, + rowConverter: OceanBaseTestBase.RowConverter + ): util.List[String] = { + queryTable(tableName, util.Arrays.asList("K", "Q", "V"), rowConverter) + } + + def rowConverter: OceanBaseTestBase.RowConverter = + new OceanBaseTestBase.RowConverter { + override def convert(rs: ResultSet, columnCount: Int): String = { + val k = Bytes.toString(rs.getBytes("K")) + val q = Bytes.toString(rs.getBytes("Q")) + val bytes = rs.getBytes("V") + var v: String = null + q match { + case "address" | "phone" | "personalName" => + v = Bytes.toString(bytes) + + case "personalPhone" => + v = String.valueOf(Bytes.toDouble(bytes)) + case _ => + throw new RuntimeException("Unknown qualifier: " + q) + } + s"$k,$q,$v" + } + } +} + +object OBKVHBaseE2eITCase extends SparkContainerTestEnvironment { + private val SINK_CONNECTOR_NAME = + "^.*spark-connector-obkv-hbase-\\d+\\.\\d+_\\d+\\.\\d+-[\\d\\.]+(?:-SNAPSHOT)?\\.jar$" + + @BeforeAll + def setup(): Unit = { + OceanBaseMySQLTestBase.CONFIG_SERVER.start() + val configServerAddress = getConfigServerAddress( + OceanBaseMySQLTestBase.CONFIG_SERVER + ) + OceanBaseMySQLTestBase.CONTAINER + .withEnv("OB_CONFIGSERVER_ADDRESS", configServerAddress) + .start() + val password = "test" + createSysUser("proxyro", password) + } + + @AfterAll + def tearDown(): Unit = { + List( + OceanBaseMySQLTestBase.CONFIG_SERVER, + OceanBaseMySQLTestBase.CONTAINER + ) + .foreach(_.stop()) + } + + val schemaWithSingleQuotes: String = + """ + |{ + | 'rowkey': {'cf': 'rowkey','col': 'rowkey','type': 'string'}, + | 'address': {'cf': 'family1','col': 'address','type': 'string'}, + | 'phone': {'cf': 'family1','col': 'phone','type': 'string'}, + | 'personalName': {'cf': 'family1','col': 'personalName','type': 'string'}, + | 'personalPhone': {'cf': 'family1','col': 'personalPhone','type': 'double'} + |} + |""".stripMargin +}