Skip to content

Commit

Permalink
test: add e2e test module
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Nov 5, 2024
1 parent c16098b commit ff1c481
Show file tree
Hide file tree
Showing 21 changed files with 1,013 additions and 88 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ jobs:
uses: ./.github/workflows/test.yml
with:
module: flink-connector-oceanbase-directload

flink-connector-oceanbase-e2e-tests:
uses: ./.github/workflows/test.yml
with:
module: flink-connector-oceanbase-e2e-tests
10 changes: 10 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ jobs:
name: Test
runs-on: ubuntu-latest
steps:
- name: Free disk space on Ubuntu runner
uses: kfir4444/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: true
swap-storage: true

- name: Check out repository code
uses: actions/checkout@v4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;

import java.sql.SQLException;
import java.util.Arrays;
Expand All @@ -37,19 +40,24 @@

public class OBKVHBaseConnectorITCase extends OceanBaseMySQLTestBase {

private static final Logger LOG = LoggerFactory.getLogger(OBKVHBaseConnectorITCase.class);

@BeforeClass
public static void setup() throws Exception {
CONFIG_SERVER.start();

String configServerAddress = getConfigServerAddress(CONFIG_SERVER);
String configUrlForODP = constructConfigUrlForODP(configServerAddress);
CONFIG_SERVER.withLogConsumer(new Slf4jLogConsumer(LOG)).start();

CONTAINER.withEnv("OB_CONFIGSERVER_ADDRESS", configServerAddress).start();
CONTAINER
.withEnv("OB_CONFIGSERVER_ADDRESS", getConfigServerAddress())
.withLogConsumer(new Slf4jLogConsumer(LOG))
.start();

String password = "test";
createSysUser("proxyro", password);

ODP.withPassword(password).withConfigUrl(configUrlForODP).start();
ODP.withPassword(password)
.withConfigUrl(getConfigUrlForODP())
.withLogConsumer(new Slf4jLogConsumer(LOG))
.start();
}

@AfterClass
Expand All @@ -58,7 +66,7 @@ public static void tearDown() {
}

@Before
public void before() {
public void before() throws Exception {
initialize("sql/htable.sql");
}

Expand Down Expand Up @@ -178,18 +186,4 @@ protected List<String> queryHTable(String tableName, RowConverter rowConverter)
throws SQLException {
return queryTable(tableName, Arrays.asList("K", "Q", "V"), rowConverter);
}

protected String integer(Integer n) {
if (n == null) {
return "CAST(NULL AS INT)";
}
return n.toString();
}

protected String string(String s) {
if (s == null) {
return "CAST(NULL AS STRING)";
}
return "'" + s + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.OceanBaseUserInfo;

import com.github.dockerjava.api.model.ContainerNetwork;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.oceanbase.OceanBaseCEContainer;

Expand All @@ -46,7 +48,7 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
private static final String SYS_PASSWORD = "123456";
private static final String TEST_PASSWORD = "654321";

private static final Network NETWORK = Network.newNetwork();
@ClassRule public static final Network NETWORK = Network.newNetwork();

@SuppressWarnings("resource")
public static final GenericContainer<?> CONFIG_SERVER =
Expand All @@ -56,8 +58,7 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
.waitingFor(
new HttpWaitStrategy()
.forPort(CONFIG_SERVER_PORT)
.forPath(CONFIG_URL_PATH))
.withLogConsumer(new Slf4jLogConsumer(LOG));
.forPath(CONFIG_URL_PATH));

public static final OceanBaseCEContainer CONTAINER =
new OceanBaseCEContainer("oceanbase/oceanbase-ce:latest")
Expand All @@ -69,13 +70,10 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
.withEnv("OB_SYS_PASSWORD", SYS_PASSWORD)
.withEnv("OB_DATAFILE_SIZE", "2G")
.withEnv("OB_LOG_DISK_SIZE", "4G")
.withStartupTimeout(Duration.ofMinutes(4))
.withLogConsumer(new Slf4jLogConsumer(LOG));
.withStartupTimeout(Duration.ofMinutes(4));

public static final OceanBaseProxyContainer ODP =
new OceanBaseProxyContainer("4.3.1.0-4")
.withNetwork(NETWORK)
.withLogConsumer(new Slf4jLogConsumer(LOG));
new OceanBaseProxyContainer("4.3.1.0-4").withNetwork(NETWORK);

public static String getContainerIP(GenericContainer<?> container) {
String ip =
Expand All @@ -90,13 +88,34 @@ public static String getContainerIP(GenericContainer<?> container) {
return ip;
}

public static String getConfigServerAddress(GenericContainer<?> container) {
String ip = getContainerIP(container);
return "http://" + ip + ":" + CONFIG_SERVER_PORT;
private static String configServerAddress;
private static String obServerIP;
private static OceanBaseUserInfo userInfo;

public static String getConfigServerAddress() {
if (configServerAddress == null) {
String ip = getContainerIP(CONFIG_SERVER);
configServerAddress = "http://" + ip + ":" + CONFIG_SERVER_PORT;
}
return configServerAddress;
}

public static String getOBServerIP() {
if (obServerIP == null) {
obServerIP = getContainerIP(CONTAINER);
}
return obServerIP;
}

public static OceanBaseUserInfo getUserInfo() {
if (userInfo == null) {
userInfo = OceanBaseUserInfo.parse(CONTAINER.getUsername());
}
return userInfo;
}

public static String constructConfigUrlForODP(String address) {
return address + CONFIG_URL_PATH;
public static String getConfigUrlForODP() {
return getConfigServerAddress() + CONFIG_URL_PATH;
}

public static Connection getSysJdbcConnection() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.oceanbase.connector.flink;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -86,7 +88,7 @@ public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(getJdbcUrl(), getUsername(), getPassword());
}

public void initialize(String sqlFile) {
public void initialize(String sqlFile) throws SQLException, IOException, URISyntaxException {
final URL file = getClass().getClassLoader().getResource(sqlFile);
assertNotNull("Cannot locate " + sqlFile, file);

Expand All @@ -109,8 +111,6 @@ public void initialize(String sqlFile) {
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -189,4 +189,18 @@ public void dropTables(String... tableNames) throws SQLException {
public interface RowConverter {
String convert(ResultSet rs, int columnCount) throws SQLException;
}

public String integer(Integer n) {
if (n == null) {
return "CAST(NULL AS INT)";
}
return n.toString();
}

public String string(String s) {
if (s == null) {
return "CAST(NULL AS STRING)";
}
return "'" + s + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import java.time.Duration;
import java.util.Map;

/** The connector options of {@linkplain flink-connector-oceanbase-directload} module. */
/** The connector options of flink-connector-oceanbase-directload connector. */
public class OBDirectLoadConnectorOptions implements Serializable {

private static final long serialVersionUID = 1L;

public static final ConfigOption<String> HOST =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,49 @@

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;

import java.util.Arrays;
import java.util.List;

public class OBKVDirectLoadMySQLModeITCase extends OceanBaseMySQLTestBase {
public class OBDirectLoadITCase extends OceanBaseMySQLTestBase {

private static final Logger LOG = LoggerFactory.getLogger(OBDirectLoadITCase.class);

@BeforeClass
public static void setup() throws Exception {
CONTAINER.start();
public static void setup() {
CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start();
}

@AfterClass
public static void tearDown() throws Exception {
public static void tearDown() {
CONTAINER.stop();
}

@Before
public void before() throws Exception {
initialize("sql/products.sql");
}

@After
public void after() throws Exception {
dropTables("products");
}

@Test
public void testDirectLoadSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

initialize("sql/mysql/products.sql");

String createTableSql =
String.format(
"CREATE TEMPORARY TABLE target ("
Expand All @@ -72,8 +88,8 @@ public void testDirectLoadSink() throws Exception {
+ ");",
getHost(),
getRpcPort(),
getUsername().split("@")[0],
getUsername().split("@")[1],
getUserInfo().getUser(),
getUserInfo().getTenant(),
getPassword());
tEnv.executeSql(createTableSql);

Expand Down Expand Up @@ -107,14 +123,10 @@ public void testDirectLoadSink() throws Exception {
List<String> actual = queryTable("products");

assertEqualsInAnyOrder(expected, actual);

dropTables("products");
}

@Test
public void testMultiNodeDirectLoadSink() throws Exception {
initialize("sql/mysql/products.sql");

// 1. get DirectLoader and execution id.
DirectLoader directLoad = getDirectLoad();
String executionId = directLoad.begin();
Expand Down Expand Up @@ -147,8 +159,8 @@ public void testMultiNodeDirectLoadSink() throws Exception {
+ ");",
getHost(),
getRpcPort(),
getUsername().split("@")[0],
getUsername().split("@")[1],
getUserInfo().getUser(),
getUserInfo().getTenant(),
getPassword(),
executionId,
true);
Expand Down Expand Up @@ -187,8 +199,6 @@ public void testMultiNodeDirectLoadSink() throws Exception {
List<String> actual = queryTable("products");

assertEqualsInAnyOrder(expected, actual);

dropTables("products");
}

private DirectLoader getDirectLoad() {
Expand All @@ -203,9 +213,9 @@ private DirectLoader getDirectLoad() {
OBDirectLoadConnectorOptions.TABLE_NAME.key(),
"products",
OBDirectLoadConnectorOptions.USERNAME.key(),
getUsername().split("@")[0],
getUserInfo().getUser(),
OBDirectLoadConnectorOptions.TENANT_NAME.key(),
getUsername().split("@")[1],
getUserInfo().getTenant(),
OBDirectLoadConnectorOptions.PASSWORD.key(),
getPassword());
OBDirectLoadConnectorOptions connectorOptions = new OBDirectLoadConnectorOptions(configMap);
Expand Down
Loading

0 comments on commit ff1c481

Please sign in to comment.