Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add BQMS test that validates using a BQ query #33625

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
"modification": 1
}
2 changes: 2 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
@@ -84,6 +84,8 @@ dependencies {

// BigQueryMetastore catalog dep
testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.google_api_services_bigquery

testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Original file line number Diff line number Diff line change
@@ -17,19 +17,49 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();;
static final long SALT = System.nanoTime();

@BeforeClass
public static void createDataset() throws IOException, InterruptedException {
BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
}

@AfterClass
public static void deleteDataset() {
BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
}

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
@@ -41,7 +71,7 @@ public Catalog createCatalog() {
BQMS_CATALOG,
"bqms_" + catalogName,
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.build(),
@@ -65,12 +95,53 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
.put(
"catalog_properties",
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.build())
.build();
}

@Test
public void testWriteToPartitionedAndValidateWithBQQuery()
throws IOException, InterruptedException {
// For an example row where bool=true, modulo_5=3, str=value_303,
// this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
.identity("bool")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);

// Write with Beam
Map<String, Object> config = managedIcebergConfig(tableId());
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

// Fetch records using a BigQuery query and validate
BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId());
List<TableRow> rows = bqClient.queryUnflattened(query, OPTIONS.getProject(), true, true);
List<Row> beamRows =
rows.stream()
.map(tr -> BigQueryUtils.toBeamRow(BEAM_SCHEMA, tr))
.collect(Collectors.toList());

assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));

String queryByPartition =
String.format("SELECT bool, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId());
rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true);
RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime"));
beamRows =
rows.stream()
.map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr))
.collect(Collectors.toList());
assertThat(beamRows, containsInAnyOrder(inputRows.stream().map(rowFilter::filter).toArray()));
}
}
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ public Integer numRecords() {
@Override
public Catalog createCatalog() {
Configuration catalogHadoopConf = new Configuration();
catalogHadoopConf.set("fs.gs.project.id", options.getProject());
catalogHadoopConf.set("fs.gs.project.id", OPTIONS.getProject());
catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");

HadoopCatalog catalog = new HadoopCatalog();
Original file line number Diff line number Diff line change
@@ -147,7 +147,12 @@ public static String warehouse(Class<? extends IcebergCatalogBaseIT> testClass)

@Before
public void setUp() throws Exception {
options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
warehouse =
String.format(
"%s/%s/%s",
TestPipeline.testingPipelineOptions().getTempLocation(),
getClass().getSimpleName(),
RANDOM);
warehouse = warehouse(getClass());
catalogSetup();
catalog = createCatalog();
@@ -162,7 +167,7 @@ public void cleanUp() throws Exception {
}

try {
GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
GcsUtil gcsUtil = OPTIONS.as(GcsOptions.class).getGcsUtil();
GcsPath path = GcsPath.fromUri(warehouse);

@Nullable
@@ -190,7 +195,8 @@ public void cleanUp() throws Exception {

protected static String warehouse;
public Catalog catalog;
protected static GcpOptions options;
protected static final GcpOptions OPTIONS =
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
private static final String RANDOM = UUID.randomUUID().toString();
@Rule public TestPipeline pipeline = TestPipeline.create();
@Rule public TestName testName = new TestName();
@@ -210,7 +216,7 @@ public void cleanUp() throws Exception {
.addInt32Field("nested_int")
.addFloatField("nested_float")
.build();
private static final Schema BEAM_SCHEMA =
protected static final Schema BEAM_SCHEMA =
Schema.builder()
.addStringField("str")
.addStringField("char")
@@ -262,16 +268,16 @@ public Row apply(Long num) {
}
};

private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
private static final SimpleFunction<Row, Record> RECORD_FUNC =
protected static final SimpleFunction<Row, Record> RECORD_FUNC =
new SimpleFunction<Row, Record>() {
@Override
public Record apply(Row input) {
return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
}
};
private final List<Row> inputRows =
protected final List<Row> inputRows =
LongStream.range(0, numRecords()).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());

/** Populates the Iceberg table and Returns a {@link List<Row>} of expected elements. */