From b2a82c370aee9d93c5390ff240abe59e9e9c7b2a Mon Sep 17 00:00:00 2001 From: Nahian-Al Hasan Date: Tue, 7 Jan 2025 18:25:45 +1100 Subject: [PATCH 1/6] Update DatastoreWriterFn to output WriteSummary --- .../sdk/io/gcp/datastore/DatastoreV1.java | 187 ++++++++++++++---- 1 file changed, 150 insertions(+), 37 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 1563b0b059f2..88861cbaad16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -35,19 +35,8 @@ import com.google.auth.http.HttpCredentialsAdapter; import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.datastore.v1.CommitRequest; -import com.google.datastore.v1.Entity; -import com.google.datastore.v1.EntityResult; -import com.google.datastore.v1.GqlQuery; -import com.google.datastore.v1.Key; +import com.google.datastore.v1.*; import com.google.datastore.v1.Key.PathElement; -import com.google.datastore.v1.Mutation; -import com.google.datastore.v1.PartitionId; -import com.google.datastore.v1.Query; -import com.google.datastore.v1.QueryResultBatch; -import com.google.datastore.v1.ReadOptions; -import com.google.datastore.v1.RunQueryRequest; -import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.client.Datastore; import com.google.datastore.v1.client.DatastoreException; import com.google.datastore.v1.client.DatastoreFactory; @@ -64,8 +53,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.NoSuchElementException; import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.concurrent.Immutable; import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.ServiceCallMetric; @@ -89,6 +81,7 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; @@ -104,6 +97,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -126,7 +120,7 @@ *

