-
运行环境:Windows 该表空间在PDB容器中,按照oracle connector官方文档开启了Oracle的日志归档功能。 import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import oracle.jdbc.OracleDriver;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.DriverManager;
import java.util.*;
public class Main {
static final String ORACLE_HOST = "";
static final int ORACLE_PORT = 1521;
static final String ORACLE_USER = "";
static final String ORACLE_PWD = "";
static final String ORACLE_SERVICE_NAME = "";
static final String[] ORACLE_SCHEMAS = {};
static final String[] ORACLE_TABLES = {};
public static void main(String[] args) throws Exception {
DriverManager.registerDriver(new OracleDriver());
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("debezium.database.tablename.case.insensitive", "false");
debeziumProperties.setProperty("debezium.log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("debezium.log.mining.continuous.mine", "true");
debeziumProperties.setProperty("scan.startup.mode", "latest-offset");
// 告警监听不到的表
debeziumProperties.setProperty("inconsistent.schema.handling.mode", "warn");
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname(ORACLE_HOST)
.port(ORACLE_PORT)
.username(ORACLE_USER)
.password(ORACLE_PWD)
// Database name of the Oracle server to monitor.
.database(ORACLE_SERVICE_NAME)
.schemaList(ORACLE_SCHEMAS)
.tableList(ORACLE_TABLES)
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.debeziumProperties(debeziumProperties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).setParallelism(1)
.print();
env.execute("oracleTest");
}
}
|
Beta Was this translation helpful? Give feedback.
Answered by
lizhe6132
Mar 2, 2023
Replies: 1 comment 3 replies
-
CDB+PDB模式必须设置database.dbname和database.pdb.name两个属性:database.dbname是CDB的名字和database.pdb.name是pdb的名字 |
Beta Was this translation helpful? Give feedback.
3 replies
Answer selected by
starrella
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
CDB+PDB模式必须设置database.dbname和database.pdb.name两个属性:database.dbname是CDB的名字和database.pdb.name是pdb的名字