diff --git a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleCloudSpannerTest.java b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleCloudSpannerTest.java index 452a9c284..49a9379ef 100644 --- a/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleCloudSpannerTest.java +++ b/integration-test-remote/src/test/java/io/cdap/cdap/app/etl/gcp/GoogleCloudSpannerTest.java @@ -53,7 +53,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,6 +121,17 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase { Schema.Field.of("ARRAY_DATE_COL", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.LogicalType.DATE)))) ); + private static final Schema IMPORT_SCHEMA = Schema.recordOf( + "schema", + Schema.Field.of("ID", Schema.nullableOf(Schema.of(Schema.Type.LONG))), + Schema.Field.of("StringCol", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("BoolCol", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))), + Schema.Field.of("TimestampCol", Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))), + Schema.Field.of("ArrayIntCol", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.Type.LONG)))), + Schema.Field.of("BytesCol", Schema.nullableOf(Schema.of(Schema.Type.BYTES))), + Schema.Field.of("DateCol", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))) + ); + private static final ZonedDateTime NOW = ZonedDateTime.now(); private static final Function> TEST_MUTATIONS = (tableName) -> ImmutableList.of( Mutation.newInsertBuilder(tableName) @@ -156,7 +166,17 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase { Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth()), Date.fromYearMonthDay(NOW.getYear() + 1, NOW.getMonthValue(), NOW.getDayOfMonth()), null)) - .build() + .build(), + + Mutation.newInsertBuilder(tableName) + .set("ID").to(3) + .set("STRING_COL").to("some string") + .set("BOOL_COL").to(false) + .set("TIMESTAMP_COL").to(Timestamp.ofTimeSecondsAndNanos(NOW.toEpochSecond(), NOW.getNano())) + .set("ARRAY_INT_COL").toInt64Array(Arrays.asList(1L, 2L, null)) + .set("BYTES_COL").to(ByteArray.copyFrom("some value".getBytes())) + .set("DATE_COL").to(Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth())) + .build() ); private static final List SOURCE_TABLE_TEST_MUTATIONS = TEST_MUTATIONS.apply(SOURCE_TABLE_NAME); @@ -306,6 +326,70 @@ private void testReadAndStore(Engine engine) throws Exception { Assert.assertTrue(resultSet.isNull("NOT_IN_THE_SCHEMA_COL")); } + @Test + public void testReadWithImportQuery() throws Exception { + testReadWithImportQuery(Engine.MAPREDUCE); + testReadWithImportQuery(Engine.SPARK); + } + + private void testReadWithImportQuery(Engine engine) throws Exception { + Map sourceProperties = new ImmutableMap.Builder() + .put("referenceName", "spanner_source") + .put("project", "${project}") + .put("instance", "${instance}") + .put("database", "${database}") + .put("table", "${srcTable}") + .put("importQuery","${importQuery}") + .build(); + + Map sinkProperties = new ImmutableMap.Builder() + .put("referenceName", "spanner_sink") + .put("project", "${project}") + .put("instance", "${instance}") + .put("database", "${database}") + .put("table", "${dstTable}") + .put("schema", IMPORT_SCHEMA.toString()) + .put("keys", "${keys}") + .build(); + + String applicationName = SPANNER_PLUGIN_NAME + "-testReadWithImportQuery"; + String nonExistentSinkTableName = "nonexistent_" + UUID.randomUUID().toString().replaceAll("-", "_"); + ApplicationManager applicationManager = deployApplication(sourceProperties, sinkProperties, + applicationName, engine); + Map args = new HashMap<>(); + args.put("project", getProjectId()); + args.put("instance", instance.getId().getInstance()); + args.put("database", database.getId().getDatabase()); + args.put("srcTable", SOURCE_TABLE_NAME); + args.put("dstTable", nonExistentSinkTableName); + args.put("keys", "ID"); + args.put("importQuery","Select ID, STRING_COL as StringCol, BOOL_COL as BoolCol, TIMESTAMP_COL as TimestampCol, " + + "ARRAY_INT_COL as ArrayIntCol, BYTES_COL as BytesCol, DATE_COL as DateCol from " + SOURCE_TABLE_NAME); + startWorkFlow(applicationManager, ProgramRunStatus.COMPLETED, args); + + ResultSet resultSet = spanner.getDatabaseClient(database.getId()) + .singleUse() + .executeQuery(Statement.of(String.format("select * from %s;", nonExistentSinkTableName))); + + Assert.assertTrue(resultSet.next()); + Map firstRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(0).asMap(); + Assert.assertEquals(firstRowExpected.get("ID").getInt64(), resultSet.getLong("ID")); + + Assert.assertTrue(resultSet.next()); + Assert.assertTrue(resultSet.next()); + Map secondRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(2).asMap(); + Assert.assertEquals(secondRowExpected.keySet().size(), resultSet.getColumnCount()); + Assert.assertEquals(secondRowExpected.get("ID").getInt64(), resultSet.getLong("ID")); + Assert.assertEquals(secondRowExpected.get("STRING_COL").getString(), resultSet.getString("StringCol")); + Assert.assertEquals(secondRowExpected.get("BOOL_COL").getBool(), resultSet.getBoolean("BoolCol")); + Assert.assertEquals(secondRowExpected.get("TIMESTAMP_COL").getTimestamp(), resultSet.getTimestamp("TimestampCol")); + Assert.assertEquals(secondRowExpected.get("ARRAY_INT_COL").getInt64Array(), resultSet.getLongList("ArrayIntCol")); + Assert.assertEquals(secondRowExpected.get("BYTES_COL").getBytes(), resultSet.getBytes("BytesCol")); + Assert.assertEquals(secondRowExpected.get("DATE_COL").getDate(), resultSet.getDate("DateCol")); + spanner.getDatabaseClient(database.getId()).singleUse() + .executeQuery(Statement.of(String.format("drop table %s;", nonExistentSinkTableName))); + } + //TODO:(CDAP-16040) re-enable once plugin is fixed //@Test public void testReadAndStoreInNewTableWithNoSourceSchema() throws Exception {