From dc3bc136072964098755e3bf0eeecc8494d455c0 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 26 Sep 2024 10:44:00 -0700 Subject: [PATCH] ignore void transform partitions from current partition field list (#606) * ignore void transform partitions from current partition field list * use correct method calls to get partition schema * test tableDTO instead of tableDto * Add safe-guard to omit void partitions * revert fast property changes * Delete omitVoidPartitionFields from Config interface and impl * revert last changes * use equalsIgnoreCase and remove extra lines --------- Co-authored-by: Gabriel Igliozzi --- .../HiveConnectorInfoConverter.java | 2 +- .../hive/converters/HiveTypeConverter.java | 9 +- .../HiveConnectorInfoConvertorSpec.groovy | 4 +- .../converters/HiveTypeConverterSpec.groovy | 44 +++++++ ...-7b1a-4a6f-aec0-03d3c851088c.metadata.json | 117 ++++++++++++++++++ .../netflix/metacat/MetacatSmokeSpec.groovy | 45 +++++++ 6 files changed, 215 insertions(+), 6 deletions(-) create mode 100644 metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConverter.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConverter.java index 1b2f02b0f..8a897056e 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConverter.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConverter.java @@ -188,7 +188,7 @@ public TableInfo fromIcebergTableToTableInfo(final QualifiedName name, final TableInfo tableInfo) { final org.apache.iceberg.Table table = tableWrapper.getTable(); final List allFields = - this.hiveTypeConverter.icebergeSchemaTofieldDtos(table.schema(), table.spec().fields()); + this.hiveTypeConverter.icebergSchemaTofieldDtos(table.schema(), table.spec().fields()); final Map tableParameters = new HashMap<>(); tableParameters.put(DirectSqlTable.PARAM_TABLE_TYPE, DirectSqlTable.ICEBERG_TABLE_TYPE); tableParameters.put(DirectSqlTable.PARAM_METADATA_LOCATION, tableLoc); diff --git a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java index 625a0990e..57cda30c2 100644 --- a/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java +++ b/metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java @@ -108,12 +108,13 @@ public Type toMetacatType(final String type) { * @param partitionFields partitioned fields * @return list of field Info */ - public List icebergeSchemaTofieldDtos(final Schema schema, + public List icebergSchemaTofieldDtos(final Schema schema, final List partitionFields) { final List fields = Lists.newArrayList(); - final List partitionNames = - partitionFields.stream() - .map(f -> schema.findField(f.sourceId()).name()).collect(Collectors.toList()); + final List partitionNames = partitionFields.stream() + .filter(f -> f.transform() != null && !f.transform().toString().equalsIgnoreCase("void")) + .map(f -> schema.findField(f.sourceId()).name()) + .collect(Collectors.toList()); for (Types.NestedField field : schema.columns()) { final FieldInfo fieldInfo = new FieldInfo(); diff --git a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConvertorSpec.groovy b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConvertorSpec.groovy index 0a611c28f..6eff0374d 100644 --- a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConvertorSpec.groovy +++ b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConvertorSpec.groovy @@ -19,6 +19,7 @@ package com.netflix.metacat.connector.hive.converters import org.apache.iceberg.PartitionField import org.apache.iceberg.PartitionSpec import org.apache.iceberg.Schema +import org.apache.iceberg.transforms.Identity import org.apache.iceberg.types.Type import org.apache.iceberg.types.Types import com.netflix.metacat.common.QualifiedName @@ -58,7 +59,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{ def setup() { // Stub this to always return true config.isEpochInSeconds() >> true - converter = new HiveConnectorInfoConverter( new HiveTypeConverter()) + converter = new HiveConnectorInfoConverter(new HiveTypeConverter()) } def 'test date to epoch seconds'() { @@ -512,6 +513,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{ def tableInfo = converter.fromIcebergTableToTableInfo(QualifiedName.ofTable('c', 'd', 't'), icebergTableWrapper, "/tmp/test", TableInfo.builder().build() ) then: + 2 * field.transform() >> Mock(Identity) 1 * icebergTable.properties() >> ["test":"abd"] 2 * icebergTable.spec() >> partSpec 1 * partSpec.fields() >> [ field] diff --git a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy index 9a8282c3d..e916d6cc2 100644 --- a/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy +++ b/metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy @@ -13,6 +13,11 @@ package com.netflix.metacat.connector.hive.converters +import org.apache.iceberg.PartitionField +import org.apache.iceberg.transforms.Identity +import org.apache.iceberg.transforms.VoidTransform +import org.apache.iceberg.Schema +import org.apache.iceberg.types.Types import spock.lang.Shared import spock.lang.Specification import spock.lang.Unroll @@ -255,4 +260,43 @@ class HiveTypeConverterSpec extends Specification { "array,source:string>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}""" "array,source:string>>>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"Date","type":"string"},{"name":"nestedArray","type":{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}}]}}""" } + + def "Test treat void transforms partitions as non-partition field"() { + given: + // Initial schema with three fields + def initialSchema = new Schema( + Types.NestedField.optional(1, "field1", Types.BooleanType.get(), "added 1st - partition key"), + Types.NestedField.optional(2, "field2", Types.StringType.get(), "added 2nd"), + Types.NestedField.optional(3, "field3", Types.IntegerType.get(), "added 3rd") + ) + + // Initial partition fields + def initialPartitionFields = [ + new PartitionField(1, 1, "field1", new Identity()), + new PartitionField(2, 2, "field2", new VoidTransform()), + ] + + when: + def fieldDtos = this.converter.icebergSchemaTofieldDtos(initialSchema, initialPartitionFields) + + then: + fieldDtos.size() == 3 + + // Validate the first field + def field1 = fieldDtos.find { it.name == "field1" } + field1 != null + field1.partitionKey == true + + // Validate the second field + def field2 = fieldDtos.find { it.name == "field2" } + field2 != null + field2.partitionKey == false + + // Validate the third field + def field3 = fieldDtos.find { it.name == "field3" } + field3 != null + field3.partitionKey == false + + noExceptionThrown() + } } diff --git a/metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json b/metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json new file mode 100644 index 000000000..adccffd1b --- /dev/null +++ b/metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json @@ -0,0 +1,117 @@ +{ + "format-version" : 1, + "table-uuid" : "77ea2333-acd1-4d8b-9870-b3dcece00c87", + "location" : "file:/tmp/dat", + "last-updated-ms" : 1725487921787, + "last-column-id" : 4, + "schema" : { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "field1", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "field2", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "field3", + "required" : false, + "type" : "long" + }, { + "id" : 4, + "name" : "field4", + "required" : false, + "type" : "int" + } ] + }, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "field1", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "field2", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "field3", + "required" : false, + "type" : "long" + }, { + "id" : 4, + "name" : "field4", + "required" : false, + "type" : "int" + } ] + } ], + "partition-spec" : [ { + "name" : "field1", + "transform" : "identity", + "source-id" : 1, + "field-id" : 1000 + }, { + "name" : "field2", + "transform" : "void", + "source-id" : 2, + "field-id" : 1001 + } ], + "default-spec-id" : 1, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ { + "name" : "field1", + "transform" : "identity", + "source-id" : 1, + "field-id" : 1000 + }, { + "name" : "field2", + "transform" : "identity", + "source-id" : 2, + "field-id" : 1001 + } ] + }, { + "spec-id" : 1, + "fields" : [ { + "name" : "field1", + "transform" : "identity", + "source-id" : 1, + "field-id" : 1000 + }, { + "name" : "field2", + "transform" : "void", + "source-id" : 2, + "field-id" : 1001 + } ] + } ], + "last-partition-id" : 1001, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "owner", + "acls" : "[{\"format_version\":1,\"principals\":[{\"name\":\"123\",\"principal_type\":\"USER\"}],\"resources\":[{\"resource_type\":\"TABLE\",\"uuid\":\"77ea2333-acd1-4d8b-9870-b3dcece00c87\",\"parent\":{\"resource_type\":\"SCHEMA\",\"name\":\"owner\",\"parent\":{\"resource_type\":\"CATALOG\",\"name\":\"prodhive\",\"parent\":null}}}],\"privileges\":[\"ALL\"],\"grantee\":{\"name\":\"123\",\"principal_type\":\"USER\"},\"with_grant\":true},{\"format_version\":1,\"principals\":[{\"name\":\"123\",\"principal_type\":\"USER\"}],\"resources\":[{\"resource_type\":\"TABLE\",\"uuid\":\"77ea2333-acd1-4d8b-9870-b3dcece00c87\",\"parent\":{\"resource_type\":\"SCHEMA\",\"name\":\"owner\",\"parent\":{\"resource_type\":\"CATALOG\",\"name\":\"prodhive\",\"parent\":null}}}],\"privileges\":[\"ALL\"],\"grantee\":{\"name\":\"123\",\"principal_type\":\"USER\"},\"with_grant\":false}]", + "field.metadata.json" : "{\"1\":{},\"2\":{},\"3\":{},\"4\":{}}" + }, + "current-snapshot-id" : -1, + "refs" : { }, + "snapshots" : [ ], + "statistics" : [ ], + "snapshot-log" : [ ], + "metadata-log" : [ { + "timestamp-ms" : 1725487921770, + "metadata-file" : "file:/tmp/data/metadata/00001-abf48887-aa4f-4bcc-9219-1e1721314ee1.metadata.json" + } ] +} diff --git a/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy b/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy index c7193abd6..b7873a914 100644 --- a/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy +++ b/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy @@ -41,6 +41,7 @@ import feign.RetryableException import feign.Retryer import groovy.sql.Sql import org.apache.commons.io.FileUtils +import org.apache.iceberg.PartitionField import org.joda.time.Instant import org.skyscreamer.jsonassert.JSONAssert import spock.lang.Ignore @@ -831,6 +832,50 @@ class MetacatSmokeSpec extends Specification { api.deleteTable(catalogName, databaseName, tableName) } + @Unroll + def "Test ignore void transform as partition fields"() { + given: + def catalogName = 'embedded-fast-hive-metastore' + def databaseName = 'iceberg_db' + def tableName = 'iceberg_table_6' + def uri = isLocalEnv ? String.format('file:/tmp/data/') : null + def tableDto = new TableDto( + name: QualifiedName.ofTable(catalogName, databaseName, tableName), + serde: new StorageDto( + owner: 'metacat-test', + inputFormat: 'org.apache.hadoop.mapred.TextInputFormat', + outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', + serializationLib: 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + parameters: [ + 'serialization.format': '1' + ], + uri: uri + ), + definitionMetadata: null, + dataMetadata: null, + fields: null + ) + + def metadataLocation = String.format('/tmp/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json') + def metadata = [table_type: 'ICEBERG', metadata_location: metadataLocation] + tableDto.setMetadata(metadata) + + when: + try {api.createDatabase(catalogName, databaseName, new DatabaseCreateRequestDto()) + } catch (Exception ignored) { + } + api.createTable(catalogName, databaseName, tableName, tableDto) + def tableDTO = api.getTable(catalogName, databaseName, tableName, true, true, true) + + then: + tableDTO.getFields().size() == 4 + tableDTO.getPartition_keys().size() == 1 + tableDTO.getPartition_keys()[0] == "field1" + + cleanup: + api.deleteTable(catalogName, databaseName, tableName) + } + @Unroll def "Test get partitions from iceberg table using #filter"() { def catalogName = 'embedded-fast-hive-metastore'