From 41f0d0dffe1a50affed7aee6b93e65616b53ce2c Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Wed, 12 Dec 2018 21:42:35 +0800 Subject: [PATCH 01/12] support kafka09 sink --- sylph-connectors/sylph-kafka09/build.gradle | 4 +- .../plugins/kafka/flink/KafkaSink09.java | 103 ++++++++++++++++ .../plugins/kafka/flink/TestSource09.java | 116 ++++++++++++++++++ .../plugins/kafka/flink/utils/IProducer.java | 6 + .../kafka/flink/utils/KafkaProducer.java | 94 ++++++++++++++ .../kafka/flink/utils/SimplePartitioner.java | 31 +++++ 6 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java create mode 100644 sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java create mode 100644 sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java create mode 100644 sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java create mode 100644 sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java diff --git a/sylph-connectors/sylph-kafka09/build.gradle b/sylph-connectors/sylph-kafka09/build.gradle index b3a0879db..b7eac4a21 100644 --- a/sylph-connectors/sylph-kafka09/build.gradle +++ b/sylph-connectors/sylph-kafka09/build.gradle @@ -5,6 +5,8 @@ dependencies { //--table sql--- compileOnly group: 'org.apache.flink', name: 'flink-table_2.11', version: deps.flink - compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: deps.flink + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.9.0.1' + compile group: 'org.apache.curator', name: 'curator-framework', version: '2.12.0' + compile group: 'com.google.code.gson', name: 'gson', version: '2.2.4' } diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java new file mode 100644 index 000000000..237d3f651 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink; + +import com.google.gson.Gson; +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; +import ideal.sylph.plugins.kafka.flink.utils.KafkaProducer; +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState; + + +@Name("kafka09") +@Description("this is kafka09 Sink plugin") +public class KafkaSink09 + implements RealTimeSink { + + private static final Logger logger = LoggerFactory.getLogger(KafkaSink09.class); + private final Kafka09SinkConfig config; + private final Row.Schema schema; + private int idIndex = -1; + private KafkaProducer kafkaProducer; + private final String topic; + public KafkaSink09(SinkContext context, Kafka09SinkConfig config) { + + schema = context.getSchema(); + if (!Strings.isNullOrEmpty(config.idField)) { + int fieldIndex = schema.getFieldIndex(config.idField); + checkState(fieldIndex != -1, config.idField + " does not exist, only " + schema.getFields()); + this.idIndex = fieldIndex; + } + this.config = config; + this.topic=config.topics; + + } + + @Override + public void process(Row value) { + + Gson gson = new Gson(); + Map map = new HashMap<>(); + for (String fieldName : schema.getFieldNames()) { + map.put(fieldName, value.getAs(fieldName)); + } + String message = gson.toJson(map); + kafkaProducer.send(message); + } + + @Override + public boolean open(long partitionId, long version) throws Exception { + //config.zookeeper,config.brokers 至少一个 暂时 zookeeper + this.kafkaProducer = new KafkaProducer(config.zookeeper,config.topics); + return true; + } + + @Override + public void close(Throwable errorOrNull) { + kafkaProducer.close(); + } + + public static class Kafka09SinkConfig extends PluginConfig + { + private static final long serialVersionUID = 2L; + @Name("kafka_topic") + @Description("this is kafka topic list") + private String topics; + + @Name("kafka_broker") + @Description("this is kafka broker list") + private String brokers = "localhost:6667"; + + @Name("zookeeper.connect") + @Description("this is kafka zk list") + private String zookeeper; + + @Name("id_field") + @Description("this is kafka id_field") + private String idField; + } +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java new file mode 100644 index 000000000..a084d4389 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink; + +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.annotation.Version; +import ideal.sylph.etl.api.Source; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; +import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.types.Row; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * test source + **/ +@Name("test09") +@Description("this flink test source inputStream") +@Version("1.0.0") +public class TestSource09 + implements Source> +{ + private static final long serialVersionUID = 2L; + + private final transient Supplier> loadStream; + + public TestSource09(StreamExecutionEnvironment execEnv) + { + this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource())); + } + + @Override + public DataStream getSource() + { + return loadStream.get(); + } + + public static class MyDataSource + extends RichParallelSourceFunction + implements ResultTypeQueryable + { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private volatile boolean running = true; + + @Override + public void run(SourceContext sourceContext) + throws Exception + { + Random random = new Random(); + int numKeys = 10; + long count = 1L; + while (running) { + long eventTime = System.currentTimeMillis() - random.nextInt(10 * 10); //表示数据已经产生了 但是会有10秒以内的延迟 + String userId = "uid_" + count; + + String msg = MAPPER.writeValueAsString(ImmutableMap.of("user_id", userId, "ip", "127.0.0.1", "store", 12.0)); + Row row = Row.of("key" + random.nextInt(10), msg, eventTime); + sourceContext.collect(row); + count = count > numKeys ? 1L : count + 1; + TimeUnit.MILLISECONDS.sleep(100); + } + } + + @Override + public TypeInformation getProducedType() + { + TypeInformation[] types = new TypeInformation[] { + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(long.class) + }; + + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "event_time"}); + //createTypeInformation[Row] + return rowTypeInfo; + } + + @Override + public void cancel() + { + running = false; + } + + @Override + public void close() + throws Exception + { + this.cancel(); + super.close(); + } + } +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java new file mode 100644 index 000000000..062b79425 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java @@ -0,0 +1,6 @@ +package ideal.sylph.plugins.kafka.flink.utils; + +public interface IProducer { + void send(String message); + void close(); +} \ No newline at end of file diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java new file mode 100644 index 000000000..ba4ebc49d --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java @@ -0,0 +1,94 @@ +package ideal.sylph.plugins.kafka.flink.utils; + +import com.google.common.base.Joiner; +import com.google.gson.Gson; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class KafkaProducer implements IProducer { + private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); + + private String brokersString; + private String topic; + private String partitionKey = ""; + private org.apache.kafka.clients.producer.KafkaProducer producer; + + + public KafkaProducer(String zkConnect, String topic) { + this.topic = topic; + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework client = CuratorFrameworkFactory.newClient(zkConnect, retryPolicy); + client.start(); + + // Get the current kafka brokers and its IDs from ZK + List ids = Collections.emptyList(); + List hosts = new ArrayList<>(); + + try { + ids = client.getChildren().forPath("/brokers/ids"); + } catch (Exception ex) { + log.error("Couldn't get brokers ids", ex); + } + + // Get the host and port from each of the brokers + for (String id : ids) { + String jsonString = null; + + try { + jsonString = new String(client.getData().forPath("/brokers/ids/" + id), "UTF-8"); + } catch (Exception ex) { + log.error("Couldn't parse brokers data", ex); + } + + if (jsonString != null) { + try { + Gson gson = new Gson(); + Map json = gson.fromJson(jsonString, Map.class); + Double port = (Double) json.get("port"); + String host = json.get("host") + ":" + port.intValue(); + hosts.add(host); + } catch (NullPointerException e) { + log.error("Failed converting a JSON tuple to a Map class", e); + } + } + } + + // Close the zookeeper connection + client.close(); + + brokersString = Joiner.on(',').join(hosts); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.RETRIES_CONFIG, "60"); + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "10000"); + props.put(ProducerConfig.LINGER_MS_CONFIG, "500"); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "ideal.sylph.plugins.kafka.flink.utils.SimplePartitioner"); + + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + } + + @Override + public void close() { + producer.close(); + } + + @Override + public void send(String message) { + ProducerRecord record = new ProducerRecord<>(topic, message); + producer.send(record); + } +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java new file mode 100644 index 000000000..13d54bbd3 --- /dev/null +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java @@ -0,0 +1,31 @@ +package ideal.sylph.plugins.kafka.flink.utils; + + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + +public class SimplePartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + if(key != null) { + String stringKey = key.toString(); + int offset = stringKey.hashCode(); + return Math.abs(offset % cluster.partitionCountForTopic(topic)); + } else { + return 0; + } + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } +} From 785d2ee743924950c9a77b0f6d1f0ef53ad3ff05 Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Wed, 12 Dec 2018 23:12:20 +0800 Subject: [PATCH 02/12] support clickhouse sink --- settings.gradle | 4 ++-- sylph-connectors/sylph-clickhouse/build.gradle | 16 ++++++++++++++++ .../sylph/plugins/clickhouse/ClickHouseSink.java | 7 +++++++ 3 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 sylph-connectors/sylph-clickhouse/build.gradle create mode 100644 sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java diff --git a/settings.gradle b/settings.gradle index e1f5d5a39..857fadf31 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,11 +28,11 @@ include 'sylph-connectors:sylph-hdfs' include 'sylph-connectors:sylph-kafka09' include 'sylph-connectors:sylph-hbase' include 'sylph-connectors:sylph-elasticsearch6' - +include 'sylph-connectors:sylph-clickhouse' //---- include 'sylph-dist' include 'sylph-parser' include 'sylph-docs' include 'sylph-yarn' - +//include 'sylph-clickhouse' diff --git a/sylph-connectors/sylph-clickhouse/build.gradle b/sylph-connectors/sylph-clickhouse/build.gradle new file mode 100644 index 000000000..db77423d7 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/build.gradle @@ -0,0 +1,16 @@ +plugins { + id 'java' +} + +group 'ideal' +version '0.4.0-SNAPSHOT' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +dependencies { + testCompile group: 'junit', name: 'junit', version: '4.12' +} diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java new file mode 100644 index 000000000..21ea16cf2 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -0,0 +1,7 @@ +package ideal.sylph.plugins.clickhouse; + +public class ClickHouseSink { + public static void main(String[] args) { + System.err.println("kkkkkkk"); + } +} From 2c3ebbd8e463ca6ff51fa15bb09aa0b2d190bc0e Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Wed, 12 Dec 2018 23:30:46 +0800 Subject: [PATCH 03/12] support elasticearch5 sink --- .../sylph-elasticsearch5/build.gradle | 16 +++++++++++++ .../elasticsearch5/Elasticsearch5Sink.java | 23 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 sylph-connectors/sylph-elasticsearch5/build.gradle create mode 100644 sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java diff --git a/sylph-connectors/sylph-elasticsearch5/build.gradle b/sylph-connectors/sylph-elasticsearch5/build.gradle new file mode 100644 index 000000000..db77423d7 --- /dev/null +++ b/sylph-connectors/sylph-elasticsearch5/build.gradle @@ -0,0 +1,16 @@ +plugins { + id 'java' +} + +group 'ideal' +version '0.4.0-SNAPSHOT' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +dependencies { + testCompile group: 'junit', name: 'junit', version: '4.12' +} diff --git a/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java new file mode 100644 index 000000000..4de59c9af --- /dev/null +++ b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.elasticsearch5; + + +public class Elasticsearch5Sink { + public static void main(String[] args) { + System.err.println("kkkkkkk"); + } +} \ No newline at end of file From 1a0803438167b99e13c4da6f9dd437099eb2e84f Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Thu, 13 Dec 2018 14:53:49 +0800 Subject: [PATCH 04/12] support clickhouse sink --- .../sylph-clickhouse/build.gradle | 5 +- .../plugins/clickhouse/ClickHouseSink.java | 179 +++++++++++++++++- .../plugins/clickhouse/utils/JdbcUtils.java | 56 ++++++ 3 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java diff --git a/sylph-connectors/sylph-clickhouse/build.gradle b/sylph-connectors/sylph-clickhouse/build.gradle index db77423d7..5cf2d5ab8 100644 --- a/sylph-connectors/sylph-clickhouse/build.gradle +++ b/sylph-connectors/sylph-clickhouse/build.gradle @@ -12,5 +12,8 @@ repositories { } dependencies { - testCompile group: 'junit', name: 'junit', version: '4.12' + compileOnly group: 'org.apache.flink', name: 'flink-table_2.11', version: deps.flink + compileOnly group: 'org.slf4j', name: 'slf4j-api', version: deps.log4j12 + compile group: 'com.github.housepower', name: 'clickhouse-native-jdbc', version: '1.5-stable' + } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java index 21ea16cf2..ef8fd0042 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -1,7 +1,180 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ideal.sylph.plugins.clickhouse; -public class ClickHouseSink { - public static void main(String[] args) { - System.err.println("kkkkkkk"); +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; + +import java.sql.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkState; + + +@Name("ClickHouseSink") +@Description("this is ClickHouseSink sink plugin") +public class ClickHouseSink + implements RealTimeSink +{ + private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class); + + private final ClickHouseSinkConfig config; + private final String prepareStatementQuery; + private final String[] keys; + + private transient Connection connection; + private transient PreparedStatement statement; + private int num = 0; + + public ClickHouseSink(ClickHouseSinkConfig clickHouseSinkConfig) + { + this.config = clickHouseSinkConfig; + checkState(config.getQuery() != null, "insert into query not setting"); + this.prepareStatementQuery = config.getQuery().replaceAll("\\$\\{.*?}", "?"); + // parser sql query ${key} + Matcher matcher = Pattern.compile("(?<=\\$\\{)(.+?)(?=\\})").matcher(config.getQuery()); + List builder = new ArrayList<>(); + while (matcher.find()) { + builder.add(matcher.group()); + } + this.keys = builder.toArray(new String[0]); + } + + @Override + public void process(Row row){ + + // type convert + +// case "DateTime" | "Date" | "String" => statement.setString(i + 1, item.getAs[String](field)) +// case "Int8" | "Int16" | "Int32" | "UInt8" | "UInt16" => statement.setInt(i + 1, item.getAs[Int](field)) +// case "UInt64" | "Int64" | "UInt32" => statement.setLong(i + 1, item.getAs[Long](field)) +// case "Float32" | "Float64" => statement.setDouble(i + 1, item.getAs[Double](field)) +// case _ => statement.setString(i + 1, item.getAs[String](field)) + + +// pstmt.setString(1, lines[1]); +// pstmt.setString(2, lines[3]); +// pstmt.setString(3, lines[4]); +// pstmt.setString(4, lines[6]); +// pstmt.addBatch(); + + + try { + int i = 1; + for (String key : keys) { + Object value = isNumeric(key) ? row.getAs(Integer.parseInt(key)) : row.getAs(key); + statement.setObject(i, value); + i += 1; + } + statement.addBatch(); + // submit batch + if (num++ >= 50) { + statement.executeBatch(); + num = 0; + } + } + catch (SQLException e) { + throw new RuntimeException(e); + } + + } + + @Override + public boolean open(long partitionId, long version) throws SQLException, ClassNotFoundException + { + Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); + this.connection = DriverManager.getConnection(config.jdbcUrl, config.user, config.password); + this.statement = connection.prepareStatement(prepareStatementQuery); + return true; } + + @Override + public void close(Throwable errorOrNull){ + + try (Connection conn = connection) { + try (Statement stmt = statement) { + if (stmt != null) { + stmt.executeBatch(); + } + } + catch (SQLException e) { + logger.error("close executeBatch fail", e); + } + } + catch (SQLException e) { + logger.error("close connection fail", e); + } + } + + public static class ClickHouseSinkConfig extends PluginConfig{ + + @Name("url") + @Description("this is ck jdbc url") + private String jdbcUrl = "jdbc:clickhouse://localhost:9000"; + + @Name("userName") + @Description("this is ck userName") + private String user = "default"; + + @Name("password") + @Description("this is ck password") + private String password = "default"; + + @Name("query") + @Description("this is ck save query") + private String query = null; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + + public String getQuery() { + return query; + } + } + + private static boolean isNumeric(String str) + { + for (int i = str.length(); --i >= 0; ) { + if (!Character.isDigit(str.charAt(i))) { + return false; + } + } + return true; + } + + } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java new file mode 100644 index 000000000..487be43c9 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.clickhouse.utils; + +import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList; +import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +public class JdbcUtils +{ + private JdbcUtils() {} + + /** + * jdbc ResultSet to List + * + * @param rs input jdbc ResultSet + * @return List + */ + public static List> resultToList(ResultSet rs) + throws SQLException + { + ImmutableList.Builder> listBuilder = ImmutableList.builder(); + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (rs.next()) { + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnLabel(i); + Object value = rs.getObject(i); + if (value != null) { + mapBuilder.put(columnName, value); + } + } + listBuilder.add(mapBuilder.build()); + } + return listBuilder.build(); + } +} From c1fe879e9b9142f61647642a30f7719ffc314123 Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Fri, 14 Dec 2018 16:09:43 +0800 Subject: [PATCH 05/12] support clickhouse sink --- .../sylph-clickhouse/build.gradle | 3 + .../plugins/clickhouse/ClickHouseSink.java | 59 +++++---- .../plugins/clickhouse/TestCKSource.java | 118 ++++++++++++++++++ 3 files changed, 153 insertions(+), 27 deletions(-) create mode 100644 sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java diff --git a/sylph-connectors/sylph-clickhouse/build.gradle b/sylph-connectors/sylph-clickhouse/build.gradle index 5cf2d5ab8..cc390f241 100644 --- a/sylph-connectors/sylph-clickhouse/build.gradle +++ b/sylph-connectors/sylph-clickhouse/build.gradle @@ -12,6 +12,9 @@ repositories { } dependencies { + compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { + exclude(module: 'flink-shaded-hadoop2') + } compileOnly group: 'org.apache.flink', name: 'flink-table_2.11', version: deps.flink compileOnly group: 'org.slf4j', name: 'slf4j-api', version: deps.log4j12 compile group: 'com.github.housepower', name: 'clickhouse-native-jdbc', version: '1.5-stable' diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java index ef8fd0042..848c70bd2 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -30,6 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,15 +49,17 @@ public class ClickHouseSink private final ClickHouseSinkConfig config; private final String prepareStatementQuery; private final String[] keys; - + private final Row.Schema schema; + private int idIndex = -1; private transient Connection connection; private transient PreparedStatement statement; private int num = 0; - public ClickHouseSink(ClickHouseSinkConfig clickHouseSinkConfig) + public ClickHouseSink(SinkContext context,ClickHouseSinkConfig clickHouseSinkConfig) { this.config = clickHouseSinkConfig; checkState(config.getQuery() != null, "insert into query not setting"); + logger.info("query >>> " +config.getQuery()); this.prepareStatementQuery = config.getQuery().replaceAll("\\$\\{.*?}", "?"); // parser sql query ${key} Matcher matcher = Pattern.compile("(?<=\\$\\{)(.+?)(?=\\})").matcher(config.getQuery()); @@ -62,46 +67,38 @@ public ClickHouseSink(ClickHouseSinkConfig clickHouseSinkConfig) while (matcher.find()) { builder.add(matcher.group()); } + + schema = context.getSchema(); + if (!Strings.isNullOrEmpty(config.idField)) { + int fieldIndex = schema.getFieldIndex(config.idField); + Preconditions.checkState(fieldIndex != -1, config.idField + " does not exist, only " + schema.getFields()); + this.idIndex = fieldIndex; + } this.keys = builder.toArray(new String[0]); } @Override public void process(Row row){ - // type convert - -// case "DateTime" | "Date" | "String" => statement.setString(i + 1, item.getAs[String](field)) -// case "Int8" | "Int16" | "Int32" | "UInt8" | "UInt16" => statement.setInt(i + 1, item.getAs[Int](field)) -// case "UInt64" | "Int64" | "UInt32" => statement.setLong(i + 1, item.getAs[Long](field)) -// case "Float32" | "Float64" => statement.setDouble(i + 1, item.getAs[Double](field)) -// case _ => statement.setString(i + 1, item.getAs[String](field)) - - -// pstmt.setString(1, lines[1]); -// pstmt.setString(2, lines[3]); -// pstmt.setString(3, lines[4]); -// pstmt.setString(4, lines[6]); -// pstmt.addBatch(); - - try { int i = 1; - for (String key : keys) { - Object value = isNumeric(key) ? row.getAs(Integer.parseInt(key)) : row.getAs(key); - statement.setObject(i, value); - i += 1; + //后期利用反射 CK 类型转换 + for (String fieldName : schema.getFieldNames()) { + if (fieldName.equals("event_time")){ + statement.setDate(i, java.sql.Date.valueOf(row.getAs(fieldName).toString())); + }else{ + statement.setString(i, row.getAs(fieldName)); + } + i += 1; } statement.addBatch(); - // submit batch - if (num++ >= 50) { + if (num++ >= 100000) {//暂时 statement.executeBatch(); num = 0; } - } - catch (SQLException e) { + }catch (SQLException e) { throw new RuntimeException(e); } - } @Override @@ -149,6 +146,14 @@ public static class ClickHouseSinkConfig extends PluginConfig{ @Description("this is ck save query") private String query = null; + @Name("id_field") + @Description("this is ck id_field") + private String idField; + + @Name("eventDate_field") + @Description("this is your data eventDate_field, 必须是 YYYY-mm--dd位时间戳") + private String eventTimeName; + public String getJdbcUrl() { return jdbcUrl; } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java new file mode 100644 index 000000000..34a610c83 --- /dev/null +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.clickhouse; + +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.annotation.Version; +import ideal.sylph.etl.api.Source; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; +import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * test source + **/ +@Name("testCK") +@Description("this flink test source inputStream") +@Version("1.0.0") +public class TestCKSource + implements Source> +{ + private static final long serialVersionUID = 2L; + private static final Logger logger = LoggerFactory.getLogger(TestCKSource.class); + private final transient Supplier> loadStream; + + public TestCKSource(StreamExecutionEnvironment execEnv) + { + this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource())); + } + + @Override + public DataStream getSource() + { + return loadStream.get(); + } + + public static class MyDataSource + extends RichParallelSourceFunction + implements ResultTypeQueryable + { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private volatile boolean running = true; + + @Override + public void run(SourceContext sourceContext) + throws Exception + { + Random random = new Random(1000000); + int numKeys = 10; +// long count = 1L; + while (running) { + LocalDate now=LocalDate.now(); +// String userId = "uid_" + count; + String msg="https://mp.weixin.qq.com/s"; + Row row = Row.of("https://mp.weixin.qq.com/s" + random.nextLong(), msg, now.toString()); + sourceContext.collect(row); +// count = count > numKeys ? 1L : count + 1; +// TimeUnit.MILLISECONDS.sleep(100); + } + } + + @Override + public TypeInformation getProducedType() + { + TypeInformation[] types = new TypeInformation[] { + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(String.class) + }; + + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "event_time"}); + //createTypeInformation[Row] + return rowTypeInfo; + } + + @Override + public void cancel() + { + running = false; + } + + @Override + public void close() + throws Exception + { + this.cancel(); + super.close(); + } + } +} From 0e61fc50e6e073a88ed7daf561369d30b52bd405 Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Fri, 14 Dec 2018 16:25:41 +0800 Subject: [PATCH 06/12] support es5 sink testing --- .../elasticsearch5/Elasticsearch5Sink.java | 147 +++++++++++++++++- 1 file changed, 142 insertions(+), 5 deletions(-) diff --git a/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java index 4de59c9af..a4d6e88a5 100644 --- a/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java +++ b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java @@ -1,4 +1,4 @@ -/* + /* * Copyright (C) 2018 The Sylph Authors * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,9 +15,146 @@ */ package ideal.sylph.plugins.elasticsearch5; +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.transport.client.PreBuiltTransportClient; -public class Elasticsearch5Sink { - public static void main(String[] args) { - System.err.println("kkkkkkk"); +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState; + +@Name("elasticsearch5") +@Description("this is elasticsearch5 sink plugin") +public class Elasticsearch5Sink + implements RealTimeSink +{ + private static final int MAX_BATCH_BULK = 50; + private final Row.Schema schema; + private final ElasticsearchSinkConfig config; + + private TransportClient client; + private int idIndex = -1; + private final AtomicInteger cnt = new AtomicInteger(0); + private BulkRequestBuilder bulkBuilder; + + public Elasticsearch5Sink(SinkContext context, ElasticsearchSinkConfig config) + { + schema = context.getSchema(); + this.config = config; + if (!Strings.isNullOrEmpty(config.idField)) { + int fieldIndex = schema.getFieldIndex(config.idField); + checkState(fieldIndex != -1, config.idField + " does not exist, only " + schema.getFields()); + this.idIndex = fieldIndex; + } + if (config.update) { + checkState(idIndex != -1, "This is Update mode, `id_field` must be set"); + } + } + + @Override + public void process(Row value) + { + Map map = new HashMap<>(); + for (String fieldName : schema.getFieldNames()) { + map.put(fieldName, value.getAs(fieldName)); + } + if (config.update) { //is update + Object id = value.getAs(idIndex); + if (id == null) { + return; + } + UpdateRequestBuilder requestBuilder = client.prepareUpdate(config.index, config.type, id.toString()); + requestBuilder.setDoc(map); + requestBuilder.setDocAsUpsert(true); + bulkBuilder.add(requestBuilder.request()); + } + else { + IndexRequestBuilder requestBuilder = client.prepareIndex(config.index, config.type); + if (idIndex != -1) { + Object id = value.getAs(idIndex); + if (id != null) { + requestBuilder.setId(id.toString()); + } + } + + requestBuilder.setSource(map); + bulkBuilder.add(requestBuilder.request()); + } + if (cnt.getAndIncrement() > MAX_BATCH_BULK) { + client.bulk(bulkBuilder.request()).actionGet(); + cnt.set(0); + bulkBuilder = client.prepareBulk(); + } + } + + @Override + public boolean open(long partitionId, long version) + throws Exception + { + String clusterName = config.clusterName; + String hosts = config.hosts; + Settings settings = Settings.builder().put("cluster.name", clusterName) + .put("client.transport.sniff", true).build(); + + TransportClient client = new PreBuiltTransportClient(settings); + for (String ip : hosts.split(",")) { + client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip.split(":")[0]), Integer.valueOf(ip.split(":")[1]))); + } + this.client = client; + this.bulkBuilder = client.prepareBulk(); + return true; + } + + @Override + public void close(Throwable errorOrNull) + { + try (TransportClient closeClient = client) { + if (bulkBuilder != null && closeClient != null) { + closeClient.bulk(bulkBuilder.request()); + } + } + } + + public static class ElasticsearchSinkConfig + extends PluginConfig + { + @Name("cluster_name") + @Description("this is es cluster name") + private String clusterName; + + @Name("cluster_hosts") + @Description("this is es cluster hosts") + private String hosts; + + @Name("es_index") + @Description("this is es index") + private String index; + + @Name("id_field") + @Description("this is es id_field") + private String idField; + + @Name("update") + @Description("update or insert") + private boolean update = false; + + @Name("index_type") + @Description("this is es index_type, Do not set") + private String type = "default"; } -} \ No newline at end of file +} From 508681e3c2f0ddba2b759bf605cd73df8385f058 Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Wed, 19 Dec 2018 15:06:43 +0800 Subject: [PATCH 07/12] support clickhouse sink --- settings.gradle | 2 + .../sylph-clickhouse/build.gradle | 13 ---- .../plugins/clickhouse/ClickHouseSink.java | 71 ++++++++++--------- .../plugins/clickhouse/TestCKSource.java | 16 ++--- .../sylph-elasticsearch5/build.gradle | 30 +++++--- 5 files changed, 66 insertions(+), 66 deletions(-) diff --git a/settings.gradle b/settings.gradle index 857fadf31..d8850fb81 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,6 +28,7 @@ include 'sylph-connectors:sylph-hdfs' include 'sylph-connectors:sylph-kafka09' include 'sylph-connectors:sylph-hbase' include 'sylph-connectors:sylph-elasticsearch6' +include 'sylph-connectors:sylph-elasticsearch5' include 'sylph-connectors:sylph-clickhouse' //---- include 'sylph-dist' @@ -35,4 +36,5 @@ include 'sylph-parser' include 'sylph-docs' include 'sylph-yarn' //include 'sylph-clickhouse' +//include 'sylph-elasticsearch5' diff --git a/sylph-connectors/sylph-clickhouse/build.gradle b/sylph-connectors/sylph-clickhouse/build.gradle index cc390f241..4f4520160 100644 --- a/sylph-connectors/sylph-clickhouse/build.gradle +++ b/sylph-connectors/sylph-clickhouse/build.gradle @@ -1,16 +1,3 @@ -plugins { - id 'java' -} - -group 'ideal' -version '0.4.0-SNAPSHOT' - -sourceCompatibility = 1.8 - -repositories { - mavenCentral() -} - dependencies { compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { exclude(module: 'flink-shaded-hadoop2') diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java index 848c70bd2..63d25cc36 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -31,6 +31,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +//import ideal.sylph.plugins.hbase.tuple.Tuple2; +//import ideal.sylph.plugins.clickhouse.tuple.Tuple2; import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.slf4j.Logger; @@ -48,56 +50,52 @@ public class ClickHouseSink private final ClickHouseSinkConfig config; private final String prepareStatementQuery; - private final String[] keys; private final Row.Schema schema; private int idIndex = -1; private transient Connection connection; private transient PreparedStatement statement; private int num = 0; + private final Map nametypes; public ClickHouseSink(SinkContext context,ClickHouseSinkConfig clickHouseSinkConfig) { this.config = clickHouseSinkConfig; checkState(config.getQuery() != null, "insert into query not setting"); - logger.info("query >>> " +config.getQuery()); this.prepareStatementQuery = config.getQuery().replaceAll("\\$\\{.*?}", "?"); - // parser sql query ${key} - Matcher matcher = Pattern.compile("(?<=\\$\\{)(.+?)(?=\\})").matcher(config.getQuery()); - List builder = new ArrayList<>(); - while (matcher.find()) { - builder.add(matcher.group()); - } - schema = context.getSchema(); - if (!Strings.isNullOrEmpty(config.idField)) { - int fieldIndex = schema.getFieldIndex(config.idField); - Preconditions.checkState(fieldIndex != -1, config.idField + " does not exist, only " + schema.getFields()); - this.idIndex = fieldIndex; + Map nt =new HashMap(); + for (int i=0;i= 100000) {//暂时 - statement.executeBatch(); - num = 0; + //Byte Double String Date Long + if (nametypes.get(fieldName).equals("java.sql.Date")) { + statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString())); + } else if ((nametypes.get(fieldName).equals("java.lang.Long"))) { + statement.setLong(ith, row.getAs(fieldName)); + } else if ((nametypes.get(fieldName).equals("java.lang.Double"))) { + statement.setDouble(ith, row.getAs(fieldName)); + } else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) { + statement.setByte(ith, Byte.valueOf(row.getAs(fieldName))); + } else { + statement.setString(ith, row.getAs(fieldName)); + } + ith += 1; } - }catch (SQLException e) { - throw new RuntimeException(e); + statement.addBatch(); + if (num++ >= config.bulkSize) { + statement.executeBatch(); + num = 0; + } + } catch (SQLException e) { + e.printStackTrace(); } } @@ -146,9 +144,9 @@ public static class ClickHouseSinkConfig extends PluginConfig{ @Description("this is ck save query") private String query = null; - @Name("id_field") - @Description("this is ck id_field") - private String idField; + @Name("bulkSize") + @Description("this is ck bulkSize") + private int bulkSize; @Name("eventDate_field") @Description("this is your data eventDate_field, 必须是 YYYY-mm--dd位时间戳") @@ -181,5 +179,10 @@ private static boolean isNumeric(String str) return true; } + public enum MyStrings{ + + + } + } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java index 34a610c83..f469a8d0b 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java @@ -75,15 +75,12 @@ public void run(SourceContext sourceContext) { Random random = new Random(1000000); int numKeys = 10; -// long count = 1L; while (running) { - LocalDate now=LocalDate.now(); -// String userId = "uid_" + count; - String msg="https://mp.weixin.qq.com/s"; - Row row = Row.of("https://mp.weixin.qq.com/s" + random.nextLong(), msg, now.toString()); + java.time.LocalDate date = java.time.LocalDate.now(); + java.sql.Date now = java.sql.Date.valueOf(date); + String msg="https://github.com/harbby/sylph/"+ random.nextLong(); + Row row = Row.of("github.com" + random.nextLong(), msg, now); sourceContext.collect(row); -// count = count > numKeys ? 1L : count + 1; -// TimeUnit.MILLISECONDS.sleep(100); } } @@ -93,11 +90,10 @@ public TypeInformation getProducedType() TypeInformation[] types = new TypeInformation[] { TypeExtractor.createTypeInfo(String.class), TypeExtractor.createTypeInfo(String.class), - TypeExtractor.createTypeInfo(String.class) + TypeExtractor.createTypeInfo(java.sql.Date.class) }; - RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "event_time"}); - //createTypeInformation[Row] + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "mes_time"}); return rowTypeInfo; } diff --git a/sylph-connectors/sylph-elasticsearch5/build.gradle b/sylph-connectors/sylph-elasticsearch5/build.gradle index db77423d7..b17d19810 100644 --- a/sylph-connectors/sylph-elasticsearch5/build.gradle +++ b/sylph-connectors/sylph-elasticsearch5/build.gradle @@ -1,16 +1,28 @@ plugins { - id 'java' + id "com.github.johnrengelman.shadow" version "4.0.3" } -group 'ideal' -version '0.4.0-SNAPSHOT' +dependencies { + shadow group: 'org.apache.flink', name: 'flink-shaded-guava', version: '18.0-4.0' + compile 'org.elasticsearch.client:transport:5.6.0' +} -sourceCompatibility = 1.8 +shadowJar { + baseName = project.name + classifier = 'shaded' + version = project.version -repositories { - mavenCentral() -} + configurations = [project.configurations.compile] -dependencies { - testCompile group: 'junit', name: 'junit', version: '4.12' + dependencies { + exclude(dependency('junit:junit:')) + } + + //relocate 'com.google.protobuf', 'shaded.com.google.protobuf' + relocate 'com.google.common', 'shaded.elasticsearch6.com.google.common' + relocate 'io.netty', 'shaded.elasticsearch5.io.netty' + relocate 'io.netty', 'shaded.elasticsearch5.io.netty' + relocate 'org.apache.logging', 'shaded.elasticsearch5.org.apache.logging' } +assemble.dependsOn shadowJar +buildPlugins.dependsOn shadowJar \ No newline at end of file From d3e26b99c9c1c8f3d4336fc10de14fa42e2c9bc4 Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Mon, 24 Dec 2018 10:11:36 +0800 Subject: [PATCH 08/12] support es5 sink testing --- .../plugins/clickhouse/ClickHouseSink.java | 53 +++++++------------ 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java index 63d25cc36..6e86230a1 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -73,29 +73,29 @@ public ClickHouseSink(SinkContext context,ClickHouseSinkConfig clickHouseSinkCon @Override public void process(Row row) { int ith=1; - try { - for (String fieldName : schema.getFieldNames()) { - //Byte Double String Date Long - if (nametypes.get(fieldName).equals("java.sql.Date")) { - statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString())); - } else if ((nametypes.get(fieldName).equals("java.lang.Long"))) { - statement.setLong(ith, row.getAs(fieldName)); - } else if ((nametypes.get(fieldName).equals("java.lang.Double"))) { - statement.setDouble(ith, row.getAs(fieldName)); - } else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) { - statement.setByte(ith, Byte.valueOf(row.getAs(fieldName))); - } else { - statement.setString(ith, row.getAs(fieldName)); - } - ith += 1; - } + try { + for (String fieldName : schema.getFieldNames()) { + //Byte Double String Date Long ..... + if (nametypes.get(fieldName).equals("java.sql.Date")) { + statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString())); + } else if ((nametypes.get(fieldName).equals("java.lang.Long"))) { + statement.setLong(ith, row.getAs(fieldName)); + } else if ((nametypes.get(fieldName).equals("java.lang.Double"))) { + statement.setDouble(ith, row.getAs(fieldName)); + } else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) { + statement.setByte(ith, Byte.valueOf(row.getAs(fieldName))); + } else { + statement.setString(ith, row.getAs(fieldName)); + } + ith += 1; + } statement.addBatch(); if (num++ >= config.bulkSize) { statement.executeBatch(); num = 0; } } catch (SQLException e) { - e.printStackTrace(); + e.printStackTrace(); } } @@ -146,7 +146,7 @@ public static class ClickHouseSinkConfig extends PluginConfig{ @Name("bulkSize") @Description("this is ck bulkSize") - private int bulkSize; + private int bulkSize=20000; @Name("eventDate_field") @Description("this is your data eventDate_field, 必须是 YYYY-mm--dd位时间戳") @@ -168,21 +168,4 @@ public String getQuery() { return query; } } - - private static boolean isNumeric(String str) - { - for (int i = str.length(); --i >= 0; ) { - if (!Character.isDigit(str.charAt(i))) { - return false; - } - } - return true; - } - - public enum MyStrings{ - - - } - - } From 392d36b9e662111990aaacb8a762899a9f2a2b64 Mon Sep 17 00:00:00 2001 From: lingya <1617229094@qq.com> Date: Mon, 24 Dec 2018 10:44:28 +0800 Subject: [PATCH 09/12] support ck sink testing --- .../ideal/sylph/plugins/clickhouse/ClickHouseSink.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java index 6e86230a1..ad044ccfa 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -23,18 +23,8 @@ import ideal.sylph.etl.api.RealTimeSink; import java.sql.*; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -//import ideal.sylph.plugins.hbase.tuple.Tuple2; -//import ideal.sylph.plugins.clickhouse.tuple.Tuple2; -import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; -import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 1d772ee94da82d2e90140f0499ff1b0880aa1514 Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 24 Dec 2018 11:54:31 +0800 Subject: [PATCH 10/12] core format form new connectors --- build.gradle | 15 +++ .../plugins/clickhouse/ClickHouseSink.java | 105 +++++++++------- .../plugins/clickhouse/TestCKSource.java | 5 +- .../plugins/clickhouse/utils/JdbcUtils.java | 56 --------- .../elasticsearch5/Elasticsearch5Sink.java | 3 +- .../plugins/hdfs/parquet/ApacheParquet.java | 6 - .../plugins/kafka/flink/KafkaSink09.java | 27 ++-- .../plugins/kafka/flink/TestSource09.java | 116 ------------------ .../plugins/kafka/flink/utils/IProducer.java | 21 +++- .../kafka/flink/utils/KafkaProducer.java | 44 +++++-- .../kafka/flink/utils/SimplePartitioner.java | 37 ++++-- .../sylph/plugins/mysql/utils/JdbcUtils.java | 4 +- .../controller/utils/JsonFormatUtil.java | 18 +-- .../java/ideal/sylph/main/SylphMaster.java | 12 +- .../sylph/spi/utils/GenericTypeReference.java | 7 +- 15 files changed, 188 insertions(+), 288 deletions(-) delete mode 100644 sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java delete mode 100644 sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java diff --git a/build.gradle b/build.gradle index d13b82ca1..26d568511 100644 --- a/build.gradle +++ b/build.gradle @@ -104,4 +104,19 @@ subprojects { //assemble.dependsOn 'licenseMain','licenseTest' //licenseMain.includes //license.mapping('javascript', 'JAVADOC_STYLE') + + task sourcesJar(type: Jar, dependsOn: classes) { + classifier = 'sources' + from sourceSets.main.allSource + } + + task javadocJar(type: Jar, dependsOn: javadoc) { + classifier = 'javadoc' + from javadoc.destinationDir + //javadoc.failOnError = false + } + + artifacts { + archives sourcesJar, javadocJar + } } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java index ad044ccfa..28bd11c87 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/ClickHouseSink.java @@ -21,16 +21,19 @@ import ideal.sylph.etl.Row; import ideal.sylph.etl.SinkContext; import ideal.sylph.etl.api.RealTimeSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; import java.util.HashMap; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkState; - @Name("ClickHouseSink") @Description("this is ClickHouseSink sink plugin") public class ClickHouseSink @@ -45,52 +48,59 @@ public class ClickHouseSink private transient Connection connection; private transient PreparedStatement statement; private int num = 0; - private final Map nametypes; + private final Map nametypes; - public ClickHouseSink(SinkContext context,ClickHouseSinkConfig clickHouseSinkConfig) + public ClickHouseSink(SinkContext context, ClickHouseSinkConfig clickHouseSinkConfig) { this.config = clickHouseSinkConfig; checkState(config.getQuery() != null, "insert into query not setting"); this.prepareStatementQuery = config.getQuery().replaceAll("\\$\\{.*?}", "?"); schema = context.getSchema(); - Map nt =new HashMap(); - for (int i=0;i nt = new HashMap(); + for (int i = 0; i < schema.getFieldNames().size(); i++) { + nt.put(schema.getFieldNames().get(i), schema.getFieldTypes().get(i).toString().split(" ")[1]); } - this.nametypes=nt; + this.nametypes = nt; } @Override - public void process(Row row) { - int ith=1; - try { - for (String fieldName : schema.getFieldNames()) { - //Byte Double String Date Long ..... - if (nametypes.get(fieldName).equals("java.sql.Date")) { - statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString())); - } else if ((nametypes.get(fieldName).equals("java.lang.Long"))) { - statement.setLong(ith, row.getAs(fieldName)); - } else if ((nametypes.get(fieldName).equals("java.lang.Double"))) { - statement.setDouble(ith, row.getAs(fieldName)); - } else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) { - statement.setByte(ith, Byte.valueOf(row.getAs(fieldName))); - } else { - statement.setString(ith, row.getAs(fieldName)); - } - ith += 1; - } - statement.addBatch(); - if (num++ >= config.bulkSize) { - statement.executeBatch(); - num = 0; - } - } catch (SQLException e) { - e.printStackTrace(); + public void process(Row row) + { + int ith = 1; + try { + for (String fieldName : schema.getFieldNames()) { + //Byte Double String Date Long ..... + if (nametypes.get(fieldName).equals("java.sql.Date")) { + statement.setDate(ith, java.sql.Date.valueOf(row.getAs(fieldName).toString())); + } + else if ((nametypes.get(fieldName).equals("java.lang.Long"))) { + statement.setLong(ith, row.getAs(fieldName)); + } + else if ((nametypes.get(fieldName).equals("java.lang.Double"))) { + statement.setDouble(ith, row.getAs(fieldName)); + } + else if ((nametypes.get(fieldName).equals("java.lang.Integer"))) { + statement.setByte(ith, Byte.valueOf(row.getAs(fieldName))); + } + else { + statement.setString(ith, row.getAs(fieldName)); + } + ith += 1; + } + statement.addBatch(); + if (num++ >= config.bulkSize) { + statement.executeBatch(); + num = 0; + } + } + catch (SQLException e) { + e.printStackTrace(); } } @Override - public boolean open(long partitionId, long version) throws SQLException, ClassNotFoundException + public boolean open(long partitionId, long version) + throws SQLException, ClassNotFoundException { Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); this.connection = DriverManager.getConnection(config.jdbcUrl, config.user, config.password); @@ -99,8 +109,8 @@ public boolean open(long partitionId, long version) throws SQLException, ClassNo } @Override - public void close(Throwable errorOrNull){ - + public void close(Throwable errorOrNull) + { try (Connection conn = connection) { try (Statement stmt = statement) { if (stmt != null) { @@ -116,8 +126,9 @@ public void close(Throwable errorOrNull){ } } - public static class ClickHouseSinkConfig extends PluginConfig{ - + public static class ClickHouseSinkConfig + extends PluginConfig + { @Name("url") @Description("this is ck jdbc url") private String jdbcUrl = "jdbc:clickhouse://localhost:9000"; @@ -136,25 +147,29 @@ public static class ClickHouseSinkConfig extends PluginConfig{ @Name("bulkSize") @Description("this is ck bulkSize") - private int bulkSize=20000; + private int bulkSize = 20000; @Name("eventDate_field") @Description("this is your data eventDate_field, 必须是 YYYY-mm--dd位时间戳") private String eventTimeName; - public String getJdbcUrl() { + public String getJdbcUrl() + { return jdbcUrl; } - public String getUser() { + public String getUser() + { return user; } - public String getPassword() { + public String getPassword() + { return password; } - public String getQuery() { + public String getQuery() + { return query; } } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java index f469a8d0b..fb35392e8 100644 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java +++ b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/TestCKSource.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -34,9 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.LocalDate; import java.util.Random; -import java.util.concurrent.TimeUnit; /** * test source @@ -78,7 +75,7 @@ public void run(SourceContext sourceContext) while (running) { java.time.LocalDate date = java.time.LocalDate.now(); java.sql.Date now = java.sql.Date.valueOf(date); - String msg="https://github.com/harbby/sylph/"+ random.nextLong(); + String msg = "https://github.com/harbby/sylph/" + random.nextLong(); Row row = Row.of("github.com" + random.nextLong(), msg, now); sourceContext.collect(row); } diff --git a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java b/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java deleted file mode 100644 index 487be43c9..000000000 --- a/sylph-connectors/sylph-clickhouse/src/main/java/ideal/sylph/plugins/clickhouse/utils/JdbcUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2018 The Sylph Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ideal.sylph.plugins.clickhouse.utils; - -import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList; -import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; - -public class JdbcUtils -{ - private JdbcUtils() {} - - /** - * jdbc ResultSet to List - * - * @param rs input jdbc ResultSet - * @return List - */ - public static List> resultToList(ResultSet rs) - throws SQLException - { - ImmutableList.Builder> listBuilder = ImmutableList.builder(); - ResultSetMetaData metaData = rs.getMetaData(); - int columnCount = metaData.getColumnCount(); - while (rs.next()) { - ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); - for (int i = 1; i <= columnCount; i++) { - String columnName = metaData.getColumnLabel(i); - Object value = rs.getObject(i); - if (value != null) { - mapBuilder.put(columnName, value); - } - } - listBuilder.add(mapBuilder.build()); - } - return listBuilder.build(); - } -} diff --git a/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java index a4d6e88a5..85752cfb1 100644 --- a/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java +++ b/sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java @@ -1,4 +1,4 @@ - /* +/* * Copyright (C) 2018 The Sylph Authors * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java index 91c98c4d2..c0fdae1cf 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java @@ -124,9 +124,6 @@ public long getDataSize() return writer.getDataSize(); } - /** - * 入参list - */ @Override public void writeLine(List evalRow) { @@ -146,9 +143,6 @@ public void writeLine(List evalRow) } } - /** - * 入参list - */ @Override public void writeLine(Row row) { diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java index 237d3f651..56fa34b1e 100644 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSink09.java @@ -32,20 +32,20 @@ import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState; - @Name("kafka09") @Description("this is kafka09 Sink plugin") public class KafkaSink09 - implements RealTimeSink { - + implements RealTimeSink +{ private static final Logger logger = LoggerFactory.getLogger(KafkaSink09.class); private final Kafka09SinkConfig config; private final Row.Schema schema; private int idIndex = -1; private KafkaProducer kafkaProducer; private final String topic; - public KafkaSink09(SinkContext context, Kafka09SinkConfig config) { + public KafkaSink09(SinkContext context, Kafka09SinkConfig config) + { schema = context.getSchema(); if (!Strings.isNullOrEmpty(config.idField)) { int fieldIndex = schema.getFieldIndex(config.idField); @@ -53,13 +53,12 @@ public KafkaSink09(SinkContext context, Kafka09SinkConfig config) { this.idIndex = fieldIndex; } this.config = config; - this.topic=config.topics; - + this.topic = config.topics; } @Override - public void process(Row value) { - + public void process(Row value) + { Gson gson = new Gson(); Map map = new HashMap<>(); for (String fieldName : schema.getFieldNames()) { @@ -70,18 +69,22 @@ public void process(Row value) { } @Override - public boolean open(long partitionId, long version) throws Exception { + public boolean open(long partitionId, long version) + throws Exception + { //config.zookeeper,config.brokers 至少一个 暂时 zookeeper - this.kafkaProducer = new KafkaProducer(config.zookeeper,config.topics); + this.kafkaProducer = new KafkaProducer(config.zookeeper, config.topics); return true; } @Override - public void close(Throwable errorOrNull) { + public void close(Throwable errorOrNull) + { kafkaProducer.close(); } - public static class Kafka09SinkConfig extends PluginConfig + public static class Kafka09SinkConfig + extends PluginConfig { private static final long serialVersionUID = 2L; @Name("kafka_topic") diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java deleted file mode 100644 index a084d4389..000000000 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/TestSource09.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (C) 2018 The Sylph Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ideal.sylph.plugins.kafka.flink; - -import ideal.sylph.annotation.Description; -import ideal.sylph.annotation.Name; -import ideal.sylph.annotation.Version; -import ideal.sylph.etl.api.Source; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; -import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.types.Row; - -import java.util.Random; -import java.util.concurrent.TimeUnit; - -/** - * test source - **/ -@Name("test09") -@Description("this flink test source inputStream") -@Version("1.0.0") -public class TestSource09 - implements Source> -{ - private static final long serialVersionUID = 2L; - - private final transient Supplier> loadStream; - - public TestSource09(StreamExecutionEnvironment execEnv) - { - this.loadStream = Suppliers.memoize(() -> execEnv.addSource(new MyDataSource())); - } - - @Override - public DataStream getSource() - { - return loadStream.get(); - } - - public static class MyDataSource - extends RichParallelSourceFunction - implements ResultTypeQueryable - { - private static final ObjectMapper MAPPER = new ObjectMapper(); - private volatile boolean running = true; - - @Override - public void run(SourceContext sourceContext) - throws Exception - { - Random random = new Random(); - int numKeys = 10; - long count = 1L; - while (running) { - long eventTime = System.currentTimeMillis() - random.nextInt(10 * 10); //表示数据已经产生了 但是会有10秒以内的延迟 - String userId = "uid_" + count; - - String msg = MAPPER.writeValueAsString(ImmutableMap.of("user_id", userId, "ip", "127.0.0.1", "store", 12.0)); - Row row = Row.of("key" + random.nextInt(10), msg, eventTime); - sourceContext.collect(row); - count = count > numKeys ? 1L : count + 1; - TimeUnit.MILLISECONDS.sleep(100); - } - } - - @Override - public TypeInformation getProducedType() - { - TypeInformation[] types = new TypeInformation[] { - TypeExtractor.createTypeInfo(String.class), - TypeExtractor.createTypeInfo(String.class), - TypeExtractor.createTypeInfo(long.class) - }; - - RowTypeInfo rowTypeInfo = new RowTypeInfo(types, new String[] {"key", "message", "event_time"}); - //createTypeInformation[Row] - return rowTypeInfo; - } - - @Override - public void cancel() - { - running = false; - } - - @Override - public void close() - throws Exception - { - this.cancel(); - super.close(); - } - } -} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java index 062b79425..ef9d7b862 100644 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/IProducer.java @@ -1,6 +1,23 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ideal.sylph.plugins.kafka.flink.utils; -public interface IProducer { +public interface IProducer +{ void send(String message); + void close(); -} \ No newline at end of file +} diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java index ba4ebc49d..73a611195 100644 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/KafkaProducer.java @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ideal.sylph.plugins.kafka.flink.utils; import com.google.common.base.Joiner; @@ -11,9 +26,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; -public class KafkaProducer implements IProducer { +public class KafkaProducer + implements IProducer +{ private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); private String brokersString; @@ -21,8 +42,8 @@ public class KafkaProducer implements IProducer { private String partitionKey = ""; private org.apache.kafka.clients.producer.KafkaProducer producer; - - public KafkaProducer(String zkConnect, String topic) { + public KafkaProducer(String zkConnect, String topic) + { this.topic = topic; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); @@ -35,7 +56,8 @@ public KafkaProducer(String zkConnect, String topic) { try { ids = client.getChildren().forPath("/brokers/ids"); - } catch (Exception ex) { + } + catch (Exception ex) { log.error("Couldn't get brokers ids", ex); } @@ -45,7 +67,8 @@ public KafkaProducer(String zkConnect, String topic) { try { jsonString = new String(client.getData().forPath("/brokers/ids/" + id), "UTF-8"); - } catch (Exception ex) { + } + catch (Exception ex) { log.error("Couldn't parse brokers data", ex); } @@ -56,7 +79,8 @@ public KafkaProducer(String zkConnect, String topic) { Double port = (Double) json.get("port"); String host = json.get("host") + ":" + port.intValue(); hosts.add(host); - } catch (NullPointerException e) { + } + catch (NullPointerException e) { log.error("Failed converting a JSON tuple to a Map class", e); } } @@ -82,12 +106,14 @@ public KafkaProducer(String zkConnect, String topic) { } @Override - public void close() { + public void close() + { producer.close(); } @Override - public void send(String message) { + public void send(String message) + { ProducerRecord record = new ProducerRecord<>(topic, message); producer.send(record); } diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java index 13d54bbd3..df780e2f2 100644 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/utils/SimplePartitioner.java @@ -1,31 +1,48 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ideal.sylph.plugins.kafka.flink.utils; - import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; -public class SimplePartitioner implements Partitioner { - +public class SimplePartitioner + implements Partitioner +{ @Override - public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - if(key != null) { + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) + { + if (key != null) { String stringKey = key.toString(); int offset = stringKey.hashCode(); return Math.abs(offset % cluster.partitionCountForTopic(topic)); - } else { + } + else { return 0; } } @Override - public void close() { - + public void close() + { } @Override - public void configure(Map configs) { - + public void configure(Map configs) + { } } diff --git a/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java b/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java index 6a7df18b3..b98a2803a 100644 --- a/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java +++ b/sylph-connectors/sylph-mysql/src/main/java/ideal/sylph/plugins/mysql/utils/JdbcUtils.java @@ -29,10 +29,10 @@ public class JdbcUtils private JdbcUtils() {} /** - * jdbc ResultSet to List + * jdbc ResultSet to List[Map] * * @param rs input jdbc ResultSet - * @return List + * @return List[Map] */ public static List> resultToList(ResultSet rs) throws SQLException diff --git a/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java b/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java index 1dba62bd3..9b040e623 100644 --- a/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java +++ b/sylph-controller/src/main/java/ideal/sylph/controller/utils/JsonFormatUtil.java @@ -26,9 +26,7 @@ private JsonFormatUtil() {} /** * 打印输入到控制台 * - * @param jsonStr - * @author lizhgb - * @Date 2015-10-14 下午1:17:22 + * @param jsonStr json text */ public static String printJson(String jsonStr) { @@ -38,10 +36,8 @@ public static String printJson(String jsonStr) /** * 格式化 * - * @param jsonStr - * @return - * @author lizhgb - * @Date 2015-10-14 下午1:17:35 + * @param jsonStr json text + * @return String */ public static String formatJson(String jsonStr) { @@ -85,14 +81,6 @@ public static String formatJson(String jsonStr) return sb.toString(); } - /** - * 添加space - * - * @param sb - * @param indent - * @author lizhgb - * @Date 2015-10-14 上午10:38:04 - */ private static void addIndentBlank(StringBuilder sb, int indent) { for (int i = 0; i < indent; i++) { diff --git a/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java b/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java index cc7a0e75d..f0ad5fdda 100644 --- a/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java +++ b/sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java @@ -58,16 +58,16 @@ public static void main(String[] args) /*2 Initialize Guice Injector */ try { logger.info("========={} Bootstrap initialize...========", SylphMaster.class.getCanonicalName()); - IocFactory injector = IocFactory.create(sylphBean, + IocFactory app = IocFactory.create(sylphBean, binder -> binder.bind(ControllerApp.class).withSingle() ); - injector.getInstance(PipelinePluginLoader.class).loadPlugins(); - injector.getInstance(RunnerLoader.class).loadPlugins(); - injector.getInstance(JobStore.class).loadJobs(); + app.getInstance(PipelinePluginLoader.class).loadPlugins(); + app.getInstance(RunnerLoader.class).loadPlugins(); + app.getInstance(JobStore.class).loadJobs(); - injector.getInstance(JobManager.class).start(); - injector.getInstance(ControllerApp.class).start(); + app.getInstance(JobManager.class).start(); + app.getInstance(ControllerApp.class).start(); //ProcessHandle.current().pid() logger.info("\n" + logo); logger.info("======== SERVER STARTED this pid is {}========"); diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java b/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java index 2bc29331e..cdce7ef99 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/utils/GenericTypeReference.java @@ -16,14 +16,14 @@ package ideal.sylph.spi.utils; import com.fasterxml.jackson.core.type.TypeReference; -import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; +import com.github.harbby.gadtry.base.JavaType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; /** * demo: - * Map config = MAPPER.readValue(json, new GenericTypeReference(Map.class, String.class, Object.class)); + * Map[String, Object] config = MAPPER.readValue(json, new GenericTypeReference(Map.class, String.class, Object.class)); */ public class GenericTypeReference extends TypeReference @@ -33,7 +33,8 @@ public class GenericTypeReference public GenericTypeReference(Class rawType, Type... typeArguments) { //this.type = new MoreTypes.ParameterizedTypeImpl(null, rawType, typeArguments); - this.type = ParameterizedTypeImpl.make(rawType, typeArguments, null); + //sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl.make(rawType, typeArguments, null); + this.type = JavaType.make(rawType, typeArguments, null); } @Override From 382a65cda43bf25f6d1b01be3b207e567e0a2f2d Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 27 Dec 2018 16:55:56 +0800 Subject: [PATCH 11/12] Cancel the function immediately when adding a task submission --- ...gerResurce.java => JobManagerResurce.java} | 6 +- .../ideal/sylph/main/service/JobManager.java | 82 ++++++++----------- .../runner/flink/FlinkContainerFactory.java | 16 +--- .../flink/yarn/FlinkYarnJobLauncher.java | 35 ++++---- .../flink/yarn/YarnClusterDescriptor.java | 36 ++++---- .../runner/spark/SparkContainerFactory.java | 16 +--- .../runner/spark/yarn/SparkAppLauncher.java | 26 +++++- .../deploy/yarn/SylphSparkYarnClient.java | 2 +- .../main/java/ideal/sylph/spi/job/Job.java | 3 +- .../ideal/sylph/spi/job/JobContainer.java | 3 + .../ideal/sylph/spi/job/JobContainerAbs.java | 46 ----------- .../sylph/runtime/local/LocalContainer.java | 10 ++- .../sylph/runtime/yarn/YarnJobContainer.java | 79 +++++++++++++----- 13 files changed, 168 insertions(+), 192 deletions(-) rename sylph-controller/src/main/java/ideal/sylph/controller/action/{JobMangerResurce.java => JobManagerResurce.java} (98%) delete mode 100644 sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java diff --git a/sylph-controller/src/main/java/ideal/sylph/controller/action/JobMangerResurce.java b/sylph-controller/src/main/java/ideal/sylph/controller/action/JobManagerResurce.java similarity index 98% rename from sylph-controller/src/main/java/ideal/sylph/controller/action/JobMangerResurce.java rename to sylph-controller/src/main/java/ideal/sylph/controller/action/JobManagerResurce.java index 6672e9892..f4e9533bf 100644 --- a/sylph-controller/src/main/java/ideal/sylph/controller/action/JobMangerResurce.java +++ b/sylph-controller/src/main/java/ideal/sylph/controller/action/JobManagerResurce.java @@ -45,15 +45,15 @@ @javax.inject.Singleton @Path("/job_manger") -public class JobMangerResurce +public class JobManagerResurce { - private static final Logger logger = LoggerFactory.getLogger(JobMangerResurce.class); + private static final Logger logger = LoggerFactory.getLogger(JobManagerResurce.class); @Context private ServletContext servletContext; @Context private UriInfo uriInfo; private SylphContext sylphContext; - public JobMangerResurce( + public JobManagerResurce( @Context ServletContext servletContext, @Context UriInfo uriInfo) { diff --git a/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java b/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java index 82cfbc90a..2f64a9859 100644 --- a/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java +++ b/sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java @@ -33,14 +33,15 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static ideal.sylph.spi.exception.StandardErrorCode.ILLEGAL_OPERATION; import static ideal.sylph.spi.exception.StandardErrorCode.JOB_START_ERROR; -import static ideal.sylph.spi.job.Job.Status.KILLING; import static ideal.sylph.spi.job.Job.Status.RUNNING; import static ideal.sylph.spi.job.Job.Status.STARTED_ERROR; import static ideal.sylph.spi.job.Job.Status.STARTING; +import static ideal.sylph.spi.job.Job.Status.STOP; /** * JobManager @@ -54,53 +55,35 @@ public final class JobManager @Autowired private RunnerManager runnerManger; @Autowired private MetadataManager metadataManager; - private final ConcurrentMap runningContainers = new ConcurrentHashMap<>(); - - private volatile boolean run; + private final ConcurrentMap containers = new ConcurrentHashMap<>(); /** - * 用来做耗时的->任务启动提交到yarn的操作 + * Used to do time-consuming task submit operations */ private ExecutorService jobStartPool = Executors.newFixedThreadPool(MaxSubmitJobNum); private final Thread monitorService = new Thread(() -> { - while (run) { + while (true) { Thread.currentThread().setName("job_monitor"); - runningContainers.forEach((jobId, container) -> { - try { - Job.Status status = container.getStatus(); - switch (status) { - case STOP: { - jobStartPool.submit(() -> { - try { - Thread.currentThread().setName("job_submit_" + jobId); - logger.warn("Job {}[{}] Status is {}, Soon to start", jobId, - container.getRunId(), status); - container.setStatus(STARTING); - Optional runId = container.run(); - if (container.getStatus() == KILLING) { - container.shutdown(); - } - else { - container.setStatus(RUNNING); - runId.ifPresent(result -> metadataManager.addMetadata(jobId, result)); - } - } - catch (Exception e) { - container.setStatus(STARTED_ERROR); - logger.warn("job {} start error", jobId, e); - } - }); //需要重启 Job + containers.forEach((jobId, container) -> { + Job.Status status = container.getStatus(); + if (status == STOP) { + Future future = jobStartPool.submit(() -> { + try { + Thread.currentThread().setName("job_submit_" + jobId); + logger.warn("Job {}[{}] Status is {}, Soon to start", jobId, + container.getRunId(), status); + container.setStatus(STARTING); + Optional runId = container.run(); + container.setStatus(RUNNING); + runId.ifPresent(result -> metadataManager.addMetadata(jobId, result)); } - case RUNNING: - case STARTED_ERROR: - case STARTING: - case KILLING: - default: - } - } - catch (Exception e) { - logger.warn("Check job {} status error", jobId, e); + catch (Exception e) { + container.setStatus(STARTED_ERROR); + logger.warn("job {} start error", jobId, e); + } + }); + container.setFuture(future); } }); @@ -118,12 +101,12 @@ public final class JobManager */ public synchronized void startJob(String jobId) { - if (runningContainers.containsKey(jobId)) { + if (containers.containsKey(jobId)) { throw new SylphException(JOB_START_ERROR, "Job " + jobId + " already started"); } Job job = this.getJob(jobId).orElseThrow(() -> new SylphException(JOB_START_ERROR, "Job " + jobId + " not found with jobStore")); - runningContainers.computeIfAbsent(jobId, k -> runnerManger.createJobContainer(job, null)); - logger.info("runningContainers size:{}", runningContainers.size()); + containers.computeIfAbsent(jobId, k -> runnerManger.createJobContainer(job, null)); + logger.info("deploy job :{}", jobId); } /** @@ -132,8 +115,9 @@ public synchronized void startJob(String jobId) public synchronized void stopJob(String jobId) throws Exception { - JobContainer container = runningContainers.remove(jobId); + JobContainer container = containers.remove(jobId); if (container != null) { + logger.warn("job {} Cancel submission", jobId); metadataManager.removeMetadata(jobId); container.shutdown(); } @@ -147,7 +131,7 @@ public void saveJob(@NotNull Job job) public void removeJob(String jobId) throws IOException { - if (runningContainers.containsKey(jobId)) { + if (containers.containsKey(jobId)) { throw new SylphException(ILLEGAL_OPERATION, "Can only delete tasks that have been offline"); } jobStore.removeJob(jobId); @@ -176,15 +160,13 @@ public Collection listJobs() public void start() throws IOException { - this.run = true; monitorService.setDaemon(false); monitorService.start(); //--------- init read metadata job status --------------- Map metadatas = metadataManager.loadMetadata(); metadatas.forEach((jobId, jobInfo) -> this.getJob(jobId).ifPresent(job -> { JobContainer container = runnerManger.createJobContainer(job, jobInfo); - runningContainers.put(job.getId(), container); - logger.info("runningContainers size:{}", runningContainers.size()); + containers.put(job.getId(), container); })); } @@ -193,7 +175,7 @@ public void start() */ public Optional getJobContainer(@NotNull String jobId) { - return Optional.ofNullable(runningContainers.get(jobId)); + return Optional.ofNullable(containers.get(jobId)); } /** @@ -201,7 +183,7 @@ public Optional getJobContainer(@NotNull String jobId) */ public Optional getJobContainerWithRunId(@NotNull String runId) { - for (JobContainer container : runningContainers.values()) { + for (JobContainer container : containers.values()) { if (runId.equals(container.getRunId())) { return Optional.ofNullable(container); } diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java index 5e6a94daf..f65ebf9a3 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/FlinkContainerFactory.java @@ -30,7 +30,6 @@ import ideal.sylph.spi.job.JobContainer; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,20 +64,7 @@ public JobContainer getYarnContainer(Job job, String lastRunid) { FlinkYarnJobLauncher jobLauncher = yarnLauncher.get(); - JobContainer yarnJobContainer = new YarnJobContainer(jobLauncher.getYarnClient(), lastRunid) - { - @Override - public Optional run() - throws Exception - { - logger.info("Instantiating SylphFlinkJob {} at yarnId {}", job.getId()); - this.setYarnAppId(null); - ApplicationId applicationId = jobLauncher.start(job); - this.setYarnAppId(applicationId); - return Optional.of(applicationId.toString()); - } - }; - return YarnJobContainer.proxy(yarnJobContainer); + return YarnJobContainer.of(jobLauncher.getYarnClient(), lastRunid, () -> jobLauncher.start(job)); } @Override diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java index 62d1612e8..91b321f00 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/FlinkYarnJobLauncher.java @@ -15,6 +15,7 @@ */ package ideal.sylph.runner.flink.yarn; +import com.github.harbby.gadtry.base.Throwables; import com.github.harbby.gadtry.ioc.Autowired; import ideal.sylph.runner.flink.FlinkJobConfig; import ideal.sylph.runner.flink.FlinkJobHandle; @@ -22,7 +23,6 @@ import ideal.sylph.runner.flink.actuator.JobParameter; import ideal.sylph.spi.job.Job; import org.apache.flink.api.common.JobID; -import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -38,10 +38,12 @@ import java.io.File; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.URI; import java.net.URL; import java.util.Collection; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -65,7 +67,7 @@ public YarnClient getYarnClient() return yarnClient; } - public ApplicationId start(Job job) + public Optional start(Job job) throws Exception { FlinkJobHandle jobHandle = (FlinkJobHandle) job.getJobHandle(); @@ -81,31 +83,35 @@ public ApplicationId start(Job job) JobGraph jobGraph = jobHandle.getJobGraph(); //todo: How to use `savepoints` to restore a job //jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("hdfs:///tmp/sylph/apps/savepoints")); - return start(descriptor, jobGraph).getClusterId(); + return start(descriptor, jobGraph); } - private ClusterClient start(YarnClusterDescriptor descriptor, JobGraph job) + private Optional start(YarnClusterDescriptor descriptor, JobGraph job) throws Exception { ApplicationId applicationId = null; try { - ClusterClient client = descriptor.deploy(); //create app master + ClusterClient client = descriptor.deploy(); //create yarn appMaster applicationId = client.getClusterId(); - ClusterSpecification specification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(1024) - .setNumberTaskManagers(2) - .setSlotsPerTaskManager(2) - .setTaskManagerMemoryMB(1024) - .createClusterSpecification(); - client.runDetached(job, null); //submit graph to yarn appMaster 并运行分离 + client.runDetached(job, null); //submit graph to appMaster 并分离 stopAfterJob(client, job.getJobID()); - return client; + client.shutdown(); + return Optional.of(applicationId); } catch (Exception e) { if (applicationId != null) { yarnClient.killApplication(applicationId); } - throw e; + Thread thread = Thread.currentThread(); + if (e instanceof InterruptedIOException || + thread.isInterrupted() || + Throwables.getRootCause(e) instanceof InterruptedException) { + logger.warn("job {} Canceled submission", job.getJobID()); + return Optional.empty(); + } + else { + throw e; + } } finally { //Clear temporary directory @@ -114,6 +120,7 @@ private ClusterClient start(YarnClusterDescriptor descriptor, Job FileSystem hdfs = FileSystem.get(clusterConf.yarnConf()); Path appDir = new Path(clusterConf.appRootDir(), applicationId.toString()); hdfs.delete(appDir, true); + logger.info("clear tmp dir: {}", appDir); } } catch (IOException e) { diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java index 03f593dca..4ae7df1d9 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/yarn/YarnClusterDescriptor.java @@ -131,27 +131,23 @@ public YarnClient getYarnClient() } public ClusterClient deploy() + throws Exception { - try { - YarnClientApplication application = yarnClient.createApplication(); - ApplicationReport report = startAppMaster(application); - - Configuration flinkConfiguration = getFlinkConfiguration(); - flinkConfiguration.setString(JobManagerOptions.ADDRESS.key(), report.getHost()); - flinkConfiguration.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort()); - - flinkConfiguration.setString(RestOptions.ADDRESS, report.getHost()); - flinkConfiguration.setInteger(RestOptions.PORT, report.getRpcPort()); - - //return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()).getMaxSlots(); - return new YarnClusterClient(this, - appConf.getTaskManagerCount(), - appConf.getTaskManagerSlots(), - report, clusterConf.flinkConfiguration(), false); - } - catch (Exception e) { - throw new RuntimeException(e); - } + YarnClientApplication application = yarnClient.createApplication(); + ApplicationReport report = startAppMaster(application); + + Configuration flinkConfiguration = getFlinkConfiguration(); + flinkConfiguration.setString(JobManagerOptions.ADDRESS.key(), report.getHost()); + flinkConfiguration.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort()); + + flinkConfiguration.setString(RestOptions.ADDRESS, report.getHost()); + flinkConfiguration.setInteger(RestOptions.PORT, report.getRpcPort()); + + //return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()).getMaxSlots(); + return new YarnClusterClient(this, + appConf.getTaskManagerCount(), + appConf.getTaskManagerSlots(), + report, clusterConf.flinkConfiguration(), false); } private ApplicationReport startAppMaster(YarnClientApplication application) diff --git a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java index 53d6823f3..da5941549 100644 --- a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java +++ b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/SparkContainerFactory.java @@ -26,13 +26,11 @@ import ideal.sylph.spi.job.ContainerFactory; import ideal.sylph.spi.job.Job; import ideal.sylph.spi.job.JobContainer; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.StreamingContext; -import java.util.Optional; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; @@ -50,20 +48,8 @@ public class SparkContainerFactory public JobContainer getYarnContainer(Job job, String lastRunid) { SparkAppLauncher appLauncher = yarnLauncher.get(); - final JobContainer yarnJobContainer = new YarnJobContainer(appLauncher.getYarnClient(), lastRunid) - { - @Override - public Optional run() - throws Exception - { - this.setYarnAppId(null); - ApplicationId yarnAppId = appLauncher.run(job); - this.setYarnAppId(yarnAppId); - return Optional.of(yarnAppId.toString()); - } - }; //----create JobContainer Proxy - return YarnJobContainer.proxy(yarnJobContainer); + return YarnJobContainer.of(appLauncher.getYarnClient(), lastRunid, () -> appLauncher.run(job)); } @Override diff --git a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java index 391b1e594..8731777c5 100644 --- a/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java +++ b/sylph-runners/spark/src/main/java/ideal/sylph/runner/spark/yarn/SparkAppLauncher.java @@ -16,8 +16,9 @@ package ideal.sylph.runner.spark.yarn; import com.github.harbby.gadtry.base.Serializables; +import com.github.harbby.gadtry.base.Throwables; +import com.github.harbby.gadtry.ioc.Autowired; import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; import ideal.sylph.runner.spark.SparkJobHandle; import ideal.sylph.spi.job.Job; import org.apache.commons.lang3.StringUtils; @@ -27,17 +28,22 @@ import org.apache.spark.deploy.yarn.Client; import org.apache.spark.deploy.yarn.ClientArguments; import org.apache.spark.ideal.deploy.yarn.SylphSparkYarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class SparkAppLauncher { - @Inject private YarnClient yarnClient; + private static final Logger logger = LoggerFactory.getLogger(SparkAppLauncher.class); + + @Autowired private YarnClient yarnClient; private static final String sparkHome = System.getenv("SPARK_HOME"); public YarnClient getYarnClient() @@ -45,7 +51,7 @@ public YarnClient getYarnClient() return yarnClient; } - public ApplicationId run(Job job) + public Optional run(Job job) throws Exception { System.setProperty("SPARK_YARN_MODE", "true"); @@ -63,7 +69,19 @@ public ApplicationId run(Job job) ClientArguments clientArguments = new ClientArguments(args); // spark-2.0.0 //yarnClient.getConfig().iterator().forEachRemaining(x -> sparkConf.set("spark.hadoop." + x.getKey(), x.getValue())); Client appClient = new SylphSparkYarnClient(clientArguments, sparkConf, yarnClient); - return appClient.submitApplication(); + try { + return Optional.of(appClient.submitApplication()); + } + catch (Exception e) { + Thread thread = Thread.currentThread(); + if (thread.isInterrupted() || Throwables.getRootCause(e) instanceof InterruptedException) { + logger.warn("job {} Canceled submission", job.getId()); + return Optional.empty(); + } + else { + throw e; + } + } } private static void setDistJars(Job job, SparkConf sparkConf) diff --git a/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java b/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java index d16ed5bda..cb134669a 100644 --- a/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java +++ b/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java @@ -37,7 +37,7 @@ public SylphSparkYarnClient(ClientArguments clientArgs, SparkConf spConf, YarnCl throws NoSuchFieldException, IllegalAccessException { super(clientArgs, spConf); - String key = DRIVER_MEMORY; //test + //String key = DRIVER_MEMORY; //test Field field = this.getClass().getSuperclass().getDeclaredField("org$apache$spark$deploy$yarn$Client$$hadoopConf"); field.setAccessible(true); YarnConfiguration yarnConfiguration = new YarnConfiguration(yarnClient.getConfig()); diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java index 72abc8f9e..af710317b 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/job/Job.java @@ -52,8 +52,7 @@ public enum Status RUNNING(0), //运行中 STARTING(1), // 启动中 STOP(2), // 停止运行 - STARTED_ERROR(3), // 启动失败 - KILLING(4); // Killing job + STARTED_ERROR(3); // 启动失败 private final int status; diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java index 06ad5c328..5e277391b 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainer.java @@ -18,6 +18,7 @@ import javax.validation.constraints.NotNull; import java.util.Optional; +import java.util.concurrent.Future; /** * Job Container @@ -44,6 +45,8 @@ Optional run() */ void shutdown(); + void setFuture(Future future); + /** * 获取job的状态 */ diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java b/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java deleted file mode 100644 index 8b77ab1ce..000000000 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/JobContainerAbs.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (C) 2018 The Sylph Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ideal.sylph.spi.job; - -import javax.validation.constraints.NotNull; - -import static ideal.sylph.spi.job.Job.Status.RUNNING; -import static ideal.sylph.spi.job.Job.Status.STOP; -import static java.util.Objects.requireNonNull; - -public abstract class JobContainerAbs - implements JobContainer -{ - private volatile Job.Status status = STOP; - - @Override - public synchronized void setStatus(Job.Status status) - { - this.status = requireNonNull(status, "status is null"); - } - - @NotNull - @Override - public synchronized Job.Status getStatus() - { - if (status == RUNNING) { - return isRunning() ? RUNNING : STOP; - } - return status; - } - - public abstract boolean isRunning(); -} diff --git a/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java b/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java index fc9824427..afbe503d0 100644 --- a/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java +++ b/sylph-yarn/src/main/java/ideal/sylph/runtime/local/LocalContainer.java @@ -26,13 +26,14 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class LocalContainer implements JobContainer { private static final Logger logger = LoggerFactory.getLogger(LocalContainer.class); - private final ExecutorService pool = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final JVMLaunchers.VmBuilder vmBuilder; @@ -74,7 +75,7 @@ public String getRunId() public synchronized Optional run() throws Exception { - pool.submit(() -> { + executor.submit(() -> { launcher.startAndGet(); return true; }); @@ -91,6 +92,11 @@ public void shutdown() } } + @Override + public void setFuture(Future future) + { + } + @Override public Job.Status getStatus() { diff --git a/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java b/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java index 620fbbc84..a2d41c9d1 100644 --- a/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java +++ b/sylph-yarn/src/main/java/ideal/sylph/runtime/yarn/YarnJobContainer.java @@ -18,8 +18,8 @@ import com.github.harbby.gadtry.aop.AopFactory; import com.github.harbby.gadtry.classloader.ThreadContextClassLoader; import ideal.sylph.spi.exception.SylphException; +import ideal.sylph.spi.job.Job; import ideal.sylph.spi.job.JobContainer; -import ideal.sylph.spi.job.JobContainerAbs; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -31,20 +31,29 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import static ideal.sylph.spi.exception.StandardErrorCode.CONNECTION_ERROR; -import static ideal.sylph.spi.job.Job.Status.KILLING; import static ideal.sylph.spi.job.Job.Status.RUNNING; +import static ideal.sylph.spi.job.Job.Status.STOP; +import static java.util.Objects.requireNonNull; -public abstract class YarnJobContainer - extends JobContainerAbs +public class YarnJobContainer + implements JobContainer { private static final Logger logger = LoggerFactory.getLogger(YarnJobContainer.class); private ApplicationId yarnAppId; private YarnClient yarnClient; + private volatile Job.Status status = STOP; + private volatile Future future; - protected YarnJobContainer(YarnClient yarnClient, String jobInfo) + private final Callable> runnable; + + private YarnJobContainer(YarnClient yarnClient, String jobInfo, Callable> runnable) { + this.runnable = runnable; this.yarnClient = yarnClient; if (jobInfo != null) { this.yarnAppId = Apps.toAppID(jobInfo); @@ -55,8 +64,11 @@ protected YarnJobContainer(YarnClient yarnClient, String jobInfo) @Override public synchronized void shutdown() { + if (!future.isDone() && !future.isCancelled()) { + future.cancel(true); + } + try { - this.setStatus(KILLING); if (yarnAppId != null) { yarnClient.killApplication(yarnAppId); } @@ -66,6 +78,16 @@ public synchronized void shutdown() } } + @Override + public Optional run() + throws Exception + { + this.setYarnAppId(null); + Optional applicationId = runnable.call(); + applicationId.ifPresent(this::setYarnAppId); + return applicationId.map(ApplicationId::toString); + } + @Override public String getRunId() { @@ -82,13 +104,6 @@ public ApplicationId getYarnAppId() return yarnAppId; } - @Override - public boolean isRunning() - { - YarnApplicationState yarnAppStatus = getYarnAppStatus(yarnAppId); - return YarnApplicationState.ACCEPTED.equals(yarnAppStatus) || YarnApplicationState.RUNNING.equals(yarnAppStatus); - } - @Override public String getJobUrl() { @@ -101,35 +116,59 @@ public String getJobUrl() } } + @Override + public synchronized void setStatus(Job.Status status) + { + this.status = requireNonNull(status, "status is null"); + } + + @Override + public synchronized Job.Status getStatus() + { + if (status == RUNNING) { + return isRunning() ? RUNNING : STOP; + } + return status; + } + + @Override + public void setFuture(Future future) + { + this.future = future; + } + /** * 获取yarn Job运行情况 */ - private YarnApplicationState getYarnAppStatus(ApplicationId applicationId) + private boolean isRunning() { try { - ApplicationReport app = yarnClient.getApplicationReport(applicationId); //获取某个指定的任务 - return app.getYarnApplicationState(); + ApplicationReport app = yarnClient.getApplicationReport(getYarnAppId()); //获取某个指定的任务 + YarnApplicationState state = app.getYarnApplicationState(); + return YarnApplicationState.ACCEPTED.equals(state) || YarnApplicationState.RUNNING.equals(state); } catch (ApplicationNotFoundException e) { //app 不存在与yarn上面 - return null; + return false; } catch (YarnException | IOException e) { throw new SylphException(CONNECTION_ERROR, e); } } - public static JobContainer proxy(JobContainer yarnJobContainer) + public static JobContainer of(YarnClient yarnClient, String jobInfo, Callable> runnable) { + JobContainer container = new YarnJobContainer(yarnClient, jobInfo, runnable); + //----create JobContainer Proxy return AopFactory.proxy(JobContainer.class) - .byInstance(yarnJobContainer) + .byInstance(container) .around(proxyContext -> { /* * 通过这个 修改当前YarnClient的ClassLoader的为当前sdk的加载器 * 默认hadoop Configuration使用jvm的AppLoader,会出现 akka.version not setting的错误 原因是找不到akka相关jar包 * 原因是hadoop Configuration 初始化: this.classLoader = Thread.currentThread().getContextClassLoader(); * */ - try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(yarnJobContainer.getClass().getClassLoader())) { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(YarnJobContainer.class.getClassLoader())) { proxyContext.proceed(); } }); From 3b392de7d5fe69d00443b11a6d105fdcf69c63e7 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 27 Dec 2018 18:16:31 +0800 Subject: [PATCH 12/12] Cancel the function immediately when adding a task submission --- .../apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java b/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java index cb134669a..dd10840d4 100644 --- a/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java +++ b/sylph-runners/spark/src/main/java/org/apache/spark/ideal/deploy/yarn/SylphSparkYarnClient.java @@ -27,8 +27,6 @@ import java.lang.reflect.Field; -import static org.apache.spark.launcher.SparkLauncher.DRIVER_MEMORY; - public class SylphSparkYarnClient extends Client {