diff --git a/tools/etl/tg-spark-connector/LICENSE b/tools/etl/tg-spark-connector/LICENSE new file mode 100644 index 00000000..989e2c59 --- /dev/null +++ b/tools/etl/tg-spark-connector/LICENSE @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/tools/etl/tg-spark-connector/pom.xml b/tools/etl/tg-spark-connector/pom.xml new file mode 100644 index 00000000..e95980d4 --- /dev/null +++ b/tools/etl/tg-spark-connector/pom.xml @@ -0,0 +1,257 @@ + + + + 4.0.0 + + com.tigergraph + tigergraph-spark-connector + 0.1.0 + + tigergraph-spark-connector + + + UTF-8 + 1.8 + 1.8 + 3.2.0 + 12.5 + + + + + org.apache.spark + spark-core_2.13 + ${spark.version} + provided + + + org.apache.spark + spark-sql_2.13 + ${spark.version} + provided + + + io.github.openfeign + feign-core + + + io.github.openfeign + feign-jackson + + + io.github.openfeign + feign-hc5 + + + org.junit.jupiter + junit-jupiter-engine + 5.8.2 + test + + + + + + + org.codehaus.mojo + animal-sniffer-maven-plugin + 1.19 + + + check-java-8-compatibility + compile + + check + + + + org.codehaus.mojo.signature + java18 + 1.0 + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.2.3 + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.3 + + + package + + shade + + + true + jar-with-dependencies + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + false + + + + + + + + + + + + publish + + + ossrh + https://s01.oss.sonatype.org/content/repositories/snapshots + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.12 + true + + ossrh + https://s01.oss.sonatype.org/ + true + + + + maven-assembly-plugin + + false + + src/assembly/package.xml + + + + + package + + single + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.0.1 + + + sign-artifacts + verify + + sign + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.3.2 + + + attach-javadocs + + jar + + + -Xdoclint:none + + + + + + + + + check + + + + + org.owasp + dependency-check-maven + 7.0.4 + + + + check + + + + + + + + + + + + + io.github.openfeign + feign-bom + ${feign.version} + pom + import + + + + \ No newline at end of file diff --git a/tools/etl/tg-spark-connector/src/assembly/package.xml b/tools/etl/tg-spark-connector/src/assembly/package.xml new file mode 100644 index 00000000..09886049 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/assembly/package.xml @@ -0,0 +1,26 @@ + + + release + false + + + tar.gz + + + + ${project.build.directory}/${project.artifactId}-${project.version}.jar + ${project.artifactId}-${project.version}.jar + / + + + + + /lib + + ${project.groupId}:${project.artifactId}:jar:* + + + + diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphConnection.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphConnection.java new file mode 100644 index 00000000..1859a0dd --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphConnection.java @@ -0,0 +1,299 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark; + +import java.io.Serializable; +import java.time.Instant; +import java.util.Base64; +import com.tigergraph.spark.client.Builder; +import com.tigergraph.spark.client.Auth; +import com.tigergraph.spark.client.Misc; +import com.tigergraph.spark.client.Write; +import com.tigergraph.spark.client.Auth.AuthResponse; +import com.tigergraph.spark.client.common.RestppResponse; +import com.tigergraph.spark.util.Options; +import com.tigergraph.spark.util.Utils; +import feign.FeignException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Initalize TG connection including:
+ * 1. init authentication;
+ * 2. init the clients needed for corresponding operations. + * + *

Note, it is not a real DB connection, no network connection will be cached. + * + *

