Skip to content

Commit

Permalink
Add support for virtual workflow threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 25, 2024
1 parent 6efbde3 commit d6638f8
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 18 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ jobs:
- name: Set up Java
uses: actions/setup-java@v4
with:
java-version: "11"
java-version: |
11
21
distribution: "temurin"

- name: Set up Gradle
Expand Down
65 changes: 65 additions & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,35 @@ dependencies {
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}

// Temporal SDK supports Java 8 or later so to support virutal threads
// we need to compile the code with Java 21 and package it in a multi-release jar.
sourceSets {
java21 {
java {
srcDirs = ['src/main/java21']
}
}
}

dependencies {
java21Implementation files(sourceSets.main.output.classesDirs) { builtBy compileJava }
}

tasks.named('compileJava21Java') {
javaCompiler = javaToolchains.compilerFor {
languageVersion = JavaLanguageVersion.of(21)
}
}

jar {
into('META-INF/versions/21') {
from sourceSets.java21.output
}
manifest.attributes(
'Multi-Release': 'true'
)
}

task registerNamespace(type: JavaExec) {
getMainClass().set('io.temporal.internal.docker.RegisterTestNamespace')
classpath = sourceSets.test.runtimeClasspath
Expand All @@ -49,4 +78,40 @@ task testResourceIndependent(type: Test) {
includeCategories 'io.temporal.worker.IndependentResourceBasedTests'
maxParallelForks = 1
}
}

// To test the virtual thread support we need to run a seprate test suite with Java 21
testing {
suites {
virtualThreadTests(JvmTestSuite) {
useJUnit(junitVersion)
dependencies {
implementation project()
implementation "ch.qos.logback:logback-classic:${logbackVersion}"
implementation project(':temporal-testing')

implementation "junit:junit:${junitVersion}"
implementation "org.mockito:mockito-core:${mockitoVersion}"
implementation 'pl.pragmatists:JUnitParams:1.1.1'
implementation("com.jayway.jsonpath:json-path:$jsonPathVersion"){
exclude group: 'org.slf4j', module: 'slf4j-api'
}
}

targets {
all {
testTask.configure {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(21)
}
shouldRunAfter(test)
}
}
}
}
}
}

