From 9bd1a1dd2c08ada9d17fa241f17021758943a34d Mon Sep 17 00:00:00 2001 From: Davis Zhang Date: Fri, 17 Jan 2025 15:36:06 -0800 Subject: [PATCH] Make sure table version is set when creating metaclient --- .../apache/hudi/HoodieSparkSqlWriter.scala | 6 +- .../apache/hudi/util/SparkConfigUtils.scala | 4 +- .../catalyst/catalog/HoodieCatalogTable.scala | 4 +- .../hudi/TestHoodieSparkSqlWriter.scala | 44 +++++++++++++-- .../TestWriteTableVersionConfig.scala | 56 +++++++++++++++++++ 5 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index c84d95abff90..c6a9559b2c84 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -49,7 +49,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} -import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION} +import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} import org.apache.hudi.hadoop.fs.HadoopFSUtils @@ -66,6 +66,7 @@ import org.apache.hudi.storage.HoodieStorage import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException +import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys import org.apache.hudi.util.SparkKeyGenUtils import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty @@ -302,6 +303,7 @@ class HoodieSparkSqlWriterInternal { else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig) HoodieTableMetaClient.newTableBuilder() .setTableType(tableType) + .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters, HoodieWriteConfig.WRITE_TABLE_VERSION))) .setDatabaseName(databaseName) .setTableName(tblName) .setBaseFileFormat(baseFileFormat) @@ -748,7 +750,7 @@ class HoodieSparkSqlWriterInternal { .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) .setRecordKeyFields(recordKeyFields) - .setTableVersion(hoodieConfig.getIntOrDefault(WRITE_TABLE_VERSION)) + .setTableVersion(Integer.valueOf(getStringWithAltKeys(parameters, HoodieWriteConfig.WRITE_TABLE_VERSION))) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(payloadClass) .setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE))) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala index 88f223284ddc..107a3968c82e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala @@ -28,11 +28,11 @@ object SparkConfigUtils { * the alternate config keys for the specified key as well. * * @param props Configs in scala map - * @param configProperty {@link ConfigProperty} config of String type to fetch. + * @param configProperty {@link ConfigProperty} config of type T to fetch. * @return String value if the config exists; default String value if the config does not exist * and there is default value defined in the {@link ConfigProperty} config; {@code null} otherwise. */ - def getStringWithAltKeys(props: Map[String, String], configProperty: ConfigProperty[String]): String = { + def getStringWithAltKeys[T](props: Map[String, String], configProperty: ConfigProperty[T]): String = { ConfigUtils.getStringWithAltKeys(JFunction.toJavaFunction[String, Object](key => props.getOrElse(key, null)), configProperty) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 1a23356381fd..dc819560e36a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -28,12 +28,13 @@ import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING import org.apache.hudi.common.table.timeline.TimelineUtils import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.keygen.constant.{KeyGeneratorOptions, KeyGeneratorType} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.storage.HoodieStorageUtils import org.apache.hudi.util.SparkConfigUtils - +import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.avro.SchemaConverters @@ -225,6 +226,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten HoodieTableMetaClient.newTableBuilder() .fromProperties(properties) + .setTableVersion(Integer.valueOf(getStringWithAltKeys(tableConfigs, HoodieWriteConfig.WRITE_TABLE_VERSION))) .setDatabaseName(catalogDatabaseName) .setTableName(table.identifier.table) .setTableCreateSchema(schema.toString()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 9cfc01ae6b2a..b020598aa245 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -237,6 +237,30 @@ class TestHoodieSparkSqlWriter extends HoodieSparkWriterTestBase { assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier2, dataFrame2)._1) } + /** + * Test case for do not let the parttitonpath field change + */ + @Test + def testChangeWriteTableVersion(): Unit = { + Seq(6, 8).foreach { tableVersion => + val tempPath = s"$tempBasePath/${tableVersion}" + val tableModifier1 = Map( + "path" -> tempPath, + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.write.table.version" -> s"$tableVersion", + "hoodie.datasource.write.recordkey.field" -> "uuid", + "hoodie.datasource.write.partitionpath.field" -> "ts" + ) + val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) + + // Make sure table version is adopted. + val metaClient = HoodieTableMetaClient.builder().setBasePath(tempPath) + .setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build() + assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), tableVersion) + } + } + /** * Test case for each bulk insert sort mode * @@ -518,13 +542,14 @@ def testBulkInsertForDropPartitionColumn(): Unit = { /** * Test cases for HoodieSparkSqlWriter functionality with datasource bootstrap - * for different type of tables. + * for different type of tables and table versions. * * @param tableType Type of table + * @param tableVersion Version of table */ @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testWithDatasourceBootstrapForTableType(tableType: String): Unit = { + @MethodSource(Array("bootstrapTestParams")) + def testWithDatasourceBootstrapForTableType(tableType: String, tableVersion: Int): Unit = { val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path") try { val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc, @@ -542,6 +567,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = { DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName, DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getCanonicalName, + "hoodie.write.table.version" -> tableVersion.toString, HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false") val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false) @@ -564,6 +590,9 @@ def testBulkInsertForDropPartitionColumn(): Unit = { assertFalse(ignoreResult) verify(client, times(2)).close() + // Assert the table version is adopted. + val metaClient = createMetaClient(spark, tempBasePath) + assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), tableVersion) // fetch all records from parquet files generated from write to hudi val actualDf = sqlContext.read.parquet(tempBasePath) assert(actualDf.count == 100) @@ -1137,7 +1166,6 @@ def testBulkInsertForDropPartitionColumn(): Unit = { assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[NonpartitionedKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}")) } - @Test def testNoKeyGenToSimpleKeyGen(): Unit = { val _spark = spark @@ -1325,4 +1353,12 @@ object TestHoodieSparkSqlWriter { Arguments.arguments("*5/03/1*", Seq("2016/03/15")), Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17"))) } + + def bootstrapTestParams(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + Arguments.arguments("MERGE_ON_READ", Integer.valueOf(8)), + Arguments.arguments("MERGE_ON_READ", Integer.valueOf(6)), + Arguments.arguments("COPY_ON_WRITE", Integer.valueOf(8)) + ) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala new file mode 100644 index 000000000000..515e62ea72ba --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestWriteTableVersionConfig.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals} +import org.apache.hudi.common.table.HoodieTableConfig.VERSION +import org.apache.hudi.hadoop.fs.HadoopFSUtils + +class TestWriteTableVersionConfig extends HoodieSparkSqlTestBase { + + test("Test create table with various write version") { + Seq(6, 8).foreach { tableVersion => + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + spark.sql( + s""" + | create table $tableName ( + | id int, + | ts long, + | dt string + | ) using hudi + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.write.table.version = $tableVersion + | ) + | partitioned by(dt) + | location '$basePath' + """.stripMargin) + val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) + .setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build() + assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(), tableVersion) + } + } + } +}