-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
updated/PLUGIN-296 #457
updated/PLUGIN-296 #457
Conversation
// listing table's schema documented at https://cloud.google.com/spanner/docs/information-schema | ||
private static final Statement.Builder SCHEMA_STATEMENT_BUILDER = Statement.newBuilder( | ||
String.format("SELECT t.column_name,t.spanner_type, t.is_nullable FROM information_schema.columns AS t WHERE " + | ||
" t.table_catalog = '' AND t.table_schema = '' AND t.table_name = @%s", TABLE_NAME)); | ||
private static final Statement.Builder SCHEMA_STATEMENT_BUILDER_WITH_COLUMNS = Statement.newBuilder( | ||
String.format("SELECT t.column_name,t.spanner_type, t.is_nullable FROM information_schema.columns AS t WHERE " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if prefix of this string is same as above string, we can just concat 2 strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@@ -241,11 +255,40 @@ private Schema getSchema(FailureCollector collector) { | |||
projectId)) { | |||
DatabaseClient databaseClient = | |||
spanner.getDatabaseClient(DatabaseId.of(projectId, config.instance, config.database)); | |||
Statement getTableSchemaStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build(); | |||
Map<String, String> columnNameMap = new HashMap<>(); | |||
// get columns from import query when query does not contain the '*' or 'case' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we making this assumption? also why are we checking for 'case'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding *
it indicates that user selects all column of the table.
Regarding case
user can type select statement like in example:
SELECT A, B, CASE A WHEN 90 THEN 'red' WHEN 50 THEN 'blue' ELSE 'green' END AS result FROM Numbers
in these cases we fallback to getting full schema of the table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is case
used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user types a query like:
SELECT A, B, CASE A WHEN 90 THEN 'red' WHEN 50 THEN 'blue' ELSE 'green' END AS result FROM Numbers
in these cases we fallback to getting full schema of the table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm this approach is not ideal. You can construct queries where we unnecessarily fallback to getting full table schema.
For example, if the query is SELECT COUNT(*) FROM table
, we'll get the full schema.
Instead of parsing the query, can we execute the query with LIMIT 1
and get the schema from the query result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's implement this correctly so we won't need to rework the fix later.
Ideally, we would get the correct schema for all queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmstar With the suggested implementation we face the following issue:
Currently with Spanner cloud library we can get column name and colum type. We are missing the information whether column is nullable or not.
Spanner has this information in 'ResultSetMetadata' class -however this class is onlyavailable in 'com.google.spanner' package (https://googleapis.dev/java/google-cloud-spanner/3.2.1/com/google/spanner/v1/ResultSetMetadata.html) and not in 'com.google.cloud.spanner' package ('https://googleapis.dev/java/google-cloud-spanner/latest/com/google/cloud/spanner/ResultSet.html')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the effort involved to use com.google.spanner
library to get column metadata, including whether it's nullable or not?
If that's not feasible, can we use com.google.cloud.spanner
to get the column names, and then use the existing approach in this PR (query information_schema) to check if the column is nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not be able to use the alternative 1 com.google.spanner
library.
Yes we can use the com.google.cloud.spanner
to get the column names, and then use the existing approach (information_schema) to check if the column is nullable. However, there are cases where we can't get the nullable information like:
- Querying using aliases
- Aggregate Functions
For these cases should we default to nullable column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if you can't get nullable information, then default to nullable column.
Schema should be populated in accordance with import query |
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
update the PR to populated the schema accordance with import query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please provide list of select statements this fix has been tested with?
@@ -84,10 +84,6 @@ public void validate(FailureCollector collector) { | |||
if (!containsMacro(NAME_SCHEMA) && schema != null) { | |||
SpannerUtil.validateSchema(schema, SUPPORTED_TYPES, collector); | |||
} | |||
if (!containsMacro(NAME_SCHEMA) && schema == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will undo the changes for https://cdap.atlassian.net/browse/PLUGIN-251
Correct fix is to do something similar to bigquery source:
https://github.com/data-integrations/google-cloud/blob/develop/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java#L194
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Statement getTableSchemaStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build(); | ||
Map<String, String> columnNameMap = new HashMap<>(); | ||
// get columns from import query when query does not contain the '*' or 'case' | ||
if (config.importQuery != null && !config.importQuery.contains("*") && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for null and empty query
@@ -241,11 +255,40 @@ private Schema getSchema(FailureCollector collector) { | |||
projectId)) { | |||
DatabaseClient databaseClient = | |||
spanner.getDatabaseClient(DatabaseId.of(projectId, config.instance, config.database)); | |||
Statement getTableSchemaStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build(); | |||
Map<String, String> columnNameMap = new HashMap<>(); | |||
// get columns from import query when query does not contain the '*' or 'case' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is case
used for?
} | ||
} | ||
} | ||
Statement getTableSchemaStatement = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no need to initialize this as null
src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java
Outdated
Show resolved
Hide resolved
try (ResultSet resultSet = databaseClient.singleUse().executeQuery(getTableSchemaStatement)) { | ||
List<Schema.Field> schemaFields = new ArrayList<>(); | ||
while (resultSet.next()) { | ||
String columnName = resultSet.getString("column_name"); | ||
// remap column name to alias | ||
if (!columnNameMap.isEmpty() && columnNameMap.containsKey(columnName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is !columnNameMap.isEmpty()
check needed? Why not just do columnNameMap.containsKey(columnName)
?
// check for column aliases | ||
if (column.toLowerCase().contains(" as ")) { | ||
String[] columnNameAndAlias = column.split(COLUMN_ALIAS_SPLIT_PATTERN); | ||
columnNameMap.put(columnNameAndAlias[0].trim(), columnNameAndAlias[1].trim()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check for the size of columnNameAndAlias
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
Output schema null and for import query:
|
What is the behavior in case of following queries (in case of invalid queries)?
|
also please test with queries like:
|
This should also have an integration test |
|
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
if (StringUtils.containsIgnoreCase(importQuery, LIMIT)) { | ||
int total = StringUtils.lastIndexOf(importQuery, LIMIT); | ||
String substringToReplace = importQuery.substring(total); | ||
query = importQuery.replace(substringToReplace, "limit 1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can break in some corner cases where the table name contains "limit" substring. For example something like SELECT <columns> from limited
.
Can you make sure we only replace "limit < number >" with "limit 1".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. ℹ️ Googlers: Go here for more info. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, please address minor comment and squash commits.
@@ -272,6 +309,20 @@ private Schema getSchema(FailureCollector collector) { | |||
|
|||
} | |||
|
|||
private Statement getStatementForOneRow(String importQuery) { | |||
String query; | |||
String regex = "^(?:[^;']|(?:'[^']+'))+ LIMIT +\\d+(.*)"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment that explains what this regex matches.
2b7396a
to
0f66e6e
Compare
Done |
added: Output schema to match fields from import query
Jira Ticket: https://cdap.atlassian.net/browse/PLUGIN-296