tasks.named('check') {
dependsOn(testing.suites.virtualThreadTests)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.task;

/**
* Function interface for {@link VirtualThreadDelegate#newVirtualThreadExecutor(ThreadConfigurator)}
* called for every thread created.
*/
@FunctionalInterface
public interface ThreadConfigurator {
/** Invoked for every thread created by {@link VirtualThreadDelegate#newVirtualThreadExecutor}. */
void configure(Thread t);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.task;

import java.util.concurrent.ExecutorService;

/**
* Internal delegate for virtual thread handling on JDK 21. This is a dummy version for reachability
* on JDK <21.
*/
public final class VirtualThreadDelegate {
public static ExecutorService newVirtualThreadExecutor(ThreadConfigurator configurator) {
throw new UnsupportedOperationException("Virtual threads not supported on JDK <21");
}

private VirtualThreadDelegate() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.uber.m3.tally.Scope;
import io.temporal.internal.sync.WorkflowThreadExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -34,11 +35,11 @@
* reasons. {@link ThreadPoolExecutor#getActiveCount()} take a pool-wide lock.
*/
class ActiveThreadReportingExecutor implements WorkflowThreadExecutor {
private final ThreadPoolExecutor workflowThreadPool;
private final ExecutorService workflowThreadPool;
private final Scope metricsScope;
private final AtomicInteger tasksInFlight = new AtomicInteger();

ActiveThreadReportingExecutor(ThreadPoolExecutor workflowThreadPool, Scope metricsScope) {
ActiveThreadReportingExecutor(ExecutorService workflowThreadPool, Scope metricsScope) {
this.workflowThreadPool = workflowThreadPool;
this.metricsScope = metricsScope;
}
Expand Down
34 changes: 20 additions & 14 deletions temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.client.WorkflowClientInternal;
import io.temporal.internal.sync.WorkflowThreadExecutor;
import io.temporal.internal.task.VirtualThreadDelegate;
import io.temporal.internal.worker.*;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.serviceclient.MetricsTag;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -55,7 +53,7 @@ public final class WorkerFactory {

private final Map<String, Worker> workers = new HashMap<>();
private final WorkflowClient workflowClient;
private final ThreadPoolExecutor workflowThreadPool;
private final ExecutorService workflowThreadPool;
private final WorkflowThreadExecutor workflowThreadExecutor;
private final AtomicInteger workflowThreadCounter = new AtomicInteger();
private final WorkerFactoryOptions factoryOptions;
Expand Down Expand Up @@ -98,15 +96,23 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor
.getMetricsScope()
.tagged(MetricsTag.defaultTags(namespace));

this.workflowThreadPool =
new ThreadPoolExecutor(
0,
this.factoryOptions.getMaxWorkflowThreadCount(),
1,
TimeUnit.MINUTES,
new SynchronousQueue<>());
this.workflowThreadPool.setThreadFactory(
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));
if (this.factoryOptions.isEnableVirtualWorkflowThreads()) {
this.workflowThreadPool =
VirtualThreadDelegate.newVirtualThreadExecutor(
(t) -> t.setName("workflow-thread-" + workflowThreadCounter.incrementAndGet()));
} else {
ThreadPoolExecutor workflowThreadPoolExecutor =
new ThreadPoolExecutor(
0,
this.factoryOptions.getMaxWorkflowThreadCount(),
1,
TimeUnit.MINUTES,
new SynchronousQueue<>());
workflowThreadPoolExecutor.setThreadFactory(
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));
this.workflowThreadPool = workflowThreadPoolExecutor;
}

this.workflowThreadExecutor =
new ActiveThreadReportingExecutor(this.workflowThreadPool, this.metricsScope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.worker;

import com.google.common.base.Preconditions;
import io.temporal.common.Experimental;
import io.temporal.common.interceptors.WorkerInterceptor;
import java.time.Duration;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -55,6 +56,7 @@ public static class Builder {
private int maxWorkflowThreadCount;
private WorkerInterceptor[] workerInterceptors;
private boolean enableLoggingInReplay;
private boolean enableVirtualWorkflowThreads;

private Builder() {}

Expand All @@ -68,6 +70,7 @@ private Builder(WorkerFactoryOptions options) {
this.maxWorkflowThreadCount = options.maxWorkflowThreadCount;
this.workerInterceptors = options.workerInterceptors;
this.enableLoggingInReplay = options.enableLoggingInReplay;
this.enableVirtualWorkflowThreads = options.enableVirtualWorkflowThreads;
}

/**
Expand Down Expand Up @@ -119,6 +122,19 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
return this;
}

/**
* Enable the use of Virtual Threads for workflow execution across all workers created by this
* factory. This includes cached workflows. This option is only supported for JDK >= 21. If set
* then {@link #setMaxWorkflowThreadCount(int)} is ignored.
*
* <p>Default is false
*/
@Experimental
public Builder setEnableVirtualWorkflowThreads(boolean enableVirtualWorkflowThreads) {
this.enableVirtualWorkflowThreads = enableVirtualWorkflowThreads;
return this;
}

/**
* @deprecated not used anymore by JavaSDK, this value doesn't have any effect
*/
Expand All @@ -134,6 +150,7 @@ public WorkerFactoryOptions build() {
workflowHostLocalTaskQueueScheduleToStartTimeout,
workerInterceptors,
enableLoggingInReplay,
enableVirtualWorkflowThreads,
false);
}

Expand All @@ -144,6 +161,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() {
workflowHostLocalTaskQueueScheduleToStartTimeout,
workerInterceptors == null ? new WorkerInterceptor[0] : workerInterceptors,
enableLoggingInReplay,
enableVirtualWorkflowThreads,
true);
}
}
Expand All @@ -153,13 +171,15 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() {
private final @Nullable Duration workflowHostLocalTaskQueueScheduleToStartTimeout;
private final WorkerInterceptor[] workerInterceptors;
private final boolean enableLoggingInReplay;
private final boolean enableVirtualWorkflowThreads;

private WorkerFactoryOptions(
int workflowCacheSize,
int maxWorkflowThreadCount,
@Nullable Duration workflowHostLocalTaskQueueScheduleToStartTimeout,
WorkerInterceptor[] workerInterceptors,
boolean enableLoggingInReplay,
boolean enableVirtualWorkflowThreads,
boolean validate) {
if (validate) {
Preconditions.checkState(workflowCacheSize >= 0, "negative workflowCacheSize");
Expand All @@ -186,6 +206,7 @@ private WorkerFactoryOptions(
workflowHostLocalTaskQueueScheduleToStartTimeout;
this.workerInterceptors = workerInterceptors;
this.enableLoggingInReplay = enableLoggingInReplay;
this.enableVirtualWorkflowThreads = enableVirtualWorkflowThreads;
}

public int getWorkflowCacheSize() {
Expand All @@ -209,6 +230,10 @@ public boolean isEnableLoggingInReplay() {
return enableLoggingInReplay;
}

public boolean isEnableVirtualWorkflowThreads() {
return enableVirtualWorkflowThreads;
}

/**
* @deprecated not used anymore by JavaSDK, this value doesn't have any effect
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* Internal delegate for virtual thread handling on JDK 21.
* This is the actual version compiled against JDK 21.
*/
public final class VirtualThreadDelegate {

public static ExecutorService newVirtualThreadExecutor(ThreadConfigurator configurator) {

return Executors.newThreadPerTaskExecutor(
r -> {
Thread.Builder threadBuilder = Thread.ofVirtual();
Thread t = threadBuilder.unstarted(r);
configurator.configure(t);
return t;
});
}

private VirtualThreadDelegate() {
}
}
Loading

0 comments on commit d6638f8

Please sign in to comment.