To read a {@link PCollection} from a query to Cloud Datastore, use {@link DatastoreV1#read} * and its methods {@link DatastoreV1.Read#withProjectId} and {@link DatastoreV1.Read#withQuery} to * specify the project to query and the query to read from. You can optionally provide a namespace - * to query within using {@link DatastoreV1.Read#withDatabase} or {@link + * to query within using {@link DatastoreV1.Read#withDatabaseId} or {@link * DatastoreV1.Read#withNamespace}. You could also optionally specify how many splits you want for * the query using {@link DatastoreV1.Read#withNumQuerySplits}. * @@ -1224,6 +1218,51 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + /** + * Summary object produced when a number of writes are successfully written to Datastore in a + * single Mutation. + */ + @Immutable + public static final class WriteSuccessSummary implements Serializable { + private final int numWrites; + private final long numBytes; + + public WriteSuccessSummary(int numWrites, long numBytes) { + this.numWrites = numWrites; + this.numBytes = numBytes; + } + + public int getNumWrites() { + return numWrites; + } + + public long getNumBytes() { + return numBytes; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof WriteSuccessSummary)) { + return false; + } + WriteSuccessSummary that = (WriteSuccessSummary) o; + return numWrites == that.numWrites && numBytes == that.numBytes; + } + + @Override + public int hashCode() { + return Objects.hash(numWrites, numBytes); + } + + @Override + public String toString() { + return "WriteSummary{" + "numWrites=" + numWrites + ", numBytes=" + numBytes + '}'; + } + } + /** * Returns an empty {@link DatastoreV1.Write} builder. Configure the destination {@code projectId} * using {@link DatastoreV1.Write#withProjectId}. @@ -1515,7 +1554,6 @@ public DeleteKey withHintNumWorkers(ValueProvider hintNumWorkers) { * DoFn} provided, as the commits are retried when failures occur. */ private abstract static class Mutate extends PTransform, PDone> { - protected ValueProvider projectId; protected ValueProvider databaseId; protected @Nullable String localhost; @@ -1682,6 +1720,33 @@ public int nextBatchSize(long timeSinceEpochMillis) { private transient MovingAverage meanLatencyPerEntityMs; } + static class DatastoreWriterFn extends BaseDatastoreWriterFn { + + DatastoreWriterFn(String projectId, @Nullable String localhost) { + super(projectId, localhost); + } + + DatastoreWriterFn(ValueProvider projectId, @Nullable String localhost) { + super(projectId, localhost); + } + + @VisibleForTesting + DatastoreWriterFn(ValueProvider projectId, @Nullable String localhost, V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) { + super(projectId, localhost, datastoreFactory, writeBatcher); + } + + @VisibleForTesting + DatastoreWriterFn(ValueProvider projectId, ValueProvider databaseId, @Nullable String localhost, V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) { + super(projectId, databaseId, localhost, datastoreFactory, writeBatcher); + } + + @Override + void handleWriteSummary(ContextAdapter context, Instant timestamp, KV tuple, Runnable logMessage) { + logMessage.run(); + context.output(tuple.getKey(), timestamp, tuple.getValue()); + } + } + /** * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in * batches; see {@link DatastoreV1.WriteBatcherImpl}. @@ -1690,25 +1755,25 @@ public int nextBatchSize(long timeSinceEpochMillis) { * Properties, and Keys for information about entity keys and mutations. * *

Commits are non-transactional. If a commit fails because of a conflict over an entity group, - * the commit will be retried (up to {@link DatastoreV1.DatastoreWriterFn#MAX_RETRIES} times). + * the commit will be retried (up to {@link DatastoreV1.BaseDatastoreWriterFn#MAX_RETRIES} times). * This means that the mutation operation should be idempotent. Thus, the writer should only be * used for {@code upsert} and {@code delete} mutation operations, as these are the only two Cloud * Datastore mutations that are idempotent. */ - @VisibleForTesting - static class DatastoreWriterFn extends DoFn { + abstract static class BaseDatastoreWriterFn extends DoFn{ - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); + private static final Logger LOG = LoggerFactory.getLogger(BaseDatastoreWriterFn.class); private final ValueProvider projectId; private final ValueProvider databaseId; private final @Nullable String localhost; private transient Datastore datastore; private final V1DatastoreFactory datastoreFactory; // Current batch of mutations to be written. - private final List mutations = new ArrayList<>(); + private final List> mutations = new ArrayList<>(); private final HashSet uniqueMutationKeys = new HashSet<>(); private int mutationsSize = 0; // Accumulated size of protos in mutations. private WriteBatcher writeBatcher; + private transient AdaptiveThrottler adaptiveThrottler; private final Counter throttlingMsecs = Metrics.counter(DatastoreWriterFn.class, Metrics.THROTTLE_TIME_COUNTER_NAME); @@ -1729,7 +1794,7 @@ static class DatastoreWriterFn extends DoFn { .withMaxRetries(MAX_RETRIES) .withInitialBackoff(Duration.standardSeconds(5)); - DatastoreWriterFn(String projectId, @Nullable String localhost) { + BaseDatastoreWriterFn(String projectId, @Nullable String localhost) { this( StaticValueProvider.of(projectId), null, @@ -1738,12 +1803,11 @@ static class DatastoreWriterFn extends DoFn { new WriteBatcherImpl()); } - DatastoreWriterFn(ValueProvider projectId, @Nullable String localhost) { + BaseDatastoreWriterFn(ValueProvider projectId, @Nullable String localhost) { this(projectId, null, localhost, new V1DatastoreFactory(), new WriteBatcherImpl()); } - @VisibleForTesting - DatastoreWriterFn( + BaseDatastoreWriterFn( ValueProvider projectId, @Nullable String localhost, V1DatastoreFactory datastoreFactory, @@ -1751,8 +1815,7 @@ static class DatastoreWriterFn extends DoFn { this(projectId, null, localhost, datastoreFactory, writeBatcher); } - @VisibleForTesting - DatastoreWriterFn( + BaseDatastoreWriterFn( ValueProvider projectId, ValueProvider databaseId, @Nullable String localhost, @@ -1765,6 +1828,46 @@ static class DatastoreWriterFn extends DoFn { this.writeBatcher = writeBatcher; } + /** + * Adapter interface which provides a common parent for {@link ProcessContext} and {@link + * FinishBundleContext} so that we are able to use a single common invocation to output from. + */ + interface ContextAdapter { + void output(T t, Instant timestamp, BoundedWindow window); + } + + private static final class ProcessContextAdapter implements DatastoreV1.BaseDatastoreWriterFn.ContextAdapter { + private final DoFn.ProcessContext context; + + private ProcessContextAdapter(DoFn.ProcessContext context) { + this.context = context; + } + + @Override + public void output(T t, Instant timestamp, BoundedWindow window) { + context.outputWithTimestamp(t, timestamp); + } + } + + private static final class FinishBundleContextAdapter implements DatastoreV1.BaseDatastoreWriterFn.ContextAdapter { + private final DoFn.FinishBundleContext context; + + private FinishBundleContextAdapter(DoFn.FinishBundleContext context) { + this.context = context; + } + + @Override + public void output(T t, Instant timestamp, BoundedWindow window) { + context.output(t, timestamp, window); + } + } + + abstract void handleWriteSummary( + ContextAdapter context, + Instant timestamp, + KV tuple, + Runnable logMessage); + @StartBundle public void startBundle(StartBundleContext c) { String databaseIdOrDefaultDatabase = databaseId == null ? DEFAULT_DATABASE : databaseId.get(); @@ -1794,29 +1897,30 @@ private static com.google.datastore.v1.Key getKey(Mutation m) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { Mutation mutation = c.element(); int size = mutation.getSerializedSize(); + ProcessContextAdapter contextAdapter = new ProcessContextAdapter<>(c); if (!uniqueMutationKeys.add(getKey(mutation))) { - flushBatch(); + flushBatch(contextAdapter); } if (mutations.size() > 0 && mutationsSize + size >= DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT) { - flushBatch(); + flushBatch(contextAdapter); } - mutations.add(c.element()); + mutations.add(KV.of(c.element(), window)); mutationsSize += size; if (mutations.size() >= writeBatcher.nextBatchSize(System.currentTimeMillis())) { - flushBatch(); + flushBatch(contextAdapter); } } @FinishBundle - public void finishBundle() throws Exception { + public void finishBundle(FinishBundleContext c) throws Exception { if (!mutations.isEmpty()) { - flushBatch(); + flushBatch(new FinishBundleContextAdapter<>(c)); } } @@ -1830,8 +1934,9 @@ public void finishBundle() throws Exception { * @throws DatastoreException if the commit fails or IOException or InterruptedException if * backing off between retries fails. */ - private synchronized void flushBatch() + private synchronized void flushBatch(ContextAdapter context) throws DatastoreException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); @@ -1839,10 +1944,14 @@ private synchronized void flushBatch() batchSize.update(mutations.size()); String databaseIdOrDefaultDatabase = databaseId == null ? DEFAULT_DATABASE : databaseId.get(); + CommitResponse response; + BoundedWindow okWindow; + Instant end; + while (true) { // Batch upsert entities. CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.addAllMutations(mutations); + commitRequest.addAllMutations(mutations.stream().map(KV::getKey).collect(Collectors.toList())); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.setProjectId(projectId.get()); commitRequest.setDatabaseId(databaseIdOrDefaultDatabase); @@ -1868,8 +1977,10 @@ private synchronized void flushBatch() new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); try { - datastore.commit(commitRequest.build()); + response = datastore.commit(commitRequest.build()); endTime = System.currentTimeMillis(); + end = Instant.ofEpochMilli(endTime); + okWindow = Iterables.getLast(mutations).getValue(); serviceCallMetric.call("ok"); writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size()); @@ -1877,7 +1988,6 @@ private synchronized void flushBatch() latencyMsPerMutation.update((endTime - startTime) / mutations.size()); rpcSuccesses.inc(); entitiesMutated.inc(mutations.size()); - // Break if the commit threw no exception. break; } catch (DatastoreException exception) { @@ -1908,7 +2018,10 @@ private synchronized void flushBatch() } } } - LOG.debug("Successfully wrote {} mutations", mutations.size()); + int okCount = mutations.size(); + long okBytes = response.getSerializedSize(); + handleWriteSummary(context, end, KV.of(new WriteSuccessSummary(okCount, okBytes), okWindow), () -> LOG.debug("Successfully wrote {} mutations", mutations.size())); + mutations.clear(); uniqueMutationKeys.clear(); mutationsSize = 0; From 5a754d34d8b0523d6e71963b8f01eb8c3d7e34b9 Mon Sep 17 00:00:00 2001 From: Nahian-Al Hasan Date: Tue, 14 Jan 2025 14:33:52 +1100 Subject: [PATCH 2/6] Add WriteWithSummary and withResults API interface --- .../sdk/io/gcp/datastore/DatastoreV1.java | 558 +++++++++++++++--- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 63 +- .../beam/sdk/io/gcp/datastore/V1WriteIT.java | 4 +- 3 files changed, 544 insertions(+), 81 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 88861cbaad16..d05f115141b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -35,8 +35,20 @@ import com.google.auth.http.HttpCredentialsAdapter; import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.datastore.v1.*; +import com.google.datastore.v1.CommitRequest; +import com.google.datastore.v1.CommitResponse; +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.GqlQuery; +import com.google.datastore.v1.Key; import com.google.datastore.v1.Key.PathElement; +import com.google.datastore.v1.Mutation; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.ReadOptions; +import com.google.datastore.v1.RunQueryRequest; +import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.client.Datastore; import com.google.datastore.v1.client.DatastoreException; import com.google.datastore.v1.client.DatastoreFactory; @@ -53,8 +65,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.concurrent.Immutable; @@ -79,6 +91,7 @@ import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -1218,7 +1231,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** + /** * Summary object produced when a number of writes are successfully written to Datastore in a * single Mutation. */ @@ -1287,13 +1300,104 @@ public DeleteKey deleteKey() { return new DeleteKey(null, null, true, StaticValueProvider.of(DEFAULT_HINT_NUM_WORKERS)); } + /** + * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore and returns {@link + * WriteSuccessSummary} for each successful write. + * + * @see DatastoreIO + */ + public static class WriteWithSummary extends Mutate { + + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it + * is {@code null} at instantiation time, an error will be thrown. + */ + WriteWithSummary( + @Nullable ValueProvider projectId, + @Nullable String localhost, + boolean throttleRampup, + ValueProvider hintNumWorkers) { + super(projectId, null, localhost, new UpsertFn(), throttleRampup, hintNumWorkers); + } + + WriteWithSummary( + @Nullable ValueProvider projectId, + @Nullable ValueProvider databaseId, + @Nullable String localhost, + boolean throttleRampup, + ValueProvider hintNumWorkers) { + super(projectId, databaseId, localhost, new UpsertFn(), throttleRampup, hintNumWorkers); + } + + /** + * Returns a new {@link WriteWithSummary} that writes to the Cloud Datastore for the default + * database. + */ + public WriteWithSummary withProjectId(String projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return withProjectId(StaticValueProvider.of(projectId)); + } + + /** + * Returns a new {@link WriteWithSummary} that writes to the Cloud Datastore for the database + * id. + */ + public WriteWithSummary withDatabaseId(String databaseId) { + checkArgument(databaseId != null, "databaseId can not be null"); + return withDatabaseId(StaticValueProvider.of(databaseId)); + } + + /** Same as {@link WriteWithSummary#withProjectId(String)} but with a {@link ValueProvider}. */ + public WriteWithSummary withProjectId(ValueProvider projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return new WriteWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + + /** Same as {@link WriteWithSummary#withDatabaseId(String)} but with a {@link ValueProvider}. */ + public WriteWithSummary withDatabaseId(ValueProvider databaseId) { + checkArgument(databaseId != null, "databaseId can not be null"); + return new WriteWithSummary(projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + } + + /** + * Returns a new {@link WriteWithSummary} that writes to the Cloud Datastore Emulator running + * locally on the specified host port. + */ + public WriteWithSummary withLocalhost(String localhost) { + checkArgument(localhost != null, "localhost can not be null"); + return new WriteWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + + /** Returns a new {@link WriteWithSummary} that does not throttle during ramp-up. */ + public WriteWithSummary withRampupThrottlingDisabled() { + return new WriteWithSummary(projectId, localhost, false, hintNumWorkers); + } + + /** + * Returns a new {@link WriteWithSummary} with a different worker count hint for ramp-up + * throttling. Value is ignored if ramp-up throttling is disabled. + */ + public WriteWithSummary withHintNumWorkers(int hintNumWorkers) { + return withHintNumWorkers(StaticValueProvider.of(hintNumWorkers)); + } + + /** + * Same as {@link WriteWithSummary#withHintNumWorkers(int)} but with a {@link ValueProvider}. + */ + public WriteWithSummary withHintNumWorkers(ValueProvider hintNumWorkers) { + checkArgument(hintNumWorkers != null, "hintNumWorkers can not be null"); + return new WriteWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + } + /** * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore. * * @see DatastoreIO */ - public static class Write extends Mutate { + public static class Write extends PTransform, PDone> { + WriteWithSummary inner; /** * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it * is {@code null} at instantiation time, an error will be thrown. @@ -1303,7 +1407,7 @@ public static class Write extends Mutate { @Nullable String localhost, boolean throttleRampup, ValueProvider hintNumWorkers) { - super(projectId, null, localhost, new UpsertFn(), throttleRampup, hintNumWorkers); + this.inner = new WriteWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); } Write( @@ -1312,31 +1416,32 @@ public static class Write extends Mutate { @Nullable String localhost, boolean throttleRampup, ValueProvider hintNumWorkers) { - super(projectId, databaseId, localhost, new UpsertFn(), throttleRampup, hintNumWorkers); + this.inner = + new WriteWithSummary(projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + } + + Write(WriteWithSummary inner) { + this.inner = inner; } /** Returns a new {@link Write} that writes to the Cloud Datastore for the default database. */ public Write withProjectId(String projectId) { - checkArgument(projectId != null, "projectId can not be null"); - return withProjectId(StaticValueProvider.of(projectId)); + return new Write(this.inner.withProjectId(projectId)); } /** Returns a new {@link Write} that writes to the Cloud Datastore for the database id. */ public Write withDatabaseId(String databaseId) { - checkArgument(databaseId != null, "databaseId can not be null"); - return withDatabaseId(StaticValueProvider.of(databaseId)); + return new Write(this.inner.withDatabaseId(databaseId)); } /** Same as {@link Write#withProjectId(String)} but with a {@link ValueProvider}. */ public Write withProjectId(ValueProvider projectId) { - checkArgument(projectId != null, "projectId can not be null"); - return new Write(projectId, localhost, throttleRampup, hintNumWorkers); + return new Write(this.inner.withProjectId(projectId)); } /** Same as {@link Write#withDatabaseId(String)} but with a {@link ValueProvider}. */ public Write withDatabaseId(ValueProvider databaseId) { - checkArgument(databaseId != null, "databaseId can not be null"); - return new Write(projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + return new Write(this.inner.withDatabaseId(databaseId)); } /** @@ -1344,13 +1449,12 @@ public Write withDatabaseId(ValueProvider databaseId) { * the specified host port. */ public Write withLocalhost(String localhost) { - checkArgument(localhost != null, "localhost can not be null"); - return new Write(projectId, localhost, throttleRampup, hintNumWorkers); + return new Write(this.inner.withLocalhost(localhost)); } /** Returns a new {@link Write} that does not throttle during ramp-up. */ public Write withRampupThrottlingDisabled() { - return new Write(projectId, localhost, false, hintNumWorkers); + return new Write(this.inner.withRampupThrottlingDisabled()); } /** @@ -1358,13 +1462,150 @@ public Write withRampupThrottlingDisabled() { * is ignored if ramp-up throttling is disabled. */ public Write withHintNumWorkers(int hintNumWorkers) { - return withHintNumWorkers(StaticValueProvider.of(hintNumWorkers)); + return new Write(this.inner.withHintNumWorkers(hintNumWorkers)); } /** Same as {@link Write#withHintNumWorkers(int)} but with a {@link ValueProvider}. */ public Write withHintNumWorkers(ValueProvider hintNumWorkers) { + return new Write(this.inner.withHintNumWorkers(hintNumWorkers)); + } + + /** + * Returns {@link WriteWithSummary} transform which can be used in {@link + * Wait#on(PCollection[])} to wait until all data is written. + * + *

Example: write a {@link PCollection} to one database and then to another database, making + * sure that writing a window of data to the second database starts only after the respective + * window has been fully written to the first database. + * + *

{@code
+     * }
+ */ + public WriteWithSummary withResults() { + return inner; + } + + @Override + public String toString() { + return this.inner.toString(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + this.inner.populateDisplayData(builder); + } + + public String getProjectId() { + return this.inner.getProjectId(); + } + + public String getDatabaseId() { + return this.inner.getDatabaseId(); + } + + @Override + public PDone expand(PCollection input) { + inner.expand(input); + return PDone.in(input.getPipeline()); + } + } + + /** + * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore and returns + * {@link WriteSuccessSummary} for each successful write. + * + * @see DatastoreIO + */ + public static class DeleteEntityWithSummary extends Mutate { + + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it + * is {@code null} at instantiation time, an error will be thrown. + */ + DeleteEntityWithSummary( + @Nullable ValueProvider projectId, + @Nullable String localhost, + boolean throttleRampup, + ValueProvider hintNumWorkers) { + super(projectId, null, localhost, new DeleteEntityFn(), throttleRampup, hintNumWorkers); + } + + DeleteEntityWithSummary( + @Nullable ValueProvider projectId, + @Nullable ValueProvider databaseId, + @Nullable String localhost, + boolean throttleRampup, + ValueProvider hintNumWorkers) { + super(projectId, databaseId, localhost, new DeleteEntityFn(), throttleRampup, hintNumWorkers); + } + + /** + * Returns a new {@link DeleteEntityWithSummary} that deletes entities from the Cloud Datastore + * for the specified project. + */ + public DeleteEntityWithSummary withProjectId(String projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return withProjectId(StaticValueProvider.of(projectId)); + } + + /** + * Returns a new {@link DeleteEntityWithSummary} that deletes entities from the Cloud Datastore + * for the specified database. + */ + public DeleteEntityWithSummary withDatabaseId(String databaseId) { + checkArgument(databaseId != null, "databaseId can not be null"); + return withDatabaseId(StaticValueProvider.of(databaseId)); + } + + /** + * Same as {@link DeleteEntityWithSummary#withProjectId(String)} but with a {@link + * ValueProvider}. + */ + public DeleteEntityWithSummary withProjectId(ValueProvider projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return new DeleteEntityWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + + /** + * Same as {@link DeleteEntityWithSummary#withDatabaseId(String)} but with a {@link + * ValueProvider}. + */ + public DeleteEntityWithSummary withDatabaseId(ValueProvider databaseId) { + checkArgument(databaseId != null, "databaseId can not be null"); + return new DeleteEntityWithSummary( + projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + } + + /** + * Returns a new {@link DeleteEntityWithSummary} that deletes entities from the Cloud Datastore + * Emulator running locally on the specified host port. + */ + public DeleteEntityWithSummary withLocalhost(String localhost) { + checkArgument(localhost != null, "localhost can not be null"); + return new DeleteEntityWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + + /** Returns a new {@link DeleteEntityWithSummary} that does not throttle during ramp-up. */ + public DeleteEntityWithSummary withRampupThrottlingDisabled() { + return new DeleteEntityWithSummary(projectId, localhost, false, hintNumWorkers); + } + + /** + * Returns a new {@link DeleteEntityWithSummary} with a different worker count hint for ramp-up + * throttling. Value is ignored if ramp-up throttling is disabled. + */ + public DeleteEntityWithSummary withHintNumWorkers(int hintNumWorkers) { + checkArgument(hintNumWorkers > 0, "hintNumWorkers must be positive"); + return withHintNumWorkers(StaticValueProvider.of(hintNumWorkers)); + } + + /** + * Same as {@link DeleteEntityWithSummary#withHintNumWorkers(int)} but with a {@link + * ValueProvider}. + */ + public DeleteEntityWithSummary withHintNumWorkers(ValueProvider hintNumWorkers) { checkArgument(hintNumWorkers != null, "hintNumWorkers can not be null"); - return new Write(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteEntityWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); } } @@ -1373,7 +1614,9 @@ public Write withHintNumWorkers(ValueProvider hintNumWorkers) { * * @see DatastoreIO */ - public static class DeleteEntity extends Mutate { + public static class DeleteEntity extends PTransform, PDone> { + + DeleteEntityWithSummary inner; /** * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it @@ -1384,7 +1627,8 @@ public static class DeleteEntity extends Mutate { @Nullable String localhost, boolean throttleRampup, ValueProvider hintNumWorkers) { - super(projectId, null, localhost, new DeleteEntityFn(), throttleRampup, hintNumWorkers); + this.inner = + new DeleteEntityWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); } DeleteEntity( @@ -1393,7 +1637,13 @@ public static class DeleteEntity extends Mutate { @Nullable String localhost, boolean throttleRampup, ValueProvider hintNumWorkers) { - super(projectId, databaseId, localhost, new DeleteEntityFn(), throttleRampup, hintNumWorkers); + this.inner = + new DeleteEntityWithSummary( + projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + } + + DeleteEntity(DeleteEntityWithSummary inner) { + this.inner = inner; } /** @@ -1401,8 +1651,7 @@ public static class DeleteEntity extends Mutate { * specified project. */ public DeleteEntity withProjectId(String projectId) { - checkArgument(projectId != null, "projectId can not be null"); - return withProjectId(StaticValueProvider.of(projectId)); + return new DeleteEntity(this.inner.withProjectId(projectId)); } /** @@ -1410,20 +1659,17 @@ public DeleteEntity withProjectId(String projectId) { * specified database. */ public DeleteEntity withDatabaseId(String databaseId) { - checkArgument(databaseId != null, "databaseId can not be null"); - return withDatabaseId(StaticValueProvider.of(databaseId)); + return new DeleteEntity(this.inner.withDatabaseId(databaseId)); } /** Same as {@link DeleteEntity#withProjectId(String)} but with a {@link ValueProvider}. */ public DeleteEntity withProjectId(ValueProvider projectId) { - checkArgument(projectId != null, "projectId can not be null"); - return new DeleteEntity(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteEntity(this.inner.withProjectId(projectId)); } /** Same as {@link DeleteEntity#withDatabaseId(String)} but with a {@link ValueProvider}. */ public DeleteEntity withDatabaseId(ValueProvider databaseId) { - checkArgument(databaseId != null, "databaseId can not be null"); - return new DeleteEntity(projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + return new DeleteEntity(this.inner.withDatabaseId(databaseId)); } /** @@ -1431,13 +1677,12 @@ public DeleteEntity withDatabaseId(ValueProvider databaseId) { * running locally on the specified host port. */ public DeleteEntity withLocalhost(String localhost) { - checkArgument(localhost != null, "localhost can not be null"); - return new DeleteEntity(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteEntity(this.inner.withLocalhost(localhost)); } /** Returns a new {@link DeleteEntity} that does not throttle during ramp-up. */ public DeleteEntity withRampupThrottlingDisabled() { - return new DeleteEntity(projectId, localhost, false, hintNumWorkers); + return new DeleteEntity(this.inner.withRampupThrottlingDisabled()); } /** @@ -1445,14 +1690,140 @@ public DeleteEntity withRampupThrottlingDisabled() { * Value is ignored if ramp-up throttling is disabled. */ public DeleteEntity withHintNumWorkers(int hintNumWorkers) { - checkArgument(hintNumWorkers > 0, "hintNumWorkers must be positive"); - return withHintNumWorkers(StaticValueProvider.of(hintNumWorkers)); + return new DeleteEntity(this.inner.withHintNumWorkers(hintNumWorkers)); } /** Same as {@link DeleteEntity#withHintNumWorkers(int)} but with a {@link ValueProvider}. */ public DeleteEntity withHintNumWorkers(ValueProvider hintNumWorkers) { + return new DeleteEntity(this.inner.withHintNumWorkers(hintNumWorkers)); + } + + /** + * Returns {@link DeleteEntityWithSummary} transform which can be used in {@link + * Wait#on(PCollection[])} to wait until all data is deleted. + * + *

Example: delete a {@link PCollection} from one database and then from another database, + * making sure that deleting a window of data to the second database starts only after the + * respective window has been fully deleted from the first database. + * + *

{@code
+     * }
+ */ + public DeleteEntityWithSummary withResults() { + return inner; + } + + @Override + public String toString() { + return this.inner.toString(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + this.inner.populateDisplayData(builder); + } + + @Override + public PDone expand(PCollection input) { + inner.expand(input); + return PDone.in(input.getPipeline()); + } + } + + /** + * A {@link PTransform} that deletes {@link Entity Entities} associated with the given {@link Key + * Keys} from Cloud Datastore and returns {@link WriteSuccessSummary} for each successful delete. + * + * @see DatastoreIO + */ + public static class DeleteKeyWithSummary extends Mutate { + + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it + * is {@code null} at instantiation time, an error will be thrown. + */ + DeleteKeyWithSummary( + @Nullable ValueProvider projectId, + @Nullable String localhost, + boolean throttleRampup, + ValueProvider hintNumWorkers) { + super(projectId, null, localhost, new DeleteKeyFn(), throttleRampup, hintNumWorkers); + } + + DeleteKeyWithSummary( + @Nullable ValueProvider projectId, + @Nullable ValueProvider databaseId, + @Nullable String localhost, + boolean throttleRampup, + ValueProvider hintNumWorkers) { + super(projectId, databaseId, localhost, new DeleteKeyFn(), throttleRampup, hintNumWorkers); + } + + /** + * Returns a new {@link DeleteKeyWithSummary} that deletes entities from the Cloud Datastore for + * the specified project. + */ + public DeleteKeyWithSummary withProjectId(String projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return withProjectId(StaticValueProvider.of(projectId)); + } + + /** + * Returns a new {@link DeleteKeyWithSummary} that deletes entities from the Cloud Datastore for + * the specified database. + */ + public DeleteKeyWithSummary withDatabaseId(String databaseId) { + checkArgument(databaseId != null, "databaseId can not be null"); + return withDatabaseId(StaticValueProvider.of(databaseId)); + } + + /** + * Returns a new {@link DeleteKeyWithSummary} that deletes entities from the Cloud Datastore + * Emulator running locally on the specified host port. + */ + public DeleteKeyWithSummary withLocalhost(String localhost) { + checkArgument(localhost != null, "localhost can not be null"); + return new DeleteKeyWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + + /** + * Same as {@link DeleteKeyWithSummary#withProjectId(String)} but with a {@link ValueProvider}. + */ + public DeleteKeyWithSummary withProjectId(ValueProvider projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return new DeleteKeyWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); + } + + /** + * Same as {@link DeleteKeyWithSummary#withDatabaseId(String)} but with a {@link ValueProvider}. + */ + public DeleteKeyWithSummary withDatabaseId(ValueProvider databaseId) { + checkArgument(databaseId != null, "databaseId can not be null"); + return new DeleteKeyWithSummary( + projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + } + + /** Returns a new {@link DeleteKeyWithSummary} that does not throttle during ramp-up. */ + public DeleteKeyWithSummary withRampupThrottlingDisabled() { + return new DeleteKeyWithSummary(projectId, localhost, false, hintNumWorkers); + } + + /** + * Returns a new {@link DeleteKeyWithSummary} with a different worker count hint for ramp-up + * throttling. Value is ignored if ramp-up throttling is disabled. + */ + public DeleteKeyWithSummary withHintNumWorkers(int hintNumWorkers) { + checkArgument(hintNumWorkers > 0, "hintNumWorkers must be positive"); + return withHintNumWorkers(StaticValueProvider.of(hintNumWorkers)); + } + + /** + * Same as {@link DeleteKeyWithSummary#withHintNumWorkers(int)} but with a {@link + * ValueProvider}. + */ + public DeleteKeyWithSummary withHintNumWorkers(ValueProvider hintNumWorkers) { checkArgument(hintNumWorkers != null, "hintNumWorkers can not be null"); - return new DeleteEntity(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteKeyWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); } } @@ -1462,7 +1833,9 @@ public DeleteEntity withHintNumWorkers(ValueProvider hintNumWorkers) { * * @see DatastoreIO */ - public static class DeleteKey extends Mutate { + public static class DeleteKey extends PTransform, PDone> { + + DeleteKeyWithSummary inner; /** * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if it @@ -1473,7 +1846,7 @@ public static class DeleteKey extends Mutate { @Nullable String localhost, boolean throttleRampup, ValueProvider hintNumWorkers) { - super(projectId, null, localhost, new DeleteKeyFn(), throttleRampup, hintNumWorkers); + this.inner = new DeleteKeyWithSummary(projectId, localhost, throttleRampup, hintNumWorkers); } DeleteKey( @@ -1482,7 +1855,13 @@ public static class DeleteKey extends Mutate { @Nullable String localhost, boolean throttleRampup, ValueProvider hintNumWorkers) { - super(projectId, databaseId, localhost, new DeleteKeyFn(), throttleRampup, hintNumWorkers); + this.inner = + new DeleteKeyWithSummary( + projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + } + + DeleteKey(DeleteKeyWithSummary inner) { + this.inner = inner; } /** @@ -1490,8 +1869,7 @@ public static class DeleteKey extends Mutate { * specified project. */ public DeleteKey withProjectId(String projectId) { - checkArgument(projectId != null, "projectId can not be null"); - return withProjectId(StaticValueProvider.of(projectId)); + return new DeleteKey(this.inner.withProjectId(projectId)); } /** @@ -1499,8 +1877,7 @@ public DeleteKey withProjectId(String projectId) { * specified database. */ public DeleteKey withDatabaseId(String databaseId) { - checkArgument(databaseId != null, "databaseId can not be null"); - return withDatabaseId(StaticValueProvider.of(databaseId)); + return new DeleteKey(this.inner.withDatabaseId(databaseId)); } /** @@ -1508,25 +1885,22 @@ public DeleteKey withDatabaseId(String databaseId) { * running locally on the specified host port. */ public DeleteKey withLocalhost(String localhost) { - checkArgument(localhost != null, "localhost can not be null"); - return new DeleteKey(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteKey(this.inner.withLocalhost(localhost)); } /** Same as {@link DeleteKey#withProjectId(String)} but with a {@link ValueProvider}. */ public DeleteKey withProjectId(ValueProvider projectId) { - checkArgument(projectId != null, "projectId can not be null"); - return new DeleteKey(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteKey(this.inner.withProjectId(projectId)); } /** Same as {@link DeleteKey#withDatabaseId(String)} but with a {@link ValueProvider}. */ public DeleteKey withDatabaseId(ValueProvider databaseId) { - checkArgument(databaseId != null, "databaseId can not be null"); - return new DeleteKey(projectId, databaseId, localhost, throttleRampup, hintNumWorkers); + return new DeleteKey(this.inner.withDatabaseId(databaseId)); } /** Returns a new {@link DeleteKey} that does not throttle during ramp-up. */ public DeleteKey withRampupThrottlingDisabled() { - return new DeleteKey(projectId, localhost, false, hintNumWorkers); + return new DeleteKey(this.inner.withRampupThrottlingDisabled()); } /** @@ -1534,14 +1908,43 @@ public DeleteKey withRampupThrottlingDisabled() { * Value is ignored if ramp-up throttling is disabled. */ public DeleteKey withHintNumWorkers(int hintNumWorkers) { - checkArgument(hintNumWorkers > 0, "hintNumWorkers must be positive"); - return withHintNumWorkers(StaticValueProvider.of(hintNumWorkers)); + return new DeleteKey(this.inner.withHintNumWorkers(hintNumWorkers)); } /** Same as {@link DeleteKey#withHintNumWorkers(int)} but with a {@link ValueProvider}. */ public DeleteKey withHintNumWorkers(ValueProvider hintNumWorkers) { - checkArgument(hintNumWorkers != null, "hintNumWorkers can not be null"); - return new DeleteKey(projectId, localhost, throttleRampup, hintNumWorkers); + return new DeleteKey(this.inner.withHintNumWorkers(hintNumWorkers)); + } + + /** + * Returns {@link DeleteKeyWithSummary} transform which can be used in {@link + * Wait#on(PCollection[])} to wait until all data is deleted. + * + *

Example: delete a {@link PCollection} of {@link Key} from one database and then from + * another database, making sure that deleting a window of data to the second database starts + * only after the respective window has been fully deleted from the first database. + * + *

{@code
+     * }
+ */ + public DeleteKeyWithSummary withResults() { + return inner; + } + + @Override + public String toString() { + return this.inner.toString(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + this.inner.populateDisplayData(builder); + } + + @Override + public PDone expand(PCollection input) { + inner.expand(input); + return PDone.in(input.getPipeline()); } } @@ -1553,7 +1956,8 @@ public DeleteKey withHintNumWorkers(ValueProvider hintNumWorkers) { * idempotent Cloud Datastore mutation operations (upsert and delete) should be used by the {@code * DoFn} provided, as the commits are retried when failures occur. */ - private abstract static class Mutate extends PTransform, PDone> { + private abstract static class Mutate + extends PTransform, PCollection> { protected ValueProvider projectId; protected ValueProvider databaseId; protected @Nullable String localhost; @@ -1584,7 +1988,7 @@ private abstract static class Mutate extends PTransform, PDone } @Override - public PDone expand(PCollection input) { + public PCollection expand(PCollection input) { checkArgument(projectId != null, "withProjectId() is required"); if (projectId.isAccessible()) { checkArgument(projectId.get() != null, "projectId can not be null"); @@ -1617,7 +2021,7 @@ public PCollectionView expand(PBegin input) { "Enforce ramp-up through throttling", ParDo.of(rampupThrottlingFn).withSideInputs(startTimestampView)); } - intermediateOutput.apply( + return intermediateOutput.apply( "Write Mutation to Datastore", ParDo.of( new DatastoreWriterFn( @@ -1626,8 +2030,6 @@ public PCollectionView expand(PBegin input) { localhost, new V1DatastoreFactory(), new WriteBatcherImpl()))); - - return PDone.in(input.getPipeline()); } @Override @@ -1731,17 +2133,30 @@ static class DatastoreWriterFn extends BaseDatastoreWriterFn projectId, @Nullable String localhost, V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) { + DatastoreWriterFn( + ValueProvider projectId, + @Nullable String localhost, + V1DatastoreFactory datastoreFactory, + WriteBatcher writeBatcher) { super(projectId, localhost, datastoreFactory, writeBatcher); } @VisibleForTesting - DatastoreWriterFn(ValueProvider projectId, ValueProvider databaseId, @Nullable String localhost, V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) { + DatastoreWriterFn( + ValueProvider projectId, + ValueProvider databaseId, + @Nullable String localhost, + V1DatastoreFactory datastoreFactory, + WriteBatcher writeBatcher) { super(projectId, databaseId, localhost, datastoreFactory, writeBatcher); } @Override - void handleWriteSummary(ContextAdapter context, Instant timestamp, KV tuple, Runnable logMessage) { + void handleWriteSummary( + ContextAdapter context, + Instant timestamp, + KV tuple, + Runnable logMessage) { logMessage.run(); context.output(tuple.getKey(), timestamp, tuple.getValue()); } @@ -1760,7 +2175,7 @@ void handleWriteSummary(ContextAdapter context, Instant tim * used for {@code upsert} and {@code delete} mutation operations, as these are the only two Cloud * Datastore mutations that are idempotent. */ - abstract static class BaseDatastoreWriterFn extends DoFn{ + abstract static class BaseDatastoreWriterFn extends DoFn { private static final Logger LOG = LoggerFactory.getLogger(BaseDatastoreWriterFn.class); private final ValueProvider projectId; @@ -1836,7 +2251,8 @@ interface ContextAdapter { void output(T t, Instant timestamp, BoundedWindow window); } - private static final class ProcessContextAdapter implements DatastoreV1.BaseDatastoreWriterFn.ContextAdapter { + private static final class ProcessContextAdapter + implements DatastoreV1.BaseDatastoreWriterFn.ContextAdapter { private final DoFn.ProcessContext context; private ProcessContextAdapter(DoFn.ProcessContext context) { @@ -1849,7 +2265,8 @@ public void output(T t, Instant timestamp, BoundedWindow window) { } } - private static final class FinishBundleContextAdapter implements DatastoreV1.BaseDatastoreWriterFn.ContextAdapter { + private static final class FinishBundleContextAdapter + implements DatastoreV1.BaseDatastoreWriterFn.ContextAdapter { private final DoFn.FinishBundleContext context; private FinishBundleContextAdapter(DoFn.FinishBundleContext context) { @@ -1920,7 +2337,7 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { if (!mutations.isEmpty()) { - flushBatch(new FinishBundleContextAdapter<>(c)); + flushBatch(new FinishBundleContextAdapter<>(c)); } } @@ -1951,7 +2368,8 @@ private synchronized void flushBatch(ContextAdapter context) while (true) { // Batch upsert entities. CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.addAllMutations(mutations.stream().map(KV::getKey).collect(Collectors.toList())); + commitRequest.addAllMutations( + mutations.stream().map(KV::getKey).collect(Collectors.toList())); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.setProjectId(projectId.get()); commitRequest.setDatabaseId(databaseIdOrDefaultDatabase); @@ -2020,7 +2438,11 @@ private synchronized void flushBatch(ContextAdapter context) } int okCount = mutations.size(); long okBytes = response.getSerializedSize(); - handleWriteSummary(context, end, KV.of(new WriteSuccessSummary(okCount, okBytes), okWindow), () -> LOG.debug("Successfully wrote {} mutations", mutations.size())); + handleWriteSummary( + context, + end, + KV.of(new WriteSuccessSummary(okCount, okBytes), okWindow), + () -> LOG.debug("Successfully wrote {} mutations", mutations.size())); mutations.clear(); uniqueMutationKeys.clear(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index ab6f5567dd9a..af3721dfd4f7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -608,17 +608,28 @@ private void datastoreWriterFnTest(int numMutations) throws Exception { makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); } + int start = 0; + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(CommitRequest.class); + CommitResponse response = CommitResponse.getDefaultInstance(); + while (start < numMutations) { + int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START); + when(mockDatastore.commit(requestCaptor.capture())).thenReturn(response); + start = end; + } + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn( StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + DoFnTester doFnTester = + DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); - int start = 0; + start = 0; + List requests = new ArrayList<>(); while (start < numMutations) { int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); @@ -627,9 +638,12 @@ private void datastoreWriterFnTest(int numMutations) throws Exception { commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.addAllMutations(mutations.subList(start, end)); // Verify all the batch requests were made with the expected mutations. - verify(mockDatastore, times(1)).commit(commitRequest.build()); + CommitRequest expectedRequest = commitRequest.build(); + verify(mockDatastore, times(1)).commit(expectedRequest); + requests.add(expectedRequest); start = end; } + assertTrue(requestCaptor.getAllValues().containsAll(requests)); } /** @@ -651,20 +665,31 @@ public void testDatastoreWriterFnWithLargeEntities() throws Exception { mutations.add(makeUpsert(entity).build()); } + int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; + int start = 0; + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(CommitRequest.class); + CommitResponse response = CommitResponse.getDefaultInstance(); + while (start < mutations.size()) { + int end = Math.min(mutations.size(), start + entitiesPerRpc); + when(mockDatastore.commit(requestCaptor.capture())).thenReturn(response); + start = end; + } + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn( StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + DoFnTester doFnTester = + DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); // This test is over-specific currently; it requires that we split the 12 entity writes into 3 // requests, but we only need each CommitRequest to be less than 10MB in size. - int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; - int start = 0; + start = 0; + List requests = new ArrayList<>(); while (start < mutations.size()) { int end = Math.min(mutations.size(), start + entitiesPerRpc); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); @@ -673,9 +698,12 @@ public void testDatastoreWriterFnWithLargeEntities() throws Exception { commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.addAllMutations(mutations.subList(start, end)); // Verify all the batch requests were made with the expected mutations. - verify(mockDatastore).commit(commitRequest.build()); + CommitRequest expectedRequest = commitRequest.build(); + verify(mockDatastore).commit(expectedRequest); + requests.add(expectedRequest); start = end; } + assertTrue(requestCaptor.getAllValues().containsAll(requests)); } /** Tests {@link DatastoreWriterFn} correctly flushes batch upon receive same entity keys. */ @@ -693,13 +721,18 @@ public void testDatastoreWriterFnWithDuplicateEntities() throws Exception { .build()); } + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(CommitRequest.class); + CommitResponse response = CommitResponse.getDefaultInstance(); + when(mockDatastore.commit(requestCaptor.capture())).thenReturn(response).thenReturn(response); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn( StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + DoFnTester doFnTester = + DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); @@ -709,7 +742,8 @@ public void testDatastoreWriterFnWithDuplicateEntities() throws Exception { commitRequest.addAllMutations(mutations.subList(0, 2)); commitRequest.setProjectId(PROJECT_ID); commitRequest.setDatabaseId(DATABASE_ID); - verify(mockDatastore, times(1)).commit(commitRequest.build()); + CommitRequest expectedRequest1 = commitRequest.build(); + verify(mockDatastore, times(1)).commit(expectedRequest1); // second invocation has key [0, 2] because the second 0 triggered a flush batch commitRequest = CommitRequest.newBuilder(); @@ -717,8 +751,14 @@ public void testDatastoreWriterFnWithDuplicateEntities() throws Exception { commitRequest.addAllMutations(mutations.subList(2, 4)); commitRequest.setProjectId(PROJECT_ID); commitRequest.setDatabaseId(DATABASE_ID); - verify(mockDatastore, times(1)).commit(commitRequest.build()); + CommitRequest expectedRequest2 = commitRequest.build(); + verify(mockDatastore, times(1)).commit(expectedRequest2); verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); + + assertTrue( + requestCaptor + .getAllValues() + .containsAll(Arrays.asList(expectedRequest1, expectedRequest2))); } /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */ @@ -743,7 +783,8 @@ public void testDatastoreWriterFnRetriesErrors() throws Exception { null, mockDatastoreFactory, new FakeWriteBatcher()); - DoFnTester doFnTester = DoFnTester.of(datastoreWriter); + DoFnTester doFnTester = + DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java index 0062208630f6..fb1abd096a4c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java @@ -119,8 +119,8 @@ public void testDatastoreWriterFnWithDuplicatedEntities() throws Exception { new DatastoreV1.DatastoreWriterFn( TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(), null); - PTransform, PCollection> datastoreWriterTransform = - ParDo.of(datastoreWriter); + PTransform, PCollection> + datastoreWriterTransform = ParDo.of(datastoreWriter); /** Following three lines turn the original arrayList into a member of the first PCollection */ List newArrayList = new ArrayList<>(mutations); From e8abfaa09a33e5990ff9fc40a03e22a289f6c0fc Mon Sep 17 00:00:00 2001 From: Nahian-Al Hasan Date: Wed, 15 Jan 2025 12:47:46 +1100 Subject: [PATCH 3/6] Add integration tests withResults API interface --- .../beam/sdk/io/gcp/datastore/V1WriteIT.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java index fb1abd096a4c..9b506aa828da 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.countEntities; import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import com.google.datastore.v1.Entity; import com.google.datastore.v1.Key; @@ -39,11 +40,13 @@ import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; @@ -196,6 +199,99 @@ public void testE2EV1WriteWithLargeEntities() throws Exception { assertEquals(numLargeEntities, numEntitiesWritten); } + /** Tests {@link DatastoreV1.WriteWithSummary} using {@link DatastoreV1.Write#withResults()}. */ + @Test + public void testE2EV1WriteWithResults() throws Exception { + Pipeline p = Pipeline.create(options); + + PCollection firstBatch = + p.apply("First GenerateSequence", GenerateSequence.from(0).to(numEntities)) + .apply( + "First CreateEntityFn", + ParDo.of( + new V1TestUtil.CreateEntityFn( + options.getKind(), options.getNamespace(), ancestor, 0))); + PCollection secondBatch = + p.apply("Second GenerateSequence", GenerateSequence.from(numEntities).to(numEntities * 2)) + .apply( + "Second CreateEntityFn", + ParDo.of( + new V1TestUtil.CreateEntityFn( + options.getKind(), options.getNamespace(), ancestor, 0))); + PCollection firstWriteResults = + firstBatch.apply(DatastoreIO.v1().write().withProjectId(project).withResults()); + + secondBatch + .apply(Wait.on(firstWriteResults)) + .setCoder(secondBatch.getCoder()) + .apply(DatastoreIO.v1().write().withProjectId(project)); + + PAssert.that(firstWriteResults) + .satisfies( + results -> { + for (DatastoreV1.WriteSuccessSummary result : results) { + assertNotNull(result); + } + return null; + }); + + p.run(); + + long numEntitiesWritten = countEntities(options, project, database, ancestor); + + assertEquals(numEntities * 2, numEntitiesWritten); + } + + /** + * Tests {@link DatastoreV1.WriteWithSummary} using {@link DatastoreV1.Write#withResults()} and + * {@link DatastoreV1.DeleteEntity#withResults()}. + */ + @Test + public void testE2EV1WriteWithResultsAndDeleteWithResults() throws Exception { + Pipeline p = Pipeline.create(options); + + PCollection entities = + p.apply("First GenerateSequence", GenerateSequence.from(0).to(numEntities)) + .apply( + "First CreateEntityFn", + ParDo.of( + new V1TestUtil.CreateEntityFn( + options.getKind(), options.getNamespace(), ancestor, 0))); + + PCollection writeResults = + entities.apply(DatastoreIO.v1().write().withProjectId(project).withResults()); + + PCollection deleteResults = + entities + .apply(Wait.on(writeResults)) + .setCoder(entities.getCoder()) + .apply(DatastoreIO.v1().deleteEntity().withProjectId(project).withResults()); + + PAssert.that(writeResults) + .satisfies( + results -> { + for (DatastoreV1.WriteSuccessSummary result : results) { + assertNotNull(result); + } + return null; + }); + + PAssert.that(deleteResults) + .satisfies( + results -> { + for (DatastoreV1.WriteSuccessSummary result : results) { + assertNotNull(result); + } + return null; + }); + + p.run(); + + long numEntitiesWritten = countEntities(options, project, database, ancestor); + + assertEquals(0, numEntitiesWritten); + } + @After public void tearDown() throws Exception { deleteAllEntities(options, project, database, ancestor); From 1b157d2a446ed3f7def2065c40987876c01b5342 Mon Sep 17 00:00:00 2001 From: Nahian-Al Hasan Date: Wed, 15 Jan 2025 12:48:36 +1100 Subject: [PATCH 4/6] Change comments from abstract class to concrete class --- .../sdk/io/gcp/datastore/DatastoreV1.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index d05f115141b4..c3057af5a3d5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -2122,6 +2122,19 @@ public int nextBatchSize(long timeSinceEpochMillis) { private transient MovingAverage meanLatencyPerEntityMs; } + /** + * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in + * batches; see {@link DatastoreV1.WriteBatcherImpl}. + * + *

See Datastore: Entities, + * Properties, and Keys for information about entity keys and mutations. + * + *

Commits are non-transactional. If a commit fails because of a conflict over an entity group, + * the commit will be retried (up to {@link DatastoreV1.BaseDatastoreWriterFn#MAX_RETRIES} times). + * This means that the mutation operation should be idempotent. Thus, the writer should only be + * used for {@code upsert} and {@code delete} mutation operations, as these are the only two Cloud + * Datastore mutations that are idempotent. + */ static class DatastoreWriterFn extends BaseDatastoreWriterFn { DatastoreWriterFn(String projectId, @Nullable String localhost) { @@ -2162,19 +2175,6 @@ void handleWriteSummary( } } - /** - * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in - * batches; see {@link DatastoreV1.WriteBatcherImpl}. - * - *

See Datastore: Entities, - * Properties, and Keys for information about entity keys and mutations. - * - *

Commits are non-transactional. If a commit fails because of a conflict over an entity group, - * the commit will be retried (up to {@link DatastoreV1.BaseDatastoreWriterFn#MAX_RETRIES} times). - * This means that the mutation operation should be idempotent. Thus, the writer should only be - * used for {@code upsert} and {@code delete} mutation operations, as these are the only two Cloud - * Datastore mutations that are idempotent. - */ abstract static class BaseDatastoreWriterFn extends DoFn { private static final Logger LOG = LoggerFactory.getLogger(BaseDatastoreWriterFn.class); From 9279adc2fc1ff486d486ac97e54b287f2abab5a2 Mon Sep 17 00:00:00 2001 From: Nahian-Al Hasan Date: Wed, 15 Jan 2025 13:12:37 +1100 Subject: [PATCH 5/6] Add code examples in Javadoc for withResults() API --- .../apache/beam/sdk/io/gcp/datastore/DatastoreV1.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index c3057af5a3d5..d3eda28ed814 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -1479,6 +1479,9 @@ public Write withHintNumWorkers(ValueProvider hintNumWorkers) { * window has been fully written to the first database. * *

{@code
+     * PCollection entities = ... ;
+     * PCollection writeSummary =
+     *         entities.apply(DatastoreIO.v1().write().withProjectId(project).withResults());
      * }
*/ public WriteWithSummary withResults() { @@ -1707,6 +1710,9 @@ public DeleteEntity withHintNumWorkers(ValueProvider hintNumWorkers) { * respective window has been fully deleted from the first database. * *
{@code
+     * PCollection entities = ... ;
+     * PCollection deleteSummary =
+     *         entities.apply(DatastoreIO.v1().deleteEntity().withProjectId(project).withResults());
      * }
*/ public DeleteEntityWithSummary withResults() { @@ -1923,9 +1929,6 @@ public DeleteKey withHintNumWorkers(ValueProvider hintNumWorkers) { *

Example: delete a {@link PCollection} of {@link Key} from one database and then from * another database, making sure that deleting a window of data to the second database starts * only after the respective window has been fully deleted from the first database. - * - *

{@code
-     * }
*/ public DeleteKeyWithSummary withResults() { return inner; From e68b9eb94dccdb23a13d093474d1227aebc5de10 Mon Sep 17 00:00:00 2001 From: Nahian-Al Hasan Date: Wed, 15 Jan 2025 16:14:15 +1100 Subject: [PATCH 6/6] Add getProjectId and getDatabaseId functions --- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index d3eda28ed814..471675a2c988 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -1729,6 +1729,14 @@ public void populateDisplayData(DisplayData.Builder builder) { this.inner.populateDisplayData(builder); } + public String getProjectId() { + return this.inner.getProjectId(); + } + + public String getDatabaseId() { + return this.inner.getDatabaseId(); + } + @Override public PDone expand(PCollection input) { inner.expand(input); @@ -1944,6 +1952,14 @@ public void populateDisplayData(DisplayData.Builder builder) { this.inner.populateDisplayData(builder); } + public String getProjectId() { + return this.inner.getProjectId(); + } + + public String getDatabaseId() { + return this.inner.getDatabaseId(); + } + @Override public PDone expand(PCollection input) { inner.expand(input);