Skip to content

Commit

Permalink
extract glue-specific createNamespace logic to its own method
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Feb 14, 2025
1 parent b77e2fc commit defa908
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class S3DataLakeChecker(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
)
s3DataLakeUtil.createNamespaceWithGlueHandling(testTableIdentifier, catalog)
val table =
s3DataLakeUtil.createTable(
testTableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class S3DataLakeStreamLoader(
override suspend fun start() {
val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
s3DataLakeUtil.createNamespaceWithGlueHandling(stream.descriptor, catalog)
table =
s3DataLakeUtil.createTable(
streamDescriptor = stream.descriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,8 @@ class S3DataLakeUtil(
return CatalogUtil.buildIcebergCatalog(catalogName, properties, Configuration())
}

/**
* Builds (if necessary) an Iceberg [Table]. This includes creating the table's namespace if it
* does not already exist. If the [Table] already exists, it is loaded from the [Catalog].
*
* @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's
* namespace and name.
* @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once
* created.
* @param schema The Iceberg [Schema] associated with the [Table].
* @param properties The [Table] configuration properties derived from the [Catalog].
* @return The Iceberg [Table], created if it does not yet exist.
*/
fun createTable(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog,
schema: Schema,
properties: Map<String, String>
): Table {
/** Create the namespace if it doesn't already exist. */
fun createNamespace(streamDescriptor: DestinationStream.Descriptor, catalog: Catalog) {
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
synchronized(tableIdentifier.namespace()) {
if (
Expand All @@ -145,15 +129,46 @@ class S3DataLakeUtil(
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
} catch (e: ConcurrentModificationException) {
// do the same for AWS Glue
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
}
}
}
}

fun createNamespaceWithGlueHandling(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog
) {
try {
createNamespace(streamDescriptor, catalog)
} catch (e: ConcurrentModificationException) {
// glue catalog throws its own special exception
logger.info {
"Namespace '${streamDescriptor.namespace}' was likely created by another thread during parallel operations."
}
}
}

/**
* Builds (if necessary) an Iceberg [Table]. If the [Table] already exists, it is loaded from
* the [Catalog].
*
* Assumes the namespace already exists. Use [createNamespace] if this is not guaranteed.
*
* @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's
* namespace and name.
* @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once
* created.
* @param schema The Iceberg [Schema] associated with the [Table].
* @param properties The [Table] configuration properties derived from the [Catalog].
* @return The Iceberg [Table], created if it does not yet exist.
*/
fun createTable(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog,
schema: Schema,
properties: Map<String, String>
): Table {
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
return if (!catalog.tableExists(tableIdentifier)) {
logger.info { "Creating Iceberg table '$tableIdentifier'...." }
catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ internal class S3DataLakeStreamLoaderTest {
every { table.manageSnapshots().createBranch(any()).commit() } just runs
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createNamespaceWithGlueHandling(any(), any()) } just runs
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
every { toIcebergSchema(any(), any<MapperPipeline>()) } answers
Expand Down Expand Up @@ -222,6 +223,7 @@ internal class S3DataLakeStreamLoaderTest {
every { table.newScan().planFiles() } returns CloseableIterable.empty()
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createNamespaceWithGlueHandling(any(), any()) } just runs
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
every { toIcebergSchema(any(), any<MapperPipeline>()) } answers
Expand Down Expand Up @@ -370,6 +372,7 @@ internal class S3DataLakeStreamLoaderTest {
every { table.newScan().planFiles() } returns CloseableIterable.empty()
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createNamespaceWithGlueHandling(any(), any()) } just runs
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
every { toIcebergSchema(any(), any<MapperPipeline>()) } answers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ internal class S3DataLakeUtilTest {
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
false
}
s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = streamDescriptor,
Expand Down Expand Up @@ -134,6 +135,7 @@ internal class S3DataLakeUtilTest {
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
false
}
s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = streamDescriptor,
Expand Down Expand Up @@ -161,6 +163,7 @@ internal class S3DataLakeUtilTest {
every { namespaceExists(any()) } returns true
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true
}
s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = streamDescriptor,
Expand Down

0 comments on commit defa908

Please sign in to comment.