diff --git a/pom.xml b/pom.xml
index 8942378e1..eb23adcd4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@
vertx-mysql-client
vertx-mssql-client
vertx-db2-client
+ vertx-clickhouse-binary-client
vertx-sql-client-templates
vertx-oracle-client
diff --git a/vertx-clickhouse-binary-client/pom.xml b/vertx-clickhouse-binary-client/pom.xml
new file mode 100644
index 000000000..d50acfa67
--- /dev/null
+++ b/vertx-clickhouse-binary-client/pom.xml
@@ -0,0 +1,204 @@
+
+
+
+
+ 4.0.0
+
+
+ io.vertx
+ vertx-sql-client-parent
+ 4.4.0-SNAPSHOT
+
+
+ vertx-clickhouse-binary-client
+
+ Vertx Clickhouse binary Client
+ https://github.com/eclipse-vertx/vertx-sql-client
+ The Reactive Clickhouse Client
+
+
+ false
+ ${project.basedir}/src/main/docs
+ ${project.basedir}/src/main/generated
+ 11
+ 11
+
+
+ 2.19.0
+
+
+
+
+
+
+ io.vertx
+ vertx-core
+
+
+ io.vertx
+ vertx-codegen
+ true
+
+
+ io.vertx
+ vertx-docgen
+ true
+
+
+ io.vertx
+ vertx-sql-client
+
+
+
+ org.lz4
+ lz4-java
+ 1.8.0
+
+
+
+ io.vertx
+ vertx-sql-client
+ test-jar
+ test
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+ test
+
+
+
+ org.testcontainers
+ clickhouse
+ ${testcontainers.version}
+ test
+
+
+
+ com.clickhouse
+ clickhouse-jdbc
+ 0.3.2-patch11
+ test
+
+
+
+ org.slf4j
+ slf4j-api
+ 2.0.3
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+ ${log4j.version}
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-jul
+ ${log4j.version}
+ test
+
+
+
+
+ org.openjdk.jmh
+ jmh-core
+ ${jmh.version}
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ ${jmh.version}
+ test
+
+
+
+
+
+
+
+ maven-surefire-plugin
+
+ -Xmx1024M
+
+ ${project.build.directory}
+ ${embedded.clickhouse.version}
+ ${connection.uri}
+
+
+
+
+
+
+
+
+
+ org.bsc.maven
+ maven-processor-plugin
+
+
+ generate-sources
+
+
+ ${project.basedir}/../vertx-sql-client/src/main/asciidoc/*.adoc,${asciidoc.dir}/*.adoc
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+
+ package-sources
+
+
+ ${project.basedir}/../assembly/sources.xml
+
+
+ none
+
+ true
+
+
+
+
+
+
+
+
diff --git a/vertx-clickhouse-binary-client/src/main/generated/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnectOptionsConverter.java b/vertx-clickhouse-binary-client/src/main/generated/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnectOptionsConverter.java
new file mode 100644
index 000000000..9b9f5d747
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/generated/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnectOptionsConverter.java
@@ -0,0 +1,33 @@
+package io.vertx.clickhouseclient.binary;
+
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.impl.JsonUtil;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.Base64;
+
+/**
+ * Converter and mapper for {@link io.vertx.clickhouseclient.binary.ClickhouseBinaryConnectOptions}.
+ * NOTE: This class has been automatically generated from the {@link io.vertx.clickhouseclient.binary.ClickhouseBinaryConnectOptions} original class using Vert.x codegen.
+ */
+public class ClickhouseBinaryConnectOptionsConverter {
+
+
+ private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
+ private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;
+
+ public static void fromJson(Iterable> json, ClickhouseBinaryConnectOptions obj) {
+ for (java.util.Map.Entry member : json) {
+ switch (member.getKey()) {
+ }
+ }
+ }
+
+ public static void toJson(ClickhouseBinaryConnectOptions obj, JsonObject json) {
+ toJson(obj, json.getMap());
+ }
+
+ public static void toJson(ClickhouseBinaryConnectOptions obj, java.util.Map json) {
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/examples/SqlClientExamples.java b/vertx-clickhouse-binary-client/src/main/java/examples/SqlClientExamples.java
new file mode 100644
index 000000000..1548aff47
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/examples/SqlClientExamples.java
@@ -0,0 +1,334 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package examples;
+
+import io.vertx.clickhouseclient.binary.ClickhouseBinaryConnectOptions;
+import io.vertx.clickhouseclient.binary.ClickhouseBinaryPool;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.DeploymentOptions;
+import io.vertx.core.Vertx;
+import io.vertx.core.tracing.TracingPolicy;
+import io.vertx.docgen.Source;
+import io.vertx.sqlclient.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Source
+public class SqlClientExamples {
+ public void queries01(SqlClient client) {
+ client
+ .query("SELECT * FROM users WHERE id='julien'")
+ .execute(ar -> {
+ if (ar.succeeded()) {
+ RowSet result = ar.result();
+ System.out.println("Got " + result.size() + " rows ");
+ } else {
+ System.out.println("Failure: " + ar.cause().getMessage());
+ }
+ });
+ }
+
+
+ public void queries02(SqlClient client) {
+ client
+ .preparedQuery("SELECT * FROM users WHERE id=?")
+ .execute(Tuple.of("julien"), ar -> {
+ if (ar.succeeded()) {
+ RowSet rows = ar.result();
+ System.out.println("Got " + rows.size() + " rows ");
+ } else {
+ System.out.println("Failure: " + ar.cause().getMessage());
+ }
+ });
+ }
+
+ public void queries03(SqlClient client) {
+ client
+ .preparedQuery("SELECT first_name, last_name FROM users")
+ .execute(ar -> {
+ if (ar.succeeded()) {
+ RowSet rows = ar.result();
+ for (Row row : rows) {
+ System.out.println("User " + row.getString(0) + " " + row.getString(1));
+ }
+ } else {
+ System.out.println("Failure: " + ar.cause().getMessage());
+ }
+ });
+ }
+
+ public void queries04(SqlClient client) {
+ client
+ .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
+ .execute(Tuple.of("Julien", "Viet"), ar -> {
+ if (ar.succeeded()) {
+ RowSet rows = ar.result();
+ System.out.println(rows.rowCount());
+ } else {
+ System.out.println("Failure: " + ar.cause().getMessage());
+ }
+ });
+ }
+
+ public void queries05(Row row) {
+ System.out.println("User " + row.getString(0) + " " + row.getString(1));
+ }
+
+ public void queries06(Row row) {
+ System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
+ }
+
+ public void queries07(Row row) {
+
+ String firstName = row.getString("first_name");
+ Boolean male = row.getBoolean("male");
+ Integer age = row.getInteger("age");
+
+ // ...
+
+ }
+
+ public void queries08(SqlClient client) {
+
+ // Add commands to the batch
+ List batch = new ArrayList<>();
+ batch.add(Tuple.of("julien", "Julien Viet"));
+ batch.add(Tuple.of("emad", "Emad Alblueshi"));
+
+ // Execute the prepared batch
+ client
+ .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
+ .executeBatch(batch, res -> {
+ if (res.succeeded()) {
+
+ // Process rows
+ RowSet rows = res.result();
+ } else {
+ System.out.println("Batch failed " + res.cause());
+ }
+ });
+ }
+
+ public void queries09(SqlClient client, SqlConnectOptions connectOptions) {
+
+ // Enable prepare statements caching
+ connectOptions.setCachePreparedStatements(true);
+ client
+ .preparedQuery("SELECT * FROM users WHERE id = ?")
+ .execute(Tuple.of("julien"), ar -> {
+ if (ar.succeeded()) {
+ RowSet rows = ar.result();
+ System.out.println("Got " + rows.size() + " rows ");
+ } else {
+ System.out.println("Failure: " + ar.cause().getMessage());
+ }
+ });
+ }
+
+ public void queries10(SqlConnection sqlConnection) {
+ sqlConnection
+ .prepare("SELECT * FROM users WHERE id = ?", ar -> {
+ if (ar.succeeded()) {
+ PreparedStatement preparedStatement = ar.result();
+ preparedStatement.query()
+ .execute(Tuple.of("julien"), ar2 -> {
+ if (ar2.succeeded()) {
+ RowSet rows = ar2.result();
+ System.out.println("Got " + rows.size() + " rows ");
+ preparedStatement.close();
+ } else {
+ System.out.println("Failure: " + ar2.cause().getMessage());
+ }
+ });
+ } else {
+ System.out.println("Failure: " + ar.cause().getMessage());
+ }
+ });
+ }
+
+ public void usingConnections01(Vertx vertx, Pool pool) {
+
+ pool
+ .getConnection()
+ .compose(connection ->
+ connection
+ .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
+ .executeBatch(Arrays.asList(
+ Tuple.of("Julien", "Viet"),
+ Tuple.of("Emad", "Alblueshi")
+ ))
+ .compose(res -> connection
+ // Do something with rows
+ .query("SELECT COUNT(*) FROM Users")
+ .execute()
+ .map(rows -> rows.iterator().next().getInteger(0)))
+ // Return the connection to the pool
+ .eventually(v -> connection.close())
+ ).onSuccess(count -> {
+ System.out.println("Insert users, now the number of users is " + count);
+ });
+ }
+
+ public void usingConnections02(SqlConnection connection) {
+ connection
+ .prepare("SELECT * FROM users WHERE first_name LIKE ?")
+ .compose(pq ->
+ pq.query()
+ .execute(Tuple.of("Julien"))
+ .eventually(v -> pq.close())
+ ).onSuccess(rows -> {
+ // All rows
+ });
+ }
+
+ public void usingConnections03(Pool pool) {
+ pool.withConnection(connection ->
+ connection
+ .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
+ .executeBatch(Arrays.asList(
+ Tuple.of("Julien", "Viet"),
+ Tuple.of("Emad", "Alblueshi")
+ ))
+ .compose(res -> connection
+ // Do something with rows
+ .query("SELECT COUNT(*) FROM Users")
+ .execute()
+ .map(rows -> rows.iterator().next().getInteger(0)))
+ ).onSuccess(count -> {
+ System.out.println("Insert users, now the number of users is " + count);
+ });
+ }
+
+ public void transaction01(Pool pool) {
+ //transactions are not supported
+ }
+
+ public void transaction02(Transaction tx) {
+ //transactions are not supported
+ }
+
+ public void transaction03(Pool pool) {
+ //transactions are not supported
+ }
+
+ public void usingCursors01(SqlConnection connection) {
+ connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
+ if (ar1.succeeded()) {
+ PreparedStatement pq = ar1.result();
+
+ // Create a cursor
+ Cursor cursor = pq.cursor(Tuple.of(18));
+
+ // Read 50 rows
+ cursor.read(50, ar2 -> {
+ if (ar2.succeeded()) {
+ RowSet rows = ar2.result();
+
+ // Check for more ?
+ if (cursor.hasMore()) {
+ // Repeat the process...
+ } else {
+ // No more rows - close the cursor
+ cursor.close();
+ }
+ }
+ });
+ }
+ });
+ }
+
+ public void usingCursors02(Cursor cursor) {
+ cursor.read(50, ar2 -> {
+ if (ar2.succeeded()) {
+ // Close the cursor
+ cursor.close();
+ }
+ });
+ }
+
+ public void usingCursors03(SqlConnection connection) {
+ connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
+ if (ar1.succeeded()) {
+ PreparedStatement pq = ar1.result();
+
+ // Fetch 50 rows at a time
+ RowStream stream = pq.createStream(50, Tuple.of(18));
+
+ // Use the stream
+ stream.exceptionHandler(err -> {
+ System.out.println("Error: " + err.getMessage());
+ });
+ stream.endHandler(v -> {
+ System.out.println("End of stream");
+ });
+ stream.handler(row -> {
+ System.out.println("User: " + row.getString("last_name"));
+ });
+ }
+ });
+ }
+
+ public void tracing01(ClickhouseBinaryConnectOptions options) {
+ options.setTracingPolicy(TracingPolicy.ALWAYS);
+ }
+
+ public void poolConfig01(ClickhouseBinaryConnectOptions server1, ClickhouseBinaryConnectOptions server2, ClickhouseBinaryConnectOptions server3, PoolOptions options) {
+ ClickhouseBinaryPool pool = ClickhouseBinaryPool.pool(Arrays.asList(server1, server2, server3), options);
+ }
+
+ public void poolConfig02(ClickhouseBinaryPool pool, String sql) {
+ pool.connectHandler(conn -> {
+ conn.query(sql).execute().onSuccess(res -> {
+ // Release the connection to the pool, ready to be used by the application
+ conn.close();
+ });
+ });
+ }
+
+ public void poolSharing1(Vertx vertx, ClickhouseBinaryConnectOptions database, int maxSize) {
+ ClickhouseBinaryPool pool = ClickhouseBinaryPool.pool(database, new PoolOptions().setMaxSize(maxSize));
+ vertx.deployVerticle(() -> new AbstractVerticle() {
+ @Override
+ public void start() throws Exception {
+ // Use the pool
+ }
+ }, new DeploymentOptions().setInstances(4));
+ }
+
+ public void poolSharing2(Vertx vertx, ClickhouseBinaryConnectOptions database, int maxSize) {
+ vertx.deployVerticle(() -> new AbstractVerticle() {
+ ClickhouseBinaryPool pool;
+ @Override
+ public void start() {
+ // Get or create a shared pool
+ // this actually creates a lease to the pool
+ // when the verticle is undeployed, the lease will be released automaticaly
+ pool = ClickhouseBinaryPool.pool(database, new PoolOptions()
+ .setMaxSize(maxSize)
+ .setShared(true)
+ .setName("my-pool"));
+ }
+ }, new DeploymentOptions().setInstances(4));
+ }
+
+ public static void poolSharing3(Vertx vertx, ClickhouseBinaryConnectOptions database, int maxSize) {
+ ClickhouseBinaryPool pool = ClickhouseBinaryPool.pool(database, new PoolOptions()
+ .setMaxSize(maxSize)
+ .setShared(true)
+ .setName("my-pool")
+ .setEventLoopSize(4));
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnectOptions.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnectOptions.java
new file mode 100644
index 000000000..b375e6383
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnectOptions.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary;
+
+import io.vertx.clickhouseclient.binary.impl.ClickhouseBinaryConnectionUriParser;
+import io.vertx.codegen.annotations.DataObject;
+import io.vertx.core.json.JsonObject;
+import io.vertx.sqlclient.SqlConnectOptions;
+
+@DataObject(generateConverter = true)
+public class ClickhouseBinaryConnectOptions extends SqlConnectOptions {
+
+ public static ClickhouseBinaryConnectOptions wrap(SqlConnectOptions options) {
+ if (options instanceof ClickhouseBinaryConnectOptions) {
+ return (ClickhouseBinaryConnectOptions) options;
+ } else {
+ return new ClickhouseBinaryConnectOptions(options);
+ }
+ }
+
+ public static ClickhouseBinaryConnectOptions fromUri(String connectionUri) throws IllegalArgumentException {
+ JsonObject parsedConfiguration = ClickhouseBinaryConnectionUriParser.parse(connectionUri);
+ return new ClickhouseBinaryConnectOptions(parsedConfiguration);
+ }
+
+ public ClickhouseBinaryConnectOptions() {
+ super();
+ }
+
+ public ClickhouseBinaryConnectOptions(JsonObject json) {
+ super(json);
+ ClickhouseBinaryConnectOptionsConverter.fromJson(json, this);
+ }
+
+ public ClickhouseBinaryConnectOptions(SqlConnectOptions other) {
+ super(other);
+ }
+
+ public ClickhouseBinaryConnectOptions(ClickhouseBinaryConnectOptions other) {
+ super(other);
+ }
+
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnection.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnection.java
new file mode 100644
index 000000000..2286fb721
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryConnection.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary;
+
+import io.vertx.clickhouseclient.binary.impl.ClickhouseBinaryConnectionImpl;
+import io.vertx.codegen.annotations.VertxGen;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.sqlclient.SqlConnection;
+
+@VertxGen
+public interface ClickhouseBinaryConnection extends SqlConnection {
+ static void connect(Vertx vertx, ClickhouseBinaryConnectOptions connectOptions, Handler> handler) {
+ Future fut = connect(vertx, connectOptions);
+ if (handler != null) {
+ fut.onComplete(handler);
+ }
+ }
+
+ static Future connect(Vertx vertx, ClickhouseBinaryConnectOptions connectOptions) {
+ return ClickhouseBinaryConnectionImpl.connect((ContextInternal) vertx.getOrCreateContext(), connectOptions);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryPool.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryPool.java
new file mode 100644
index 000000000..755136909
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseBinaryPool.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary;
+
+import io.vertx.clickhouseclient.binary.spi.ClickhouseBinaryDriver;
+import io.vertx.codegen.annotations.VertxGen;
+import io.vertx.core.Vertx;
+import io.vertx.sqlclient.Pool;
+import io.vertx.sqlclient.PoolOptions;
+
+import java.util.Collections;
+import java.util.List;
+
+@VertxGen
+public interface ClickhouseBinaryPool extends Pool {
+ static ClickhouseBinaryPool pool(ClickhouseBinaryConnectOptions database, PoolOptions options) {
+ return pool(null, database, options);
+ }
+
+ static ClickhouseBinaryPool pool(Vertx vertx, ClickhouseBinaryConnectOptions database, PoolOptions options) {
+ return pool(vertx, Collections.singletonList(database), options);
+ }
+
+ static ClickhouseBinaryPool pool(List databases, PoolOptions options) {
+ return pool(null, databases, options);
+ }
+
+ static ClickhouseBinaryPool pool(Vertx vertx, List databases, PoolOptions options) {
+ return (ClickhouseBinaryPool) ClickhouseBinaryDriver.INSTANCE.createPool(vertx, databases, options);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseConstants.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseConstants.java
new file mode 100644
index 000000000..6b6f9e9c4
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/ClickhouseConstants.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClickhouseConstants {
+ public static final int DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES = 50264;
+ public static final int DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS = 51554;
+ public static final int DBMS_MIN_REVISION_WITH_BLOCK_INFO = 51903;
+
+ public static final int DBMS_MIN_REVISION_WITH_CLIENT_INFO = 54032;
+ public static final int DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE = 54058;
+ public static final int DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO = 54060;
+ public static final int DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME = 54372;
+ public static final int DBMS_MIN_REVISION_WITH_VERSION_PATCH = 54401;
+ public static final int DBMS_MIN_REVISION_WITH_SERVER_LOGS = 54406;
+ public static final int DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA = 54410;
+ public static final int DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO = 54420;
+ public static final int DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS = 54429;
+ public static final int DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET = 54441;
+
+ public static final int CLIENT_VERSION_MAJOR = 20;
+ public static final int CLIENT_VERSION_MINOR = 10;
+ public static final int CLIENT_VERSION_PATCH = 2;
+ public static final int CLIENT_REVISION = 54441;
+
+ public static final String OPTION_APPLICATION_NAME = "application_name";
+ public static final String OPTION_INITIAL_USER = "initial_user";
+ public static final String OPTION_INITIAL_QUERY_ID = "initial_query_id";
+ public static final String OPTION_INITIAL_ADDRESS = "initial_address";
+ public static final String OPTION_INITIAL_USERNAME = "initial_username";
+ public static final String OPTION_INITIAL_HOSTNAME = "initial_hostname";
+ public static final String OPTION_COMPRESSOR = "compressor";
+ public static final String OPTION_STRING_CHARSET = "string_charset";
+ public static final String OPTION_DEFAULT_ZONE_ID = "default_zone_id";
+ public static final String OPTION_YEAR_DURATION = "days_in_year";
+ public static final String OPTION_QUARTER_DURATION = "days_in_quarter";
+ public static final String OPTION_MONTH_DURATION = "days_in_month";
+ public static final String OPTION_SEND_LOGS_LEVEL = "send_logs_level";
+ public static final String OPTION_DATETIME64_EXTRA_NANOS_MODE = "dt64_extra_nanos";
+ public static final String OPTION_ENUM_RESOLUTION = "enum_resolution";
+ public static final String OPTION_MAX_BLOCK_SIZE = "max_block_size";
+ public static final String OPTION_REMOVE_TRAILING_ZEROS_WHEN_ENCODE_FIXED_STRINGS = "remove_trailing_zeros_when_encode_fixed_strings";
+
+ public static final short COMPRESSION_METHOD_LZ4 = 0x82;
+ public static final short COMPRESSION_METHOD_ZSTD = 0x90;
+
+ public static final Set NON_QUERY_OPTIONS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ OPTION_APPLICATION_NAME, OPTION_INITIAL_USER, OPTION_INITIAL_QUERY_ID, OPTION_INITIAL_ADDRESS, OPTION_INITIAL_USERNAME,
+ OPTION_INITIAL_HOSTNAME, OPTION_COMPRESSOR, OPTION_STRING_CHARSET, OPTION_DEFAULT_ZONE_ID, OPTION_YEAR_DURATION, OPTION_QUARTER_DURATION,
+ OPTION_MONTH_DURATION, OPTION_DATETIME64_EXTRA_NANOS_MODE, OPTION_ENUM_RESOLUTION, OPTION_REMOVE_TRAILING_ZEROS_WHEN_ENCODE_FIXED_STRINGS)));
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/BaseBlock.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/BaseBlock.java
new file mode 100644
index 000000000..501b3ff80
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/BaseBlock.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseBinaryColumnDescriptor;
+import io.vertx.clickhouseclient.binary.impl.codec.columns.ClickhouseColumnReader;
+import io.vertx.sqlclient.desc.ColumnDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class BaseBlock {
+ private final Map columnsWithTypes;
+ protected final ClickhouseBinaryRowDesc rowDesc;
+ private final List data;
+ private final BlockInfo blockInfo;
+ protected final ClickhouseBinaryDatabaseMetadata md;
+
+ public BaseBlock(Map columnsWithTypes,
+ List data, BlockInfo blockInfo, ClickhouseBinaryDatabaseMetadata md) {
+ this.columnsWithTypes = columnsWithTypes;
+ this.rowDesc = buildRowDescriptor(columnsWithTypes);
+ this.data = data;
+ this.blockInfo = blockInfo;
+ this.md = md;
+ }
+
+ public Map getColumnsWithTypes() {
+ return columnsWithTypes;
+ }
+
+ public List getData() {
+ return data;
+ }
+
+ public BlockInfo getBlockInfo() {
+ return blockInfo;
+ }
+
+ public ClickhouseBinaryDatabaseMetadata getMd() {
+ return md;
+ }
+
+ public ClickhouseBinaryRowDesc rowDesc() {
+ return rowDesc;
+ }
+
+ private ClickhouseBinaryRowDesc buildRowDescriptor(Map columnsWithTypes) {
+ ClickhouseBinaryColumnDescriptor[] columnTypes = columnsWithTypes.values().toArray(ClickhouseBinaryRowDesc.EMPTY_DESCRIPTORS);
+ return new ClickhouseBinaryRowDesc(columnTypes);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/BlockInfo.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/BlockInfo.java
new file mode 100644
index 000000000..01e7a2bd1
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/BlockInfo.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseStreamDataSink;
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseStreamDataSource;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+
+public class BlockInfo {
+ private static final Logger LOG = LoggerFactory.getLogger(BlockInfo.class);
+
+ private Boolean isOverflows;
+ private Integer bucketNum;
+ private boolean complete;
+ private Integer fieldNum;
+
+ public BlockInfo() {
+ isOverflows = false;
+ bucketNum = -1;
+ }
+
+ public BlockInfo(Boolean isOverflows, Integer bucketNum) {
+ this.isOverflows = isOverflows;
+ this.bucketNum = bucketNum;
+ }
+
+ public void serializeTo(ClickhouseStreamDataSink sink) {
+ sink.writeULeb128(1);
+ sink.writeByte(isOverflows ? 1 : 0);
+ sink.writeULeb128(2);
+ sink.writeIntLE(bucketNum);
+ sink.writeULeb128(0);
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public boolean isPartial() {
+ return !complete;
+ }
+
+ public void readFrom(ClickhouseStreamDataSource in) {
+ while (isPartial()) {
+ if (fieldNum == null) {
+ fieldNum = in.readULeb128();
+ if (fieldNum == null) {
+ return;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("fieldNum: " + fieldNum + "(" + Integer.toHexString(fieldNum) + ")");
+ }
+ if (fieldNum == 0) {
+ complete = true;
+ return;
+ }
+ if (fieldNum == 1) {
+ if (in.readableBytes() >= 1) {
+ isOverflows = in.readBoolean();
+ fieldNum = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("isOverflows: " + isOverflows);
+ }
+ } else {
+ return;
+ }
+ } else if (fieldNum == 2) {
+ int readable = in.readableBytes();
+ if (readable >= 4) {
+ bucketNum = in.readIntLE();
+ fieldNum = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("bucketNum: " + bucketNum);
+ }
+ } else {
+ return;
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BlockInfo{" +
+ "isOverflows=" + isOverflows +
+ ", bucketNum=" + bucketNum +
+ '}';
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionFactory.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionFactory.java
new file mode 100644
index 000000000..2d07a98c3
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionFactory.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.ClickhouseConstants;
+import io.vertx.clickhouseclient.binary.ClickhouseBinaryConnectOptions;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.future.PromiseInternal;
+import io.vertx.core.net.NetClientOptions;
+import io.vertx.core.net.NetSocket;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.core.net.impl.NetSocketInternal;
+import io.vertx.sqlclient.SqlConnectOptions;
+import io.vertx.sqlclient.SqlConnection;
+import io.vertx.sqlclient.impl.Connection;
+import io.vertx.sqlclient.impl.ConnectionFactoryBase;
+import io.vertx.sqlclient.impl.tracing.QueryTracer;
+import net.jpountz.lz4.LZ4Factory;
+
+public class ClickhouseBinaryConnectionFactory extends ConnectionFactoryBase {
+ public static final String LZ4_FASTEST_JAVA = "lz4_fastest_java";
+
+ private final LZ4Factory lz4Factory;
+
+ public ClickhouseBinaryConnectionFactory(VertxInternal vertx, ClickhouseBinaryConnectOptions options) {
+ super(vertx, options);
+ this.lz4Factory = lz4FactoryForName(options.getProperties().getOrDefault(ClickhouseConstants.OPTION_COMPRESSOR, LZ4_FASTEST_JAVA));
+ }
+
+ private LZ4Factory lz4FactoryForName(String name) {
+ if ("lz4_native".equals(name)) {
+ return LZ4Factory.nativeInstance();
+ } else if ("lz4_fastest".equals(name)) {
+ return LZ4Factory.fastestInstance();
+ } else if (LZ4_FASTEST_JAVA.equals(name)) {
+ return LZ4Factory.fastestJavaInstance();
+ } else if ("lz4_safe".equals(name)) {
+ return LZ4Factory.safeInstance();
+ } else if ("lz4_unsafe".equals(name)) {
+ return LZ4Factory.unsafeInstance();
+ }
+ return null;
+ }
+
+ @Override
+ protected void initializeConfiguration(SqlConnectOptions connectOptions) {
+ }
+
+ @Override
+ protected void configureNetClientOptions(NetClientOptions netClientOptions) {
+ netClientOptions.setSsl(false);
+ }
+
+ @Override
+ protected Future doConnectInternal(SocketAddress server, String username, String password, String database, EventLoopContext context) {
+ return doConnect(server, context).flatMap(conn -> {
+ ClickhouseBinarySocketConnection socket = (ClickhouseBinarySocketConnection) conn;
+ socket.init();
+ return Future.future(p -> socket.sendStartupMessage(username, password, database, properties, p))
+ .map(conn);
+ });
+ }
+
+ private Future doConnect(SocketAddress server, EventLoopContext ctx) {
+ Future soFut;
+ try {
+ soFut = netClient.connect(server, (String) null);
+ } catch (Exception e) {
+ // Client is closed
+ return ctx.failedFuture(e);
+ }
+ return soFut.map(so -> newSocketConnection(ctx, (NetSocketInternal) so));
+ }
+
+ @Override
+ public Future connect(Context context) {
+ ContextInternal contextInternal = (ContextInternal) context;
+ PromiseInternal promise = contextInternal.promise();
+ connect(asEventLoopContext(contextInternal))
+ .map(conn -> {
+ QueryTracer tracer = contextInternal.tracer() == null ? null : new QueryTracer(contextInternal.tracer(), options);
+ ClickhouseBinaryConnectionImpl dbConn = new ClickhouseBinaryConnectionImpl(this, contextInternal, conn, tracer, null);
+ conn.init(dbConn);
+ return (SqlConnection)dbConn;
+ })
+ .onComplete(promise);
+ return promise.future();
+ }
+
+ private ClickhouseBinarySocketConnection newSocketConnection(EventLoopContext ctx, NetSocketInternal socket) {
+ return new ClickhouseBinarySocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize,
+ preparedStatementCacheSqlFilter, ctx, lz4Factory);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionImpl.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionImpl.java
new file mode 100644
index 000000000..584f80994
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionImpl.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.ClickhouseBinaryConnectOptions;
+import io.vertx.clickhouseclient.binary.ClickhouseBinaryConnection;
+import io.vertx.clickhouseclient.binary.spi.ClickhouseBinaryDriver;
+import io.vertx.core.Future;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.spi.metrics.ClientMetrics;
+import io.vertx.sqlclient.Transaction;
+import io.vertx.sqlclient.impl.Connection;
+import io.vertx.sqlclient.impl.SqlConnectionBase;
+import io.vertx.sqlclient.impl.tracing.QueryTracer;
+import io.vertx.sqlclient.spi.ConnectionFactory;
+
+public class ClickhouseBinaryConnectionImpl extends SqlConnectionBase implements ClickhouseBinaryConnection {
+ public static Future connect(ContextInternal ctx, ClickhouseBinaryConnectOptions options) {
+ ClickhouseBinaryConnectionFactory client;
+ try {
+ client = new ClickhouseBinaryConnectionFactory(ctx.owner(), options);
+ } catch (Exception e) {
+ return ctx.failedFuture(e);
+ }
+ ctx.addCloseHook(client);
+ return (Future)client.connect(ctx);
+ }
+
+ ClickhouseBinaryConnectionImpl(ConnectionFactory factory, ContextInternal context, Connection conn, QueryTracer tracer, ClientMetrics metrics) {
+ super(context, factory, conn, ClickhouseBinaryDriver.INSTANCE, tracer, metrics);
+ }
+
+ @Override
+ public Future begin() {
+ return Future.failedFuture(new UnsupportedOperationException());
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionUriParser.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionUriParser.java
new file mode 100644
index 000000000..cfd6c42f6
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryConnectionUriParser.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.core.json.JsonObject;
+
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.AbstractMap.SimpleImmutableEntry;
+
+public class ClickhouseBinaryConnectionUriParser {
+ public static JsonObject parse(String connectionUri) {
+ return parse(connectionUri, true);
+ }
+
+ public static JsonObject parse(String connectionUri, boolean exact) {
+ try {
+ JsonObject configuration = new JsonObject();
+ URI location = URI.create(connectionUri);
+ String userInfo = location.getUserInfo();
+ String user = userInfo;
+ String password = "";
+ if (userInfo.contains(":")) {
+ String[] tokens = userInfo.split(":");
+ user = tokens[0];
+ password = tokens[1];
+ }
+ configuration.put("user", user);
+ configuration.put("password", password);
+ configuration.put("host", location.getHost());
+ int port = location.getPort();
+ if (port == -1) {
+ port = 9000;
+ }
+ configuration.put("port", port);
+ String path = location.getPath();
+ int startDbOffset = path.startsWith("/") ? 1 : 0;
+ int endLocOffset = path.endsWith("/") && path.length() >= 2 ? 1 : 0;
+ path = path.substring(startDbOffset, path.length() - endLocOffset);
+ configuration.put("database", path);
+
+ configuration.put("properties", queryAsMap(location.getQuery()));
+
+ return configuration;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Cannot parse invalid connection URI: " + connectionUri, e);
+ }
+ }
+
+ public static Map queryAsMap(String query) {
+ if (query == null || query.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return Arrays.stream(query.split("&"))
+ .map(ClickhouseBinaryConnectionUriParser::asEntry)
+ .collect(Collectors.toMap(SimpleImmutableEntry::getKey, SimpleImmutableEntry::getValue));
+ }
+
+ public static AbstractMap.SimpleImmutableEntry asEntry(String str) {
+ int idx = str.indexOf("=");
+ String key = idx > 0 ? str.substring(0, idx) : str;
+ String value = idx > 0 && str.length() > idx + 1 ? str.substring(idx + 1) : null;
+ return new AbstractMap.SimpleImmutableEntry<>(key, value);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryDatabaseMetadata.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryDatabaseMetadata.java
new file mode 100644
index 000000000..1532210d5
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryDatabaseMetadata.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.ClickhouseConstants;
+import io.vertx.sqlclient.spi.DatabaseMetadata;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Map;
+
+public class ClickhouseBinaryDatabaseMetadata implements DatabaseMetadata {
+ private final String productName;
+ private final String fullVersion;
+ private final int major;
+ private final int minor;
+ private final int revision;
+ private final int patchVersion;
+ private final String displayName;
+ private final ZoneId serverZoneId;
+ private final ZoneId defaultZoneId;
+ private final String fullClientName;
+ private final Charset stringCharset;
+ private final Map properties;
+ private final Duration yearDuration;
+ private final Duration quarterDuration;
+ private final Duration monthDuration;
+ private final boolean saturateExtraNanos;
+ private final boolean removeTrailingZerosInFixedStrings;
+
+ public ClickhouseBinaryDatabaseMetadata(String productName, String fullVersion, int major, int minor, int revision,
+ int patchVersion, String displayName, ZoneId serverZoneId, ZoneId defaultZoneId,
+ String fullClientName, Map properties, Charset stringCharset,
+ Duration yearDuration, Duration quarterDuration, Duration monthDuration,
+ boolean saturateExtraNanos, boolean removeTrailingZerosInFixedStrings) {
+ this.productName = productName;
+ this.fullVersion = fullVersion;
+ this.major = major;
+ this.minor = minor;
+ this.revision = revision;
+ this.patchVersion = patchVersion;
+ this.displayName = displayName;
+ this.serverZoneId = serverZoneId;
+ this.defaultZoneId = defaultZoneId;
+ this.fullClientName = fullClientName;
+ this.properties = properties;
+ this.stringCharset = stringCharset;
+ this.yearDuration = yearDuration;
+ this.quarterDuration = quarterDuration;
+ this.monthDuration = monthDuration;
+ this.saturateExtraNanos = saturateExtraNanos;
+ this.removeTrailingZerosInFixedStrings = removeTrailingZerosInFixedStrings;
+ }
+
+ public static Charset charset(Map props) {
+ String desiredCharset = props.get(ClickhouseConstants.OPTION_STRING_CHARSET);
+ if (desiredCharset == null || "system_default".equals(desiredCharset)) {
+ return Charset.defaultCharset();
+ } else {
+ return Charset.forName(desiredCharset);
+ }
+ }
+
+ @Override
+ public String productName() {
+ return productName;
+ }
+
+ @Override
+ public String fullVersion() {
+ return fullVersion;
+ }
+
+ @Override
+ public int majorVersion() {
+ return major;
+ }
+
+ @Override
+ public int minorVersion() {
+ return minor;
+ }
+
+ public int getRevision() {
+ return revision;
+ }
+
+ public int getPatchVersion() {
+ return patchVersion;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public ZoneId getServerZoneId() {
+ return serverZoneId;
+ }
+
+ public ZoneId getDefaultZoneId() {
+ return defaultZoneId;
+ }
+
+ public String getFullClientName() {
+ return fullClientName;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public Charset getStringCharset() {
+ return stringCharset;
+ }
+
+ public Duration yearDuration() {
+ return yearDuration;
+ }
+
+ public Duration quarterDuration() {
+ return quarterDuration;
+ }
+
+ public Duration monthDuration() {
+ return monthDuration;
+ }
+
+ public boolean isSaturateExtraNanos() {
+ return saturateExtraNanos;
+ }
+
+ public boolean isRemoveTrailingZerosInFixedStrings() {
+ return removeTrailingZerosInFixedStrings;
+ }
+
+ @Override
+ public String toString() {
+ return "ClickhouseNativeDatabaseMetadata{" +
+ "productName='" + productName + '\'' +
+ ", fullVersion='" + fullVersion + '\'' +
+ ", major=" + major +
+ ", minor=" + minor +
+ ", revision=" + revision +
+ ", patchVersion=" + patchVersion +
+ ", displayName='" + displayName + '\'' +
+ ", timezone='" + serverZoneId + '\'' +
+ '}';
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryPoolImpl.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryPoolImpl.java
new file mode 100644
index 000000000..bc2604d11
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryPoolImpl.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.ClickhouseBinaryPool;
+import io.vertx.core.impl.CloseFuture;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.sqlclient.Pool;
+import io.vertx.sqlclient.impl.PoolBase;
+
+public class ClickhouseBinaryPoolImpl extends PoolBase implements ClickhouseBinaryPool {
+ public ClickhouseBinaryPoolImpl(VertxInternal vertx, CloseFuture closeFuture, Pool delegate) {
+ super(vertx, closeFuture, delegate);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryRowDesc.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryRowDesc.java
new file mode 100644
index 000000000..bf07f4d7c
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryRowDesc.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseBinaryColumnDescriptor;
+import io.vertx.sqlclient.impl.RowDesc;
+
+public class ClickhouseBinaryRowDesc extends RowDesc {
+ public static final ClickhouseBinaryColumnDescriptor[] EMPTY_DESCRIPTORS = new ClickhouseBinaryColumnDescriptor[0];
+ public static final ClickhouseBinaryRowDesc EMPTY = new ClickhouseBinaryRowDesc(EMPTY_DESCRIPTORS);
+
+ public ClickhouseBinaryRowDesc(ClickhouseBinaryColumnDescriptor[] columnDescriptors) {
+ super(columnDescriptors);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryRowImpl.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryRowImpl.java
new file mode 100644
index 000000000..f706083bc
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinaryRowImpl.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.impl.codec.columns.ClickhouseColumnReader;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.Tuple;
+import io.vertx.sqlclient.data.NullValue;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClickhouseBinaryRowImpl implements Row {
+ private final int rowNo;
+ private final Charset stringCharset;
+ private final ClickhouseBinaryRowDesc rowDesc;
+ private final ColumnOrientedBlock block;
+
+ public ClickhouseBinaryRowImpl(int rowNo, ClickhouseBinaryRowDesc rowDesc, ColumnOrientedBlock block, ClickhouseBinaryDatabaseMetadata md) {
+ this.rowNo = rowNo;
+ this.rowDesc = rowDesc;
+ this.block = block;
+ this.stringCharset = md.getStringCharset();
+ }
+
+ @Override
+ public String getColumnName(int pos) {
+ return rowDesc.columnNames().get(pos);
+ }
+
+ @Override
+ public int getColumnIndex(String column) {
+ return rowDesc.columnIndex(column);
+ }
+
+ @Override
+ public Object getValue(int columnIndex) {
+ return getValue(columnIndex, Object.class);
+ }
+
+ private Object getValue(int columnIndex, Class> desired) {
+ List data = block.getData();
+ ClickhouseColumnReader column = data.get(columnIndex);
+ Object columnData = column.getElement(rowNo, desired);
+ return columnData;
+ }
+
+ @Override
+ public T get(Class type, int position) {
+ if (type == null) {
+ throw new IllegalArgumentException("Accessor type can not be null");
+ }
+ Object value = getValue(position, type);
+ if (value == null) {
+ return null;
+ }
+ if (type.isAssignableFrom(value.getClass())) {
+ return type.cast(value);
+ }
+ throw new ClassCastException("can't cast value " + value + " at position " + position + " of class " + value.getClass().getName() + " to class " + type.getName());
+ }
+
+ @Override
+ public String getString(int pos) {
+ Object val = getValue(pos);
+ if (val == null) {
+ return null;
+ } else if (val instanceof String) {
+ return (String) val;
+ } else if (val instanceof Enum>) {
+ return ((Enum>) val).name();
+ } else if (val.getClass() == byte[].class) {
+ return new String((byte[])val, stringCharset);
+ } else {
+ throw new ClassCastException("Invalid String value type " + val.getClass());
+ }
+ }
+
+ @Override
+ public Tuple addValue(Object value) {
+ throw new IllegalStateException("not implemented");
+ }
+
+ @Override
+ public int size() {
+ return block.numColumns();
+ }
+
+ @Override
+ public void clear() {
+ throw new IllegalStateException("not implemented");
+ }
+
+ @Override
+ public List> types() {
+ int len = size();
+ List> types = new ArrayList<>(len);
+ for (int i = 0; i < len; i++) {
+ Object param = getValue(i);
+ if (param instanceof NullValue) {
+ types.add(((NullValue) param).type());
+ } else if (param == null) {
+ types.add(Object.class);
+ } else {
+ types.add(param.getClass());
+ }
+ }
+ return types;
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinarySocketConnection.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinarySocketConnection.java
new file mode 100644
index 000000000..d1dc08595
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseBinarySocketConnection.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.netty.channel.ChannelPipeline;
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseBinaryCodec;
+import io.vertx.core.Promise;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.net.impl.NetSocketInternal;
+import io.vertx.sqlclient.impl.Connection;
+import io.vertx.sqlclient.impl.SocketConnectionBase;
+import io.vertx.sqlclient.impl.command.InitCommand;
+import net.jpountz.lz4.LZ4Factory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+public class ClickhouseBinarySocketConnection extends SocketConnectionBase {
+ private ClickhouseBinaryCodec codec;
+ private ClickhouseBinaryDatabaseMetadata md;
+ private UUID psId;
+ private String ourCursorId;
+ private final LZ4Factory lz4Factory;
+
+ public ClickhouseBinarySocketConnection(NetSocketInternal socket,
+ boolean cachePreparedStatements,
+ int preparedStatementCacheSize,
+ Predicate preparedStatementCacheSqlFilter,
+ EventLoopContext context,
+ LZ4Factory lz4Factory) {
+ super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, 1, context);
+ this.lz4Factory = lz4Factory;
+ }
+
+ @Override
+ public void init() {
+ codec = new ClickhouseBinaryCodec(this);
+ ChannelPipeline pipeline = socket.channelHandlerContext().pipeline();
+ pipeline.addBefore("handler", "codec", codec);
+ super.init();
+ }
+
+ void sendStartupMessage(String username, String password, String database, Map properties, Promise completionHandler) {
+ InitCommand cmd = new InitCommand(this, username, password, database, properties);
+ schedule(context, cmd).onComplete(completionHandler);
+ }
+
+ public void setDatabaseMetadata(ClickhouseBinaryDatabaseMetadata md) {
+ this.md = md;
+ }
+
+ public void lockPsOrThrow(UUID newPsId) {
+ if (psId == null) {
+ psId = newPsId;
+ } else {
+ if (newPsId != null) {
+ if (!Objects.equals(psId, newPsId)) {
+ throw new IllegalStateException("attempt to block blocked (" + psId + ") connection by ps" + newPsId);
+ }
+ }
+ }
+ }
+
+ public void lockCursorOrThrow(UUID psId, String newCursorId) {
+ lockPsOrThrow(psId);
+ if (ourCursorId == null) {
+ ourCursorId = newCursorId;
+ } else {
+ if (newCursorId != null) {
+ if (!Objects.equals(ourCursorId, newCursorId)) {
+ throw new IllegalStateException("attempt to block blocked (" + ourCursorId + ") connection by cursor " + newCursorId);
+ }
+ }
+ }
+ }
+
+ public void releaseCursor(UUID psId, String newCursorId) {
+ if (!Objects.equals(this.ourCursorId, newCursorId)) {
+ throw new IllegalStateException("can't release: pending cursor = " + ourCursorId + "; provided: " + newCursorId);
+ }
+ this.ourCursorId = null;
+ }
+
+ public void releasePs(UUID newPs) {
+ if (!Objects.equals(this.psId, newPs)) {
+ throw new IllegalStateException("can't release: pending cursor = " + psId + "; provided: " + newPs);
+ }
+ this.psId = null;
+ }
+
+ public void throwExceptionIfCursorIsBusy(String callerId) {
+ if (ourCursorId != null) {
+ if (!Objects.equals(ourCursorId, callerId)) {
+ throw new IllegalStateException("connection is busy with " + ourCursorId);
+ }
+ }
+ }
+
+ @Override
+ public ClickhouseBinaryDatabaseMetadata getDatabaseMetaData() {
+ return md;
+ }
+
+ public LZ4Factory lz4Factory() {
+ return lz4Factory;
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseServerException.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseServerException.java
new file mode 100644
index 000000000..89474406b
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ClickhouseServerException.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+public class ClickhouseServerException extends RuntimeException {
+ private final int code;
+ private final String name;
+ private final String message;
+ private final String stacktrace;
+
+ private ClickhouseServerException(Integer code, String name, String message, String stacktrace, ClickhouseServerException cause, boolean unused) {
+ super(message, cause, false, true);
+ this.code = code;
+ this.name = name;
+ this.message = message;
+ //TODO: maybe log stacktraces with specified EOL (useful for log collectors)
+ this.stacktrace = stacktrace;
+ }
+
+ private ClickhouseServerException(Integer code, String name, String message, String stacktrace, ClickhouseServerException cause) {
+ super(message, cause, false, false);
+ this.code = code;
+ this.name = name;
+ this.message = message;
+ this.stacktrace = stacktrace;
+ }
+
+ public static ClickhouseServerException build(Integer code, String name, String message, String stacktrace, ClickhouseServerException cause, boolean first) {
+ if (first) {
+ return new ClickhouseServerException(code, name, message, stacktrace, cause, first);
+ }
+ return new ClickhouseServerException(code, name, message, stacktrace, cause);
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getMessage() {
+ return message;
+ }
+
+ public String getServerStacktrace() {
+ return stacktrace;
+ }
+
+ @Override
+ public String toString() {
+ return "ClickhouseServerException{" +
+ "code=" + code +
+ ", name='" + name + '\'' +
+ ", message='" + message + '\'' +
+ ", stacktrace='" + stacktrace + '\'' +
+ '}';
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ColumnOrientedBlock.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ColumnOrientedBlock.java
new file mode 100644
index 000000000..2bf023f7c
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/ColumnOrientedBlock.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseBinaryColumnDescriptor;
+import io.vertx.clickhouseclient.binary.impl.codec.columns.ClickhouseColumnReader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class ColumnOrientedBlock extends BaseBlock {
+ public ColumnOrientedBlock(Map columnsWithTypes,
+ List data, BlockInfo blockInfo, ClickhouseBinaryDatabaseMetadata md) {
+ super(columnsWithTypes, data, blockInfo, md);
+ }
+
+ public int numColumns() {
+ Collection dt = getData();
+ return dt == null ? 0 : dt.size();
+ }
+
+ public int numRows() {
+ if (numColumns() > 0) {
+ ClickhouseColumnReader firstColumn = getData().iterator().next();
+ return firstColumn.nRows();
+ } else {
+ return 0;
+ }
+ }
+
+ public List rows() {
+ int numRows = numRows();
+ List ret = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ ret.add(row(i));
+ }
+ return ret;
+ }
+
+ public ClickhouseBinaryRowImpl row(int rowNo) {
+ return new ClickhouseBinaryRowImpl(rowNo, rowDesc, this, md);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/RowOrientedBlock.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/RowOrientedBlock.java
new file mode 100644
index 000000000..e4790ec37
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/RowOrientedBlock.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl;
+
+import io.vertx.clickhouseclient.binary.ClickhouseConstants;
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseBinaryColumnDescriptor;
+import io.vertx.clickhouseclient.binary.impl.codec.ClickhouseStreamDataSink;
+import io.vertx.clickhouseclient.binary.impl.codec.columns.ClickhouseColumnWriter;
+import io.vertx.clickhouseclient.binary.impl.codec.columns.ClickhouseColumns;
+import io.vertx.sqlclient.Tuple;
+import io.vertx.sqlclient.impl.RowDesc;
+
+import java.util.List;
+
+public class RowOrientedBlock {
+ private final RowDesc rowDesc;
+ private final List data;
+ private final BlockInfo blockInfo;
+ private final ClickhouseBinaryDatabaseMetadata md;
+ private final ClickhouseColumnWriter[] writers;
+
+ public RowOrientedBlock(RowDesc rowDesc,
+ List data, ClickhouseBinaryDatabaseMetadata md) {
+ this.rowDesc = rowDesc;
+ this.data = data;
+ this.blockInfo = new BlockInfo();
+ this.md = md;
+ this.writers = buildWriters();
+ }
+
+ private ClickhouseColumnWriter[] buildWriters() {
+ ClickhouseColumnWriter[] ret = new ClickhouseColumnWriter[nColumns()];
+ for (int columnIndex = 0; columnIndex < nColumns(); ++columnIndex) {
+ ClickhouseBinaryColumnDescriptor descr = (ClickhouseBinaryColumnDescriptor) rowDesc.columnDescriptor().get(columnIndex);
+ ClickhouseColumnWriter writer = ClickhouseColumns.columnForSpec(descr, md).writer(data, columnIndex);
+ ret[columnIndex] = writer;
+ }
+ return ret;
+ }
+
+ public void serializeAsBlock(ClickhouseStreamDataSink sink, int fromRow, int toRow) {
+ if (md.getRevision() >= ClickhouseConstants.DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
+ blockInfo.serializeTo(sink);
+ }
+ //n_columns
+ sink.writeULeb128(nColumns());
+ //n_rows
+ int nRows = toRow - fromRow;
+ sink.writeULeb128(nRows);
+ //TODO: maybe serialize into tiny sinks/blocks here, then return to caller
+ for (int columnIndex = 0; columnIndex < nColumns(); ++columnIndex) {
+ ClickhouseBinaryColumnDescriptor descr = (ClickhouseBinaryColumnDescriptor) rowDesc.columnDescriptor().get(columnIndex);
+ sink.writePascalString(descr.name());
+ sink.writePascalString(descr.getUnparsedNativeType());
+ writers[columnIndex].serializeColumn(sink, fromRow, toRow);
+ }
+ }
+
+ public int nColumns() {
+ return rowDesc.columnDescriptor().size();
+ }
+
+ public int totalRows() {
+ return data.size();
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/BlockStreamProfileInfo.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/BlockStreamProfileInfo.java
new file mode 100644
index 000000000..f5e3d8e17
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/BlockStreamProfileInfo.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl.codec;
+
+public class BlockStreamProfileInfo {
+ private final int rows;
+ private final int blocks;
+ private final int bytes;
+ private final boolean appliedLimit;
+ private final int rowsBeforeLimit;
+ private final boolean calculatedRowsBeforeLimit;
+
+ public BlockStreamProfileInfo(int rows, int blocks, int bytes, boolean appliedLimit, int rowsBeforeLimit,
+ boolean calculatedRowsBeforeLimit) {
+ this.rows = rows;
+ this.blocks = blocks;
+ this.bytes = bytes;
+ this.appliedLimit = appliedLimit;
+ this.rowsBeforeLimit = rowsBeforeLimit;
+ this.calculatedRowsBeforeLimit = calculatedRowsBeforeLimit;
+ }
+
+ public int getRows() {
+ return rows;
+ }
+
+ public int getBlocks() {
+ return blocks;
+ }
+
+ public int getBytes() {
+ return bytes;
+ }
+
+ public boolean getAppliedLimit() {
+ return appliedLimit;
+ }
+
+ public int getRowsBeforeLimit() {
+ return rowsBeforeLimit;
+ }
+
+ public boolean getCalculatedRowsBeforeLimit() {
+ return calculatedRowsBeforeLimit;
+ }
+
+ @Override
+ public String toString() {
+ return "BlockStreamProfileInfo{" +
+ "rows=" + rows +
+ ", blocks=" + blocks +
+ ", bytes=" + bytes +
+ ", appliedLimit=" + appliedLimit +
+ ", rowsBeforeLimit=" + rowsBeforeLimit +
+ ", calculatedRowsBeforeLimit=" + calculatedRowsBeforeLimit +
+ '}';
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/BlockStreamProfileInfoReader.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/BlockStreamProfileInfoReader.java
new file mode 100644
index 000000000..bc56ed327
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/BlockStreamProfileInfoReader.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+
+public class BlockStreamProfileInfoReader {
+ private static final Logger LOG = LoggerFactory.getLogger(BlockStreamProfileInfoReader.class);
+
+ private Integer rows;
+ private Integer blocks;
+ private Integer bytes;
+ private Boolean appliedLimit;
+ private Integer rowsBeforeLimit;
+ private Boolean calculatedRowsBeforeLimit;
+
+ public BlockStreamProfileInfo readFrom(ByteBuf in) {
+ int idxStart = in.readerIndex();
+ if (rows == null) {
+ rows = ByteBufUtils.readULeb128(in);
+ if (rows == null) {
+ return null;
+ }
+ }
+ if (blocks == null) {
+ blocks = ByteBufUtils.readULeb128(in);
+ if (blocks == null) {
+ return null;
+ }
+ }
+ if (bytes == null) {
+ bytes = ByteBufUtils.readULeb128(in);
+ if (bytes == null) {
+ return null;
+ }
+ }
+ if (appliedLimit == null) {
+ if (in.readableBytes() == 0) {
+ return null;
+ }
+ appliedLimit = in.readBoolean();
+ }
+ if (rowsBeforeLimit == null) {
+ rowsBeforeLimit = ByteBufUtils.readULeb128(in);
+ if (rowsBeforeLimit == null) {
+ return null;
+ }
+ }
+ if (calculatedRowsBeforeLimit == null) {
+ if (in.readableBytes() == 0) {
+ return null;
+ }
+ calculatedRowsBeforeLimit = in.readBoolean();
+ }
+ if (LOG.isDebugEnabled()) {
+ int idxEnd = in.readerIndex();
+ String bufferAsStringConsumed = ByteBufUtil.hexDump(in, idxStart, idxEnd - idxStart);
+ LOG.debug("bufferAsStringConsumed: " + bufferAsStringConsumed);
+ }
+ return new BlockStreamProfileInfo(rows, blocks, bytes, appliedLimit, rowsBeforeLimit, calculatedRowsBeforeLimit);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/ByteBufUtils.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/ByteBufUtils.java
new file mode 100644
index 000000000..6cab2cedb
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/ByteBufUtils.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl.codec;
+
+import io.netty.buffer.ByteBuf;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+public class ByteBufUtils {
+ public static void writeULeb128(int value, ByteBuf buf) {
+ assert (value >= 0);
+ int remaining = value >>> 7;
+ while (remaining != 0) {
+ buf.writeByte((byte) ((value & 0x7f) | 0x80));
+ value = remaining;
+ remaining >>>= 7;
+ }
+ buf.writeByte((byte) (value & 0x7f));
+ }
+
+ public static Integer readULeb128(ByteBuf buf) {
+ int value = 0;
+ int read = 0;
+ int count = 0;
+ int readerIndex = buf.readerIndex();
+ boolean notEnoughData = false;
+ do {
+ if (buf.readableBytes() >= 1) {
+ read = buf.readByte() & 0xff;
+ value |= (read & 0x7f) << (count * 7);
+ count++;
+ } else {
+ notEnoughData = true;
+ break;
+ }
+ } while (((read & 0x80) == 0x80) && count < 5);
+
+ if (notEnoughData) {
+ buf.readerIndex(readerIndex);
+ return null;
+ }
+ if ((read & 0x80) == 0x80) {
+ buf.readerIndex(readerIndex);
+ throw new RuntimeException("invalid LEB128 sequence");
+ }
+ return value;
+ }
+
+ public static String readPascalString(ByteBuf buf, Charset charset) {
+ int readerIndex = buf.readerIndex();
+ Integer length = readULeb128(buf);
+ if (length == null) {
+ return null;
+ }
+ if (buf.readableBytes() >= length) {
+ byte[] b = new byte[length];
+ buf.readBytes(b);
+ return new String(b, charset);
+ }
+ buf.readerIndex(readerIndex);
+ return null;
+ }
+
+ public static void writePascalString(String str, ByteBuf buf) {
+ byte[] b = str.getBytes(StandardCharsets.UTF_8);
+ writeULeb128(b.length, buf);
+ buf.writeBytes(b);
+ }
+}
diff --git a/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/ClickhouseBinaryCodec.java b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/ClickhouseBinaryCodec.java
new file mode 100644
index 000000000..782258d08
--- /dev/null
+++ b/vertx-clickhouse-binary-client/src/main/java/io/vertx/clickhouseclient/binary/impl/codec/ClickhouseBinaryCodec.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2021 Vladimir Vishnevskii
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ *
+ */
+
+package io.vertx.clickhouseclient.binary.impl.codec;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.CombinedChannelDuplexHandler;
+import io.vertx.clickhouseclient.binary.impl.ClickhouseBinarySocketConnection;
+import io.vertx.core.impl.NoStackTraceThrowable;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.sqlclient.impl.command.CommandBase;
+import io.vertx.sqlclient.impl.command.CommandResponse;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+
+public class ClickhouseBinaryCodec extends CombinedChannelDuplexHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ClickhouseBinaryCodec.class);
+
+ private ArrayDeque> inflight;
+
+ public ClickhouseBinaryCodec(ClickhouseBinarySocketConnection conn) {
+ inflight = new ArrayDeque<>();
+ ClickhouseBinaryEncoder encoder = new ClickhouseBinaryEncoder(inflight, conn);
+ ClickhouseBinaryDecoder decoder = new ClickhouseBinaryDecoder(inflight);
+ init(decoder, encoder);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ fail(ctx, cause);
+ super.exceptionCaught(ctx, cause);
+ }
+
+ private void fail(ChannelHandlerContext ctx, Throwable cause) {
+ for (Iterator> it = inflight.iterator(); it.hasNext();) {
+ ClickhouseBinaryCommandCodec, ?> codec = it.next();
+ it.remove();
+ CommandResponse