Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLUGIN-296 Add integration test for SpannerSource getSchema from import Query #1084

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style removes this line automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -122,6 +121,17 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
Schema.Field.of("ARRAY_DATE_COL", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))))
);

private static final Schema IMPORT_SCHEMA = Schema.recordOf(
"schema",
Schema.Field.of("ID", Schema.nullableOf(Schema.of(Schema.Type.LONG))),
Schema.Field.of("StringCol", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("BoolCol", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))),
Schema.Field.of("TimestampCol", Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))),
Schema.Field.of("ArrayIntCol", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.Type.LONG)))),
Schema.Field.of("BytesCol", Schema.nullableOf(Schema.of(Schema.Type.BYTES))),
Schema.Field.of("DateCol", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE)))
);

private static final ZonedDateTime NOW = ZonedDateTime.now();
private static final Function<String, List<Mutation>> TEST_MUTATIONS = (tableName) -> ImmutableList.of(
Mutation.newInsertBuilder(tableName)
Expand Down Expand Up @@ -156,7 +166,17 @@ public class GoogleCloudSpannerTest extends DataprocETLTestBase {
Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth()),
Date.fromYearMonthDay(NOW.getYear() + 1, NOW.getMonthValue(), NOW.getDayOfMonth()),
null))
.build()
.build(),

Mutation.newInsertBuilder(tableName)
.set("ID").to(3)
.set("STRING_COL").to("some string")
.set("BOOL_COL").to(false)
.set("TIMESTAMP_COL").to(Timestamp.ofTimeSecondsAndNanos(NOW.toEpochSecond(), NOW.getNano()))
.set("ARRAY_INT_COL").toInt64Array(Arrays.asList(1L, 2L, null))
.set("BYTES_COL").to(ByteArray.copyFrom("some value".getBytes()))
.set("DATE_COL").to(Date.fromYearMonthDay(NOW.getYear(), NOW.getMonthValue(), NOW.getDayOfMonth()))
.build()
);

private static final List<Mutation> SOURCE_TABLE_TEST_MUTATIONS = TEST_MUTATIONS.apply(SOURCE_TABLE_NAME);
Expand Down Expand Up @@ -306,6 +326,70 @@ private void testReadAndStore(Engine engine) throws Exception {
Assert.assertTrue(resultSet.isNull("NOT_IN_THE_SCHEMA_COL"));
}

@Test
public void testReadWithImportQuery() throws Exception {
testReadWithImportQuery(Engine.MAPREDUCE);
testReadWithImportQuery(Engine.SPARK);
}

private void testReadWithImportQuery(Engine engine) throws Exception {
Map<String, String> sourceProperties = new ImmutableMap.Builder<String, String>()
.put("referenceName", "spanner_source")
.put("project", "${project}")
.put("instance", "${instance}")
.put("database", "${database}")
.put("table", "${srcTable}")
.put("importQuery","${importQuery}")
.build();

Map<String, String> sinkProperties = new ImmutableMap.Builder<String, String>()
.put("referenceName", "spanner_sink")
.put("project", "${project}")
.put("instance", "${instance}")
.put("database", "${database}")
.put("table", "${dstTable}")
.put("schema", IMPORT_SCHEMA.toString())
.put("keys", "${keys}")
.build();

String applicationName = SPANNER_PLUGIN_NAME + "-testReadWithImportQuery";
String nonExistentSinkTableName = "nonexistent_" + UUID.randomUUID().toString().replaceAll("-", "_");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should delete this table at the end of the test

ApplicationManager applicationManager = deployApplication(sourceProperties, sinkProperties,
applicationName, engine);
Map<String, String> args = new HashMap<>();
args.put("project", getProjectId());
args.put("instance", instance.getId().getInstance());
args.put("database", database.getId().getDatabase());
args.put("srcTable", SOURCE_TABLE_NAME);
args.put("dstTable", nonExistentSinkTableName);
args.put("keys", "ID");
args.put("importQuery","Select ID, STRING_COL as StringCol, BOOL_COL as BoolCol, TIMESTAMP_COL as TimestampCol, " +
"ARRAY_INT_COL as ArrayIntCol, BYTES_COL as BytesCol, DATE_COL as DateCol from " + SOURCE_TABLE_NAME);
startWorkFlow(applicationManager, ProgramRunStatus.COMPLETED, args);

ResultSet resultSet = spanner.getDatabaseClient(database.getId())
.singleUse()
.executeQuery(Statement.of(String.format("select * from %s;", nonExistentSinkTableName)));

Assert.assertTrue(resultSet.next());
Map<String, Value> firstRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(0).asMap();
Assert.assertEquals(firstRowExpected.get("ID").getInt64(), resultSet.getLong("ID"));

Assert.assertTrue(resultSet.next());
Assert.assertTrue(resultSet.next());
Map<String, Value> secondRowExpected = SOURCE_TABLE_TEST_MUTATIONS.get(2).asMap();
Assert.assertEquals(secondRowExpected.keySet().size(), resultSet.getColumnCount());
Assert.assertEquals(secondRowExpected.get("ID").getInt64(), resultSet.getLong("ID"));
Assert.assertEquals(secondRowExpected.get("STRING_COL").getString(), resultSet.getString("StringCol"));
Assert.assertEquals(secondRowExpected.get("BOOL_COL").getBool(), resultSet.getBoolean("BoolCol"));
Assert.assertEquals(secondRowExpected.get("TIMESTAMP_COL").getTimestamp(), resultSet.getTimestamp("TimestampCol"));
Assert.assertEquals(secondRowExpected.get("ARRAY_INT_COL").getInt64Array(), resultSet.getLongList("ArrayIntCol"));
Assert.assertEquals(secondRowExpected.get("BYTES_COL").getBytes(), resultSet.getBytes("BytesCol"));
Assert.assertEquals(secondRowExpected.get("DATE_COL").getDate(), resultSet.getDate("DateCol"));
spanner.getDatabaseClient(database.getId()).singleUse()
.executeQuery(Statement.of(String.format("drop table %s;", nonExistentSinkTableName)));
}

//TODO:(CDAP-16040) re-enable once plugin is fixed
//@Test
public void testReadAndStoreInNewTableWithNoSourceSchema() throws Exception {
Expand Down