diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c2c7fbf62..3217d8126 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,9 @@ jobs: - name: Set up Java uses: actions/setup-java@v4 with: - java-version: "11" + java-version: | + 11 + 21 distribution: "temurin" - name: Set up Gradle diff --git a/temporal-sdk/build.gradle b/temporal-sdk/build.gradle index 050612a25..c794d7455 100644 --- a/temporal-sdk/build.gradle +++ b/temporal-sdk/build.gradle @@ -31,6 +31,35 @@ dependencies { testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" } +// Temporal SDK supports Java 8 or later so to support virutal threads +// we need to compile the code with Java 21 and package it in a multi-release jar. +sourceSets { + java21 { + java { + srcDirs = ['src/main/java21'] + } + } +} + +dependencies { + java21Implementation files(sourceSets.main.output.classesDirs) { builtBy compileJava } +} + +tasks.named('compileJava21Java') { + javaCompiler = javaToolchains.compilerFor { + languageVersion = JavaLanguageVersion.of(21) + } +} + +jar { + into('META-INF/versions/21') { + from sourceSets.java21.output + } + manifest.attributes( + 'Multi-Release': 'true' + ) +} + task registerNamespace(type: JavaExec) { getMainClass().set('io.temporal.internal.docker.RegisterTestNamespace') classpath = sourceSets.test.runtimeClasspath @@ -49,4 +78,40 @@ task testResourceIndependent(type: Test) { includeCategories 'io.temporal.worker.IndependentResourceBasedTests' maxParallelForks = 1 } +} + +// To test the virtual thread support we need to run a seprate test suite with Java 21 +testing { + suites { + virtualThreadTests(JvmTestSuite) { + useJUnit(junitVersion) + dependencies { + implementation project() + implementation "ch.qos.logback:logback-classic:${logbackVersion}" + implementation project(':temporal-testing') + + implementation "junit:junit:${junitVersion}" + implementation "org.mockito:mockito-core:${mockitoVersion}" + implementation 'pl.pragmatists:JUnitParams:1.1.1' + implementation("com.jayway.jsonpath:json-path:$jsonPathVersion"){ + exclude group: 'org.slf4j', module: 'slf4j-api' + } + } + + targets { + all { + testTask.configure { + javaLauncher = javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(21) + } + shouldRunAfter(test) + } + } + } + } + } +} + +tasks.named('check') { + dependsOn(testing.suites.virtualThreadTests) } \ No newline at end of file diff --git a/temporal-sdk/src/main/java/io/temporal/internal/task/ThreadConfigurator.java b/temporal-sdk/src/main/java/io/temporal/internal/task/ThreadConfigurator.java new file mode 100644 index 000000000..dda32ae97 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/task/ThreadConfigurator.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.task; + +/** + * Function interface for {@link VirtualThreadDelegate#newVirtualThreadExecutor(ThreadConfigurator)} + * called for every thread created. + */ +@FunctionalInterface +public interface ThreadConfigurator { + /** Invoked for every thread created by {@link VirtualThreadDelegate#newVirtualThreadExecutor}. */ + void configure(Thread t); +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/task/VirtualThreadDelegate.java b/temporal-sdk/src/main/java/io/temporal/internal/task/VirtualThreadDelegate.java new file mode 100644 index 000000000..3ddaed11a --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/task/VirtualThreadDelegate.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.task; + +import java.util.concurrent.ExecutorService; + +/** + * Internal delegate for virtual thread handling on JDK 21. This is a dummy version for reachability + * on JDK <21. + */ +public final class VirtualThreadDelegate { + public static ExecutorService newVirtualThreadExecutor(ThreadConfigurator configurator) { + throw new UnsupportedOperationException("Virtual threads not supported on JDK <21"); + } + + private VirtualThreadDelegate() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/ActiveThreadReportingExecutor.java b/temporal-sdk/src/main/java/io/temporal/worker/ActiveThreadReportingExecutor.java index 1a32eade0..0aad84ffa 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/ActiveThreadReportingExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/ActiveThreadReportingExecutor.java @@ -22,6 +22,7 @@ import com.uber.m3.tally.Scope; import io.temporal.internal.sync.WorkflowThreadExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; @@ -34,11 +35,11 @@ * reasons. {@link ThreadPoolExecutor#getActiveCount()} take a pool-wide lock. */ class ActiveThreadReportingExecutor implements WorkflowThreadExecutor { - private final ThreadPoolExecutor workflowThreadPool; + private final ExecutorService workflowThreadPool; private final Scope metricsScope; private final AtomicInteger tasksInFlight = new AtomicInteger(); - ActiveThreadReportingExecutor(ThreadPoolExecutor workflowThreadPool, Scope metricsScope) { + ActiveThreadReportingExecutor(ExecutorService workflowThreadPool, Scope metricsScope) { this.workflowThreadPool = workflowThreadPool; this.metricsScope = metricsScope; } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 4de582ac0..30874c3dd 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -29,16 +29,14 @@ import io.temporal.common.converter.DataConverter; import io.temporal.internal.client.WorkflowClientInternal; import io.temporal.internal.sync.WorkflowThreadExecutor; +import io.temporal.internal.task.VirtualThreadDelegate; import io.temporal.internal.worker.*; import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.serviceclient.MetricsTag; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -55,7 +53,7 @@ public final class WorkerFactory { private final Map workers = new HashMap<>(); private final WorkflowClient workflowClient; - private final ThreadPoolExecutor workflowThreadPool; + private final ExecutorService workflowThreadPool; private final WorkflowThreadExecutor workflowThreadExecutor; private final AtomicInteger workflowThreadCounter = new AtomicInteger(); private final WorkerFactoryOptions factoryOptions; @@ -98,15 +96,23 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor .getMetricsScope() .tagged(MetricsTag.defaultTags(namespace)); - this.workflowThreadPool = - new ThreadPoolExecutor( - 0, - this.factoryOptions.getMaxWorkflowThreadCount(), - 1, - TimeUnit.MINUTES, - new SynchronousQueue<>()); - this.workflowThreadPool.setThreadFactory( - r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet())); + if (this.factoryOptions.isEnableVirtualWorkflowThreads()) { + this.workflowThreadPool = + VirtualThreadDelegate.newVirtualThreadExecutor( + (t) -> t.setName("workflow-thread-" + workflowThreadCounter.incrementAndGet())); + } else { + ThreadPoolExecutor workflowThreadPoolExecutor = + new ThreadPoolExecutor( + 0, + this.factoryOptions.getMaxWorkflowThreadCount(), + 1, + TimeUnit.MINUTES, + new SynchronousQueue<>()); + workflowThreadPoolExecutor.setThreadFactory( + r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet())); + this.workflowThreadPool = workflowThreadPoolExecutor; + } + this.workflowThreadExecutor = new ActiveThreadReportingExecutor(this.workflowThreadPool, this.metricsScope); diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java index a442a60f0..172ce0efd 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java @@ -21,6 +21,7 @@ package io.temporal.worker; import com.google.common.base.Preconditions; +import io.temporal.common.Experimental; import io.temporal.common.interceptors.WorkerInterceptor; import java.time.Duration; import javax.annotation.Nullable; @@ -55,6 +56,7 @@ public static class Builder { private int maxWorkflowThreadCount; private WorkerInterceptor[] workerInterceptors; private boolean enableLoggingInReplay; + private boolean enableVirtualWorkflowThreads; private Builder() {} @@ -68,6 +70,7 @@ private Builder(WorkerFactoryOptions options) { this.maxWorkflowThreadCount = options.maxWorkflowThreadCount; this.workerInterceptors = options.workerInterceptors; this.enableLoggingInReplay = options.enableLoggingInReplay; + this.enableVirtualWorkflowThreads = options.enableVirtualWorkflowThreads; } /** @@ -119,6 +122,19 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + /** + * Enable the use of Virtual Threads for workflow execution across all workers created by this + * factory. This includes cached workflows. This option is only supported for JDK >= 21. If set + * then {@link #setMaxWorkflowThreadCount(int)} is ignored. + * + *

Default is false + */ + @Experimental + public Builder setEnableVirtualWorkflowThreads(boolean enableVirtualWorkflowThreads) { + this.enableVirtualWorkflowThreads = enableVirtualWorkflowThreads; + return this; + } + /** * @deprecated not used anymore by JavaSDK, this value doesn't have any effect */ @@ -134,6 +150,7 @@ public WorkerFactoryOptions build() { workflowHostLocalTaskQueueScheduleToStartTimeout, workerInterceptors, enableLoggingInReplay, + enableVirtualWorkflowThreads, false); } @@ -144,6 +161,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { workflowHostLocalTaskQueueScheduleToStartTimeout, workerInterceptors == null ? new WorkerInterceptor[0] : workerInterceptors, enableLoggingInReplay, + enableVirtualWorkflowThreads, true); } } @@ -153,6 +171,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { private final @Nullable Duration workflowHostLocalTaskQueueScheduleToStartTimeout; private final WorkerInterceptor[] workerInterceptors; private final boolean enableLoggingInReplay; + private final boolean enableVirtualWorkflowThreads; private WorkerFactoryOptions( int workflowCacheSize, @@ -160,6 +179,7 @@ private WorkerFactoryOptions( @Nullable Duration workflowHostLocalTaskQueueScheduleToStartTimeout, WorkerInterceptor[] workerInterceptors, boolean enableLoggingInReplay, + boolean enableVirtualWorkflowThreads, boolean validate) { if (validate) { Preconditions.checkState(workflowCacheSize >= 0, "negative workflowCacheSize"); @@ -186,6 +206,7 @@ private WorkerFactoryOptions( workflowHostLocalTaskQueueScheduleToStartTimeout; this.workerInterceptors = workerInterceptors; this.enableLoggingInReplay = enableLoggingInReplay; + this.enableVirtualWorkflowThreads = enableVirtualWorkflowThreads; } public int getWorkflowCacheSize() { @@ -209,6 +230,10 @@ public boolean isEnableLoggingInReplay() { return enableLoggingInReplay; } + public boolean isEnableVirtualWorkflowThreads() { + return enableVirtualWorkflowThreads; + } + /** * @deprecated not used anymore by JavaSDK, this value doesn't have any effect */ diff --git a/temporal-sdk/src/main/java21/io/temporal/internal/task/VirtualThreadDelegate.java b/temporal-sdk/src/main/java21/io/temporal/internal/task/VirtualThreadDelegate.java new file mode 100644 index 000000000..11f75649e --- /dev/null +++ b/temporal-sdk/src/main/java21/io/temporal/internal/task/VirtualThreadDelegate.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.task; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * Internal delegate for virtual thread handling on JDK 21. + * This is the actual version compiled against JDK 21. + */ +public final class VirtualThreadDelegate { + + public static ExecutorService newVirtualThreadExecutor(ThreadConfigurator configurator) { + + return Executors.newThreadPerTaskExecutor( + r -> { + Thread.Builder threadBuilder = Thread.ofVirtual(); + Thread t = threadBuilder.unstarted(r); + configurator.configure(t); + return t; + }); + } + + private VirtualThreadDelegate() { + } +} \ No newline at end of file diff --git a/temporal-sdk/src/virtualThreadTests/java/io/temporal/worker/WorkerWithVirtualThreadsStressTests.java b/temporal-sdk/src/virtualThreadTests/java/io/temporal/worker/WorkerWithVirtualThreadsStressTests.java new file mode 100644 index 000000000..2beb6936e --- /dev/null +++ b/temporal-sdk/src/virtualThreadTests/java/io/temporal/worker/WorkerWithVirtualThreadsStressTests.java @@ -0,0 +1,255 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker; + +import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE; +import static org.junit.Assert.assertEquals; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.testing.TestEnvironmentOptions; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.internal.ExternalServiceTestConfigurator; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class WorkerWithVirtualThreadsStressTests { + + @Parameterized.Parameter public boolean useExternalService; + + @Parameterized.Parameters(name = "{1}") + public static Object[] data() { + if (!ExternalServiceTestConfigurator.isUseExternalService()) { + return new Object[][] {{false, "TestService"}}; + } else { + return new Object[][] {{true, "Docker"}}; + } + } + + @Parameterized.Parameter(1) + public String testType; + + @Rule public TestName testName = new TestName(); + + @Test + public void longHistoryWorkflowsCompleteSuccessfully() throws InterruptedException { + + // Arrange + String taskQueueName = "veryLongWorkflow"; + + TestEnvironmentWrapper wrapper = + new TestEnvironmentWrapper( + WorkerFactoryOptions.newBuilder().setEnableVirtualWorkflowThreads(true).build()); + WorkerFactory factory = wrapper.getWorkerFactory(); + Worker worker = factory.newWorker(taskQueueName, WorkerOptions.newBuilder().build()); + worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); + worker.registerActivitiesImplementations(new ActivitiesImpl()); + factory.start(); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder() + .setTaskQueue(taskQueueName) + .setWorkflowRunTimeout(Duration.ofSeconds(250)) + .setWorkflowTaskTimeout(Duration.ofSeconds(30)) + .build(); + WorkflowStub workflow = + wrapper.getWorkflowClient().newUntypedWorkflowStub("ActivitiesWorkflow", workflowOptions); + + // Act + // This will yield around 10000 events which is above the page limit returned by the server. + WorkflowParams w = new WorkflowParams(); + w.TemporalSleepSeconds = 0; + w.ChainSequence = 50; + w.ConcurrentCount = 50; + w.PayloadSizeBytes = 10000; + w.TaskQueueName = taskQueueName; + + workflow.start(w); + assertEquals("I'm done", workflow.getResult(String.class)); + wrapper.close(); + } + + @Test(timeout = 60000) + public void highConcurrentWorkflowsVirtualThreads() { + // Arrange + String taskQueueName = "veryLongWorkflow"; + + TestEnvironmentWrapper wrapper = + new TestEnvironmentWrapper( + WorkerFactoryOptions.newBuilder().setEnableVirtualWorkflowThreads(true).build()); + WorkerFactory factory = wrapper.getWorkerFactory(); + Worker worker = factory.newWorker(taskQueueName, WorkerOptions.newBuilder().build()); + worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); + worker.registerActivitiesImplementations(new ActivitiesImpl()); + factory.start(); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder() + .setTaskQueue(taskQueueName) + .setWorkflowRunTimeout(Duration.ofSeconds(250)) + .setWorkflowTaskTimeout(Duration.ofSeconds(30)) + .build(); + WorkflowStub workflow = + wrapper.getWorkflowClient().newUntypedWorkflowStub("ActivitiesWorkflow", workflowOptions); + + // Act + WorkflowParams w = new WorkflowParams(); + w.TemporalSleepSeconds = 0; + w.ChainSequence = 1; + w.ConcurrentCount = 1000; + w.PayloadSizeBytes = 100; + w.TaskQueueName = taskQueueName; + + workflow.start(w); + assertEquals("I'm done", workflow.getResult(String.class)); + wrapper.close(); + } + + // Todo: refactor TestEnvironment to toggle between real and test service. + private class TestEnvironmentWrapper { + + private TestWorkflowEnvironment testEnv; + WorkflowServiceStubs service; + private WorkerFactory factory; + + public TestEnvironmentWrapper(WorkerFactoryOptions options) { + options = WorkerFactoryOptions.newBuilder(options).validateAndBuildWithDefaults(); + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build(); + if (ExternalServiceTestConfigurator.isUseExternalService()) { + service = + WorkflowServiceStubs.newServiceStubs( + WorkflowServiceStubsOptions.newBuilder() + .setTarget(ExternalServiceTestConfigurator.getTemporalServiceAddress()) + .build()); + WorkflowClient client = WorkflowClient.newInstance(service, clientOptions); + factory = WorkerFactory.newInstance(client, options); + } else { + TestEnvironmentOptions testOptions = + TestEnvironmentOptions.newBuilder() + .setWorkflowClientOptions(clientOptions) + .setWorkerFactoryOptions(options) + .build(); + testEnv = TestWorkflowEnvironment.newInstance(testOptions); + } + } + + private WorkerFactory getWorkerFactory() { + return useExternalService ? factory : testEnv.getWorkerFactory(); + } + + private WorkflowClient getWorkflowClient() { + return useExternalService ? factory.getWorkflowClient() : testEnv.getWorkflowClient(); + } + + private void close() { + if (useExternalService) { + factory.shutdownNow(); + factory.awaitTermination(5, TimeUnit.SECONDS); + service.shutdownNow(); + service.awaitTermination(5, TimeUnit.SECONDS); + } else { + testEnv.close(); + } + } + } + + public static class WorkflowParams { + + public int ChainSequence; + public int ConcurrentCount; + public String TaskQueueName; + public int PayloadSizeBytes; + public int TemporalSleepSeconds; + } + + @WorkflowInterface + public interface ActivitiesWorkflow { + + @WorkflowMethod + String execute(WorkflowParams params); + } + + public static class ActivitiesWorkflowImpl implements ActivitiesWorkflow { + + @Override + public String execute(WorkflowParams params) { + SleepActivity activity = + Workflow.newActivityStub( + SleepActivity.class, + ActivityOptions.newBuilder() + .setTaskQueue(params.TaskQueueName) + .setScheduleToStartTimeout(Duration.ofMinutes(1)) + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .setHeartbeatTimeout(Duration.ofSeconds(20)) + .build()); + + for (int i = 0; i < params.ChainSequence; i++) { + List> promises = new ArrayList<>(); + for (int j = 0; j < params.ConcurrentCount; j++) { + byte[] bytes = new byte[params.PayloadSizeBytes]; + new Random().nextBytes(bytes); + Promise promise = Async.procedure(activity::sleep, i, j, bytes); + promises.add(promise); + } + + for (Promise promise : promises) { + promise.get(); + } + + Workflow.sleep(Duration.ofSeconds(params.TemporalSleepSeconds)); + } + return "I'm done"; + } + } + + @ActivityInterface + public interface SleepActivity { + void sleep(int chain, int concurrency, byte[] bytes); + } + + public static class ActivitiesImpl implements SleepActivity { + private static final Logger log = LoggerFactory.getLogger("sleep-activity"); + + @Override + public void sleep(int chain, int concurrency, byte[] bytes) { + log.trace("sleep called"); + } + } +} diff --git a/temporal-shaded/build.gradle b/temporal-shaded/build.gradle index 316c51225..ae262b159 100644 --- a/temporal-shaded/build.gradle +++ b/temporal-shaded/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'com.github.johnrengelman.shadow' version '7.1.2' + id 'com.gradleup.shadow' version '8.3.3' } description = '''Temporal Java SDK and Testing Framework with gRPC, Protobuf 3 and Guava shaded'''