Skip to content

Commit

Permalink
another query test
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Jan 21, 2025
1 parent aec91f1 commit 31c0d78
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -130,5 +132,15 @@ public void testWriteToPartitionedAndValidateWithBQQuery()
.collect(Collectors.toList());

assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));

String queryByPartition =
String.format("SELECT bool, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId());
rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true);
RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime"));
beamRows =
rows.stream()
.map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr))
.collect(Collectors.toList());
assertThat(beamRows, containsInAnyOrder(inputRows.stream().map(rowFilter::filter).toArray()));
}
}

0 comments on commit 31c0d78

Please sign in to comment.