Skip to content

Commit

Permalink
Bump Beam to 2.30.0, update transitives (#3880)
Browse files Browse the repository at this point in the history
* Bump Beam to 2.30.0, update transitives

* add confluent resolver

* fix AppliedPTransform ResourceHints breakage

* exclude kafka globally

* fix coder breakage

* fix sparkey breakage

* scalafix

* fix non-unique test pipeline names
  • Loading branch information
nevillelyh authored Jun 22, 2021
1 parent 16e3538 commit 5e471d7
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 26 deletions.
21 changes: 12 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ val autoServiceVersion = "1.0"
val autoValueVersion = "1.8.1"
val avroVersion = "1.8.2"
val beamVendorVersion = "0.1"
val beamVersion = "2.29.0"
val beamVersion = "2.30.0"
val bigdataossVersion = "2.1.6"
val bigQueryStorageVersion = "1.12.0"
val bigtableClientVersion = "1.16.0"
val bigQueryStorageVersion = "1.18.1"
val bigtableClientVersion = "1.19.1"
val breezeVersion = "1.2"
val caffeineVersion = "2.9.1"
val caseappVersion = "2.0.6"
Expand All @@ -58,16 +58,16 @@ val gcsVersion = "1.8.0"
val generatedGrpcBetaVersion = "1.19.2"
val generatedDatastoreProtoVersion = "0.88.5"
val googleClientsVersion = "1.31.1"
val googleApiServicesBigQueryVersion = s"v2-rev20210219-1.31.0"
val googleApiServicesDataflowVersion = s"v1b3-rev20210217-1.31.0"
val googleApiServicesPubsubVersion = s"v1-rev20210208-1.31.0"
val googleApiServicesBigQueryVersion = s"v2-rev20210410-1.31.0"
val googleApiServicesDataflowVersion = s"v1b3-rev20210408-1.31.0"
val googleApiServicesPubsubVersion = s"v1-rev20210322-1.31.0"
val googleApiServicesStorageVersion = s"v1-rev20210127-1.31.0"
val googleAuthVersion = "0.22.2"
val googleCloudCoreVersion = "1.94.0"
val googleCloudSpannerVersion = "3.2.1"
val googleHttpClientsVersion = "1.38.1"
val googleOauthClientVersion = "1.31.2"
val grpcVersion = "1.35.0"
val grpcVersion = "1.37.0"
val guavaVersion = "30.1-jre"
val hadoopVersion = "2.10.1"
val hamcrestVersion = "2.2"
Expand All @@ -87,7 +87,7 @@ val magnoliaVersion = "0.17.0"
val magnolifyVersion = "0.4.3"
val metricsVersion = "3.2.6"
val nettyVersion = "4.1.52.Final"
val nettyTcNativeVersion = "2.0.33.Final"
val nettyTcNativeVersion = "2.0.34.Final"
val opencensusVersion = "0.28.0"
val parquetExtraVersion = "0.4.0"
val parquetVersion = "1.12.0"
Expand Down Expand Up @@ -149,7 +149,10 @@ val commonSettings = Def
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:unchecked"),
Compile / doc / javacOptions := Seq("-source", "1.8"),
// protobuf-lite is an older subset of protobuf-java and causes issues
excludeDependencies += "com.google.protobuf" % "protobuf-lite",
excludeDependencies ++= Seq(
"com.google.protobuf" % "protobuf-lite",
"org.apache.beam" % "beam-sdks-java-io-kafka"
),
resolvers += Resolver.sonatypeRepo("public"),
Test / javaOptions += "-Dscio.ignoreVersionWarning=true",
Test / testOptions += Tests.Argument("-oD"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private[scio] object BeamCoders {

/** Get key-value coders from a `SideInput[Map[K, Iterable[V]]]`. */
def getMultiMapKV[K, V](si: SideInput[Map[K, Iterable[V]]]): (Coder[K], Coder[V]) = {
val coder = si.view.getPCollection.getCoder.asInstanceOf[beam.KvCoder[_, _]].getValueCoder
val coder = si.view.getPCollection.getCoder
val (k, v) = unwrap(coder) match {
// Beam's `View.asMultiMap`
case (c: beam.KvCoder[K, V] @unchecked) => (c.getKeyCoder, c.getValueCoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.runners.dataflow.{DataflowClient, DataflowPipelineJob}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.runners.AppliedPTransform
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PInput, POutput}
import org.apache.beam.sdk.{Pipeline, PipelineResult}
Expand Down Expand Up @@ -148,6 +149,7 @@ object DataflowResult {
Collections.emptyMap(),
Collections.emptyMap(),
new EmptyPTransform,
ResourceHints.create(),
new EmptyPipeline
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.spotify.scio.testing._
import com.spotify.scio.util._
import com.spotify.sparkey._
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.values.KV
import org.apache.commons.io.FileUtils

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -567,8 +566,7 @@ class SparkeyTest extends PipelineSpec {
.tap(sparkeyMaterialized)
.value
.next
.asInstanceOf[KV[Any, SparkeyUri]]
.getValue
.asInstanceOf[SparkeyUri]
.basePath
FileUtils.deleteDirectory(new File(basePath))
}
Expand Down Expand Up @@ -600,8 +598,7 @@ class SparkeyTest extends PipelineSpec {
.tap(sparkeyMaterialized)
.value
.next
.asInstanceOf[KV[Any, SparkeyUri]]
.getValue
.asInstanceOf[SparkeyUri]
.basePath
FileUtils.deleteDirectory(new File(basePath))
}
Expand Down Expand Up @@ -632,8 +629,7 @@ class SparkeyTest extends PipelineSpec {
.tap(sparkeyMaterialized)
.value
.next
.asInstanceOf[KV[Any, SparkeyUri]]
.getValue
.asInstanceOf[SparkeyUri]
.basePath
FileUtils.deleteDirectory(new File(basePath))
}
Expand Down Expand Up @@ -664,8 +660,7 @@ class SparkeyTest extends PipelineSpec {
.tap(sparkeyMaterialized)
.value
.next
.asInstanceOf[KV[Any, SparkeyUri]]
.getValue
.asInstanceOf[SparkeyUri]
.basePath
FileUtils.deleteDirectory(new File(basePath))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -156,7 +157,9 @@ public void testCleansUpTempFiles() throws Exception {
1,
0);

pipeline.apply(Create.of(Stream.of(input).collect(Collectors.toList()))).apply(sink);
pipeline
.apply("CleansUpTempFiles", Create.of(Stream.of(input).collect(Collectors.toList())))
.apply(sink);
pipeline.run().waitUntilFinish();

// Assert that no files are left in the temp directory
Expand All @@ -173,7 +176,7 @@ public void testCustomFilenamePrefix() throws Exception {
new SortedBucketSink<>(
metadata, outputDirectory, fromFolder(temp), ".txt", new TestFileOperations(), 1);

pipeline.apply(Create.empty(StringUtf8Coder.of())).apply(sink);
pipeline.apply("CustomFilenamePrefix", Create.empty(StringUtf8Coder.of())).apply(sink);
pipeline.run().waitUntilFinish();

final MatchResult outputFiles =
Expand All @@ -198,7 +201,7 @@ public void testWritesEmptyBucketFiles() throws Exception {
new SortedBucketSink<>(
metadata, outputDirectory, fromFolder(temp), ".txt", new TestFileOperations(), 1);

pipeline.apply(Create.empty(StringUtf8Coder.of())).apply(sink);
pipeline.apply("WritesEmptyBucketFiles", Create.empty(StringUtf8Coder.of())).apply(sink);
pipeline.run().waitUntilFinish();

final FileAssignment dstFiles =
Expand Down Expand Up @@ -233,7 +236,11 @@ public void testWritesNoFilesIfPriorStepsFail() throws Exception {
new ExceptionThrowingFileOperations(),
1);

pipeline.apply(Create.of(Stream.of(input).collect(Collectors.toList()))).apply(sink);
pipeline
.apply(
"WritesNoFilesIfPriorStepsFail",
Create.of(Stream.of(input).collect(Collectors.toList())))
.apply(sink);

try {
pipeline.run();
Expand Down Expand Up @@ -263,7 +270,9 @@ private void test(int numBuckets, int numShards, boolean useKeyCache) throws Exc

check(
pipeline
.apply(Create.of(Stream.of(input).collect(Collectors.toList())))
.apply(
"test-" + UUID.randomUUID(),
Create.of(Stream.of(input).collect(Collectors.toList())))
.apply(reshuffle)
.apply(sink),
metadata,
Expand Down Expand Up @@ -298,6 +307,7 @@ private void testKeyedCollection(int numBuckets, int numShards, boolean useKeyCa
check(
pipeline
.apply(
"test-keyed-collection-" + UUID.randomUUID(),
Create.of(keyedInput)
.withCoder(
KvCoder.of(NullableCoder.of(StringUtf8Coder.of()), StringUtf8Coder.of())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
Expand All @@ -42,11 +43,9 @@
import org.apache.beam.sdk.extensions.smb.FileOperations.Writer;
import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy.FileAssignment;
import org.apache.beam.sdk.extensions.smb.SortedBucketSource.Predicate;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.metrics.DistributionResult;
Expand Down Expand Up @@ -276,6 +275,7 @@ private void testSingleSourceGbk(Predicate<String> predicate) throws Exception {

PCollection<KV<String, CoGbkResult>> output =
pipeline.apply(
"SingleSourceGbk-" + UUID.randomUUID(),
Read.from(
new SortedBucketSource<>(String.class, Collections.singletonList(bucketedInput))));

Expand Down Expand Up @@ -704,6 +704,7 @@ private static void checkJoin(

PCollection<KV<String, CoGbkResult>> output =
pipeline.apply(
"CheckJoin-" + UUID.randomUUID(),
Read.from(new SortedBucketSource<>(String.class, inputs, targetParallelism)));

Function<String, String> extractKeyFn = TestBucketMetadata.of(2, 1)::extractKey;
Expand Down

0 comments on commit 5e471d7

Please sign in to comment.