diff --git a/glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb b/glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb deleted file mode 100644 index 1f08f41a..00000000 --- a/glue-neptune/examples/export-from-mysql-to-neptune-incremental.ipynb +++ /dev/null @@ -1,143 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sys, boto3, os, datetime\n", - "\n", - "from awsglue.utils import getResolvedOptions\n", - "from pyspark.context import SparkContext\n", - "from awsglue.context import GlueContext\n", - "from awsglue.job import Job\n", - "from awsglue.transforms import ApplyMapping\n", - "from awsglue.transforms import RenameField\n", - "from awsglue.transforms import SelectFields\n", - "from awsglue.dynamicframe import DynamicFrame\n", - "from pyspark.sql.functions import lit\n", - "from pyspark.sql.functions import format_string\n", - "from pyspark.sql.functions import col\n", - "from glue_neptune.NeptuneConnectionInfo import NeptuneConnectionInfo\n", - "from glue_neptune.NeptuneGremlinClient import NeptuneGremlinClient\n", - "from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms\n", - "from gremlin_python import statics\n", - "from gremlin_python.structure.graph import Graph\n", - "from gremlin_python.process.graph_traversal import __\n", - "from gremlin_python.process.strategies import *\n", - "from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection\n", - "from gremlin_python.process.traversal import *\n", - "\n", - "glueContext = GlueContext(sc)\n", - " \n", - "job = Job(glueContext)\n", - "job.init('mysql-to-neptune', {})\n", - "\n", - "database = \"sales-order\"\n", - "order_table = \"salesdb_sales_order\"\n", - "order_detail_table = \"salesdb_sales_order_detail\"\n", - "\n", - "gremlin_endpoint = NeptuneConnectionInfo(glueContext).neptune_endpoint('neptune')\n", - "neptune = NeptuneGremlinClient(gremlin_endpoint)\n", - "\n", - "def get_last_checkpoint (client, tablename):\n", - " conn = client.remote_connection()\n", - " g = client.traversal_source(conn)\n", - " checkpoint= (g.V().hasLabel('Checkpoint').has('table', tablename).fold().coalesce(\n", - " __.unfold(),\n", - " __.addV('Checkpoint').\n", - " property('table', tablename).\n", - " property('value', datetime.datetime(2015, 1, 1, 0, 0))).\n", - " values('value').\n", - " next())\n", - " conn.close()\n", - " return checkpoint\n", - " \n", - "def update_checkpoint (client, tablename, checkpoint):\n", - " conn = client.remote_connection()\n", - " g = client.traversal_source(conn)\n", - " g.V().hasLabel('Checkpoint').has('table', tablename).property(Cardinality.single, 'value', checkpoint).next()\n", - " conn.close()\n", - " return True\n", - " \n", - "checkpoint = get_last_checkpoint(neptune, order_table)\n", - "newcheckpoint = checkpoint + datetime.timedelta(days=1)\n", - "\n", - "print(\"Last checkpoint: \"+ str(checkpoint))\n", - "print(\"New checkpoint : \"+ str(newcheckpoint))\n", - "\n", - "print \"Creating Order vertices...\"\n", - "\n", - "datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = order_table, transformation_ctx = \"datasource0\")\n", - "df0 = datasource0.toDF().filter(col(\"ORDER_DATE\") == checkpoint)\n", - "datasource1 = DynamicFrame.fromDF(df0, glueContext,'datasource1')\n", - "\n", - "print \"Total orders : \"+str(datasource0.count())\n", - "print \"Orders for checkpoint: \"+str(datasource1.count())\n", - "\n", - "applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [(\"ORDER_DATE\", \"timestamp\", \"orderDate\", \"string\"), (\"SHIP_MODE\", \"string\", \"shipMode\", \"string\"), (\"SITE_ID\", \"double\", \"siteId\", \"int\"), (\"ORDER_ID\", \"int\", \"orderId\", \"int\")], transformation_ctx = \"applymapping1\")\n", - "applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'orderId', 'o')])\n", - "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"orderDate\", \"shipMode\"], transformation_ctx = \"selectfields1\")\n", - "\n", - "selectfields1.toDF().foreachPartition(neptune.add_vertices('Order'))\n", - "\n", - "print \"Creating OrderDetail vertices...\"\n", - "\n", - "datasource2 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = order_detail_table, transformation_ctx = \"datasource1\")\n", - "datasource3 = datasource2.join( [\"ORDER_ID\"],[\"ORDER_ID\"], datasource1, transformation_ctx = \"join\")\n", - "\n", - "print \"Total order details : \"+str(datasource2.count())\n", - "print \"Order details for checkpoint: \"+str(datasource3.count())\n", - "\n", - "applymapping2 = ApplyMapping.apply(frame = datasource3, mappings = [(\"DISCOUNT\", \"decimal(10,2)\", \"discount\", \"string\"), (\"UNIT_PRICE\", \"decimal(10,2)\", \"unitPrice\", \"string\"), (\"TAX\", \"decimal(10,2)\", \"tax\", \"string\"), (\"SUPPLY_COST\", \"decimal(10,2)\", \"supplyCost\", \"string\"), (\"PRODUCT_ID\", \"int\", \"productId\", \"int\"), (\"QUANTITY\", \"int\", \"quantity\", \"int\"), (\"LINE_ID\", \"int\", \"lineId\", \"int\"), (\"LINE_NUMBER\", \"int\", \"lineNumber\", \"int\"), (\"ORDER_ID\", \"int\", \"orderId\", \"int\")], transformation_ctx = \"applymapping2\")\n", - "applymapping2 = GremlinCsvTransforms.create_prefixed_columns(applymapping2, [('~id', 'lineId', 'od')])\n", - "selectfields2 = SelectFields.apply(frame = applymapping2, paths = [\"~id\", \"lineNumber\", \"quantity\", \"unitPrice\", \"discount\", \"supplyCost\", \"tax\"], transformation_ctx = \"selectfields2\")\n", - "\n", - "selectfields2.toDF().foreachPartition(neptune.add_vertices('OrderDetail'))\n", - "\n", - "print \"Creating ORDER_DETAIL edges...\"\n", - "\n", - "applymapping3 = RenameField.apply(applymapping2, \"~id\", \"~to\")\n", - "applymapping3 = GremlinCsvTransforms.create_prefixed_columns(applymapping3, [('~from', 'orderId', 'o')])\n", - "applymapping3 = GremlinCsvTransforms.create_edge_id_column(applymapping3, '~from', '~to')\n", - "selectfields3 = SelectFields.apply(frame = applymapping3, paths = [\"~id\", \"~from\", \"~to\", \"lineNumber\"], transformation_ctx = \"selectfields3\")\n", - "\n", - "selectfields3.toDF().foreachPartition(neptune.add_edges('ORDER_DETAIL'))\n", - "\n", - "print \"Creating PRODUCT edges...\"\n", - "\n", - "applymapping4 = RenameField.apply(applymapping2, \"~id\", \"~from\")\n", - "applymapping4 = GremlinCsvTransforms.create_prefixed_columns(applymapping4, [('~to', 'productId', 'p')])\n", - "applymapping4 = GremlinCsvTransforms.create_edge_id_column(applymapping4, '~from', '~to')\n", - "selectfields4 = SelectFields.apply(frame = applymapping4, paths = [\"~id\", \"~from\", \"~to\"], transformation_ctx = \"selectfields4\")\n", - "\n", - "selectfields4.toDF().foreachPartition(neptune.add_edges('PRODUCT'))\n", - "\n", - "update_checkpoint(neptune, order_table, newcheckpoint)\n", - "\n", - "job.commit()\n", - "\n", - "print('Done')" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Sparkmagic (PySpark)", - "language": "", - "name": "pysparkkernel" - }, - "language_info": { - "codemirror_mode": { - "name": "python", - "version": 2 - }, - "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/glue-neptune/examples/export-from-mysql-to-neptune.ipynb b/glue-neptune/examples/export-from-mysql-to-neptune.ipynb deleted file mode 100644 index 6b21f8cc..00000000 --- a/glue-neptune/examples/export-from-mysql-to-neptune.ipynb +++ /dev/null @@ -1,106 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sys, boto3, os\n", - "\n", - "from awsglue.utils import getResolvedOptions\n", - "from pyspark.context import SparkContext\n", - "from awsglue.context import GlueContext\n", - "from awsglue.job import Job\n", - "from awsglue.transforms import ApplyMapping\n", - "from awsglue.transforms import RenameField\n", - "from awsglue.transforms import SelectFields\n", - "from awsglue.dynamicframe import DynamicFrame\n", - "from pyspark.sql.functions import lit\n", - "from pyspark.sql.functions import format_string\n", - "from gremlin_python import statics\n", - "from gremlin_python.structure.graph import Graph\n", - "from gremlin_python.process.graph_traversal import __\n", - "from gremlin_python.process.strategies import *\n", - "from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection\n", - "from gremlin_python.process.traversal import *\n", - "from glue_neptune.NeptuneConnectionInfo import NeptuneConnectionInfo\n", - "from glue_neptune.NeptuneGremlinClient import NeptuneGremlinClient\n", - "from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms\n", - "\n", - "glueContext = GlueContext(sc)\n", - " \n", - "job = Job(glueContext)\n", - "job.init('mysql-to-neptune', {})\n", - "\n", - "database = \"sales-order\"\n", - "product_table = \"salesdb_product\"\n", - "product_category_table = \"salesdb_product_category\"\n", - "supplier_table = \"salesdb_supplier\"\n", - "\n", - "gremlin_endpoint = NeptuneConnectionInfo(glueContext).neptune_endpoint('neptune')\n", - "neptune = NeptuneGremlinClient(gremlin_endpoint)\n", - "\n", - "# Product vertices\n", - "\n", - "print \"Creating Product vertices...\"\n", - "\n", - "datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_table, transformation_ctx = \"datasource0\")\n", - "datasource1 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_category_table, transformation_ctx = \"datasource1\")\n", - "datasource2 = datasource0.join( [\"CATEGORY_ID\"],[\"CATEGORY_ID\"], datasource1, transformation_ctx = \"join\")\n", - "\n", - "applymapping1 = ApplyMapping.apply(frame = datasource2, mappings = [(\"NAME\", \"string\", \"name:String\", \"string\"), (\"UNIT_PRICE\", \"decimal(10,2)\", \"unitPrice\", \"string\"), (\"PRODUCT_ID\", \"int\", \"productId\", \"int\"), (\"QUANTITY_PER_UNIT\", \"int\", \"quantityPerUnit:Int\", \"int\"), (\"CATEGORY_ID\", \"int\", \"category_id\", \"int\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CATEGORY_NAME\", \"string\", \"category:String\", \"string\"), (\"DESCRIPTION\", \"string\", \"description:String\", \"string\"), (\"IMAGE_URL\", \"string\", \"imageUrl:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'productId', 'p'),('~to', 'supplierId', 's')])\n", - "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"name:String\", \"category:String\", \"description:String\", \"unitPrice\", \"quantityPerUnit:Int\", \"imageUrl:String\"], transformation_ctx = \"selectfields1\")\n", - "\n", - "selectfields1.toDF().foreachPartition(neptune.upsert_vertices('Product'))\n", - "\n", - "# Supplier vertices\n", - "\n", - "print \"Creating Supplier vertices...\"\n", - "\n", - "datasource3 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = supplier_table, transformation_ctx = \"datasource3\")\n", - "\n", - "applymapping2 = ApplyMapping.apply(frame = datasource3, mappings = [(\"COUNTRY\", \"string\", \"country:String\", \"string\"), (\"ADDRESS\", \"string\", \"address:String\", \"string\"), (\"NAME\", \"string\", \"name:String\", \"string\"), (\"STATE\", \"string\", \"state:String\", \"string\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CITY\", \"string\", \"city:String\", \"string\"), (\"PHONE\", \"string\", \"phone:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping2 = GremlinCsvTransforms.create_prefixed_columns(applymapping2, [('~id', 'supplierId', 's')])\n", - "selectfields3 = SelectFields.apply(frame = applymapping2, paths = [\"~id\", \"country:String\", \"address:String\", \"city:String\", \"phone:String\", \"name:String\", \"state:String\"], transformation_ctx = \"selectfields3\")\n", - "\n", - "selectfields3.toDF().foreachPartition(neptune.upsert_vertices('Supplier'))\n", - "\n", - "# SUPPLIER edges\n", - "\n", - "print \"Creating SUPPLIER edges...\"\n", - "\n", - "applymapping1 = RenameField.apply(applymapping1, \"~id\", \"~from\")\n", - "applymapping1 = GremlinCsvTransforms.create_edge_id_column(applymapping1, '~from', '~to')\n", - "selectfields2 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"~from\", \"~to\"], transformation_ctx = \"selectfields2\")\n", - " \n", - "selectfields2.toDF().foreachPartition(neptune.upsert_edges('SUPPLIER'))\n", - "\n", - "# End\n", - "\n", - "job.commit()\n", - "\n", - "print('Done')" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Sparkmagic (PySpark)", - "language": "", - "name": "pysparkkernel" - }, - "language_info": { - "codemirror_mode": { - "name": "python", - "version": 2 - }, - "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/glue-neptune/examples/export-from-mysql-to-s3.ipynb b/glue-neptune/examples/export-from-mysql-to-s3.ipynb deleted file mode 100644 index a005fcf0..00000000 --- a/glue-neptune/examples/export-from-mysql-to-s3.ipynb +++ /dev/null @@ -1,102 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import sys\n", - "from awsglue.utils import getResolvedOptions\n", - "from pyspark.context import SparkContext\n", - "from awsglue.context import GlueContext\n", - "from awsglue.job import Job\n", - "from awsglue.transforms import ApplyMapping\n", - "from awsglue.transforms import RenameField\n", - "from awsglue.transforms import SelectFields\n", - "from awsglue.dynamicframe import DynamicFrame\n", - "from pyspark.sql.functions import lit\n", - "from pyspark.sql.functions import format_string\n", - "from glue_neptune.GremlinCsvTransforms import GremlinCsvTransforms\n", - "\n", - "glueContext = GlueContext(sc)\n", - "\n", - "job = Job(glueContext)\n", - "job.init('rds-2-neptune', {})\n", - "\n", - "nodes_path = 's3://'\n", - "edges_path = 's3://'\n", - "\n", - "database = \"sales-order\"\n", - "product_table = \"salesdb_product\"\n", - "product_category_table = \"salesdb_product_category\"\n", - "supplier_table = \"salesdb_supplier\"\n", - " \n", - "def writeCsvFile(datasource, path):\n", - " dataframe = DynamicFrame.toDF(datasource).repartition(1)\n", - " datasource = DynamicFrame.fromDF(dataframe, glueContext, 'write-csv')\n", - " glueContext.write_dynamic_frame.from_options(frame = datasource, connection_type = \"s3\", connection_options = {\"path\": path}, format = \"csv\", transformation_ctx = \"write-csv\") \n", - "\n", - "# Product vertices\n", - "\n", - "print \"Creating Product vertices...\"\n", - "\n", - "datasource0 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_table, transformation_ctx = \"datasource0\")\n", - "datasource1 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = product_category_table, transformation_ctx = \"datasource1\")\n", - "datasource2 = datasource0.join( [\"CATEGORY_ID\"],[\"CATEGORY_ID\"], datasource1, transformation_ctx = \"join\")\n", - "\n", - "applymapping1 = ApplyMapping.apply(frame = datasource2, mappings = [(\"NAME\", \"string\", \"name:String\", \"string\"), (\"UNIT_PRICE\", \"decimal(10,2)\", \"unitPrice\", \"decimal(10,2)\"), (\"PRODUCT_ID\", \"int\", \"productId\", \"int\"), (\"QUANTITY_PER_UNIT\", \"int\", \"quantityPerUnit:Int\", \"int\"), (\"CATEGORY_ID\", \"int\", \"category_id\", \"int\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CATEGORY_NAME\", \"string\", \"category:String\", \"string\"), (\"DESCRIPTION\", \"string\", \"description:String\", \"string\"), (\"IMAGE_URL\", \"string\", \"imageUrl:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping1 = GremlinCsvTransforms.create_prefixed_columns(applymapping1, [('~id', 'productId', 'p'),('~to', 'supplierId', 's')])\n", - "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"name:String\", \"category:String\", \"description:String\", \"unitPrice\", \"quantityPerUnit:Int\", \"imageUrl:String\"], transformation_ctx = \"selectfields1\")\n", - "\n", - "writeCsvFile(GremlinCsvTransforms.addLabel(selectfields1, 'Product'), nodes_path)\n", - "\n", - "# SUPPLIER edges\n", - "\n", - "print \"Creating SUPPLIER edges...\"\n", - "\n", - "applymapping1 = RenameField.apply(applymapping1, \"~id\", \"~from\")\n", - "applymapping1 = GremlinCsvTransforms.create_edge_id_column(applymapping1, '~from', '~to')\n", - "selectfields2 = SelectFields.apply(frame = applymapping1, paths = [\"~id\", \"~from\", \"~to\"], transformation_ctx = \"selectfields2\")\n", - "\n", - "writeCsvFile(GremlinCsvTransforms.addLabel(selectfields2, 'SUPPLIER'), edges_path)\n", - "\n", - "# Supplier vertices\n", - "\n", - "print \"Creating Supplier vertices...\"\n", - "\n", - "datasource3 = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = supplier_table, transformation_ctx = \"datasource3\")\n", - "\n", - "applymapping2 = ApplyMapping.apply(frame = datasource3, mappings = [(\"COUNTRY\", \"string\", \"country:String\", \"string\"), (\"ADDRESS\", \"string\", \"address:String\", \"string\"), (\"NAME\", \"string\", \"name:String\", \"string\"), (\"STATE\", \"string\", \"state:String\", \"string\"), (\"SUPPLIER_ID\", \"int\", \"supplierId\", \"int\"), (\"CITY\", \"string\", \"city:String\", \"string\"), (\"PHONE\", \"string\", \"phone:String\", \"string\")], transformation_ctx = \"applymapping1\")\n", - "applymapping2 = GremlinCsvTransforms.create_prefixed_columns(applymapping2, [('~id', 'supplierId', 's')])\n", - "selectfields3 = SelectFields.apply(frame = applymapping2, paths = [\"~id\", \"country:String\", \"address:String\", \"city:String\", \"phone:String\", \"name:String\", \"state:String\"], transformation_ctx = \"selectfields3\")\n", - "\n", - "writeCsvFile(GremlinCsvTransforms.addLabel(selectfields3, 'Supplier'), nodes_path)\n", - "\n", - "# End\n", - "\n", - "job.commit()\n", - "\n", - "print \"Done\"" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Sparkmagic (PySpark)", - "language": "", - "name": "pysparkkernel" - }, - "language_info": { - "codemirror_mode": { - "name": "python", - "version": 2 - }, - "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python2" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/glue-neptune/readme.md b/glue-neptune/readme.md index 3ecb31b4..3a599595 100644 --- a/glue-neptune/readme.md +++ b/glue-neptune/readme.md @@ -18,13 +18,9 @@ You can then refer to this library from your Glue Development Endpoint or Glue j ## Examples -The _examples_ directory contains 3 example Glue jobs: - - - _export-from-mysql-to-s3_ – Shows how to export from several MySQL tables to CSV formatted in accordance with the Neptune CSV bulk load format. These files are saved to S3, ready to be bulk loaded into Neptune. - - _export-from-mysql-to-neptune_ – Shows how to export direct from several MySQL tables into Neptune. Nodes and edges are written conditionally to the database using user-supplied IDs. - - _export-from-mysql-to-neptune-incremental_ – Shows how to perform an incremental load from MySQL to Neptune using checkpoint information that is written to a Neptune vertex. +See [Migrating from MySQL to Amazon Neptune using AWS Glue](https://github.com/iansrobinson/amazon-neptune-samples/tree/master/gremlin/glue-neptune). - ## Cross Account/Region Datasources +## Cross Account/Region Datasources If you have a datasource in a different region and/or different account from Glue and your Neptune database, you can follow the instructions in this [blog](https://aws.amazon.com/blogs/big-data/create-cross-account-and-cross-region-aws-glue-connections/) to allow access. diff --git a/neptune-export/docs/export-pg-from-config.md b/neptune-export/docs/export-pg-from-config.md index c4a4e5a3..21190608 100644 --- a/neptune-export/docs/export-pg-from-config.md +++ b/neptune-export/docs/export-pg-from-config.md @@ -8,7 +8,8 @@ {-c | --config-file} [ {-cn | --concurrency} ] {-d | --dir} {-e | --endpoint} ... - [ {-el | --edge-label} ... ] [ --format ] + [ {-el | --edge-label} ... ] + [ --exclude-type-definitions ] [ --format ] [ --lb-port ] [ --log-level ] [ {-nl | --node-label} ... ] [ --nlb-endpoint ] @@ -63,6 +64,13 @@ -el , --edge-label Labels of edges to be exported (optional, default all labels) + --exclude-type-definitions + Exclude type definitions from column headers (optional, default + false) + + This option may occur a maximum of 1 times + + --format Output format (optional, default 'csv') diff --git a/neptune-export/docs/export-pg.md b/neptune-export/docs/export-pg.md index f856b913..b37d1e69 100644 --- a/neptune-export/docs/export-pg.md +++ b/neptune-export/docs/export-pg.md @@ -7,7 +7,8 @@ [ --alb-endpoint ] [ {-cn | --concurrency} ] {-d | --dir} {-e | --endpoint} ... - [ {-el | --edge-label} ... ] [ --format ] + [ {-el | --edge-label} ... ] + [ --exclude-type-definitions ] [ --format ] [ --lb-port ] [ --log-level ] [ {-nl | --node-label} ... ] [ --nlb-endpoint ] @@ -52,6 +53,13 @@ -el , --edge-label Labels of edges to be exported (optional, default all labels) + --exclude-type-definitions + Exclude type definitions from column headers (optional, default + false) + + This option may occur a maximum of 1 times + + --format Output format (optional, default 'csv') diff --git a/neptune-export/pom.xml b/neptune-export/pom.xml index 8830d98a..3801b4cc 100644 --- a/neptune-export/pom.xml +++ b/neptune-export/pom.xml @@ -41,6 +41,18 @@ 1.11.307 + + com.amazonaws + aws-java-sdk-s3 + 1.11.307 + + + + com.amazonaws + aws-lambda-java-core + 1.2.0 + + com.amazonaws amazon-neptune-sigv4-signer @@ -75,11 +87,12 @@ junit junit 4.12 + test org.eclipse.rdf4j - rdf4j-runtime + rdf4j-repository-sparql [2.4.0,) @@ -141,7 +154,8 @@ ${uberjar.name} - + com.amazonaws.services.neptune.NeptuneExportCli diff --git a/neptune-export/readme.md b/neptune-export/readme.md index c53aadfa..4fb6ecf3 100644 --- a/neptune-export/readme.md +++ b/neptune-export/readme.md @@ -10,34 +10,14 @@ Exports Amazon Neptune property graph data to CSV or JSON, or RDF graph data to - [`export-pg-from-queries`](docs/export-pg-from-queries.md) - [`export-rdf`](docs/export-rdf.md) -### Property Graph +### Topics - [Exporting to the Bulk Loader CSV Format](#exporting-to-the-bulk-loader-csv-format) - [Exporting the Results of User-Supplied Queries](#exporting-the-results-of-user-supplied-queries) - -### RDF Graph - - [Exporting an RDF Graph](#exporting-an-rdf-graph) - -### Encryption in transit - -You can connect to Neptune from _neptune-export_ using SSL by specifying the `--use-ssl` option. - -If you are using a load balancer or a proxy server (such as HAProxy), you must [use SSL termination and have your own SSL certificate on the proxy server](https://docs.aws.amazon.com/neptune/latest/userguide/security-ssl.html). - -### IAM DB authentication - -_neptune-export_ supports exporting from databases that have [IAM database authentication](https://docs.aws.amazon.com/neptune/latest/userguide/iam-auth.html) enabled. Supply the `--use-iam-auth` option with each command. Remember to set the **SERVICE_REGION** environment variable – e.g. `export SERVICE_REGION=us-east-1`. - -_neptune-export_ also supports connecting through a load balancer to a Neptune database with IAM DB authetication enabled. However, this feature is only currently supported for property graphs, with support for RDF graphs coming soon. - -If you are connecting through a load balancer, and have IAM DB authentication enabled, you must also supply either an `--nlb-endpoint` option (if using a network load balancer) or an `--alb-endpoint` option (if using an application load balancer), and an `--lb-port`. - -For details on using a load balancer with a database with IAM DB authentication enabled, see [Connecting to Amazon Neptune from Clients Outside the Neptune VPC](https://github.com/aws-samples/aws-dbs-refarch-graph/tree/master/src/connecting-using-a-load-balancer). - -## Building neptune-export - -`mvn clean install` + - [Building neptune-export](#building-neptune-export) + - [Security](#security) + - [Deploying neptune-export as an AWS Lambda Function](#deploying-neptune-export-as-an-aws-lambda-function) ## Exporting to the Bulk Loader CSV Format @@ -101,4 +81,45 @@ Queries whose results contain very large rows can sometimes trigger a `Corrupted ## Exporting an RDF Graph -At present _neptune-export_ supports exporting an RDF dataset to Turtle with a single-threaded long-running query. \ No newline at end of file +At present _neptune-export_ supports exporting an RDF dataset to Turtle with a single-threaded long-running query. + +## Security + +### Encryption in transit + +You can connect to Neptune from _neptune-export_ using SSL by specifying the `--use-ssl` option. + +If you are using a load balancer or a proxy server (such as HAProxy), you must [use SSL termination and have your own SSL certificate on the proxy server](https://docs.aws.amazon.com/neptune/latest/userguide/security-ssl.html). + +### IAM DB authentication + +_neptune-export_ supports exporting from databases that have [IAM database authentication](https://docs.aws.amazon.com/neptune/latest/userguide/iam-auth.html) enabled. Supply the `--use-iam-auth` option with each command. Remember to set the **SERVICE_REGION** environment variable – e.g. `export SERVICE_REGION=us-east-1`. + +_neptune-export_ also supports connecting through a load balancer to a Neptune database with IAM DB authetication enabled. However, this feature is only currently supported for property graphs, with support for RDF graphs coming soon. + +If you are connecting through a load balancer, and have IAM DB authentication enabled, you must also supply either an `--nlb-endpoint` option (if using a network load balancer) or an `--alb-endpoint` option (if using an application load balancer), and an `--lb-port`. + +For details on using a load balancer with a database with IAM DB authentication enabled, see [Connecting to Amazon Neptune from Clients Outside the Neptune VPC](https://github.com/aws-samples/aws-dbs-refarch-graph/tree/master/src/connecting-using-a-load-balancer). + +## Building neptune-export + +To build the jar, run: + +`mvn clean install` + +## Deploying neptune-export as an AWS Lambda Function + +The _neptune-export_ jar can be deployed as an AWS Lambda function. To access Neptune, you will either have to [configure the function to access resources inside your VPC](https://docs.aws.amazon.com/lambda/latest/dg/vpc.html), or [expose the Neptune endpoints via a load balancer](https://github.com/aws-samples/aws-dbs-refarch-graph/tree/master/src/connecting-using-a-load-balancer). + +Be mindful of the [AWS Lambda limits](https://docs.aws.amazon.com/lambda/latest/dg/limits.html), particularly with regard to function timeouts (max 15 minutes) and _/tmp_ directory storage (512 MB). Large exports can easily exceed these limits. + +When deployed as a Lambda function, _neptune-export_ will automatically copy the export files to an S3 bucket of your choosing. Optionally, it can also write a completion file to a separate S3 location (useful for triggering additional Lambda functions). You must configure your function with an IAM role that has write access to these S3 locations. + +The Lambda function expects a number of parameters, which you can supply either as [environment variables](https://docs.aws.amazon.com/lambda/latest/dg/env_variables.html) or via a JSON input parameter. Fields in the JSON input parameter override any environment variables you have set up. + +| Environment Variable | JSON Field | Description || +| ---- | ---- | ---- | ---- | +| `COMMAND` | `command` | Command and command-line options: e.g. `export-pg -e ` | Mandatory | +| `OUTPUT_S3_PATH` | `outputS3Path` | S3 location to which exported files will be written | Mandatory | +| `CONFIG_FILE_S3_PATH` | `configFileS3Path` | S3 location of a JSON config file to be used when exporting a property graph from a config file | Optional | +| `COMPLETION_FILE_S3_PATH` | `completionFileS3Path` | S3 location to which a completion file shuld be written once all export files have been copied to S3 | Optional | \ No newline at end of file diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java index 3c0921f1..5c04e866 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraph.java @@ -88,6 +88,10 @@ public class ExportPropertyGraph extends NeptuneExportBaseCommand implements Run @AllowedValues(allowedValues = {"csv", "json"}) private Format format = Format.csv; + @Option(name = {"--exclude-type-definitions"}, description = "Exclude type definitions from column headers (optional, default false)") + @Once + private boolean excludeTypeDefinitions = false; + @Override public void run() { ConcurrencyConfig concurrencyConfig = new ConcurrencyConfig(concurrency, range); @@ -107,7 +111,14 @@ public void run() { new SaveMetadataConfig(metadataCollection, configFilePath).execute(); - ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob(metadataSpecifications, metadataCollection, g, concurrencyConfig, directories, format); + ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob( + metadataSpecifications, + metadataCollection, + g, + concurrencyConfig, + directories, + format, + !excludeTypeDefinitions); exportJob.execute(); System.err.println(format.description() + " files : " + directories.directory()); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java index d54dd8d6..59cc4d45 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromConfig.java @@ -12,7 +12,6 @@ package com.amazonaws.services.neptune; -import com.amazonaws.services.neptune.auth.HandshakeRequestConfig; import com.amazonaws.services.neptune.io.DirectoryStructure; import com.amazonaws.services.neptune.propertygraph.ConcurrencyConfig; import com.amazonaws.services.neptune.propertygraph.NeptuneGremlinClient; @@ -77,6 +76,10 @@ public class ExportPropertyGraphFromConfig extends NeptuneExportBaseCommand impl @AllowedValues(allowedValues = {"csv", "json"}) private Format format = Format.csv; + @Option(name = {"--exclude-type-definitions"}, description = "Exclude type definitions from column headers (optional, default false)") + @Once + private boolean excludeTypeDefinitions = false; + @Override public void run() { ConcurrencyConfig concurrencyConfig = new ConcurrencyConfig(concurrency, range); @@ -90,7 +93,14 @@ public void run() { Collection> metadataSpecifications = scope.metadataSpecifications(nodeLabels, edgeLabels); - ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob(metadataSpecifications, metadataCollection, g, concurrencyConfig, directories, format); + ExportPropertyGraphJob exportJob = new ExportPropertyGraphJob( + metadataSpecifications, + metadataCollection, + g, + concurrencyConfig, + directories, + format, + !excludeTypeDefinitions); exportJob.execute(); System.err.println(format.description() + " files : " + directories.directory()); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java index c62a8080..87b50787 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportBaseCommand.java @@ -5,13 +5,14 @@ import com.github.rvesse.airline.annotations.restrictions.*; import java.io.File; +import java.util.ArrayList; import java.util.List; -public class NeptuneExportBaseCommand { +public abstract class NeptuneExportBaseCommand { - @Option(name = {"-e", "--endpoint"}, description = "Neptune endpoint(s) – supply multiple instance endpoints if you want to load balance requests across a cluster", title="endpoint") + @Option(name = {"-e", "--endpoint"}, description = "Neptune endpoint(s) – supply multiple instance endpoints if you want to load balance requests across a cluster", title = "endpoint") @Required - protected List endpoints; + protected List endpoints = new ArrayList<>(); @Option(name = {"-p", "--port"}, description = "Neptune port (optional, default 8182)") @Port(acceptablePorts = {PortType.SYSTEM, PortType.USER}) @@ -56,11 +57,11 @@ public class NeptuneExportBaseCommand { @Once protected int loadBalancerPort = 80; - public ConnectionConfig connectionConfig(){ + public ConnectionConfig connectionConfig() { return new ConnectionConfig(endpoints, port, networkLoadBalancerEndpoint, applicationLoadBalancerEndpoint, loadBalancerPort, useIamAuth, useSsl); } - public void setLoggingLevel(){ + public void applyLogLevel() { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", logLevel); } } diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java index 17538a8b..59a96dd3 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportCli.java @@ -12,6 +12,7 @@ package com.amazonaws.services.neptune; +import com.github.rvesse.airline.Accessor; import com.github.rvesse.airline.annotations.Alias; import com.github.rvesse.airline.annotations.Cli; import com.github.rvesse.airline.annotations.Parser; @@ -46,7 +47,7 @@ public static void main(String[] args) { Runnable cmd = cli.parse(args); if (NeptuneExportBaseCommand.class.isAssignableFrom(cmd.getClass())) { - ((NeptuneExportBaseCommand) cmd).setLoggingLevel(); + ((NeptuneExportBaseCommand) cmd).applyLogLevel(); } cmd.run(); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java new file mode 100644 index 00000000..fb009866 --- /dev/null +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/NeptuneExportLambda.java @@ -0,0 +1,143 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + +package com.amazonaws.services.neptune; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.amazonaws.services.neptune.util.EnvironmentVariableUtils; +import com.amazonaws.services.neptune.util.S3ObjectInfo; +import com.amazonaws.services.s3.transfer.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.*; + +public class NeptuneExportLambda implements RequestStreamHandler { + + @Override + public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { + + LambdaLogger logger = context.getLogger(); + + JsonNode json = new ObjectMapper().readTree(inputStream); + + String cmd = json.has("command") ? + json.path("command").textValue() : + EnvironmentVariableUtils.getMandatoryEnv("COMMAND"); + + String outputS3Path = json.has("outputS3Path") ? + json.path("outputS3Path").textValue() : + EnvironmentVariableUtils.getMandatoryEnv("OUTPUT_S3_PATH"); + + String configFileS3Path = json.has("configFileS3Path") ? + json.path("configFileS3Path").textValue() : + EnvironmentVariableUtils.getOptionalEnv("CONFIG_FILE_S3_PATH", ""); + + String completionFileS3Path = json.has("completionFileS3Path") ? + json.path("completionFileS3Path").textValue() : + EnvironmentVariableUtils.getOptionalEnv("COMPLETION_FILE_S3_PATH", ""); + + logger.log("cmd : " + cmd); + logger.log("outputS3Path : " + outputS3Path); + logger.log("configFileS3Path : " + configFileS3Path); + logger.log("completionFileS3Path : " + completionFileS3Path); + + S3ObjectInfo outputBaseS3ObjectInfo = new S3ObjectInfo(outputS3Path); + + TransferManager transferManager = TransferManagerBuilder.standard().build(); + + downloadConfigFile(context, logger, configFileS3Path, transferManager); + + File directory = executeCommand(cmd); + logger.log("DIRECTORY: " + directory.getAbsolutePath()); + + S3ObjectInfo outputS3ObjectInfo = outputBaseS3ObjectInfo.withNewKeySuffix(directory.getName()); + + uploadExportFilesToS3(logger, transferManager, directory, outputS3ObjectInfo); + uploadCompletionFileToS3(completionFileS3Path, transferManager, directory, outputS3ObjectInfo); + + } + + private void uploadCompletionFileToS3(String completionFileS3Path, TransferManager transferManager, File directory, S3ObjectInfo outputS3ObjectInfo) throws IOException { + if (!completionFileS3Path.isEmpty()) { + + File completionFile = new File("/tmp", directory.getName() + ".txt"); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(completionFile))) { + writer.write(outputS3ObjectInfo.toString()); + } + + S3ObjectInfo completionFileS3ObjectInfo = new S3ObjectInfo(completionFileS3Path).withNewKeySuffix(completionFile.getName()); + + Upload upload = transferManager.upload(completionFileS3ObjectInfo.bucket(), completionFileS3ObjectInfo.key(), completionFile); + try { + upload.waitForUploadResult(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void uploadExportFilesToS3(LambdaLogger logger, TransferManager transferManager, File directory, S3ObjectInfo outputS3ObjectInfo) { + try { + + MultipleFileUpload upload = transferManager.uploadDirectory( + outputS3ObjectInfo.bucket(), + outputS3ObjectInfo.key(), + directory, + true); + + upload.waitForCompletion(); + } catch (InterruptedException e) { + logger.log(e.getMessage()); + } + } + + private void downloadConfigFile(Context context, LambdaLogger logger, String configFileS3Path, TransferManager transferManager) { + if (!configFileS3Path.isEmpty()) { + S3ObjectInfo configFileS3ObjectInfo = new S3ObjectInfo(configFileS3Path); + + logger.log("Bucket: " + configFileS3ObjectInfo.bucket()); + logger.log("Key : " + configFileS3ObjectInfo.key()); + logger.log("File : " + configFileS3ObjectInfo.createDownloadFile("/tmp").getAbsolutePath()); + + Download download = transferManager.download( + configFileS3ObjectInfo.bucket(), + configFileS3ObjectInfo.key(), + configFileS3ObjectInfo.createDownloadFile("/tmp")); + try { + download.waitForCompletion(); + } catch (InterruptedException e) { + context.getLogger().log(e.getMessage()); + } + } + } + + private File executeCommand(String cmd) throws IOException { + String[] args = cmd.split(" "); + + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + + PrintStream out = new PrintStream(output); + PrintStream old = System.out; + System.setOut(out); + + NeptuneExportCli.main(args); + + System.out.flush(); + System.setOut(old); + + return new File(output.toString().replace("\n", "")); + } + } +} diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java index e19c5259..4b3920ec 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphJob.java @@ -33,19 +33,22 @@ public class ExportPropertyGraphJob { private final ConcurrencyConfig concurrencyConfig; private final Directories directories; private final Format format; + private boolean includeTypeDefinitions; public ExportPropertyGraphJob(Collection> metadataSpecifications, PropertiesMetadataCollection propertiesMetadataCollection, GraphTraversalSource g, ConcurrencyConfig concurrencyConfig, Directories directories, - Format format) { + Format format, + boolean includeTypeDefinitions) { this.metadataSpecifications = metadataSpecifications; this.propertiesMetadataCollection = propertiesMetadataCollection; this.g = g; this.concurrencyConfig = concurrencyConfig; this.directories = directories; this.format = format; + this.includeTypeDefinitions = includeTypeDefinitions; } public void execute() throws Exception { @@ -68,7 +71,8 @@ public void execute() throws Exception { format, rangeFactory, status, - index); + index, + includeTypeDefinitions); taskExecutor.execute(exportTask); } diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java index 29cf3160..3d8d5cce 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/io/ExportPropertyGraphTask.java @@ -35,14 +35,17 @@ public class ExportPropertyGraphTask implements Runnable, GraphElementHandler private final Status status; private final int index; private final Map> labelWriters = new HashMap<>(); + private boolean includeTypeDefinitions; public ExportPropertyGraphTask(PropertiesMetadata propertiesMetadata, LabelsFilter labelsFilter, GraphClient graphClient, WriterFactory writerFactory, - Format format, RangeFactory rangeFactory, + Format format, + RangeFactory rangeFactory, Status status, - int index) { + int index, + boolean includeTypeDefinitions) { this.propertiesMetadata = propertiesMetadata; this.labelsFilter = labelsFilter; this.graphClient = graphClient; @@ -51,6 +54,7 @@ public ExportPropertyGraphTask(PropertiesMetadata propertiesMetadata, this.rangeFactory = rangeFactory; this.status = status; this.index = index; + this.includeTypeDefinitions = includeTypeDefinitions; } @Override @@ -101,7 +105,7 @@ private void createWriterFor(String label) { } Printer printer = writerFactory.createPrinter(label, index, propertyMetadata, format); - printer.printHeaderRemainingColumns(propertyMetadata.values(), true); + printer.printHeaderRemainingColumns(propertyMetadata.values(), includeTypeDefinitions); labelWriters.put(label, writerFactory.createLabelWriter(printer)); diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java index c0272d95..7c908d47 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/propertygraph/metadata/MetadataSpecification.java @@ -69,7 +69,8 @@ public ExportPropertyGraphTask createExportTask(PropertiesMetadataCollection Format format, RangeFactory rangeFactory, Status status, - int index) { + int index, + boolean includeTypeDefinitions) { return new ExportPropertyGraphTask<>( metadataCollection.propertyMetadataFor(metadataType), labelsFilter, @@ -78,8 +79,8 @@ public ExportPropertyGraphTask createExportTask(PropertiesMetadataCollection format, rangeFactory, status, - index - ); + index, + includeTypeDefinitions); } private static class Handler implements GraphElementHandler> { diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java index db95fc59..570e1d70 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/rdf/NeptuneSparqlClient.java @@ -63,14 +63,14 @@ public static NeptuneSparqlClient create(Collection endpoints, int port, throw new RuntimeException(e1); } }). - peek(AbstractRepository::initialize). + peek(AbstractRepository::init). collect(Collectors.toList())); } else { return new NeptuneSparqlClient( endpoints.stream().map(e -> updateParser(new SPARQLRepository(sparqlEndpount(e, port)))). - peek(AbstractRepository::initialize). + peek(AbstractRepository::init). collect(Collectors.toList())); } } @@ -172,7 +172,7 @@ private SPARQLRepository chooseRepository() { } @Override - public void close() throws Exception { + public void close() { repositories.forEach(AbstractRepository::shutDown); } } diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java index fb55bfd3..4a9f3452 100644 --- a/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/EnvironmentVariableUtils.java @@ -1,3 +1,15 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + package com.amazonaws.services.neptune.util; public class EnvironmentVariableUtils { diff --git a/neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java new file mode 100644 index 00000000..faf21fa6 --- /dev/null +++ b/neptune-export/src/main/java/com/amazonaws/services/neptune/util/S3ObjectInfo.java @@ -0,0 +1,51 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + +package com.amazonaws.services.neptune.util; + +import java.io.File; +import java.net.URI; + +public class S3ObjectInfo { + private final String bucket; + private final String key; + private final String fileName; + + public S3ObjectInfo(String s3Uri) { + URI uri = URI.create(s3Uri); + + bucket = uri.getAuthority(); + key = uri.getPath().substring(1); + fileName = new File(uri.getPath()).getName(); + } + + public String bucket() { + return bucket; + } + + public String key() { + return key; + } + + public File createDownloadFile(String parent) { + return new File(parent, fileName); + } + + public S3ObjectInfo withNewKeySuffix(String suffix) { + return new S3ObjectInfo( String.format("s3://%s/%s", bucket, new File(key, suffix).getPath())); + } + + @Override + public String toString() { + return String.format("s3://%s/%s", bucket, key); + } +} diff --git a/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java b/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java index 384449b4..ec6233c9 100644 --- a/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java +++ b/neptune-export/src/test/java/com/amazonaws/services/neptune/propertygraph/io/JsonPrinterTest.java @@ -37,5 +37,4 @@ public void shouldPrintEdge() throws Exception { "{\"~id\":\"edge-id\",\"~label\":\"edge-label\",\"~from\":\"from-id\",\"~to\":\"to-id\"}", stringWriter.toString()); } - } \ No newline at end of file diff --git a/neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java b/neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java new file mode 100644 index 00000000..8b672733 --- /dev/null +++ b/neptune-export/src/test/java/com/amazonaws/services/neptune/util/S3ObjectInfoTest.java @@ -0,0 +1,83 @@ +/* +Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + http://www.apache.org/licenses/LICENSE-2.0 +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +*/ + +package com.amazonaws.services.neptune.util; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class S3ObjectInfoTest { + + @Test + public void canParseBucketFromURI(){ + String s3Uri = "s3://my-bucket/a/b/c"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("my-bucket", s3ObjectInfo.bucket()); + } + + @Test + public void canParseKeyWithoutTrailingSlashFromURI(){ + String s3Uri = "s3://my-bucket/a/b/c"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c", s3ObjectInfo.key()); + } + + @Test + public void canParseKeyWithTrainlingSlashFromURI(){ + String s3Uri = "s3://my-bucket/a/b/c/"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c/", s3ObjectInfo.key()); + } + + @Test + public void canCreateDownloadFileForKeyWithoutTrailingSlash(){ + String s3Uri = "s3://my-bucket/a/b/c.txt"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("/temp/c.txt", s3ObjectInfo.createDownloadFile("/temp").getAbsolutePath()); + } + + @Test + public void canCreateDownloadFileForKeyWithTrailingSlash(){ + String s3Uri = "s3://my-bucket/a/b/c/"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("/temp/c", s3ObjectInfo.createDownloadFile("/temp").getAbsolutePath()); + } + + @Test + public void canCreateNewInfoForKeyWithoutTrailingSlash() { + String s3Uri = "s3://my-bucket/a/b/c"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c/dir", s3ObjectInfo.withNewKeySuffix("dir").key()); + } + + @Test + public void canCreateNewKeyForKeyWithTrailingSlash() { + String s3Uri = "s3://my-bucket/a/b/c/"; + + S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(s3Uri); + + assertEquals("a/b/c/dir", s3ObjectInfo.withNewKeySuffix("dir").key()); + } +} \ No newline at end of file