Skip to content

Commit

Permalink
test: add e2e test module
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Nov 5, 2024
1 parent c16098b commit b7dd04a
Show file tree
Hide file tree
Showing 21 changed files with 934 additions and 104 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ jobs:
uses: ./.github/workflows/test.yml
with:
module: flink-connector-oceanbase-directload

flink-connector-oceanbase-e2e-tests:
uses: ./.github/workflows/test.yml
with:
module: flink-connector-oceanbase-e2e-tests
10 changes: 10 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ jobs:
name: Test
runs-on: ubuntu-latest
steps:
- name: Free disk space on Ubuntu runner
uses: kfir4444/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: true
swap-storage: true

- name: Check out repository code
uses: actions/checkout@v4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,17 @@

import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class OBKVHBaseConnectorITCase extends OceanBaseMySQLTestBase {

@BeforeClass
public static void setup() throws Exception {
CONFIG_SERVER.start();

String configServerAddress = getConfigServerAddress(CONFIG_SERVER);
String configUrlForODP = constructConfigUrlForODP(configServerAddress);

CONTAINER.withEnv("OB_CONFIGSERVER_ADDRESS", configServerAddress).start();

String password = "test";
createSysUser("proxyro", password);

ODP.withPassword(password).withConfigUrl(configUrlForODP).start();
}

@AfterClass
public static void tearDown() {
Stream.of(CONFIG_SERVER, CONTAINER, ODP).forEach(GenericContainer::stop);
}

@Before
public void before() {
initialize("sql/htable.sql");
Expand Down Expand Up @@ -88,12 +64,18 @@ public void testSink() throws Exception {

@Test
public void testSinkWithODP() throws Exception {
startODP();

Map<String, String> options = getOptions();
options.put("odp-mode", "true");
options.put("odp-ip", ODP.getHost());
options.put("odp-port", String.valueOf(ODP.getRpcPort()));

testSinkToHTable(options);
try {
testSinkToHTable(options);
} finally {
stopODP();
}
}

private void testSinkToHTable(Map<String, String> options) throws Exception {
Expand Down Expand Up @@ -178,18 +160,4 @@ protected List<String> queryHTable(String tableName, RowConverter rowConverter)
throws SQLException {
return queryTable(tableName, Arrays.asList("K", "Q", "V"), rowConverter);
}

protected String integer(Integer n) {
if (n == null) {
return "CAST(NULL AS INT)";
}
return n.toString();
}

protected String string(String s) {
if (s == null) {
return "CAST(NULL AS STRING)";
}
return "'" + s + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.OceanBaseUserInfo;

import com.github.dockerjava.api.model.ContainerNetwork;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -31,6 +36,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.stream.Stream;

public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {

Expand All @@ -46,7 +52,7 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
private static final String SYS_PASSWORD = "123456";
private static final String TEST_PASSWORD = "654321";

private static final Network NETWORK = Network.newNetwork();
@ClassRule public static final Network NETWORK = Network.newNetwork();

@SuppressWarnings("resource")
public static final GenericContainer<?> CONFIG_SERVER =
Expand Down Expand Up @@ -77,6 +83,30 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
.withNetwork(NETWORK)
.withLogConsumer(new Slf4jLogConsumer(LOG));

@BeforeClass
public static void setup() throws Exception {
CONFIG_SERVER.start();

CONTAINER.withEnv("OB_CONFIGSERVER_ADDRESS", getConfigServerAddress()).start();

createSysUser("proxyro", SYS_PASSWORD);
}

@AfterClass
public static void tearDown() {
Stream.of(CONFIG_SERVER, CONTAINER).forEach(GenericContainer::stop);
}

public static void startODP() {
String url = getConfigServerAddress() + CONFIG_URL_PATH;
LOG.info("Try to start ODP with config url: {}", url);
ODP.withConfigUrl(url).withPassword(SYS_PASSWORD).start();
}

public static void stopODP() {
ODP.stop();
}

public static String getContainerIP(GenericContainer<?> container) {
String ip =
container.getContainerInfo().getNetworkSettings().getNetworks().values().stream()
Expand All @@ -90,13 +120,30 @@ public static String getContainerIP(GenericContainer<?> container) {
return ip;
}

public static String getConfigServerAddress(GenericContainer<?> container) {
String ip = getContainerIP(container);
return "http://" + ip + ":" + CONFIG_SERVER_PORT;
private static String configServerAddress;
private static String obServerIP;
private static OceanBaseUserInfo userInfo;

public static String getConfigServerAddress() {
if (configServerAddress == null) {
String ip = getContainerIP(CONFIG_SERVER);
configServerAddress = "http://" + ip + ":" + CONFIG_SERVER_PORT;
}
return configServerAddress;
}

public static String getOBServerIP() {
if (obServerIP == null) {
obServerIP = getContainerIP(CONTAINER);
}
return obServerIP;
}

public static String constructConfigUrlForODP(String address) {
return address + CONFIG_URL_PATH;
public static OceanBaseUserInfo getUserInfo() {
if (userInfo == null) {
userInfo = OceanBaseUserInfo.parse(CONTAINER.getUsername());
}
return userInfo;
}

public static Connection getSysJdbcConnection() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,18 @@ public void dropTables(String... tableNames) throws SQLException {
public interface RowConverter {
String convert(ResultSet rs, int columnCount) throws SQLException;
}

public String integer(Integer n) {
if (n == null) {
return "CAST(NULL AS INT)";
}
return n.toString();
}

public String string(String s) {
if (s == null) {
return "CAST(NULL AS STRING)";
}
return "'" + s + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import java.time.Duration;
import java.util.Map;

/** The connector options of {@linkplain flink-connector-oceanbase-directload} module. */
/** The connector options of flink-connector-oceanbase-directload connector. */
public class OBDirectLoadConnectorOptions implements Serializable {

private static final long serialVersionUID = 1L;

public static final ConfigOption<String> HOST =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class OBKVDirectLoadMySQLModeITCase extends OceanBaseMySQLTestBase {
@BeforeClass
public static void setup() throws Exception {
CONTAINER.start();
public class OBDirectLoadITCase extends OceanBaseMySQLTestBase {

@Before
public void before() throws Exception {
initialize("sql/products.sql");
}

@AfterClass
public static void tearDown() throws Exception {
CONTAINER.stop();
@After
public void after() throws Exception {
dropTables("products");
}

@Test
Expand All @@ -50,8 +51,6 @@ public void testDirectLoadSink() throws Exception {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

initialize("sql/mysql/products.sql");

String createTableSql =
String.format(
"CREATE TEMPORARY TABLE target ("
Expand All @@ -72,8 +71,8 @@ public void testDirectLoadSink() throws Exception {
+ ");",
getHost(),
getRpcPort(),
getUsername().split("@")[0],
getUsername().split("@")[1],
getUserInfo().getUser(),
getUserInfo().getTenant(),
getPassword());
tEnv.executeSql(createTableSql);

Expand Down Expand Up @@ -107,14 +106,10 @@ public void testDirectLoadSink() throws Exception {
List<String> actual = queryTable("products");

assertEqualsInAnyOrder(expected, actual);

dropTables("products");
}

@Test
public void testMultiNodeDirectLoadSink() throws Exception {
initialize("sql/mysql/products.sql");

// 1. get DirectLoader and execution id.
DirectLoader directLoad = getDirectLoad();
String executionId = directLoad.begin();
Expand Down Expand Up @@ -147,8 +142,8 @@ public void testMultiNodeDirectLoadSink() throws Exception {
+ ");",
getHost(),
getRpcPort(),
getUsername().split("@")[0],
getUsername().split("@")[1],
getUserInfo().getUser(),
getUserInfo().getTenant(),
getPassword(),
executionId,
true);
Expand Down Expand Up @@ -187,8 +182,6 @@ public void testMultiNodeDirectLoadSink() throws Exception {
List<String> actual = queryTable("products");

assertEqualsInAnyOrder(expected, actual);

dropTables("products");
}

private DirectLoader getDirectLoad() {
Expand All @@ -203,9 +196,9 @@ private DirectLoader getDirectLoad() {
OBDirectLoadConnectorOptions.TABLE_NAME.key(),
"products",
OBDirectLoadConnectorOptions.USERNAME.key(),
getUsername().split("@")[0],
getUserInfo().getUser(),
OBDirectLoadConnectorOptions.TENANT_NAME.key(),
getUsername().split("@")[1],
getUserInfo().getTenant(),
OBDirectLoadConnectorOptions.PASSWORD.key(),
getPassword());
OBDirectLoadConnectorOptions connectorOptions = new OBDirectLoadConnectorOptions(configMap);
Expand Down
Loading

0 comments on commit b7dd04a

Please sign in to comment.