Skip to content

Commit

Permalink
Add getResult to UpdateHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Nov 19, 2024
1 parent 2a68883 commit 2c2b5ca
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ public interface WorkflowUpdateHandle<T> {
*/
String getId();

/**
* Returns the result of the workflow update.
*
* @return the result of the workflow update
*/
T getResult();

/**
* Returns the result of the workflow update.
*
* @param timeout maximum time to wait and perform the background long polling
* @param unit unit of timeout
* @return the result of the workflow update
*/
T getResult(long timeout, TimeUnit unit);

/**
* Returns a {@link CompletableFuture} with the update workflow execution request result,
* potentially waiting for the update to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public String getId() {
return id;
}

@Override
public T getResult() {
return result;
}

@Override
public T getResult(long timeout, TimeUnit unit) {
return result;
}

@Override
public CompletableFuture<T> getResultAsync() {
return CompletableFuture.completedFuture(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

package io.temporal.internal.client;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowException;
Expand All @@ -30,10 +29,7 @@
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

@Experimental
public final class LazyWorkflowUpdateHandleImpl<T> implements WorkflowUpdateHandle<T> {
Expand Down Expand Up @@ -97,19 +93,16 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
failure -> {
if (failure instanceof CompletionException) {
// unwrap the CompletionException
failure = ((Throwable) failure).getCause();
failure = failure.getCause();
}
failure = CheckedExceptionWrapper.unwrap((Throwable) failure);
failure = CheckedExceptionWrapper.unwrap(failure);
if (failure instanceof Error) {
throw (Error) failure;
}
if (failure instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) failure;
if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) {
// Currently no way to tell if the NOT_FOUND was because the workflow ID
// does not exist or because the update ID does not exist.
throw sre;
}
// Currently no way to tell if the NOT_FOUND was because the workflow ID
// does not exist or because the update ID does not exist.
throw sre;
} else if (failure instanceof WorkflowException) {
throw (WorkflowException) failure;
Expand All @@ -120,6 +113,34 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
});
}

@Override
public T getResult() {
try {
return getResultAsync().get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw (cause instanceof RuntimeException
? (RuntimeException) cause
: new RuntimeException(cause));
}
}

@Override
public T getResult(long timeout, TimeUnit unit) {
try {
return getResultAsync(timeout, unit).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
throw (cause instanceof RuntimeException
? (RuntimeException) cause
: new RuntimeException(cause));
}
}

@Override
public CompletableFuture<T> getResultAsync() {
return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static org.junit.Assert.*;

import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.client.*;
Expand Down Expand Up @@ -54,12 +55,18 @@ public void updateNonExistentWorkflow() {
}

@Test
public void pollUpdateNonExistentWorkflow() throws ExecutionException, InterruptedException {
public void pollUpdateNonExistentWorkflow() {
WorkflowStub workflowStub =
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("non-existing-id");
// Getting the update handle to a nonexistent workflow is fine
WorkflowUpdateHandle<String> handle = workflowStub.getUpdateHandle("update-id", String.class);
assertThrows(Exception.class, () -> handle.getResultAsync().get());
ExecutionException e =
assertThrows(ExecutionException.class, () -> handle.getResultAsync().get());
assertTrue(e.getCause() instanceof StatusRuntimeException);
StatusRuntimeException sre = (StatusRuntimeException) e.getCause();
assertEquals(io.grpc.Status.Code.NOT_FOUND, sre.getStatus().getCode());
sre = assertThrows(StatusRuntimeException.class, () -> handle.getResult());
assertEquals(io.grpc.Status.Code.NOT_FOUND, sre.getStatus().getCode());
}

@Test
Expand Down Expand Up @@ -127,7 +134,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx

// Try to get the result of an invalid update
WorkflowUpdateHandle<String> handle = workflowStub.getUpdateHandle(updateId, String.class);
assertThrows(Exception.class, () -> handle.getResultAsync().get());
assertThrows(ExecutionException.class, () -> handle.getResultAsync().get());

assertEquals(
"some-value",
Expand Down Expand Up @@ -192,9 +199,10 @@ public void updateWorkflowReuseOptions() throws ExecutionException, InterruptedE
workflowStub.startUpdate(updateOptions, 0, "some-value").getResultAsync().get());
testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId());
// Try to send another update request with the same update options
assertEquals(
"some-other-value",
workflowStub.startUpdate(updateOptions, 0, "some-other-value").getResultAsync().get());
WorkflowUpdateHandle<String> handle =
workflowStub.startUpdate(updateOptions, 0, "some-other-value");
assertEquals("some-other-value", handle.getResultAsync().get());
assertEquals("some-other-value", handle.getResult());

// Complete the workflow
workflowStub.update("complete", void.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void LongRunningWorkflowUpdateId() throws ExecutionException, Interrupted
}

@Test
public void WorkflowUpdateGetResultTimeout() throws ExecutionException, InterruptedException {
public void WorkflowUpdateGetResultAsyncTimeout() {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
String workflowType = BlockingWorkflow.class.getSimpleName();
WorkflowStub workflowStub =
Expand Down Expand Up @@ -123,6 +123,40 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup
assertEquals("complete", workflowStub.getResult(String.class));
}

@Test
public void WorkflowUpdateGetResultTimeout() {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
String workflowType = BlockingWorkflow.class.getSimpleName();
WorkflowStub workflowStub =
workflowClient.newUntypedWorkflowStub(
workflowType,
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()));

workflowStub.start();
SDKTestWorkflowRule.waitForOKQuery(workflowStub);

WorkflowUpdateHandle<String> handle =
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value");

// Verify get throws the correct exception in around the right amount of time
Stopwatch stopWatch = Stopwatch.createStarted();
assertThrows(
WorkflowUpdateTimeoutOrCancelledException.class,
() -> handle.getResult(2, TimeUnit.SECONDS));
stopWatch.stop();
long elapsedSeconds = stopWatch.elapsed(TimeUnit.SECONDS);
assertTrue(
"We shouldn't return too early or too late by the timeout, took "
+ elapsedSeconds
+ " seconds",
elapsedSeconds >= 1 && elapsedSeconds <= 3);

// Complete workflow, since the update is accepted it will not block completion
workflowStub.update("complete", void.class);
assertEquals("complete", workflowStub.getResult(String.class));
}

@WorkflowInterface
public interface BlockingWorkflow {
@WorkflowMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
WorkflowUpdateException.class,
() -> workflowStub.update("bad_update_name", String.class, 0, "Bad Update"));

// send an update request to a bad name through the async path
assertThrows(
WorkflowUpdateException.class,
() ->
workflowStub
.startUpdate("bad_update_name", WorkflowUpdateStage.COMPLETED, String.class, 0, "")
.getResult());

// send a bad update that will be rejected through the sync path
assertThrows(
WorkflowUpdateException.class,
Expand Down

0 comments on commit 2c2b5ca

Please sign in to comment.