This connection will be inited in driver, then be serialized and sent to executors. Transient + * variables will be rebuilt in executors. + */ +public class TigerGraphConnection implements Serializable { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphConnection.class); + + private Options opts; + // Common connection variables + private final String graph; + private final String url; + private final long creationTime; + private String version; + private transient Misc misc; + // Authentication variables + private String basicAuth; + private String secret; + private String token; + private boolean restAuthEnabled; + private boolean restAuthInited; + private transient Auth auth; + // Loading job variables/consts + // spark job type is supported for [3.10.0,), [3.9.4,) + static final String JOB_IDENTIFIER = "spark"; + static final String JOB_MACHINE = "all"; + private String loadingJobId = null; + private transient Write write; + + /** + * Only be called in driver, serialized and sent to executors.
+ * 1. build http client, set SSLSocketFactory if SSL enbled
+ * 2. based on 1, build {@link Auth} client
+ * 3. based on 2, detect if auth is enabled and request token if not given
+ * 4. based on 3, we can build requestInterceptor(add auth header) and retryer(refresh token) for + * other clients
+ * 5. init for specific operations, e.g., loading job id + * + * @param opts + */ + public TigerGraphConnection(Options opts, long creationTime) { + this.opts = opts; + this.creationTime = creationTime; + graph = opts.getString(Options.GRAPH); + url = opts.getString(Options.URL); + initAuth(); + // get TG version + version = opts.getString(Options.VERSION); + if (Utils.isEmpty(version)) { + RestppResponse verResp = getMisc().version(); + verResp.panicOnFail(); + version = Utils.extractVersion(verResp.message); + } + if (Utils.versionCmp(version, "3.6.0") <= 0) { + throw new UnsupportedOperationException( + "TigerGraph version under 3.6.0 is unsupported, current version: " + version); + } + logger.info("TigerGraph version: {}", version); + + if (Options.OptionType.WRITE.equals(opts.getOptionType()) + && Utils.versionCmp(version, "3.9.4") >= 0) { + loadingJobId = generateJobId(graph, opts.getString(Options.LOADING_JOB), creationTime); + } + } + + public TigerGraphConnection(Options opts) { + this(opts, Instant.now().toEpochMilli()); + } + + private void initAuth() { + if (!restAuthInited) { + this.secret = opts.getString(Options.SECRET); + this.token = opts.getString(Options.TOKEN); + // 1. encode username:password to basic auth + if (!Utils.isEmpty(opts.getString(Options.USERNAME)) + && !Utils.isEmpty(opts.getString(Options.PASSWORD))) { + this.basicAuth = + new String( + Base64.getEncoder() + .encode( + (opts.getString(Options.USERNAME) + ":" + opts.getString(Options.PASSWORD)) + .getBytes())); + } + // 2. init Auth client + getAuth(); + // 3. check if restpp auth is enabled + restAuthEnabled = true; + try { + auth.checkAuthEnabled(); + } catch (FeignException e) { + if (e.status() == 404) { + restAuthEnabled = false; + logger.warn( + "RESTPP authentication is not enabled, you can enable it via `gadmin config set" + + " RESTPP.Factory.EnableAuth true`"); + } else { + throw e; + } + } + // 4. request token if username/password or secret is provided but token is empty + if (restAuthEnabled && Utils.isEmpty(token)) { + AuthResponse resp; + if (!Utils.isEmpty(basicAuth)) { + resp = auth.requestTokenWithUserPass(graph, basicAuth, Auth.TOKEN_LIFETIME_SEC); + resp.panicOnFail(); + token = resp.results.get("token").asText(); + } else if (!Utils.isEmpty(secret)) { + resp = auth.requestTokenWithSecret(secret, Auth.TOKEN_LIFETIME_SEC); + resp.panicOnFail(); + token = resp.token; + } else { + throw new IllegalArgumentException( + "Restpp authentication is enabled, please provide at least one of the 'token'," + + " 'secret' or 'username/password' pair."); + } + logger.info( + "Requested new token {} for RESTPP authentication, expiration: {}", + Utils.maskString(token, 2), + resp.expiration); + } + restAuthInited = true; + } + } + + /** Get auth client for requesting/refreshing token */ + private Auth getAuth() { + if (auth == null) { + Builder builder = + new Builder() + .setRequestOptions( + opts.getInt(Options.IO_CONNECT_TIMEOUT_MS), + opts.getInt(Options.IO_READ_TIMEOUT_MS)) + .setRetryerWithoutAuth( + opts.getInt(Options.IO_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS), + opts.getInt(Options.IO_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS)); + if (url.trim().toLowerCase().startsWith("https://")) { + builder.setSSL( + opts.getString(Options.SSL_MODE), + opts.getString(Options.SSL_TRUSTSTORE), + opts.getString(Options.SSL_TRUSTSTORE_TYPE), + opts.getString(Options.SSL_TRUSTSTORE_PASSWORD)); + } + auth = builder.build(Auth.class, url); + } + return auth; + } + + public Misc getMisc() { + if (!restAuthInited) { + initAuth(); + } + + if (misc == null) { + Builder builder = + new Builder() + .setRequestOptions( + opts.getInt(Options.IO_CONNECT_TIMEOUT_MS), + opts.getInt(Options.IO_READ_TIMEOUT_MS)) + .setRetryer( + getAuth(), + basicAuth, + secret, + token, + opts.getInt(Options.IO_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS), + opts.getInt(Options.IO_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS)) + .setRequestInterceptor(basicAuth, token, restAuthEnabled); + if (url.trim().toLowerCase().startsWith("https://")) { + builder.setSSL( + opts.getString(Options.SSL_MODE), + opts.getString(Options.SSL_TRUSTSTORE), + opts.getString(Options.SSL_TRUSTSTORE_TYPE), + opts.getString(Options.SSL_TRUSTSTORE_PASSWORD)); + } + misc = builder.build(Misc.class, url); + } + return misc; + } + + /** Get write client (/restpp/ddl) */ + public Write getWrite() { + if (!Options.OptionType.WRITE.equals(opts.getOptionType())) { + throw new UnsupportedOperationException( + "Can't build write client for OptionType " + opts.getOptionType()); + } + + if (!restAuthInited) { + initAuth(); + } + + if (write == null) { + Builder builder = + new Builder() + .setRequestOptions( + opts.getInt(Options.IO_CONNECT_TIMEOUT_MS), + opts.getInt(Options.IO_READ_TIMEOUT_MS)) + .setRetryer( + getAuth(), + basicAuth, + secret, + token, + opts.getInt(Options.IO_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS), + opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS), + opts.getInt(Options.LOADING_RETRY_INTERVAL_MS), + opts.getInt(Options.LOADING_MAX_RETRY_INTERVAL_MS), + opts.getInt(Options.LOADING_MAX_RETRY_ATTEMPTS)) + .setRequestInterceptor(basicAuth, token, restAuthEnabled); + if (url.trim().toLowerCase().startsWith("https://")) { + builder.setSSL( + opts.getString(Options.SSL_MODE), + opts.getString(Options.SSL_TRUSTSTORE), + opts.getString(Options.SSL_TRUSTSTORE_TYPE), + opts.getString(Options.SSL_TRUSTSTORE_PASSWORD)); + } + write = builder.build(Write.class, url); + } + return write; + } + + /** + * Generate loading job id:
+ * ..file.all. + * + * @param graph the graph name + * @param job the loading job name + */ + protected static String generateJobId(String graph, String jobname, long creationTime) { + return new StringBuilder() + .append(graph) + .append(".") + .append(jobname) + .append(".") + .append(JOB_IDENTIFIER) + .append(".") + .append(JOB_MACHINE) + .append(".") + .append(creationTime) + .toString(); + } + + public Options getOpts() { + return this.opts; + } + + public String getVersion() { + return this.version; + } + + public String getLoadingJobId() { + return this.loadingJobId; + } + + public String getGraph() { + return this.graph; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphTable.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphTable.java new file mode 100644 index 00000000..7232fc74 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphTable.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark; + +import org.apache.spark.sql.connector.catalog.SupportsWrite; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.types.StructType; +import java.time.Instant; +import java.util.HashSet; +import java.util.Set; +import com.tigergraph.spark.write.TigerGraphWriteBuilder; + +/** The representation of logical structured data set of a TG, with supported capabilities. */ +public class TigerGraphTable implements SupportsWrite { + + private static final String TABLE_NAME = "TigerGraphTable"; + private final StructType schema; + private final long creationTime = Instant.now().toEpochMilli(); + + TigerGraphTable(StructType schema) { + this.schema = schema; + } + + @Override + public String name() { + return TABLE_NAME; + } + + @Override + public StructType schema() { + return schema; + } + + @Override + public Set capabilities() { + return new HashSet(2) { + { + add(TableCapability.BATCH_WRITE); + add(TableCapability.STREAMING_WRITE); + } + }; + } + + @Override + public TigerGraphWriteBuilder newWriteBuilder(LogicalWriteInfo info) throws RuntimeException { + return new TigerGraphWriteBuilder(info, creationTime); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphTableProvider.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphTableProvider.java new file mode 100644 index 00000000..19743af8 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/TigerGraphTableProvider.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark; + +import java.util.Map; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A pure implementation of Spark Data Source V2 that apply data operations to existing TG objects, + * e.g., loading job, pre-installed query, vertex or edge. DDL is unsupported. + */ +public class TigerGraphTableProvider implements TableProvider, DataSourceRegister { + + private static final String SHORT_NAME = "tigergraph"; + + /** + * For Write operation, the schema will be the schema of input dataframe; For Read operation, it + * will be the user given schema. + */ + @Override + public boolean supportsExternalMetadata() { + return true; + } + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'inferSchema'"); + } + + @Override + public TigerGraphTable getTable( + StructType schema, Transform[] partitioning, Map properties) { + return new TigerGraphTable(schema); + } + + @Override + public String shortName() { + return SHORT_NAME; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Auth.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Auth.java new file mode 100644 index 00000000..3fa9d353 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Auth.java @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client; + +import feign.*; +import com.tigergraph.spark.client.common.RestppResponse; + +/** APIs for RESTPP authentication */ +public interface Auth { + public static final long TOKEN_LIFETIME_SEC = 6 * 60 * 60; // 6h + + /** + * A helper function to check whether RESTPP auth is enabled, if not, an exception of 404 error + * will be thrown. + */ + @RequestLine("GET /restpp/requesttoken") + AuthResponse checkAuthEnabled(); + + @RequestLine("POST /restpp/requesttoken") + @Headers({"Content-Type: application/json", "Authorization: Basic {basicAuth}"}) + @Body("%7B\"graph\": \"{graph}\", \"lifetime\": \"{lifetime}\"%7D") + AuthResponse requestTokenWithUserPass( + @Param("graph") String graph, + @Param("basicAuth") String basicAuth, + @Param("lifetime") long lifetime); + + @RequestLine("POST /restpp/requesttoken") + @Headers({"Content-Type: application/json"}) + @Body("%7B\"secret\": \"{secret}\", \"lifetime\": \"{lifetime}\"%7D") + AuthResponse requestTokenWithSecret( + @Param("secret") String secret, @Param("lifetime") long lifetime); + + @RequestLine("PUT /restpp/requesttoken") + @Headers({"Content-Type: application/json", "Authorization: Basic {basicAuth}"}) + @Body("%7B\"token\": \"{token}\", \"lifetime\": \"{lifetime}\"%7D") + AuthResponse refreshTokenWithUserPass( + @Param("token") String token, + @Param("basicAuth") String basicAuth, + @Param("lifetime") long lifetime); + + @RequestLine("PUT /restpp/requesttoken") + @Headers({"Content-Type: application/json"}) + @Body("%7B\"secret\": \"{secret}\", \"token\": \"{token}\", \"lifetime\": \"{lifetime}\"%7D") + AuthResponse refreshTokenWithSecrect( + @Param("token") String token, + @Param("secret") String secret, + @Param("lifetime") long lifetime); + + public class AuthResponse extends RestppResponse { + public long expiration; + public String token; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Builder.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Builder.java new file mode 100644 index 00000000..1b2ec1de --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Builder.java @@ -0,0 +1,208 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client; + +import java.util.List; +import java.util.Random; +import java.io.InputStream; +import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.net.ssl.HostnameVerifier; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier; +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.TrustAllStrategy; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.spark.SparkFiles; +import com.tigergraph.spark.client.common.RestppAuthInterceptor; +import com.tigergraph.spark.client.common.RestppDecoder; +import com.tigergraph.spark.client.common.RestppEncoder; +import com.tigergraph.spark.client.common.RestppErrorDecoder; +import com.tigergraph.spark.client.common.RestppRetryer; +import com.tigergraph.spark.util.Options; +import com.tigergraph.spark.util.Utils; +import feign.*; +import feign.Target.HardCodedTarget; +import feign.codec.Decoder; +import feign.codec.Encoder; +import feign.codec.ErrorDecoder; +import feign.hc5.ApacheHttp5Client; + +/** Builder for all client, with custom client settings. */ +public class Builder { + + private Feign.Builder builder = new Feign.Builder(); + // default client settings + private HttpClientBuilder hc5builder = HttpClientBuilder.create(); + private PoolingHttpClientConnectionManagerBuilder connMgrBuilder = + PoolingHttpClientConnectionManagerBuilder.create(); + private Encoder encoder = RestppEncoder.INSTANCE; + private Decoder decoder = RestppDecoder.INSTANCE; + private ErrorDecoder errDecoder = new RestppErrorDecoder(RestppDecoder.INSTANCE); + private Retryer retryer = new Retryer.Default(); + private RequestInterceptor reqInterceptor; + private Request.Options reqOpts = new Request.Options(); + + public Builder setRequestOptions(int connectTimeoutMs, int readTimeoutMs) { + this.reqOpts = + new Request.Options( + connectTimeoutMs, TimeUnit.MILLISECONDS, readTimeoutMs, TimeUnit.MILLISECONDS, false); + return this; + } + + /** Set response error decoder with the HTTP error codes that will be retried. */ + public Builder setRetryableCode(Integer... code) { + this.errDecoder = new RestppErrorDecoder(decoder, code); + return this; + } + + /** Set retryer for token expiration, io exception and server errors */ + public Builder setRetryer( + Auth auth, + String basicAuth, + String secret, + String token, + int ioPeriod, + int ioMaxPeriod, + int ioMaxAttempts, + int serverPeriod, + int serverMaxPeriod, + int serverMaxAttempts) { + this.retryer = + new RestppRetryer( + auth, + basicAuth, + secret, + token, + ioPeriod, + ioMaxPeriod, + ioMaxAttempts, + serverPeriod, + serverMaxPeriod, + serverMaxAttempts); + return this; + } + + /** Set retryer for io exception and server errors */ + public Builder setRetryerWithoutAuth( + int ioPeriod, + int ioMaxPeriod, + int ioMaxAttempts, + int serverPeriod, + int serverMaxPeriod, + int serverMaxAttempts) { + this.retryer = + new RestppRetryer( + ioPeriod, ioMaxPeriod, ioMaxAttempts, serverPeriod, serverMaxPeriod, serverMaxAttempts); + return this; + } + + /** Set request interceptor for adding authorization header */ + public Builder setRequestInterceptor(String basicAuth, String token, boolean restAuthEnabled) { + this.reqInterceptor = new RestppAuthInterceptor(basicAuth, token, restAuthEnabled); + return this; + } + + /** Set SSL context for the client */ + public Builder setSSL( + String mode, String trustStoreFile, String trustStoreType, String password) { + HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE; + SSLContextBuilder sslContextBuilder = SSLContexts.custom(); + try { + switch (mode) { + case Options.SSL_MODE_BASIC: + sslContextBuilder.loadTrustMaterial(null, new TrustAllStrategy()); + break; + case Options.SSL_MODE_VERIFY_HOSTNAME: + hostnameVerifier = new DefaultHostnameVerifier(); + // the security level of hostname verification is higher than + // CA verification, so need to continue to the next case + case Options.SSL_MODE_VERIFY_CA: + if (Utils.isEmpty(trustStoreFile)) { + throw new IllegalArgumentException("\"ssl.truststore\" is required for mode " + mode); + } + String path = SparkFiles.get(trustStoreFile); + final InputStream in = new FileInputStream(new File(path)); + final KeyStore truststore = KeyStore.getInstance(trustStoreType); + if (Utils.isEmpty(password)) { + truststore.load(in, new char[0]); + } else { + truststore.load(in, password.toCharArray()); + } + sslContextBuilder.loadTrustMaterial(truststore, null); + break; + default: + throw new IllegalArgumentException("Invalid SSL mode: " + mode); + } + connMgrBuilder.setSSLSocketFactory( + new SSLConnectionSocketFactory(sslContextBuilder.build(), hostnameVerifier)); + } catch (Exception e) { + throw new RuntimeException("Failed to configure SSL", e); + } + + return this; + } + + public T build(Class apiType, String url) { + builder + .encoder(encoder) + .decoder(decoder) + .errorDecoder(errDecoder) + .retryer(retryer) + .options(reqOpts) + .client( + new ApacheHttp5Client(hc5builder.setConnectionManager(connMgrBuilder.build()).build())); + if (reqInterceptor != null) { + builder.requestInterceptor(reqInterceptor); + } + return builder.target(new LoadBalanceTarget(apiType, url)); + } + + // The target that support load balancing + public static class LoadBalanceTarget extends HardCodedTarget { + + private final List urls; + private final Random rand = new Random(); + + public LoadBalanceTarget(Class type, String url) { + super(type, url); + urls = + Arrays.stream(url.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .distinct() + .collect(Collectors.toList()); + } + + // Randomly pick an address to build the HTTP request + @Override + public String url() { + return urls.get(rand.nextInt(urls.size())); + } + + @Override + public Request apply(RequestTemplate input) { + // Randomize URLs on every request, including on retries + input.target(url()); + return input.request(); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Misc.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Misc.java new file mode 100644 index 00000000..9ea52328 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Misc.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client; + +import com.tigergraph.spark.client.common.RestppResponse; +import feign.*; + +/** + * Restpp API delaration used for connectivity check, token request and cluster basic info + * detection. + */ +public interface Misc { + @RequestLine("GET /restpp/version") + RestppResponse version(); + + @RequestLine("GET /gsqlserver/gsql/loading-jobs?action={action}&graph={graph}&jobId={jobId}") + RestppResponse loadingAction( + @Param("action") String action, @Param("graph") String graph, @Param("jobId") String jobId); +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Write.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Write.java new file mode 100644 index 00000000..f6bf9caf --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/Write.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client; + +import feign.*; +import java.util.Map; +import com.fasterxml.jackson.databind.JsonNode; +import com.tigergraph.spark.client.common.RestppResponse; + +/** Write service delaration used for data loading. */ +public interface Write { + @RequestLine("POST /restpp/ddl/{graph}") + @Headers({"Content-Type: text/plain"}) + @Body("{data}") + LoadingResponse ddl( + @Param("graph") String graph, + @Param("data") String data, + @QueryMap Map queryMap); + + public class LoadingResponse extends RestppResponse { + public boolean hasInvalidRecord() { + return hasInvalidRecord(results); + } + + // For the numeric element whose key is not validLine or validObject but value is non-zero, + // it should represent the invalid data count, e.g., notEnoughToken: 123 + protected static boolean hasInvalidRecord(JsonNode in) { + return in.toString().matches(".*\"(?!validLine|validObject)[^\"]+\": *[1-9]\\d*.*"); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppAuthInterceptor.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppAuthInterceptor.java new file mode 100644 index 00000000..80bfd6c9 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppAuthInterceptor.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client.common; + +import com.tigergraph.spark.util.Utils; +import feign.RequestInterceptor; +import feign.RequestTemplate; + +/** + * The request interceptor which is responsible for:
+ * 1. attach the basic auth header to the /restpp request 2. attach the bearer auth header to the + * /gsqlserver request + */ +public class RestppAuthInterceptor implements RequestInterceptor { + + static final String GSQL_ENDPOINT = "/gsqlserver"; + + private final String basicAuth; + private final String token; + private final boolean restAuthEnabled; + + public RestppAuthInterceptor(String basicAuth, String token, boolean restAuthEnabled) { + this.basicAuth = basicAuth; + this.token = token; + this.restAuthEnabled = restAuthEnabled; + } + + @Override + public void apply(RequestTemplate template) { + // If rest auth enabled, a token should be provided or requested, + // any requests should have the auth header. + if (restAuthEnabled) { + template.header("Authorization", "Bearer " + token); + } else if (template.path().contains(GSQL_ENDPOINT)) { + // If restpp auth disabled, /gsqlserver endpoint still need authentication. + // user/pass pair and token(requested by user/pass, not system token) are equivalent + if (!Utils.isEmpty(token)) { + template.header("Authorization", "Bearer " + token); + } else if (!Utils.isEmpty(basicAuth)) { + template.header("Authorization", "Basic " + basicAuth); + } else { + throw new IllegalArgumentException( + "Failed to send request to " + + template.path() + + ", no username/password or token provided."); + } + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppDecoder.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppDecoder.java new file mode 100644 index 00000000..4db9dc40 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppDecoder.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client.common; + +import com.fasterxml.jackson.core.json.JsonReadFeature; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import feign.codec.Decoder; +import feign.jackson.JacksonDecoder; + +public class RestppDecoder { + public static final Decoder INSTANCE = + new JacksonDecoder( + new ObjectMapper() + // restpp can respond newline-delimited json + .configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)); +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppEncoder.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppEncoder.java new file mode 100644 index 00000000..3e20d023 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppEncoder.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client.common; + +import feign.codec.Encoder; +import feign.jackson.JacksonEncoder; + +public class RestppEncoder { + public static final Encoder INSTANCE = new JacksonEncoder(); +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppErrorDecoder.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppErrorDecoder.java new file mode 100644 index 00000000..1588335d --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppErrorDecoder.java @@ -0,0 +1,95 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client.common; + +import java.util.Arrays; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.constant.ErrorCode; +import feign.Response; +import feign.RetryableException; +import feign.codec.Decoder; +import feign.codec.ErrorDecoder; +import org.apache.hc.core5.http.HttpStatus; + +/** + * Responsible for checking the HTTP status code to determine whether the request is retryable, + * throw a {@link RetryableException} or not. + */ +public class RestppErrorDecoder implements ErrorDecoder { + private static final Logger logger = LoggerFactory.getLogger(RestppErrorDecoder.class); + + static final List DEFAULT_RETRYABLE_CODE = + Arrays.asList( + HttpStatus.SC_REQUEST_TIMEOUT, + HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, + HttpStatus.SC_GATEWAY_TIMEOUT); + final List retryableCode; + final Decoder decoder; + final ErrorDecoder errDecoder = new ErrorDecoder.Default(); + + public RestppErrorDecoder(Decoder decoder) { + this.decoder = decoder; + this.retryableCode = DEFAULT_RETRYABLE_CODE; + } + + public RestppErrorDecoder(Decoder decoder, Integer... retryableCode) { + this.decoder = decoder; + this.retryableCode = Arrays.asList(retryableCode); + } + + /** + * Wrap the exception from default decoder into the {@link RetryableException} or directly throw + * it depending on HTTP status code. + */ + @Override + public Exception decode(String methodKey, Response response) { + Exception e = errDecoder.decode(methodKey, response); + if (!(e instanceof RetryableException)) { + boolean shouldRetry = false; + // Retry on Server Timeout 408, 502, 503 and 504 + // or token expiration + if (response.status() == HttpStatus.SC_FORBIDDEN) { + try { + // Retrieve the body from exception and decode to get the RESTPP error code. + // Can't directly decode the response because the inputstream has been closed. + byte[] body = ((feign.FeignException) e).responseBody().get().array(); + RestppResponse resp = + (RestppResponse) + decoder.decode(response.toBuilder().body(body).build(), RestppResponse.class); + if (ErrorCode.TOKEN_EXPIRATION.equals(resp.code)) { + logger.info("{} token expiration, attempt to refresh and retry.", resp.code); + shouldRetry = true; + } + } catch (Exception ex) { + // no-op if failed to decode body of 403 response, let it fail fast. + } + } else if (retryableCode.contains(response.status())) { + shouldRetry = true; + } + if (shouldRetry) { + return new RetryableException( + response.status(), + e.getMessage(), + response.request().httpMethod(), + e, + null, + response.request()); + } + } + return e; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppErrorException.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppErrorException.java new file mode 100644 index 00000000..2fe22d22 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppErrorException.java @@ -0,0 +1,7 @@ +package com.tigergraph.spark.client.common; + +public class RestppErrorException extends RuntimeException { + public RestppErrorException(String code, String message) { + super(String.format("RESTPP error response, code: %s, message: %s", code, message)); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppResponse.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppResponse.java new file mode 100644 index 00000000..4c557a4d --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppResponse.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client.common; + +import com.fasterxml.jackson.databind.JsonNode; + +/** Standard TG RESTPP response POJO */ +public class RestppResponse { + public String code; + public boolean error; + public String message; + public JsonNode results; + + /** Throw exception when HTTP status code is 200 but RESTPP error=true */ + public void panicOnFail() { + if (error) { + throw new RestppErrorException(code, message); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppRetryer.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppRetryer.java new file mode 100644 index 00000000..af8b88da --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/client/common/RestppRetryer.java @@ -0,0 +1,203 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.client.common; + +import java.io.IOException; +import java.util.Random; +import com.tigergraph.spark.client.Auth; +import com.tigergraph.spark.util.Utils; +import feign.RetryableException; +import feign.Retryer; +import org.apache.hc.core5.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mixed retryer for 3 types of errors:
+ * 1. token expiration
+ * 2. transport exception, e.g. read timeout, connect timeout
+ * 3. server timeout/busy, e.g. 502, 503, 504
+ * + *

Each of them have their own retry interval or max attempts setting. + */ +public class RestppRetryer implements Retryer { + private static final Logger logger = LoggerFactory.getLogger(RestppRetryer.class); + + private static final Random rand = new Random(); + private static int TYPE_AUTH = 0; // token expiration + private static int TYPE_IO = 1; // transport exception, e.g. read timeout, connect timeout + private static int TYPE_SERVER = 2; // server timeout/busy, e.g. 502, 503, 504 + + // refresh token + private static final int REFRESH_MAX_ATTEMPTS = 1; + private static final int REFRESH_PERIOD_MS = 3000; // 3s + private final Auth auth; + private final String basicAuth; + private final String secret; + private final String token; + // arrays to record the retry status for different retry types + private final int[] period = new int[3]; + private final int[] maxPeriod = new int[3]; + private final int[] maxAttempts = new int[3]; + private final int[] attempts = new int[3]; + private final int[] sleptForMillis = new int[3]; + + public RestppRetryer( + Auth auth, + String basicAuth, + String secret, + String token, + int ioPeriod, + int ioMaxPeriod, + int ioMaxAttempts, + int serverPeriod, + int serverMaxPeriod, + int serverMaxAttempts) { + this.auth = auth; + this.basicAuth = basicAuth; + this.secret = secret; + this.token = token; + + period[TYPE_AUTH] = REFRESH_PERIOD_MS; + maxPeriod[TYPE_AUTH] = REFRESH_PERIOD_MS; + maxAttempts[TYPE_AUTH] = REFRESH_MAX_ATTEMPTS; + + period[TYPE_IO] = ioPeriod; + maxPeriod[TYPE_IO] = ioMaxPeriod; + maxAttempts[TYPE_IO] = ioMaxAttempts; + + period[TYPE_SERVER] = serverPeriod; + maxPeriod[TYPE_SERVER] = serverMaxPeriod; + maxAttempts[TYPE_SERVER] = serverMaxAttempts; + } + + /** + * Shortpath for creating retryer that doesn't support refresh token. E.g., we don't need that + * when creating {@link Auth} client + */ + public RestppRetryer( + int ioPeriod, + int ioMaxPeriod, + int ioMaxAttempts, + int serverPeriod, + int serverMaxPeriod, + int serverMaxAttempts) { + this( + null, + null, + null, + null, + ioPeriod, + ioMaxPeriod, + ioMaxAttempts, + serverPeriod, + serverMaxPeriod, + serverMaxAttempts); + } + + public void continueOrPropagate(RetryableException e) { + // infer the retry type + int retryType; + String reason; + if (e.getCause() instanceof IOException) { + retryType = TYPE_IO; + reason = e.getMessage(); + } else if (e.status() == HttpStatus.SC_FORBIDDEN) { + retryType = TYPE_AUTH; + reason = + String.format( + "Token %s expired, attempt to retry after refresh", Utils.maskString(token, 2)); + } else { + retryType = TYPE_SERVER; + reason = e.getMessage(); + } + if (attempts[retryType]++ >= maxAttempts[retryType]) { + throw e; + } + + long interval; + // Set the interval according to HTTP `Retry-After` header if any + if (e.retryAfter() != null) { + interval = e.retryAfter().getTime() - currentTimeMillis(); + if (interval > maxPeriod[retryType]) { + interval = maxPeriod[retryType]; + } + if (interval < 0) { + return; + } + } else { + interval = jitter(nextMaxInterval(retryType)); + logger.info("{}, retry in {} ms, attempt {}", reason, interval, attempts[retryType]); + } + try { + Thread.sleep(interval); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + throw e; + } + sleptForMillis[retryType] += interval; + + if (retryType == TYPE_AUTH && auth != null) { + if (!Utils.isEmpty(basicAuth)) { + auth.refreshTokenWithUserPass(token, basicAuth, Auth.TOKEN_LIFETIME_SEC).panicOnFail(); + } else if (!Utils.isEmpty(secret)) { + auth.refreshTokenWithSecrect(token, secret, Auth.TOKEN_LIFETIME_SEC).panicOnFail(); + } else { + // Don't support refresh token, throw it directly + throw e; + } + logger.info( + "Successfully refreshed token {} for {} seconds", + Utils.maskString(token, 2), + Auth.TOKEN_LIFETIME_SEC); + } + } + + @Override + public RestppRetryer clone() { + return new RestppRetryer( + auth, + basicAuth, + secret, + token, + period[TYPE_IO], + maxPeriod[TYPE_IO], + maxAttempts[TYPE_IO], + period[TYPE_SERVER], + maxPeriod[TYPE_SERVER], + maxAttempts[TYPE_SERVER]); + } + + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } + + // visible for testing; + // 0.75 * interval ~ 1.25 * interval + protected static long jitter(long interval) { + return (long) (0.75 * interval + 0.5 * interval * rand.nextDouble()); + } + + /** + * Calculates the time interval to a retry attempt.
+ * The interval increases exponentially with each attempt, at a rate of nextInterval *= 1.5 (where + * 1.5 is the backoff factor), to the maximum interval. + * + * @return time in milliseconds from now until the next attempt. + */ + private long nextMaxInterval(int retryType) { + long interval = (long) (period[retryType] * Math.pow(1.5, attempts[retryType] - 1)); + return interval > maxPeriod[retryType] ? maxPeriod[retryType] : interval; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/constant/ErrorCode.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/constant/ErrorCode.java new file mode 100644 index 00000000..cebfcabb --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/constant/ErrorCode.java @@ -0,0 +1,19 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.constant; + +/** RESTPP Error Codes */ +public class ErrorCode { + public static final String TOKEN_EXPIRATION = "REST-10019"; +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java new file mode 100644 index 00000000..2739a032 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionDef.java @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.util; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.io.Serializable; +import java.util.UUID; + +public class OptionDef implements Serializable { + // A unique Java object which represents the lack of a default value. + public static final Serializable NO_DEFAULT_VALUE = UUID.randomUUID(); + + // Options' definitions + private final Map optionKeys = new HashMap<>(); + + public Map optionKeys() { + return optionKeys; + } + + /** + * Define a new option + * + * @param name the name of the option + * @param type the type of the option + * @param defaultValue the default value to use if this option isn't present + * @param required Whether this option must have + * @param validator To use in checking the correctness of the option + * @param group the group this option belongs to + * @return This OptionDef so you can chain calls + */ + public OptionDef define( + String name, + Type type, + Serializable defaultValue, + boolean required, + Validator validator, + String group) { + OptionKey key = new OptionKey(name, type, defaultValue, required, validator, group); + optionKeys.put(name, key); + return this; + } + + public OptionDef define(String name, Type type, String group) { + return define(name, type, false, group); + } + + public OptionDef define(String name, Type type, boolean required, String group) { + return define(name, type, NO_DEFAULT_VALUE, required, null, group); + } + + /* + * the definition for option + */ + public static class OptionKey implements Serializable { + public final String name; + public final Type type; + public final Serializable defaultValue; + public final boolean required; + public final Validator validator; + public final String group; + + /** + * @param name the name of the option + * @param type the type of the option + * @param defaultValue the default value to use if this option isn't present + * @param required Whether this option must have + * @param validator To use in checking the correctness of the option + * @param group the group this option belongs to + */ + public OptionKey( + String name, + Type type, + Serializable defaultValue, + boolean required, + Validator validator, + String group) { + this.name = name; + this.type = type; + this.defaultValue = NO_DEFAULT_VALUE.equals(defaultValue) ? NO_DEFAULT_VALUE : defaultValue; + this.required = required; + this.validator = validator; + this.group = group; + } + + public boolean hasDefault() { + return !NO_DEFAULT_VALUE.equals(this.defaultValue); + } + } + + public enum Type { + BOOLEAN, + STRING, + INT, + SHORT, + LONG, + DOUBLE; + } + + public interface Validator extends Serializable { + void ensureValid(String name, Serializable value); + } + + /* + * validate for String type Option + */ + public static class ValidString implements Validator { + final List validStrings; + + private ValidString(List validStrings) { + this.validStrings = validStrings; + } + + public static ValidString in(String... validStrings) { + return new ValidString(Arrays.asList(validStrings)); + } + + @Override + public void ensureValid(String name, Serializable value) { + if (validStrings.size() > 0 && !validStrings.contains(value)) { + if (validStrings.size() == 1) { + throw new IllegalArgumentException( + "Option(" + name + ") must be: " + validStrings.get(0)); + } else { + throw new IllegalArgumentException( + "Option(" + name + ") must be one of: " + String.join(", ", validStrings)); + } + } + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionError.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionError.java new file mode 100644 index 00000000..eff07fc1 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/OptionError.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.util; + +import java.util.ArrayList; +import java.util.List; + +/* + * the result of option validation. there are multiple messages when this option has multiple error. + */ +public class OptionError { + private String key; + private Object originalValue; + private List errorMsgs; + + public OptionError(String key, Object originalValue, String errorMsg) { + this.key = key; + this.originalValue = originalValue; + this.errorMsgs = new ArrayList<>(); + this.errorMsgs.add(errorMsg); + } + + public String getKey() { + return key; + } + + public Object getOriginalValue() { + return originalValue; + } + + public List getErrorMsgs() { + return errorMsgs; + } + + public String toString() { + return String.format( + "Option %s, value: %s, error: %s", key, originalValue, String.join("; ", errorMsgs)); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java new file mode 100644 index 00000000..0331d434 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Options.java @@ -0,0 +1,375 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import com.tigergraph.spark.util.OptionDef.OptionKey; +import com.tigergraph.spark.util.OptionDef.Type; + +/** Validate and transform Spark DataFrame options(configurations) */ +public class Options implements Serializable { + + public static enum OptionType { + WRITE, + READ + } + + private final OptionType optionType; + + public static final String GRAPH = "graph"; + public static final String URL = "url"; + public static final String VERSION = "version"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String SECRET = "secret"; + public static final String TOKEN = "token"; + // loading + public static final String LOADING_JOB = "loading.job"; + public static final String LOADING_FILENAME = "loading.filename"; + public static final String LOADING_SEPARATOR = "loading.separator"; + public static final String LOADING_EOL = "loading.eol"; + public static final String LOADING_BATCH_SIZE_BYTES = "loading.batch.size.bytes"; + public static final String LOADING_TIMEOUT_MS = "loading.timeout.ms"; + public static final String LOADING_MAX_PERCENT_ERROR = "loading.max.percent.error"; + public static final String LOADING_MAX_NUM_ERROR = "loading.max.num.error"; + public static final String LOADING_RETRY_INTERVAL_MS = "loading.retry.interval.ms"; + public static final String LOADING_MAX_RETRY_INTERVAL_MS = "loading.max.retry.interval.ms"; + public static final String LOADING_MAX_RETRY_ATTEMPTS = "loading.max.retry.attempts"; + // loading - default + public static final String LOADING_SEPARATOR_DEFAULT = ","; + public static final String LOADING_EOL_DEFAULT = "\n"; + public static final int LOADING_BATCH_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024; // 2mb + public static final int LOADING_TIMEOUT_MS_DEFAULT = 0; // restpp default + public static final int LOADING_RETRY_INTERVAL_MS_DEFAULT = 5 * 1000; // 5s + public static final int LOADING_MAX_RETRY_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5min + public static final int LOADING_MAX_RETRY_ATTEMPTS_DEFAULT = 10; + // http transport + public static final String IO_CONNECT_TIMEOUT_MS = "io.connect.timeout.ms"; + public static final String IO_READ_TIMEOUT_MS = "io.read.timeout.ms"; + public static final String IO_RETRY_INTERVAL_MS = "io.retry.interval.ms"; + public static final String IO_MAX_RETRY_INTERVAL_MS = "io.max.retry.interval.ms"; + public static final String IO_MAX_RETRY_ATTEMPTS = "io.max.retry.attempts"; + // http transport - default + public static final int IO_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; // 30s + public static final int IO_READ_TIMEOUT_MS_DEFAULT = 60 * 1000; // 1min + public static final int IO_RETRY_INTERVAL_MS_DEFAULT = 5 * 1000; // 5s + public static final int IO_MAX_RETRY_INTERVAL_MS_DEFAULT = 10 * 1000; // 10s + public static final int IO_MAX_RETRY_ATTEMPTS_DEFAULT = 5; + // SSL + public static final String SSL_MODE = "ssl.mode"; + public static final String SSL_MODE_BASIC = "basic"; + public static final String SSL_MODE_VERIFY_CA = "verifyCA"; + public static final String SSL_MODE_VERIFY_HOSTNAME = "verifyHostname"; + public static final String SSL_TRUSTSTORE = "ssl.truststore"; + public static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + public static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = "JKS"; + + // Options' group name + public static final String GROUP_GENERAL = "general"; + public static final String GROUP_AUTH = "auth"; + public static final String GROUP_LOADING_JOB = "loading.job"; + public static final String GROUP_TRANSPORT_TIMEOUT = "transport.timeout"; + public static final String GROUP_SSL = "ssl"; + + private final Map originals; + private final Map transformed = new HashMap<>(); + private final OptionDef definition; + + public Options(Map originals, OptionType ot) { + this.optionType = ot; + this.originals = originals != null ? originals : new HashMap<>(); + this.definition = + new OptionDef() + .define(GRAPH, Type.STRING, true, GROUP_GENERAL) + .define(URL, Type.STRING, true, GROUP_GENERAL) + .define(VERSION, Type.STRING, GROUP_GENERAL) + .define(USERNAME, Type.STRING, GROUP_AUTH) + .define(PASSWORD, Type.STRING, GROUP_AUTH) + .define(SECRET, Type.STRING, GROUP_AUTH) + .define(TOKEN, Type.STRING, GROUP_AUTH) + .define( + IO_READ_TIMEOUT_MS, + Type.INT, + IO_READ_TIMEOUT_MS_DEFAULT, + true, + null, + GROUP_TRANSPORT_TIMEOUT) + .define( + IO_CONNECT_TIMEOUT_MS, + Type.INT, + IO_CONNECT_TIMEOUT_MS_DEFAULT, + true, + null, + GROUP_TRANSPORT_TIMEOUT) + .define( + IO_RETRY_INTERVAL_MS, + Type.INT, + IO_RETRY_INTERVAL_MS_DEFAULT, + true, + null, + GROUP_TRANSPORT_TIMEOUT) + .define( + IO_MAX_RETRY_INTERVAL_MS, + Type.INT, + IO_MAX_RETRY_INTERVAL_MS_DEFAULT, + true, + null, + GROUP_TRANSPORT_TIMEOUT) + .define( + IO_MAX_RETRY_ATTEMPTS, + Type.INT, + IO_MAX_RETRY_ATTEMPTS_DEFAULT, + true, + null, + GROUP_TRANSPORT_TIMEOUT) + .define( + SSL_MODE, + Type.STRING, + SSL_MODE_BASIC, + true, + OptionDef.ValidString.in( + SSL_MODE_BASIC, SSL_MODE_VERIFY_CA, SSL_MODE_VERIFY_HOSTNAME), + GROUP_SSL) + .define(SSL_TRUSTSTORE, Type.STRING, null, false, null, GROUP_SSL) + .define( + SSL_TRUSTSTORE_TYPE, + Type.STRING, + SSL_TRUSTSTORE_TYPE_DEFAULT, + false, + null, + GROUP_SSL) + .define(SSL_TRUSTSTORE_PASSWORD, Type.STRING, null, false, null, GROUP_SSL); + + if (OptionType.WRITE.equals(ot)) { + this.definition + .define(LOADING_JOB, Type.STRING, true, GROUP_LOADING_JOB) + .define(LOADING_FILENAME, Type.STRING, true, GROUP_LOADING_JOB) + .define( + LOADING_SEPARATOR, + Type.STRING, + LOADING_SEPARATOR_DEFAULT, + true, + null, + GROUP_LOADING_JOB) + .define(LOADING_EOL, Type.STRING, LOADING_EOL_DEFAULT, true, null, GROUP_LOADING_JOB) + .define( + LOADING_BATCH_SIZE_BYTES, + Type.INT, + LOADING_BATCH_SIZE_BYTES_DEFAULT, + true, + null, + GROUP_LOADING_JOB) + .define( + LOADING_TIMEOUT_MS, + Type.INT, + LOADING_TIMEOUT_MS_DEFAULT, + true, + null, + GROUP_LOADING_JOB) + .define(LOADING_MAX_PERCENT_ERROR, Type.DOUBLE, GROUP_LOADING_JOB) + .define(LOADING_MAX_NUM_ERROR, Type.INT, GROUP_LOADING_JOB) + .define( + LOADING_RETRY_INTERVAL_MS, + Type.INT, + LOADING_RETRY_INTERVAL_MS_DEFAULT, + true, + null, + GROUP_LOADING_JOB) + .define( + LOADING_MAX_RETRY_INTERVAL_MS, + Type.INT, + LOADING_MAX_RETRY_INTERVAL_MS_DEFAULT, + true, + null, + GROUP_LOADING_JOB) + .define( + LOADING_MAX_RETRY_ATTEMPTS, + Type.INT, + LOADING_MAX_RETRY_ATTEMPTS_DEFAULT, + true, + null, + GROUP_LOADING_JOB); + } + } + + /** + * validate all the Options on their type and validator, then put the parsed value into map + * 'transformed'. + * + *

Visible for testing + * + * @return the errors of validation, If the returned List's size is 0, there is no validation + * error. + */ + protected List validateOpts() { + List errors = new ArrayList<>(); + this.definition + .optionKeys() + .forEach( + (k, v) -> { + OptionError err = null; + String key = v.name; + Serializable value = null; + try { + value = this.parse(v.name); + transformed.put(key, value); + } catch (Exception e) { + err = new OptionError(key, value, e.getMessage()); + } + try { + if (v.validator != null && this.containsOption(key)) { + v.validator.ensureValid(key, value); + } + } catch (Exception e) { + if (err == null) { + err = new OptionError(key, value, e.getMessage()); + } else { + err.getErrorMsgs().add(e.getMessage()); + } + } + if (err != null) { + errors.add(err); + } + }); + return errors; + } + + public void validate() { + List errors = validateOpts(); + if (errors != null && errors.size() > 0) { + throw new IllegalArgumentException( + "Invalid input options: " + + errors.stream().map(e -> e.toString()).reduce(". ", String::concat)); + } + } + + /** + * Determine if the Option is contained. The required option will be considered to always exist. + */ + public boolean containsOption(String key) { + if (!this.originals.containsKey(key)) { + if (this.definition.optionKeys().containsKey(key)) { + OptionKey optionKey = this.definition.optionKeys().get(key); + if (optionKey.required || optionKey.hasDefault()) { + return true; + } + } + return false; + } + return true; + } + + /** + * Gets the value of the specified Option, converts the original value to the corresponding type, + * but does not attempt to convert the default value. + * + * @param key the name of Option + * @return the value of Option + */ + private Serializable parse(String key) { + if (this.definition.optionKeys().containsKey(key)) { + OptionKey optionKey = this.definition.optionKeys().get(key); + if (this.originals.containsKey(key)) { + String value = this.originals.get(key); + String trimmed = null; + if (value != null) { + trimmed = value.trim(); + } + Type type = optionKey.type; + try { + switch (type) { + case BOOLEAN: + if (trimmed != null && trimmed.equalsIgnoreCase("true")) return true; + else if (trimmed != null && trimmed.equalsIgnoreCase("false")) return false; + else throw new IllegalArgumentException("Expected value to be either true or false"); + case STRING: + return value; + case INT: + return Integer.parseInt(trimmed); + case LONG: + return Long.parseLong(trimmed); + case DOUBLE: + return Double.parseDouble(trimmed); + default: + throw new IllegalStateException("Unknown type."); + } + } catch (Exception e) { + throw new IllegalArgumentException( + "Option(" + + key + + ") failed to convert the value to type " + + type + + ", error is " + + e.toString()); + } + } else { + if (optionKey.hasDefault()) { + return optionKey.defaultValue; + } else if (optionKey.required) { + throw new IllegalArgumentException( + "Option(" + key + ") has no default value and has not been set to a value"); + } else { + return null; + } + } + } else { + throw new IllegalArgumentException("Option(" + key + ") is not defined"); + } + } + + /** + * Retrive the value from transformed option map. Retrive it from the original options if not in + * transformed map. + * + * @param key + */ + public Object get(String key) { + if (transformed.containsKey(key)) { + return transformed.get(key); + } else if (originals.containsKey(key)) { + return originals.get(key); + } else { + return null; + } + } + + public String getString(String key) { + return (String) get(key); + } + + public Integer getInt(String key) { + return (Integer) get(key); + } + + public Long getLong(String key) { + return (Long) get(key); + } + + public Double getDouble(String key) { + return (Double) get(key); + } + + public Map getOriginals() { + return originals; + } + + public OptionType getOptionType() { + return this.optionType; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Utils.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Utils.java new file mode 100644 index 00000000..d0711636 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/util/Utils.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** Utilities */ +public class Utils { + public static final String DEFAULT_VERSION = "999.999.999"; + public static final Pattern VERSION_PARTTERN = Pattern.compile("(\\d+\\.\\d+\\.\\d+)"); + + /***************** VERSION *****************/ + + /** Extract the TG version from the response msg of /version endpoint */ + public static String extractVersion(String input) { + Matcher matcher = VERSION_PARTTERN.matcher(input); + if (matcher.find()) { + return matcher.group(1); + } else { + return DEFAULT_VERSION; + } + } + + /** + * Compare the two input versions + * + * @return positive v1 > v2; 0 if v1 == v2; negative if v1 < v2 + */ + public static int versionCmp(String v1, String v2) { + return fmtVersion(v1).compareTo(fmtVersion(v2)); + } + + /** Format version string to fixed length: 3.10.1 => 003010001 */ + private static String fmtVersion(String version) { + final List ver = new ArrayList<>(); + final String[] verSplit = version.split("\\."); + // major, minor, patch + for (int i = 0; i < 3; i++) { + if (i >= verSplit.length) { + ver.add("0"); + } else { + ver.add(verSplit[i]); + } + } + return ver.stream() + .map(v -> String.format("%03d", Integer.valueOf(v))) + .collect(Collectors.joining()); + } + + /***************** STRING *****************/ + + public static boolean isEmpty(String s) { + return s == null || s.length() == 0; + } + + /** + * Mask the string input(sensitive info). You can keep several chars of the head and tail: + * maskString("abcdef123456", 2) => "ab********56" + * + * @param s string to be mask + * @param keepHeadTail how many chars of the head and tail of the string will be kept + * @return masked string + */ + public static String maskString(String s, int keepHeadTail) { + if (s == null) { + return ""; + } + if (2 * keepHeadTail >= s.length()) { + return s; + } + return s.substring(0, keepHeadTail) + + String.join("", Collections.nCopies(s.length() - 2 * keepHeadTail, "*")) + + s.substring(s.length() - keepHeadTail); + } + + /** Avoid exposing user data from the loading stats to log. */ + public static void removeUserData(JsonNode in) { + // prior to v3.9.0 + List badDataV1 = in.findParents("invalidAttributeLinesData"); + badDataV1.forEach(json -> ((ObjectNode) json).remove("invalidAttributeLinesData")); + // after v3.9.0 + List badDataV2 = in.findParents("lineData"); + badDataV2.forEach(json -> ((ObjectNode) json).remove("lineData")); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphBatchWrite.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphBatchWrite.java new file mode 100644 index 00000000..346373bd --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphBatchWrite.java @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; +import com.tigergraph.spark.client.common.RestppResponse; +import com.tigergraph.spark.util.Utils; + +/** + * Define how to write the data to TG for batch processing. + * + *

The writing procedure is: + * + *

    + *
  1. Create a writer factory by {@link #createBatchWriterFactory(PhysicalWriteInfo)}, serialize + * and send it to all the partitions of the input data(RDD). + *
  2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. + *
  3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call {@link + * #abort(WriterCommitMessage[])}. + *
+ * + *

+ */ +public class TigerGraphBatchWrite extends TigerGraphWriteBase implements BatchWrite { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphBatchWrite.class); + + TigerGraphBatchWrite(StructType schema, TigerGraphConnection conn) { + super(schema, conn); + } + + @Override + public TigerGraphBatchWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + return new TigerGraphBatchWriterFactory(schema, conn); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + logger.info( + "Finished batch loading job {}", + conn.getLoadingJobId() == null ? "" : conn.getLoadingJobId()); + logger.info("Total processed rows: {}", getTotalProcessedRows(messages)); + logger.info("Processed rows of each task:\n{}", getTaskSummury(messages)); + RestppResponse resp = getLoadingStatistics(); + if (resp != null) { + Utils.removeUserData(resp.results); + logger.info("Overall loading statistics: {}", resp.results.toPrettyString()); + } + } + + @Override + public void abort(WriterCommitMessage[] messages) { + logger.error( + "Aborted batch loading job {}", + conn.getLoadingJobId() == null ? "" : conn.getLoadingJobId()); + logger.info("Total processed rows: {}", getTotalProcessedRows(messages)); + logger.info("Processed rows of each task:\n{}", getTaskSummury(messages)); + RestppResponse resp = getLoadingStatistics(); + if (resp != null) { + Utils.removeUserData(resp.results); + logger.info("Overall loading statistics: {}", resp.results.toPrettyString()); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphBatchWriterFactory.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphBatchWriterFactory.java new file mode 100644 index 00000000..638b7579 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphBatchWriterFactory.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; + +/** + * A factory of {@link TigerGraphDataWriter} for batch write, which is responsible for creating and + * initializing the actual data writer at executor side. + */ +public class TigerGraphBatchWriterFactory implements DataWriterFactory { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphBatchWriterFactory.class); + + private final StructType schema; + private final TigerGraphConnection conn; + + TigerGraphBatchWriterFactory(StructType schema, TigerGraphConnection conn) { + this.schema = schema; + this.conn = conn; + logger.info("Created {} for executor", TigerGraphBatchWriterFactory.class); + } + + @Override + public TigerGraphDataWriter createWriter(int partitionId, long taskId) { + logger.info( + "Creating TigerGraph batch writer for partitionId {}, taskId {}.", partitionId, taskId); + return new TigerGraphDataWriter(schema, conn, partitionId, taskId); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java new file mode 100644 index 00000000..b6906f2e --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphDataWriter.java @@ -0,0 +1,154 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; +import com.tigergraph.spark.client.Write; +import com.tigergraph.spark.client.Write.LoadingResponse; +import com.tigergraph.spark.util.Options; +import com.tigergraph.spark.util.Utils; + +/** The data writer of an executor responsible for writing data for an input RDD partition. */ +public class TigerGraphDataWriter implements DataWriter { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphDataWriter.class); + + private final StructType schema; + private final int partitionId; + private final long taskId; + private final long epochId; + + private final Write write; + private final String version; + private final String jobId; + private final String graph; + private final String sep; + private final String eol; + private final int maxBatchSizeInBytes; + private final Map queryMap; + + private final StringBuilder sb = new StringBuilder(); + private int sbOffset = 0; + // Metrics + private long totalLines = 0; + + /** For Streaming write */ + TigerGraphDataWriter( + StructType schema, TigerGraphConnection conn, int partitionId, long taskId, long epochId) { + this.schema = schema; + this.partitionId = partitionId; + this.taskId = taskId; + this.epochId = epochId; + this.write = conn.getWrite(); + this.version = conn.getVersion(); + this.jobId = conn.getLoadingJobId(); + + Options opts = conn.getOpts(); + this.graph = opts.getString(Options.GRAPH); + this.maxBatchSizeInBytes = opts.getInt(Options.LOADING_BATCH_SIZE_BYTES); + this.sep = opts.getString(Options.LOADING_SEPARATOR); + this.eol = opts.getString(Options.LOADING_EOL); + + queryMap = new HashMap<>(); + queryMap.put("tag", opts.getString(Options.LOADING_JOB)); + queryMap.put("filename", opts.getString(Options.LOADING_FILENAME)); + queryMap.put("sep", opts.getString(Options.LOADING_SEPARATOR)); + queryMap.put("eol", opts.getString(Options.LOADING_EOL)); + queryMap.put("timeout", opts.getInt(Options.LOADING_TIMEOUT_MS)); + if (Utils.versionCmp(version, "3.9.4") >= 0) { + queryMap.put("jobid", jobId); + if (opts.containsOption(Options.LOADING_MAX_NUM_ERROR)) { + queryMap.put("max_num_error", opts.getInt(Options.LOADING_MAX_NUM_ERROR)); + } + if (opts.containsOption(Options.LOADING_MAX_PERCENT_ERROR)) { + queryMap.put("max_percent_error", opts.getInt(Options.LOADING_MAX_PERCENT_ERROR)); + } + } + logger.info( + "Created data writer for partition {}, task {}, epochId {}", partitionId, taskId, epochId); + } + + /** For Batch write */ + TigerGraphDataWriter(StructType schema, TigerGraphConnection conn, int partitionId, long taskId) { + this(schema, conn, partitionId, taskId, (long) -1); + } + + @Override + public void close() throws IOException { + // no-op + } + + @Override + public void write(InternalRow record) throws IOException { + String line = + IntStream.range(0, record.numFields()) + .mapToObj(i -> record.isNullAt(i) ? "" : record.getString(i)) + .collect(Collectors.joining(sep)); + if (sb.length() + line.length() + eol.length() > maxBatchSizeInBytes) { + postToDDL(); + } + sb.append(line).append(eol); + sbOffset++; + } + + private void postToDDL() { + LoadingResponse resp = write.ddl(graph, sb.toString(), queryMap); + logger.info("Upsert {} rows to TigerGraph graph {}", sbOffset, graph); + resp.panicOnFail(); + Utils.removeUserData(resp.results); + // process stats + if (resp.hasInvalidRecord()) { + logger.error("Found rejected rows, it won't abort the loading: "); + logger.error(resp.results.toPrettyString()); + } else { + logger.debug(resp.results.toPrettyString()); + } + totalLines += sbOffset; + sbOffset = 0; + sb.setLength(0); + } + + @Override + public TigerGraphWriterCommitMessage commit() throws IOException { + if (sb.length() > 0) { + postToDDL(); + } + logger.info( + "Finished writing {} rows to TigerGraph. Partition {}, task {}, epoch {}.", + totalLines, + partitionId, + taskId, + epochId); + return new TigerGraphWriterCommitMessage(totalLines, partitionId, taskId); + } + + @Override + public void abort() throws IOException { + logger.error( + "Write aborted with {} records loaded. Partition {}, task {}, epoch {}", + totalLines, + partitionId, + taskId, + epochId); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphStreamWriterFactory.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphStreamWriterFactory.java new file mode 100644 index 00000000..a2b912ee --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphStreamWriterFactory.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; + +/** + * A factory of {@link TigerGraphDataWriter} for streaming write, which is responsible for creating + * and initializing the actual data writer at executor side. + */ +public class TigerGraphStreamWriterFactory implements StreamingDataWriterFactory { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphStreamWriterFactory.class); + + private final StructType schema; + private final TigerGraphConnection conn; + + TigerGraphStreamWriterFactory(StructType schema, TigerGraphConnection conn) { + this.schema = schema; + this.conn = conn; + logger.info("Created {} for executor", TigerGraphBatchWriterFactory.class); + } + + @Override + public TigerGraphDataWriter createWriter(int partitionId, long taskId, long epochId) { + logger.info( + "Create TigerGraph streaming writer for partitionId {}, taskId {}, epochId {}.", + partitionId, + taskId, + epochId); + return new TigerGraphDataWriter(schema, conn, partitionId, taskId, epochId); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphStreamingWrite.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphStreamingWrite.java new file mode 100644 index 00000000..5723f32d --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphStreamingWrite.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; +import com.tigergraph.spark.client.common.RestppResponse; +import com.tigergraph.spark.util.Utils; + +/** + * Defines how to write the data to TG in streaming queries. + * + *

The writing procedure is: + * + *

    + *
  1. Create a writer factory by {@link #createStreamingWriterFactory(PhysicalWriteInfo)}, + * serialize and send it to all the partitions of the input data(RDD). + *
  2. For each epoch in each partition, create the data writer, and write the data of the epoch + * in the partition with this writer. If all the data are written successfully, call {@link + * DataWriter#commit()}. If exception happens during the writing, call {@link + * DataWriter#abort()}. + *
  3. If writers in all partitions of one epoch are successfully committed, call {@link + * #commit(long, WriterCommitMessage[])}. If some writers are aborted, or the job failed with + * an unknown reason, call {@link #abort(long, WriterCommitMessage[])}. + *
+ */ +public class TigerGraphStreamingWrite extends TigerGraphWriteBase implements StreamingWrite { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphStreamingWrite.class); + + TigerGraphStreamingWrite(StructType schema, TigerGraphConnection conn) { + super(schema, conn); + } + + @Override + public TigerGraphStreamWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) { + return new TigerGraphStreamWriterFactory(schema, conn); + } + + @Override + public void commit(long epochId, WriterCommitMessage[] messages) { + logger.info( + "Finished writing streaming updates({}) to TigerGraph {}", + epochId, + conn.getLoadingJobId() == null ? "" : ", Job ID: " + conn.getLoadingJobId()); + logger.info("Total processed rows by this update: {}", getTotalProcessedRows(messages)); + logger.info("Processed rows of each task by this update:\n{}", getTaskSummury(messages)); + RestppResponse resp = getLoadingStatistics(); + if (resp != null) { + Utils.removeUserData(resp.results); + logger.info("The up-to-date overall loading statistics: {}", resp.results.toPrettyString()); + } + } + + @Override + public void abort(long epochId, WriterCommitMessage[] messages) { + logger.error( + "Aborted when writing streaming updates({}) to TigerGraph {}", + epochId, + conn.getLoadingJobId() == null ? "" : ", Job ID: " + conn.getLoadingJobId()); + logger.info("Total processed rows by this update: {}", getTotalProcessedRows(messages)); + logger.info("Processed rows of each task by this update:\n{}", getTaskSummury(messages)); + RestppResponse resp = getLoadingStatistics(); + if (resp != null) { + Utils.removeUserData(resp.results); + logger.info("The overall loading statistics: {}", resp.results.toPrettyString()); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriteBase.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriteBase.java new file mode 100644 index 00000000..fe41ca75 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriteBase.java @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; +import com.tigergraph.spark.client.common.RestppResponse; +import com.tigergraph.spark.util.Utils; +import java.util.Arrays; +import java.util.stream.Collectors; + +/** Base class for {@link TigerGraphBatchWrite} and {@link TigerGraphStreamingWrite}. */ +public class TigerGraphWriteBase { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphWriteBase.class); + + protected static String GSQL_GET_PROGRESS = "getprogress"; + + protected final StructType schema; + protected final TigerGraphConnection conn; + + public TigerGraphWriteBase(StructType schema, TigerGraphConnection conn) { + this.schema = schema; + this.conn = conn; + } + + protected RestppResponse getLoadingStatistics() { + if (Utils.versionCmp(conn.getVersion(), "3.9.4") >= 0) { + try { + RestppResponse resp = + conn.getMisc() + .loadingAction(GSQL_GET_PROGRESS, conn.getGraph(), conn.getLoadingJobId()); + resp.panicOnFail(); + return resp; + } catch (Exception e) { + logger.info( + "Failed to query loading statistics of job {}: {}, it won't block the loading" + + " and you can manually query it via `curl -X GET -u : " + + "\"localhost:8123/gsql/loading-jobs?action=getprogress&jobId={}&graph={}\"", + conn.getLoadingJobId(), + e.getMessage(), + conn.getLoadingJobId(), + conn.getGraph()); + } + } + return null; + } + + protected long getTotalProcessedRows(WriterCommitMessage[] messages) { + return Arrays.stream(messages) + .filter(msg -> msg != null && msg instanceof TigerGraphWriterCommitMessage) + .map(msg -> ((TigerGraphWriterCommitMessage) msg).getLoadedRows()) + .reduce(0L, (a, b) -> a + b); + } + + protected String getTaskSummury(WriterCommitMessage[] messages) { + return Arrays.stream(messages) + .filter(msg -> msg != null && msg instanceof TigerGraphWriterCommitMessage) + .map(msg -> msg.toString()) + .collect(Collectors.joining("\n")); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriteBuilder.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriteBuilder.java new file mode 100644 index 00000000..dcc38db1 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriteBuilder.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import java.time.Instant; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.tigergraph.spark.TigerGraphConnection; +import com.tigergraph.spark.util.Options; + +/** Builder for Batch Write or Streaming Write */ +public class TigerGraphWriteBuilder implements WriteBuilder { + private static final Logger logger = LoggerFactory.getLogger(TigerGraphWriteBuilder.class); + private final StructType schema; + private final TigerGraphConnection conn; + + public TigerGraphWriteBuilder(LogicalWriteInfo info, long creationTime) { + logger.info("Start to build TigerGraph data writer with queryId {}", info.queryId()); + schema = info.schema(); + Options opts = new Options(info.options().asCaseSensitiveMap(), Options.OptionType.WRITE); + opts.validate(); + conn = new TigerGraphConnection(opts, creationTime); + if (conn.getLoadingJobId() != null) { + logger.info("Loading job ID: {}", conn.getLoadingJobId()); + } + } + + public TigerGraphWriteBuilder(LogicalWriteInfo info) { + this(info, Instant.now().toEpochMilli()); + } + + public TigerGraphBatchWrite buildForBatch() { + return new TigerGraphBatchWrite(schema, conn); + } + + public TigerGraphStreamingWrite buildForStreaming() { + return new TigerGraphStreamingWrite(schema, conn); + } +} diff --git a/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriterCommitMessage.java b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriterCommitMessage.java new file mode 100644 index 00000000..b0161e41 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/java/com/tigergraph/spark/write/TigerGraphWriterCommitMessage.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2023 TigerGraph Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tigergraph.spark.write; + +import org.apache.spark.sql.connector.write.WriterCommitMessage; + +/** + * A commit message returned by TigerGraphDataWriter.commit() and will be sent back to the driver + * side as the input parameter of TigerGraphBatchWrite.commit(WriterCommitMessage []) or + * TigerGraphStreamingWrite.commit(long, WriterCommitMessage []). + */ +public class TigerGraphWriterCommitMessage implements WriterCommitMessage { + private final long loadedRows; + private final int partitionId; + private final long taskId; + + TigerGraphWriterCommitMessage(long loadedRows, int partitionId, long taskId) { + this.loadedRows = loadedRows; + this.partitionId = partitionId; + this.taskId = taskId; + } + + public String toString() { + return String.format( + "PartitionId: %,d, taskId: %,d, loaded rows: %,d", partitionId, taskId, loadedRows); + } + + public long getLoadedRows() { + return this.loadedRows; + } + + public int getPartitionId() { + return this.partitionId; + } + + public long getTaskId() { + return this.taskId; + } +} diff --git a/tools/etl/tg-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/tools/etl/tg-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 00000000..2fca6f96 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +com.tigergraph.spark.TigerGraphTableProvider \ No newline at end of file diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/TigerGraphConnectionTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/TigerGraphConnectionTest.java new file mode 100644 index 00000000..d4171230 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/TigerGraphConnectionTest.java @@ -0,0 +1,13 @@ +package com.tigergraph.spark; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; + +public class TigerGraphConnectionTest { + @Test + public void testGenerateJobId() { + assertTrue( + TigerGraphConnection.generateJobId("graph", "load_social", 1234567) + .equals("graph.load_social.spark.all.1234567")); + } +} diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/BuilderTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/BuilderTest.java new file mode 100644 index 00000000..e6e7f047 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/BuilderTest.java @@ -0,0 +1,32 @@ +package com.tigergraph.spark.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.Test; +import com.tigergraph.spark.client.Builder.LoadBalanceTarget; + +public class BuilderTest { + + @Test + public void LoadBalanceTest() { + List urlList = Arrays.asList("a , b ,,,,c,c", "a", " a a ,a,"); + List> urlsList = + Arrays.asList( + new ArrayList<>(Arrays.asList("a", "b", "c")), + new ArrayList<>(Arrays.asList("a")), + new ArrayList<>(Arrays.asList("a a", "a"))); + for (int i = 0; i < urlList.size(); i++) { + LoadBalanceTarget target = new LoadBalanceTarget(Write.class, urlList.get(i)); + // Iterate 100 times, then it should meet all the elements(very high probability) + for (int j = 0; j < 100; j++) { + String randUrl = target.url(); + if (urlsList.get(i).contains(randUrl)) { + urlsList.get(i).remove(randUrl); + } + } + assertEquals(Arrays.asList(), urlsList.get(i)); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/WriteTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/WriteTest.java new file mode 100644 index 00000000..8619dc39 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/WriteTest.java @@ -0,0 +1,69 @@ +package com.tigergraph.spark.client; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class WriteTest { + @Test + public void testCheckInvalidData() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + // don't have invalid data + String jsonString = + "{\"results\": [\n" + + "{\"sourceFileName\": \"Online_POST\",\n" + + "\"statistics\": {\n" + + "\"validLine\": 7927,\n" + + "\"rejectLine\": 0,\n" + + "\"failedConditionLine\": 0,\n" + + "\"notEnoughToken\": 0,\n" + + "\"invalidJson\": 0,\n" + + "\"oversizeToken\": 0,\n" + + "\"vertex\": [\n" + + "{\"typeName\": \"company\",\n" + + "\"validObject\": 7,\n" + + "\"noIdFound\": 0,\n" + + "\"invalidAttribute\": 0,\n" + + "\"invalidPrimaryId\": 0,\n" + + "\"invalidSecondaryId\": 0,\n" + + "\"incorrectFixedBinaryLength\": 0\n" + + "}\n" + + "],\n" + + "\"edge\": [],\n" + + "\"deleteVertex\": [],\n" + + "\"deleteEdge\": []\n" + + "}\n" + + "}\n" + + "]}"; + assertFalse(Write.LoadingResponse.hasInvalidRecord(mapper.readTree(jsonString))); + // have invalid data + jsonString = + "{\"results\": [\n" + + "{\"sourceFileName\": \"Online_POST\",\n" + + "\"statistics\": {\n" + + "\"validLine\": 7927,\n" + + "\"rejectLine\": 1,\n" + + "\"failedConditionLine\": 0,\n" + + "\"notEnoughToken\": 0,\n" + + "\"invalidJson\": 0,\n" + + "\"oversizeToken\": 0,\n" + + "\"vertex\": [\n" + + "{\"typeName\": \"company\",\n" + + "\"validObject\": 7,\n" + + "\"noIdFound\": 0,\n" + + "\"invalidAttribute\": 3,\n" + + "\"invalidPrimaryId\": 0,\n" + + "\"invalidSecondaryId\": 0,\n" + + "\"incorrectFixedBinaryLength\": 0\n" + + "}\n" + + "],\n" + + "\"edge\": [],\n" + + "\"deleteVertex\": [],\n" + + "\"deleteEdge\": []\n" + + "}\n" + + "}\n" + + "]}"; + assertTrue(Write.LoadingResponse.hasInvalidRecord(mapper.readTree(jsonString))); + } +} diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/common/RestppErrorDecoderTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/common/RestppErrorDecoderTest.java new file mode 100644 index 00000000..d5320809 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/common/RestppErrorDecoderTest.java @@ -0,0 +1,60 @@ +package com.tigergraph.spark.client.common; + +import java.nio.charset.Charset; +import java.util.HashMap; + +import feign.RetryableException; +import org.apache.hc.core5.http.HttpStatus; +import org.junit.jupiter.api.Test; +import feign.Request; +import feign.Request.HttpMethod; +import feign.Response; +import com.tigergraph.spark.constant.ErrorCode; + +import static org.junit.jupiter.api.Assertions.*; + +public class RestppErrorDecoderTest { + + @Test + public void testDecode() { + // test whether the decoded exception is RetryableException + RestppErrorDecoder decoder = + new RestppErrorDecoder( + RestppDecoder.INSTANCE, HttpStatus.SC_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY); + Request req = + Request.create(HttpMethod.GET, "", new HashMap<>(), null, Charset.defaultCharset(), null); + // not in the retryable code + assertFalse( + decoder.decode( + "GET", + Response.builder().request(req).status(HttpStatus.SC_REQUEST_TIMEOUT).build()) + instanceof RetryableException); + // in retryable code + assertTrue( + decoder.decode( + "GET", Response.builder().request(req).status(HttpStatus.SC_SERVER_ERROR).build()) + instanceof RetryableException); + // 403 forbidden, but not due to token expiration + assertFalse( + decoder.decode( + "GET", + Response.builder() + .request(req) + .status(HttpStatus.SC_FORBIDDEN) + .body("{\"code\":\"not_expire\"}", Charset.forName("UTF-8")) + .build()) + instanceof RetryableException); + // 403 forbidden, token expiration + assertTrue( + decoder.decode( + "GET", + Response.builder() + .request(req) + .status(HttpStatus.SC_FORBIDDEN) + .body( + "{\"code\":\"" + ErrorCode.TOKEN_EXPIRATION + "\"}", + Charset.forName("UTF-8")) + .build()) + instanceof RetryableException); + } +} diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/common/RestppRetryerTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/common/RestppRetryerTest.java new file mode 100644 index 00000000..8e1cefcd --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/client/common/RestppRetryerTest.java @@ -0,0 +1,65 @@ +package com.tigergraph.spark.client.common; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import org.apache.hc.core5.http.HttpStatus; +import org.junit.jupiter.api.Test; +import feign.Request; +import feign.RetryableException; +import feign.Request.HttpMethod; + +public class RestppRetryerTest { + @Test + public void testMixedRetryer() { + // Token exception: sleep 3s, retry 1 times + // IO exception: sleep 7s, retry 2 times + // Server exception: sleep 12s, retry 2 times + RestppRetryer retryer = new RestppRetryer(7000, 7000, 2, 12000, 12000, 2); + Request req = + Request.create(HttpMethod.GET, "", new HashMap<>(), null, Charset.defaultCharset(), null); + RetryableException ioEx = new RetryableException(0, null, null, new IOException(), null, req); + RetryableException tokenEx = + new RetryableException(HttpStatus.SC_FORBIDDEN, null, null, null, req); + RetryableException serverEx = + new RetryableException(HttpStatus.SC_SERVICE_UNAVAILABLE, null, null, null, req); + // 1. attempt 1 on ioEx + long duration = getRetryDuration(retryer, ioEx); + assertTrue(duration <= 10000 && duration >= 5000); + // 2. attempt 1 on serverEx + duration = getRetryDuration(retryer, serverEx); + assertTrue(duration <= 16000 && duration >= 8000); + // 3. attempt 1 on tokenEx + duration = getRetryDuration(retryer, tokenEx); + assertTrue(duration <= 5000 && duration >= 2000); + // 4. attempt 2 on ioEx + duration = getRetryDuration(retryer, ioEx); + assertTrue(duration <= 10000 && duration >= 5000); + // 5. attempt 2 on serverEx + duration = getRetryDuration(retryer, serverEx); + assertTrue(duration <= 16000 && duration >= 8000); + // 6. attempt 2 on tokenEx, should throw exception as attemps exceeds + assertThrows(RetryableException.class, () -> retryer.continueOrPropagate(tokenEx)); + // 7. attempt 3 on ioEx, should throw exception as attemps exceeds + assertThrows(RetryableException.class, () -> retryer.continueOrPropagate(ioEx)); + // 8. attempt 3 on serverEx, should throw exception as attemps exceeds + assertThrows(RetryableException.class, () -> retryer.continueOrPropagate(serverEx)); + } + + private long getRetryDuration(RestppRetryer retryer, RetryableException e) { + long start = System.currentTimeMillis(); + retryer.continueOrPropagate(e); + long duration = System.currentTimeMillis() - start; + return duration; + } + + @Test + public void testJitter() { + for (int i = 0; i < 10000; i++) { + long next = RestppRetryer.jitter(10000); + assertTrue(next >= 7500 && next <= 12500, () -> String.valueOf(next)); + } + } +} diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/util/OptionsTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/util/OptionsTest.java new file mode 100644 index 00000000..eb847b51 --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/util/OptionsTest.java @@ -0,0 +1,61 @@ +package com.tigergraph.spark.util; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OptionsTest { + static Map defaultOptions; + + @BeforeAll + static void prepare() { + defaultOptions = new HashMap<>(); + defaultOptions.put(Options.GRAPH, "graph_test"); + defaultOptions.put(Options.URL, "url_test"); + defaultOptions.put(Options.LOADING_JOB, "job_test"); + defaultOptions.put(Options.LOADING_FILENAME, "file_test"); + } + + @Test + public void shouldHasMultipleErrorsCauseByNoSettings() { + Options options = new Options(null, Options.OptionType.WRITE); + List errors = options.validateOpts(); + assertEquals(4, errors.size()); + } + + @Test + public void shouldSslModeError() { + Map originals = new HashMap<>(defaultOptions); + originals.put(Options.SSL_MODE, "ssl_mode_test"); + Options options = new Options(originals, Options.OptionType.WRITE); + List errors = options.validateOpts(); + assertEquals(1, errors.size()); + assertEquals( + "Option(ssl.mode) must be one of: basic, verifyCA, verifyHostname", + errors.get(0).getErrorMsgs().get(0)); + } + + @Test + public void shouldTypeConversionError() { + Map originals = new HashMap<>(defaultOptions); + originals.put(Options.IO_CONNECT_TIMEOUT_MS, "not_integer"); + Options options = new Options(originals, Options.OptionType.WRITE); + List errors = options.validateOpts(); + assertEquals(1, errors.size()); + assertEquals( + "Option(io.connect.timeout.ms) failed to convert the value to type INT, error is" + + " java.lang.NumberFormatException: For input string: \"not_integer\"", + errors.get(0).getErrorMsgs().get(0)); + } + + @Test + public void shouldPassValidation() { + Options options = new Options(defaultOptions, Options.OptionType.WRITE); + List errors = options.validateOpts(); + assertEquals(0, errors.size()); + } +} diff --git a/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/util/UtilsTest.java b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/util/UtilsTest.java new file mode 100644 index 00000000..65b722ec --- /dev/null +++ b/tools/etl/tg-spark-connector/src/test/java/com/tigergraph/spark/util/UtilsTest.java @@ -0,0 +1,109 @@ +package com.tigergraph.spark.util; + +import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import static org.junit.jupiter.api.Assertions.*; +import java.util.ArrayList; + +public class UtilsTest { + + @Test + public void testExtractVersion() { + String input1 = + "TigerGraph RESTPP:\n" + + " --- Version ---\n" + + " TigerGraph version: 3.9.3\n" + + " product tg_3.9.3_dev " + + " 559b6532eaf39c8e8074d2fa4960bb5158424aaa 2023-09-28 19:58:18 -0700\n" + + " cqrs tg_3.9.3_dev " + + " 4fcd48bd393e9c63016e01bc2783da0311ed9b38 2023-10-04 15:20:34 -0700\n"; + + String input2 = + "TigerGraph RESTPP:\n" + + " --- Version ---\n" + + " TigerGraph version: tg_3.9.3_dev\n" + + " product tg_3.9.3_dev " + + " 559b6532eaf39c8e8074d2fa4960bb5158424aaa 2023-09-28 19:58:18 -0700\n" + + " cqrs tg_3.9.3_dev " + + " 4fcd48bd393e9c63016e01bc2783da0311ed9b38 2023-10-04 15:20:34 -0700\n"; + + String input3 = + "TigerGraph RESTPP:\n" + + " --- Version ---\n" + + " TigerGraph version: \n" + + " product tg_3.9.3_dev " + + " 559b6532eaf39c8e8074d2fa4960bb5158424aaa 2023-09-28 19:58:18 -0700\n" + + " cqrs tg_3.6.4_dev " + + " 4fcd48bd393e9c63016e01bc2783da0311ed9b38 2023-10-04 15:20:34 -0700\n"; + + String input4 = + "TigerGraph RESTPP:\n" + + " --- Version ---\n" + + " TigerGraph version: \n" + + " product release-3.9.3-2023 " + + " 559b6532eaf39c8e8074d2fa4960bb5158424aaa 2023-09-28 19:58:18 -0700\n" + + " cqrs release-3.9.3-2023 " + + " 4fcd48bd393e9c63016e01bc2783da0311ed9b38 2023-10-04 15:20:34 -0700\n"; + + String input5 = ""; + + assertEquals("3.9.3", Utils.extractVersion(input1)); + assertEquals("3.9.3", Utils.extractVersion(input2)); + assertEquals("3.9.3", Utils.extractVersion(input3)); + assertEquals("3.9.3", Utils.extractVersion(input4)); + assertEquals(Utils.DEFAULT_VERSION, Utils.extractVersion(input5)); + } + + @Test + public void testVersionCmp() { + assertTrue(Utils.versionCmp("3.9.3", "3.9.2") > 0); + assertTrue(Utils.versionCmp("3.09.3", "3.9.2") > 0); + assertTrue(Utils.versionCmp("3.9.3", "3.10.2") < 0); + assertTrue(Utils.versionCmp("8.8.8", "008.008.008") == 0); + assertTrue(Utils.versionCmp("3.9", "3.8.3") > 0); + assertTrue(Utils.versionCmp("3.9", "3.9.2") < 0); + assertTrue(Utils.versionCmp("3.9", "3.9.0") == 0); + } + + @Test + public void testIsEmpty() { + assertTrue(Utils.isEmpty(null)); + assertTrue(Utils.isEmpty("")); + assertTrue(!Utils.isEmpty(" ")); + } + + @Test + public void testMaskString() { + assertEquals("abcde", Utils.maskString("abcde", 3)); + assertEquals("ab*de", Utils.maskString("abcde", 2)); + assertEquals("a***e", Utils.maskString("abcde", 1)); + assertEquals("*****", Utils.maskString("abcde", 0)); + } + + @Test + public void testRemoveUserData() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + ArrayList originals = new ArrayList<>(); + originals.add( + "{\"key1\": {\"lineData\": {\"key2\": \"value2\"}}, \"key3\":" + + " {\"invalidAttributeLinesData\": [{\"attr1\": \"value1\"}, {\"attr2\":" + + " \"value2\"}]}, \"lineData\": {\"key4\": \"value4\"}}"); + originals.add( + "{\"key1\": 1, \"lineData\": [1, 2, 4], \"nested\":" + + " {\"lineData\":{\"key2\":\"value2\"}},\"key3\":{\"invalidAttributeLinesData\":[{\"attr1\":\"value1\"},{\"attr2\":\"value2\"}]},\"key5\":{\"key4\":\"value4\"}}"); + originals.add( + "{\"vertex\": [{\"count\": 1, \"lineData\": [1,2,3]}, {\"count\": 1, \"lineData\":" + + " [1,2,3]}]}"); + ArrayList transformed = new ArrayList<>(); + transformed.add("{\"key1\":{}, \"key3\": {}}"); + transformed.add("{\"key1\": 1, \"nested\": {},\"key3\":{},\"key5\":{\"key4\":\"value4\"}}"); + transformed.add("{\"vertex\": [{\"count\": 1}, {\"count\": 1}]}"); + for (int i = 0; i < originals.size(); i++) { + JsonNode original = mapper.readTree(originals.get(i)); + Utils.removeUserData(original); + // System.out.println(original.toPrettyString()); + assertTrue(original.equals(mapper.readTree(transformed.get(i)))); + } + } +}