Skip to content

Commit

Permalink
Merge pull request #261 from ascopes/task/refactor-executor
Browse files Browse the repository at this point in the history
Refactor executor out into separate class that is individually testable
  • Loading branch information
ascopes authored Jun 23, 2024
2 parents 18b9943 + 9530da6 commit a02b92b
Show file tree
Hide file tree
Showing 6 changed files with 510 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@

import static java.util.function.Predicate.not;

import io.github.ascopes.protobufmavenplugin.utils.ConcurrentExecutor;
import io.github.ascopes.protobufmavenplugin.utils.FileUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
Expand All @@ -46,39 +40,23 @@
* extracted to a location within the Maven build directory to enable {@code protoc} and other
* plugins to be able to view them without needing access to the Java NIO file system APIs.
*
* <p>This object maintains an internal work stealing thread pool to enable performing IO
* concurrently. This should be closed when shutting down this application to prevent leaking
* resources.
*
* @author Ashley Scopes
*/
@Named
public final class ProtoSourceResolver implements AutoCloseable {
public final class ProtoSourceResolver {

private static final Logger log = LoggerFactory.getLogger(ProtoArchiveExtractor.class);

private final ConcurrentExecutor concurrentExecutor;
private final ProtoArchiveExtractor protoArchiveExtractor;
private final ExecutorService executorService;

@Inject
public ProtoSourceResolver(ProtoArchiveExtractor protoArchiveExtractor) {
var concurrency = Runtime.getRuntime().availableProcessors() * 8;
public ProtoSourceResolver(
ConcurrentExecutor concurrentExecutor,
ProtoArchiveExtractor protoArchiveExtractor
) {
this.concurrentExecutor = concurrentExecutor;
this.protoArchiveExtractor = protoArchiveExtractor;
executorService = Executors.newWorkStealingPool(concurrency);
}

@PreDestroy
@SuppressWarnings({"auto-closeable", "ResultOfMethodCallIgnored"})
@Override
public void close() {
log.debug("Shutting down executor service");
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
log.warn("Shutdown was interrupted and will be aborted", ex);
Thread.currentThread().interrupt();
}
}

public Optional<ProtoFileListing> createProtoFileListing(Path path) throws IOException {
Expand Down Expand Up @@ -112,44 +90,18 @@ public Optional<ProtoFileListing> createProtoFileListing(Path path) throws IOExc

public Collection<ProtoFileListing> createProtoFileListings(
Collection<Path> originalPaths
) throws IOException {
var results = new LinkedHashSet<ProtoFileListing>();
var exceptions = new ArrayList<Exception>();

originalPaths
) {
return originalPaths
.stream()
// GH-132: Normalize to ensure different paths to the same file do not
// get duplicated across more than one extraction site.
.map(FileUtils::normalize)
// GH-132: Avoid running multiple times on the same location.
.distinct()
.map(this::submitProtoFileListingTask)
// Terminal operation to ensure all are scheduled prior to joining.
.collect(Collectors.toUnmodifiableList())
.forEach(task -> {
try {
task.get().ifPresent(results::add);
} catch (ExecutionException | InterruptedException ex) {
exceptions.add(ex);
}
});

if (!exceptions.isEmpty()) {
var causeIterator = exceptions.iterator();
var ex = new IOException(
"Failed to discover protobuf sources in some locations", causeIterator.next()
);
causeIterator.forEachRemaining(ex::addSuppressed);
throw ex;
}

return results;
}

private FutureTask<Optional<ProtoFileListing>> submitProtoFileListingTask(Path path) {
log.debug("Searching for proto files in '{}' asynchronously...", path);
var task = new FutureTask<>(() -> createProtoFileListing(path));
executorService.submit(task);
return task;
.map(path -> concurrentExecutor.submit(() -> createProtoFileListing(path)))
.collect(concurrentExecutor.awaiting())
.stream()
.flatMap(Optional::stream)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (C) 2023 - 2024, Ashley Scopes.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.ascopes.protobufmavenplugin.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Helper component that allows scheduling IO-bound tasks within a thread pool.
*
* @author Ashley Scopes
* @since 2.2.0
*/
@Named
public final class ConcurrentExecutor {

private static final Logger log = LoggerFactory.getLogger(ConcurrentExecutor.class);
private final ExecutorService executorService;

public ConcurrentExecutor() {
ExecutorService executorService;

try {
log.debug("Trying to create new Loom virtual thread pool");
executorService = (ExecutorService) Executors.class
.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(null);

log.debug("Loom virtual thread pool creation was successful!");

} catch (Exception ex) {
var concurrency = Runtime.getRuntime().availableProcessors() * 8;
log.debug(
"Falling back to new work-stealing thread pool (concurrency={}, Loom is unavailable)",
concurrency
);
executorService = Executors.newWorkStealingPool(concurrency);
}

this.executorService = executorService;
}

/**
* Destroy the internal thread pool.
*
* @throws InterruptedException if destruction timed out or the thread was interrupted.
*/
@PreDestroy
@SuppressWarnings({"ResultOfMethodCallIgnored", "unused"})
public void destroy() throws InterruptedException {
log.debug("Shutting down executor...");
executorService.shutdown();
log.debug("Awaiting executor termination...");

// If this fails, then we can't do much about it. Force shutdown and hope threads don't
// deadlock. Not going to bother adding complicated handling here as if we get stuck, we
// likely have far bigger problems to deal with.
executorService.awaitTermination(10, TimeUnit.SECONDS);
var remaining = executorService.shutdownNow();
log.debug("Shutdown ended, stubborn remaining tasks that will be orphaned: {}", remaining);
}

public <R> FutureTask<R> submit(Callable<R> task) {
var futureTask = new FutureTask<>(task);
executorService.submit(futureTask);
return futureTask;
}

/**
* Return a reactive collector of all the results of a stream of scheduled tasks.
*
* @param <R> the task return type.
* @return the collector.
* @throws MultipleFailuresException if any of the results raised exceptions. All results are
* collected prior to this being raised.
*/
public <R> Collector<FutureTask<R>, ?, List<R>> awaiting() {
return Collectors.collectingAndThen(Collectors.toUnmodifiableList(), this::await);
}

private <R> List<R> await(List<FutureTask<R>> scheduledTasks) {
try {
var results = new ArrayList<R>();
var exceptions = new ArrayList<Throwable>();

for (var task : scheduledTasks) {
try {
results.add(task.get());
} catch (ExecutionException ex) {
exceptions.add(ex.getCause());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return results;
}
}

if (!exceptions.isEmpty()) {
throw MultipleFailuresException.create(exceptions);
}

return results;

} finally {
// Interrupt anything that didn't complete if we get interrupted on the OS level.
for (var task : scheduledTasks) {
task.cancel(true);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2023 - 2024, Ashley Scopes.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.ascopes.protobufmavenplugin.utils;

import java.util.List;
import java.util.NoSuchElementException;

/**
* Exception that gets raised when one or more concurrent tasks fail.
*
* @author Ashley Scopes
* @since 2.2.0
*/
public final class MultipleFailuresException extends RuntimeException {

private MultipleFailuresException(String message, Throwable cause) {
super(message, cause, true, false);
}

/**
* Initialise this exception.
*
* @param exceptions the exceptions that were thrown. Must have at least one item.
* @return the wrapper exception.
* @throws NoSuchElementException if an empty list was provided.
*/
public static MultipleFailuresException create(List<? extends Throwable> exceptions) {
var causeIterator = exceptions.iterator();
var ex = new MultipleFailuresException("Multiple failures occurred", causeIterator.next());
causeIterator.forEachRemaining(ex::addSuppressed);
return ex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.github.ascopes.protobufmavenplugin.fixtures;

import java.util.Random;
import java.util.UUID;

/**
Expand All @@ -25,11 +26,17 @@
*/
public final class RandomFixtures {

private static final Random random = new Random();

private RandomFixtures() {
// Static-only class.
}

public static String someText() {
return UUID.randomUUID().toString();
}

public static int someInt() {
return random.nextInt();
}
}
Loading

0 comments on commit a02b92b

Please sign in to comment.