From ff1c4815a1ebaa7f59539b94ade82cd79dd4ce5d Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 30 Oct 2024 11:51:48 +0800 Subject: [PATCH 1/2] test: add e2e test module --- .github/workflows/push_pr.yml | 5 + .github/workflows/test.yml | 10 + .../flink/OBKVHBaseConnectorITCase.java | 36 +-- .../flink/OceanBaseMySQLTestBase.java | 47 ++- .../connector/flink/OceanBaseTestBase.java | 20 +- .../flink/OBDirectLoadConnectorOptions.java | 3 +- ...odeITCase.java => OBDirectLoadITCase.java} | 46 +-- .../resources/sql/{mysql => }/products.sql | 0 flink-connector-oceanbase-e2e-tests/pom.xml | 126 ++++++++ .../flink/tests/OBDirectLoadE2eITCase.java | 131 ++++++++ .../flink/tests/OBKVHBaseE2eITCase.java | 141 +++++++++ .../flink/tests/OceanBaseE2eITCase.java | 121 ++++++++ .../utils/FlinkContainerTestEnvironment.java | 288 ++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 21 ++ .../src/test/resources/sql/htable.sql | 30 ++ .../src/test/resources/sql/products.sql | 20 ++ .../flink/OceanBaseMySQLConnectorITCase.java | 11 +- flink-sql-connector-obkv-hbase/pom.xml | 11 +- .../pom.xml | 5 +- flink-sql-connector-oceanbase/pom.xml | 15 - pom.xml | 14 +- 21 files changed, 1013 insertions(+), 88 deletions(-) rename flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/{OBKVDirectLoadMySQLModeITCase.java => OBDirectLoadITCase.java} (91%) rename flink-connector-oceanbase-directload/src/test/resources/sql/{mysql => }/products.sql (100%) create mode 100644 flink-connector-oceanbase-e2e-tests/pom.xml create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBDirectLoadE2eITCase.java create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBKVHBaseE2eITCase.java create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OceanBaseE2eITCase.java create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/utils/FlinkContainerTestEnvironment.java create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/sql/products.sql diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 26503496..3cac9db9 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -29,3 +29,8 @@ jobs: uses: ./.github/workflows/test.yml with: module: flink-connector-oceanbase-directload + + flink-connector-oceanbase-e2e-tests: + uses: ./.github/workflows/test.yml + with: + module: flink-connector-oceanbase-e2e-tests diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a7a6f998..6e493ee7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,6 +12,16 @@ jobs: name: Test runs-on: ubuntu-latest steps: + - name: Free disk space on Ubuntu runner + uses: kfir4444/free-disk-space@main + with: + tool-cache: false + android: true + dotnet: true + haskell: true + large-packages: true + swap-storage: true + - name: Check out repository code uses: actions/checkout@v4 diff --git a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java index 08e07279..d8d6349b 100644 --- a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java +++ b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java @@ -26,7 +26,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import java.sql.SQLException; import java.util.Arrays; @@ -37,19 +40,24 @@ public class OBKVHBaseConnectorITCase extends OceanBaseMySQLTestBase { + private static final Logger LOG = LoggerFactory.getLogger(OBKVHBaseConnectorITCase.class); + @BeforeClass public static void setup() throws Exception { - CONFIG_SERVER.start(); - - String configServerAddress = getConfigServerAddress(CONFIG_SERVER); - String configUrlForODP = constructConfigUrlForODP(configServerAddress); + CONFIG_SERVER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); - CONTAINER.withEnv("OB_CONFIGSERVER_ADDRESS", configServerAddress).start(); + CONTAINER + .withEnv("OB_CONFIGSERVER_ADDRESS", getConfigServerAddress()) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .start(); String password = "test"; createSysUser("proxyro", password); - ODP.withPassword(password).withConfigUrl(configUrlForODP).start(); + ODP.withPassword(password) + .withConfigUrl(getConfigUrlForODP()) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .start(); } @AfterClass @@ -58,7 +66,7 @@ public static void tearDown() { } @Before - public void before() { + public void before() throws Exception { initialize("sql/htable.sql"); } @@ -178,18 +186,4 @@ protected List queryHTable(String tableName, RowConverter rowConverter) throws SQLException { return queryTable(tableName, Arrays.asList("K", "Q", "V"), rowConverter); } - - protected String integer(Integer n) { - if (n == null) { - return "CAST(NULL AS INT)"; - } - return n.toString(); - } - - protected String string(String s) { - if (s == null) { - return "CAST(NULL AS STRING)"; - } - return "'" + s + "'"; - } } diff --git a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java index 1c4854ef..e4329a72 100644 --- a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java +++ b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLTestBase.java @@ -16,12 +16,14 @@ package com.oceanbase.connector.flink; +import com.oceanbase.connector.flink.connection.OceanBaseUserInfo; + import com.github.dockerjava.api.model.ContainerNetwork; +import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.oceanbase.OceanBaseCEContainer; @@ -46,7 +48,7 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase { private static final String SYS_PASSWORD = "123456"; private static final String TEST_PASSWORD = "654321"; - private static final Network NETWORK = Network.newNetwork(); + @ClassRule public static final Network NETWORK = Network.newNetwork(); @SuppressWarnings("resource") public static final GenericContainer CONFIG_SERVER = @@ -56,8 +58,7 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase { .waitingFor( new HttpWaitStrategy() .forPort(CONFIG_SERVER_PORT) - .forPath(CONFIG_URL_PATH)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .forPath(CONFIG_URL_PATH)); public static final OceanBaseCEContainer CONTAINER = new OceanBaseCEContainer("oceanbase/oceanbase-ce:latest") @@ -69,13 +70,10 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase { .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD) .withEnv("OB_DATAFILE_SIZE", "2G") .withEnv("OB_LOG_DISK_SIZE", "4G") - .withStartupTimeout(Duration.ofMinutes(4)) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withStartupTimeout(Duration.ofMinutes(4)); public static final OceanBaseProxyContainer ODP = - new OceanBaseProxyContainer("4.3.1.0-4") - .withNetwork(NETWORK) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + new OceanBaseProxyContainer("4.3.1.0-4").withNetwork(NETWORK); public static String getContainerIP(GenericContainer container) { String ip = @@ -90,13 +88,34 @@ public static String getContainerIP(GenericContainer container) { return ip; } - public static String getConfigServerAddress(GenericContainer container) { - String ip = getContainerIP(container); - return "http://" + ip + ":" + CONFIG_SERVER_PORT; + private static String configServerAddress; + private static String obServerIP; + private static OceanBaseUserInfo userInfo; + + public static String getConfigServerAddress() { + if (configServerAddress == null) { + String ip = getContainerIP(CONFIG_SERVER); + configServerAddress = "http://" + ip + ":" + CONFIG_SERVER_PORT; + } + return configServerAddress; + } + + public static String getOBServerIP() { + if (obServerIP == null) { + obServerIP = getContainerIP(CONTAINER); + } + return obServerIP; + } + + public static OceanBaseUserInfo getUserInfo() { + if (userInfo == null) { + userInfo = OceanBaseUserInfo.parse(CONTAINER.getUsername()); + } + return userInfo; } - public static String constructConfigUrlForODP(String address) { - return address + CONFIG_URL_PATH; + public static String getConfigUrlForODP() { + return getConfigServerAddress() + CONFIG_URL_PATH; } public static Connection getSysJdbcConnection() throws SQLException { diff --git a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java index 72629afb..3063cb59 100644 --- a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java +++ b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java @@ -16,6 +16,8 @@ package com.oceanbase.connector.flink; +import java.io.IOException; +import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; @@ -86,7 +88,7 @@ public Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection(getJdbcUrl(), getUsername(), getPassword()); } - public void initialize(String sqlFile) { + public void initialize(String sqlFile) throws SQLException, IOException, URISyntaxException { final URL file = getClass().getClassLoader().getResource(sqlFile); assertNotNull("Cannot locate " + sqlFile, file); @@ -109,8 +111,6 @@ public void initialize(String sqlFile) { for (String stmt : statements) { statement.execute(stmt); } - } catch (Exception e) { - throw new RuntimeException(e); } } @@ -189,4 +189,18 @@ public void dropTables(String... tableNames) throws SQLException { public interface RowConverter { String convert(ResultSet rs, int columnCount) throws SQLException; } + + public String integer(Integer n) { + if (n == null) { + return "CAST(NULL AS INT)"; + } + return n.toString(); + } + + public String string(String s) { + if (s == null) { + return "CAST(NULL AS STRING)"; + } + return "'" + s + "'"; + } } diff --git a/flink-connector-oceanbase-directload/src/main/java/com/oceanbase/connector/flink/OBDirectLoadConnectorOptions.java b/flink-connector-oceanbase-directload/src/main/java/com/oceanbase/connector/flink/OBDirectLoadConnectorOptions.java index 3f738298..4e33ad93 100644 --- a/flink-connector-oceanbase-directload/src/main/java/com/oceanbase/connector/flink/OBDirectLoadConnectorOptions.java +++ b/flink-connector-oceanbase-directload/src/main/java/com/oceanbase/connector/flink/OBDirectLoadConnectorOptions.java @@ -27,8 +27,9 @@ import java.time.Duration; import java.util.Map; -/** The connector options of {@linkplain flink-connector-oceanbase-directload} module. */ +/** The connector options of flink-connector-oceanbase-directload connector. */ public class OBDirectLoadConnectorOptions implements Serializable { + private static final long serialVersionUID = 1L; public static final ConfigOption HOST = diff --git a/flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/OBKVDirectLoadMySQLModeITCase.java b/flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/OBDirectLoadITCase.java similarity index 91% rename from flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/OBKVDirectLoadMySQLModeITCase.java rename to flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/OBDirectLoadITCase.java index 135b26f2..ea4b9138 100644 --- a/flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/OBKVDirectLoadMySQLModeITCase.java +++ b/flink-connector-oceanbase-directload/src/test/java/com/oceanbase/connector/flink/OBDirectLoadITCase.java @@ -25,24 +25,42 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; import java.util.Arrays; import java.util.List; -public class OBKVDirectLoadMySQLModeITCase extends OceanBaseMySQLTestBase { +public class OBDirectLoadITCase extends OceanBaseMySQLTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OBDirectLoadITCase.class); + @BeforeClass - public static void setup() throws Exception { - CONTAINER.start(); + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); } @AfterClass - public static void tearDown() throws Exception { + public static void tearDown() { CONTAINER.stop(); } + @Before + public void before() throws Exception { + initialize("sql/products.sql"); + } + + @After + public void after() throws Exception { + dropTables("products"); + } + @Test public void testDirectLoadSink() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -50,8 +68,6 @@ public void testDirectLoadSink() throws Exception { env.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - initialize("sql/mysql/products.sql"); - String createTableSql = String.format( "CREATE TEMPORARY TABLE target (" @@ -72,8 +88,8 @@ public void testDirectLoadSink() throws Exception { + ");", getHost(), getRpcPort(), - getUsername().split("@")[0], - getUsername().split("@")[1], + getUserInfo().getUser(), + getUserInfo().getTenant(), getPassword()); tEnv.executeSql(createTableSql); @@ -107,14 +123,10 @@ public void testDirectLoadSink() throws Exception { List actual = queryTable("products"); assertEqualsInAnyOrder(expected, actual); - - dropTables("products"); } @Test public void testMultiNodeDirectLoadSink() throws Exception { - initialize("sql/mysql/products.sql"); - // 1. get DirectLoader and execution id. DirectLoader directLoad = getDirectLoad(); String executionId = directLoad.begin(); @@ -147,8 +159,8 @@ public void testMultiNodeDirectLoadSink() throws Exception { + ");", getHost(), getRpcPort(), - getUsername().split("@")[0], - getUsername().split("@")[1], + getUserInfo().getUser(), + getUserInfo().getTenant(), getPassword(), executionId, true); @@ -187,8 +199,6 @@ public void testMultiNodeDirectLoadSink() throws Exception { List actual = queryTable("products"); assertEqualsInAnyOrder(expected, actual); - - dropTables("products"); } private DirectLoader getDirectLoad() { @@ -203,9 +213,9 @@ private DirectLoader getDirectLoad() { OBDirectLoadConnectorOptions.TABLE_NAME.key(), "products", OBDirectLoadConnectorOptions.USERNAME.key(), - getUsername().split("@")[0], + getUserInfo().getUser(), OBDirectLoadConnectorOptions.TENANT_NAME.key(), - getUsername().split("@")[1], + getUserInfo().getTenant(), OBDirectLoadConnectorOptions.PASSWORD.key(), getPassword()); OBDirectLoadConnectorOptions connectorOptions = new OBDirectLoadConnectorOptions(configMap); diff --git a/flink-connector-oceanbase-directload/src/test/resources/sql/mysql/products.sql b/flink-connector-oceanbase-directload/src/test/resources/sql/products.sql similarity index 100% rename from flink-connector-oceanbase-directload/src/test/resources/sql/mysql/products.sql rename to flink-connector-oceanbase-directload/src/test/resources/sql/products.sql diff --git a/flink-connector-oceanbase-e2e-tests/pom.xml b/flink-connector-oceanbase-e2e-tests/pom.xml new file mode 100644 index 00000000..75acdff6 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/pom.xml @@ -0,0 +1,126 @@ + + + + 4.0.0 + + com.oceanbase + flink-connector-oceanbase-parent + ${revision} + + + flink-connector-oceanbase-e2e-tests + jar + + + + mysql + mysql-connector-java + test + + + com.oceanbase + flink-connector-oceanbase-base + ${project.version} + test + + + com.oceanbase + flink-connector-oceanbase-base + ${project.version} + test-jar + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + none + + + integration-tests + none + + + end-to-end-tests + + test + + integration-test + + + **/*.* + + 1 + false + + ${project.basedir} + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + com.oceanbase + flink-sql-connector-obkv-hbase + ${project.version} + flink-sql-connector-obkv-hbase.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + flink-sql-connector-oceanbase + ${project.version} + flink-sql-connector-oceanbase.jar + jar + ${project.build.directory}/dependencies + + + com.oceanbase + flink-sql-connector-oceanbase-directload + ${project.version} + flink-sql-connector-oceanbase-directload.jar + jar + ${project.build.directory}/dependencies + + + + + + copy-jars + + copy + + process-resources + + + + + + + diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBDirectLoadE2eITCase.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBDirectLoadE2eITCase.java new file mode 100644 index 00000000..9bf1a69d --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBDirectLoadE2eITCase.java @@ -0,0 +1,131 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.connector.flink.tests; + +import com.oceanbase.connector.flink.tests.utils.FlinkContainerTestEnvironment; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class OBDirectLoadE2eITCase extends FlinkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(OBDirectLoadE2eITCase.class); + + private static final String SINK_CONNECTOR_NAME = + "flink-sql-connector-oceanbase-directload.jar"; + + @BeforeClass + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); + } + + @AfterClass + public static void tearDown() { + CONTAINER.stop(); + } + + @Before + public void before() throws Exception { + super.before(); + + initialize("sql/products.sql"); + } + + @After + public void after() throws Exception { + super.after(); + + dropTables("products"); + } + + @Test + public void testInsertValues() throws Exception { + List sqlLines = new ArrayList<>(); + + sqlLines.add("SET 'execution.checkpointing.interval' = '3s';"); + + sqlLines.add("SET 'execution.runtime-mode' = 'batch';"); + + sqlLines.add( + String.format( + "CREATE TEMPORARY TABLE target (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") with (" + + " 'connector' = 'oceanbase-directload'," + + " 'host' = '%s', " + + " 'port' = '%d', " + + " 'username' = '%s', " + + " 'password' = '%s'," + + " 'tenant-name' = '%s', " + + " 'schema-name' = '%s', " + + " 'table-name' = 'products'" + + ");", + getHost(), + getRpcPort(), + getUserInfo().getUser(), + getPassword(), + getUserInfo().getTenant(), + getSchemaName())); + + sqlLines.add( + "INSERT INTO target " + + "VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14)," + + " (102, 'car battery', '12V car battery', 8.1)," + + " (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8)," + + " (104, 'hammer', '12oz carpenter''s hammer', 0.75)," + + " (105, 'hammer', '14oz carpenter''s hammer', 0.875)," + + " (106, 'hammer', '16oz carpenter''s hammer', 1.0)," + + " (107, 'rocks', 'box of assorted rocks', 5.3)," + + " (108, 'jacket', 'water resistent black wind breaker', 0.1)," + + " (109, 'spare tire', '24 inch spare tire', 22.2);"); + + submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME)); + waitUntilJobRunning(Duration.ofSeconds(30)); + + List expected = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000"); + + waitingAndAssertTableCount("products", expected.size()); + + List actual = queryTable("products"); + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBKVHBaseE2eITCase.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBKVHBaseE2eITCase.java new file mode 100644 index 00000000..b2c7f2fe --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OBKVHBaseE2eITCase.java @@ -0,0 +1,141 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.connector.flink.tests; + +import com.oceanbase.connector.flink.tests.utils.FlinkContainerTestEnvironment; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +public class OBKVHBaseE2eITCase extends FlinkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(OBKVHBaseE2eITCase.class); + + private static final String SINK_CONNECTOR_NAME = "flink-sql-connector-obkv-hbase.jar"; + + @BeforeClass + public static void setup() { + CONFIG_SERVER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); + + CONTAINER + .withEnv("OB_CONFIGSERVER_ADDRESS", getConfigServerAddress()) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .start(); + } + + @AfterClass + public static void tearDown() { + Stream.of(CONFIG_SERVER, CONTAINER).forEach(GenericContainer::stop); + } + + @Override + protected String getFlinkDockerImageTag() { + // the hbase packages are not compatible with jdk 11 + return String.format("flink:%s-scala_2.12-java8", flinkVersion); + } + + @Before + public void before() throws Exception { + super.before(); + + initialize("sql/htable.sql"); + } + + @After + public void after() throws Exception { + super.after(); + + dropTables("htable$family1", "htable$family2"); + } + + @Test + public void testInsertValues() throws Exception { + List sqlLines = new ArrayList<>(); + + sqlLines.add("SET 'execution.checkpointing.interval' = '3s';"); + + sqlLines.add( + String.format( + "CREATE TEMPORARY TABLE target (" + + " rowkey STRING," + + " family1 ROW," + + " family2 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") with (" + + " 'connector'='obkv-hbase'," + + " 'url'='%s'," + + " 'sys.username'='%s'," + + " 'sys.password'='%s'," + + " 'username'='%s'," + + " 'password'='%s'," + + " 'schema-name'='%s'," + + " 'table-name'='htable'" + + ");", + getSysParameter("obconfig_url"), + getSysUsername(), + getSysPassword(), + getUsername() + "#" + getClusterName(), + getPassword(), + getSchemaName())); + + sqlLines.add( + String.format( + "INSERT INTO target VALUES " + + "(%s, ROW(%s), ROW(%s, %s)), " + + "(%s, ROW(%s), ROW(%s, %s)), " + + "(%s, ROW(%s), ROW(%s, %s)), " + + "(%s, ROW(%s), ROW(%s, %s));", + string("1"), + integer(1), + string("1"), + integer(1), + string("2"), + integer(null), + string("2"), + integer(null), + string("3"), + integer(3), + string(null), + integer(null), + string("4"), + integer(4), + string("4"), + integer(null))); + + submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME)); + waitUntilJobRunning(Duration.ofSeconds(30)); + + List expected1 = Arrays.asList("1,q1,1", "3,q1,3", "4,q1,4"); + List expected2 = Arrays.asList("1,q2,1", "1,q3,1", "2,q2,2", "4,q2,4"); + + waitingAndAssertTableCount("htable$family1", expected1.size()); + waitingAndAssertTableCount("htable$family2", expected2.size()); + } +} diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OceanBaseE2eITCase.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OceanBaseE2eITCase.java new file mode 100644 index 00000000..42aa513f --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/OceanBaseE2eITCase.java @@ -0,0 +1,121 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.connector.flink.tests; + +import com.oceanbase.connector.flink.tests.utils.FlinkContainerTestEnvironment; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class); + + private static final String SINK_CONNECTOR_NAME = "flink-sql-connector-oceanbase.jar"; + + @BeforeClass + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); + } + + @AfterClass + public static void tearDown() { + CONTAINER.stop(); + } + + @Before + public void before() throws Exception { + super.before(); + + initialize("sql/products.sql"); + } + + @After + public void after() throws Exception { + super.after(); + + dropTables("products"); + } + + @Test + public void testInsertValues() throws Exception { + List sqlLines = new ArrayList<>(); + + sqlLines.add("SET 'execution.checkpointing.interval' = '3s';"); + + sqlLines.add( + String.format( + "CREATE TEMPORARY TABLE target (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") with (" + + " 'connector'='oceanbase'," + + " 'url'='%s'," + + " 'username'='%s'," + + " 'password'='%s'," + + " 'schema-name'='%s'," + + " 'table-name'='products'" + + ");", + getJdbcUrl(), getUsername(), getPassword(), getSchemaName())); + + sqlLines.add( + "INSERT INTO target " + + "VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14)," + + " (102, 'car battery', '12V car battery', 8.1)," + + " (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8)," + + " (104, 'hammer', '12oz carpenter''s hammer', 0.75)," + + " (105, 'hammer', '14oz carpenter''s hammer', 0.875)," + + " (106, 'hammer', '16oz carpenter''s hammer', 1.0)," + + " (107, 'rocks', 'box of assorted rocks', 5.3)," + + " (108, 'jacket', 'water resistent black wind breaker', 0.1)," + + " (109, 'spare tire', '24 inch spare tire', 22.2);"); + + submitSQLJob(sqlLines, getResource(SINK_CONNECTOR_NAME)); + waitUntilJobRunning(Duration.ofSeconds(30)); + + List expected = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000"); + + waitingAndAssertTableCount("products", expected.size()); + + List actual = queryTable("products"); + assertEqualsInAnyOrder(expected, actual); + } +} diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/utils/FlinkContainerTestEnvironment.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/utils/FlinkContainerTestEnvironment.java new file mode 100644 index 00000000..3dd8e8fe --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/tests/utils/FlinkContainerTestEnvironment.java @@ -0,0 +1,288 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.connector.flink.tests.utils; + +import com.oceanbase.connector.flink.OceanBaseMySQLTestBase; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.table.api.ValidationException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkState; + +@RunWith(Parameterized.class) +public abstract class FlinkContainerTestEnvironment extends OceanBaseMySQLTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnvironment.class); + + private static final String MODULE_DIRECTORY = System.getProperty("moduleDir", ""); + + private static final int JOB_MANAGER_REST_PORT = 8081; + private static final String FLINK_BIN = "bin"; + private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; + private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; + + @Parameterized.Parameter public String flinkVersion; + + @Parameterized.Parameters + public static List getFlinkVersion() { + return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0"); + } + + protected String getFlinkDockerImageTag() { + return String.format("flink:%s-scala_2.12", flinkVersion); + } + + public String getFlinkProperties() { + return String.join( + "\n", + Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "execution.checkpointing.interval: 300")); + } + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + public GenericContainer jobManager; + public GenericContainer taskManager; + + @SuppressWarnings("resource") + @Before + public void before() throws Exception { + LOG.info("Starting Flink containers..."); + jobManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withEnv("FLINK_PROPERTIES", getFlinkProperties()) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + taskManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withEnv("FLINK_PROPERTIES", getFlinkProperties()) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + Startables.deepStart(Stream.of(jobManager, taskManager)).join(); + LOG.info("Flink containers started"); + } + + @After + public void after() throws Exception { + if (jobManager != null) { + jobManager.stop(); + } + if (taskManager != null) { + taskManager.stop(); + } + } + + @Override + public String getHost() { + return getOBServerIP(); + } + + @Override + public int getPort() { + return 2881; + } + + @Override + public int getRpcPort() { + return 2882; + } + + /** + * Searches for a resource file matching the given regex in the given directory. This method is + * primarily intended to be used for the initialization of static {@link Path} fields for + * resource file(i.e. jar, config file). if resolvePaths is empty, this method will search file + * under the modules {@code target} directory. if resolvePaths is not empty, this method will + * search file under resolvePaths of current project. + * + * @param resourceNameRegex regex pattern to match against + * @param resolvePaths an array of resolve paths of current project + * @return Path pointing to the matching file + * @throws RuntimeException if none or multiple resource files could be found + */ + public static Path getResource(final String resourceNameRegex, String... resolvePaths) { + Path path = Paths.get(MODULE_DIRECTORY).toAbsolutePath(); + + if (resolvePaths != null && resolvePaths.length > 0) { + path = path.getParent().getParent(); + for (String resolvePath : resolvePaths) { + path = path.resolve(resolvePath); + } + } + + try (Stream dependencyResources = Files.walk(path)) { + final List matchingResources = + dependencyResources + .filter( + jar -> + Pattern.compile(resourceNameRegex) + .matcher(jar.toAbsolutePath().toString()) + .find()) + .collect(Collectors.toList()); + switch (matchingResources.size()) { + case 0: + throw new RuntimeException( + new FileNotFoundException( + String.format( + "No resource file could be found that matches the pattern %s. " + + "This could mean that the test module must be rebuilt via maven.", + resourceNameRegex))); + case 1: + return matchingResources.get(0); + default: + throw new RuntimeException( + new IOException( + String.format( + "Multiple resource files were found matching the pattern %s. Matches=%s", + resourceNameRegex, matchingResources))); + } + } catch (final IOException ioe) { + throw new RuntimeException("Could not search for resource resource files.", ioe); + } + } + + /** + * Submits a SQL job to the running cluster. + * + *

NOTE: You should not use {@code '\t'}. + */ + public void submitSQLJob(List sqlLines, Path... jars) + throws IOException, InterruptedException { + final List commands = new ArrayList<>(); + Path script = temporaryFolder.newFile().toPath(); + Files.write(script, sqlLines); + jobManager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql"); + commands.add("cat /tmp/script.sql | "); + commands.add(FLINK_BIN + "/sql-client.sh"); + for (Path jar : jars) { + commands.add("--jar"); + String containerPath = + copyAndGetContainerPath(jobManager, jar.toAbsolutePath().toString()); + commands.add(containerPath); + } + + Container.ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the SQL job."); + } + } + + private String copyAndGetContainerPath(GenericContainer container, String filePath) { + Path path = Paths.get(filePath); + String containerPath = "/tmp/" + path.getFileName(); + container.copyFileToContainer(MountableFile.forHostPath(path), containerPath); + return containerPath; + } + + /** + * Get {@link RestClusterClient} connected to this FlinkContainer. + * + *

This method lazily initializes the REST client on-demand. + */ + public RestClusterClient getRestClusterClient() { + checkState( + jobManager.isRunning(), + "Cluster client should only be retrieved for a running cluster"); + try { + final Configuration clientConfiguration = new Configuration(); + clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); + clientConfiguration.set( + RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT)); + return new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to create client for Flink container cluster", e); + } + } + + public void waitUntilJobRunning(Duration timeout) { + try (RestClusterClient clusterClient = getRestClusterClient()) { + Deadline deadline = Deadline.fromNow(timeout); + while (deadline.hasTimeLeft()) { + Collection jobStatusMessages; + try { + jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error when fetching job status.", e); + continue; + } + if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { + JobStatusMessage message = jobStatusMessages.iterator().next(); + JobStatus jobStatus = message.getJobState(); + if (jobStatus.isTerminalState()) { + throw new ValidationException( + String.format( + "Job has been terminated! JobName: %s, JobID: %s, Status: %s", + message.getJobName(), + message.getJobId(), + message.getJobState())); + } else if (jobStatus == JobStatus.RUNNING) { + return; + } + } + } + } + } +} diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000..68c8b84a --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,21 @@ +# Copyright 2024 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql new file mode 100644 index 00000000..661779d5 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/htable.sql @@ -0,0 +1,30 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE `htable$family1` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); + +CREATE TABLE `htable$family2` +( + `K` varbinary(1024) NOT NULL, + `Q` varbinary(256) NOT NULL, + `T` bigint(20) NOT NULL, + `V` varbinary(1024) DEFAULT NULL, + PRIMARY KEY (`K`, `Q`, `T`) +); diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/products.sql b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/products.sql new file mode 100644 index 00000000..e5b318c5 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/products.sql @@ -0,0 +1,20 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight DECIMAL(20, 10) +); diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java index 50f5c38a..af43ab3f 100644 --- a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseMySQLConnectorITCase.java @@ -45,6 +45,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; import java.util.Arrays; import java.util.Collections; @@ -56,13 +59,15 @@ public class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLConnectorITCase.class); + @BeforeClass - public static void setup() throws Exception { - CONTAINER.start(); + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); } @AfterClass - public static void tearDown() throws Exception { + public static void tearDown() { CONTAINER.stop(); } diff --git a/flink-sql-connector-obkv-hbase/pom.xml b/flink-sql-connector-obkv-hbase/pom.xml index 124c5572..ea562c18 100644 --- a/flink-sql-connector-obkv-hbase/pom.xml +++ b/flink-sql-connector-obkv-hbase/pom.xml @@ -74,20 +74,21 @@ under the License. commons-logging:commons-logging org.apache.hadoop:hadoop-core org.apache.hbase:* + com.lmax:disruptor com.google - com.oceanbase.connector.flink.shaded.com.google + ${shaded.prefix}.com.google - io.netty - com.oceanbase.connector.flink.shaded.io.netty + org.apache.hadoop + ${shaded.prefix}.org.apache.hadoop - org.apache.hadoop - com.oceanbase.connector.flink.shaded.org.apache.hadoop + io.netty + ${shaded.prefix}.io.netty diff --git a/flink-sql-connector-oceanbase-directload/pom.xml b/flink-sql-connector-oceanbase-directload/pom.xml index 03314cda..80eb6b42 100644 --- a/flink-sql-connector-oceanbase-directload/pom.xml +++ b/flink-sql-connector-oceanbase-directload/pom.xml @@ -70,16 +70,17 @@ under the License. com.google.guava:guava commons-lang:commons-lang io.netty:* + com.lmax:disruptor com.google - com.oceanbase.connector.flink.shaded.com.google + ${shaded.prefix}.com.google io.netty - com.oceanbase.connector.flink.shaded.io.netty + ${shaded.prefix}.io.netty diff --git a/flink-sql-connector-oceanbase/pom.xml b/flink-sql-connector-oceanbase/pom.xml index d23ef496..5de1a38c 100644 --- a/flink-sql-connector-oceanbase/pom.xml +++ b/flink-sql-connector-oceanbase/pom.xml @@ -65,24 +65,9 @@ under the License. com.oceanbase:* com.alibaba:* - com.alipay.sofa:bolt - com.alipay.sofa.common:sofa-common-tools - com.google.guava:guava - commons-lang:commons-lang mysql:mysql-connector-java - io.netty:* - - - com.google - com.oceanbase.connector.flink.shaded.com.google - - - io.netty - com.oceanbase.connector.flink.shaded.io.netty - - diff --git a/pom.xml b/pom.xml index 69bf7a2d..5d06112b 100644 --- a/pom.xml +++ b/pom.xml @@ -18,9 +18,9 @@ under the License. 4.0.0 - io.github.zentol.flink + org.apache.flink flink-connector-parent - 1.0 + 1.1.0 com.oceanbase @@ -36,15 +36,14 @@ under the License. flink-sql-connector-obkv-hbase flink-sql-connector-oceanbase flink-sql-connector-oceanbase-directload + flink-connector-oceanbase-e2e-tests 1.4-SNAPSHOT - 8 - 8 - UTF-8 1.18.0 2.12 + com.oceanbase.connector.flink.shaded @@ -143,7 +142,6 @@ under the License. com.lmax disruptor 3.4.2 - true org.apache.flink @@ -301,6 +299,10 @@ under the License. + + org.apache.maven.plugins + maven-surefire-plugin + org.apache.maven.plugins maven-shade-plugin From 456d6610f3244c2c4e83cb19074ba0b91c9ae9a2 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 6 Nov 2024 00:27:56 +0800 Subject: [PATCH 2/2] modify disruptor --- flink-connector-obkv-hbase/pom.xml | 5 +++++ flink-connector-oceanbase-directload/pom.xml | 5 +++++ pom.xml | 10 +++++----- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-connector-obkv-hbase/pom.xml b/flink-connector-obkv-hbase/pom.xml index 69e139aa..28924eea 100644 --- a/flink-connector-obkv-hbase/pom.xml +++ b/flink-connector-obkv-hbase/pom.xml @@ -36,6 +36,11 @@ under the License. obkv-hbase-client + + com.lmax + disruptor + + com.oceanbase flink-connector-oceanbase-base diff --git a/flink-connector-oceanbase-directload/pom.xml b/flink-connector-oceanbase-directload/pom.xml index 7d0a2e34..f86ffbaf 100644 --- a/flink-connector-oceanbase-directload/pom.xml +++ b/flink-connector-oceanbase-directload/pom.xml @@ -30,6 +30,11 @@ under the License. obkv-table-client + + com.lmax + disruptor + + com.oceanbase flink-connector-oceanbase-base diff --git a/pom.xml b/pom.xml index 5d06112b..5d323489 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,11 @@ under the License. + + com.lmax + disruptor + 3.4.2 + @@ -138,11 +143,6 @@ under the License. ${flink.version} provided - - com.lmax - disruptor - 3.4.2 - org.apache.flink flink-test-utils