Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix #2193] ExecuteAfter mutation to return Json #2194

Merged
merged 2 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;

import com.fasterxml.jackson.databind.JsonNode;

public interface KogitoRuntimeClient {

CompletableFuture<String> executeProcessInstance(ProcessDefinition definition, ExecuteArgs args);
CompletableFuture<JsonNode> executeProcessInstance(ProcessDefinition definition, ExecuteArgs args);

CompletableFuture<String> abortProcessInstance(String serviceURL, ProcessInstance processInstance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.index.model.Job;
Expand Down Expand Up @@ -116,13 +117,20 @@ public CompletableFuture sendDeleteClientRequest(WebClient webClient, String req
return future;
}

protected void asyncHttpResponseTreatment(AsyncResult<HttpResponse<Buffer>> res, CompletableFuture future, String logMessage) {
if (res.succeeded() && (res.result().statusCode() == 200 || res.result().statusCode() == 201)) {
String jsonMessage = res.result().bodyAsString();
protected void asyncHttpResponseTreatment(AsyncResult<HttpResponse<Buffer>> res, CompletableFuture<String> future, String logMessage) {
asyncHttpResponseTreatment(res, future, result -> {
String jsonMessage = result.bodyAsString();
LOGGER.trace("Result {}", jsonMessage);
future.complete(jsonMessage != null ? jsonMessage : "Successfully performed: " + logMessage);
return jsonMessage != null ? jsonMessage : "Successfully performed: " + logMessage;
}, logMessage);

}

protected <T> void asyncHttpResponseTreatment(AsyncResult<HttpResponse<Buffer>> res, CompletableFuture<T> future, Function<HttpResponse<Buffer>, T> function, String logMessage) {
if (res.succeeded() && (res.result().statusCode() == 200 || res.result().statusCode() == 201)) {
future.complete(function.apply(res.result()));
} else {
LOGGER.trace("Error {}", logMessage);
LOGGER.info("Error {}", logMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are logging an error, why not to use do LOGGER.error ?

future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result())));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Map<String, DataFetcher<CompletableFuture<?>>> mutations(AbstractGraphQLS
return Map.of("ExecuteAfter", env -> sharedOutput(schemaManager, env));
}

private CompletableFuture<String> sharedOutput(AbstractGraphQLSchemaManager schemaManager, DataFetchingEnvironment env) {
private CompletableFuture<JsonNode> sharedOutput(AbstractGraphQLSchemaManager schemaManager, DataFetchingEnvironment env) {
DataIndexStorageService cacheService = schemaManager.getCacheService();
ProcessDefinitionKey key = new ProcessDefinitionKey(mandatoryArgument(env, "processId"), mandatoryArgument(env, "processVersion"));
ProcessDefinition processDefinition = cacheService.getProcessDefinitionStorage().get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;

import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
Expand Down Expand Up @@ -76,13 +78,14 @@ class KogitoRuntimeClientImpl extends KogitoRuntimeCommonClient implements Kogit
private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeClientImpl.class);

@Override
public CompletableFuture<String> executeProcessInstance(ProcessDefinition definition, ExecuteArgs args) {
CompletableFuture<String> future = new CompletableFuture<>();
public CompletableFuture<JsonNode> executeProcessInstance(ProcessDefinition definition, ExecuteArgs args) {
CompletableFuture<JsonNode> future = new CompletableFuture<>();
HttpRequest<Buffer> request = getWebClient(CommonUtils.getServiceUrl(definition.getEndpoint(), definition.getId())).post("/" + definition.getId());
if (args.businessKey() != null) {
request.addQueryParam("businessKey", args.businessKey());
}
request.sendJson(args.input(), res -> asyncHttpResponseTreatment(res, future, "START ProcessInstance of type " + definition.getId()));
request.sendJson(args.input(), res -> asyncHttpResponseTreatment(res, future, result -> result.bodyAsJson(JsonNode.class),
"START ProcessInstance of type " + definition.getId()));
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.kie.kogito.source.files.SourceFilesProvider;
import org.kie.kogito.svg.ProcessSvgService;

import com.fasterxml.jackson.databind.JsonNode;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -279,7 +281,7 @@ private String executeOnProcessInstance(String processId, String processInstance
}

@Override
public CompletableFuture<String> executeProcessInstance(ProcessDefinition definition, ExecuteArgs args) {
public CompletableFuture<JsonNode> executeProcessInstance(ProcessDefinition definition, ExecuteArgs args) {
Process<?> process = processes != null ? processes.processById(definition.getId()) : null;
if (process == null) {
throw new DataIndexServiceException(String.format("Unable to find Process with id %s to perform the operation requested", definition.getId()));
Expand All @@ -288,7 +290,6 @@ public CompletableFuture<String> executeProcessInstance(ProcessDefinition defini
m.update(JsonObjectUtils.convertValue(args.input(), Map.class));
org.kie.kogito.process.ProcessInstance<? extends Model> pi = process.createInstance(m);
pi.start();
return CompletableFuture.completedFuture(
String.format(SUCCESSFULLY_OPERATION_MESSAGE, "Started Process Instance with id: " + pi.id()));
return CompletableFuture.completedFuture(JsonObjectUtils.fromValue(pi.variables().toMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ public class KogitoAddonRuntimeClientImplTest {
@Mock
private ProcessError error;

@Mock
private Object variables;

@Mock
Instance<Application> applicationInstance;

Expand All @@ -148,14 +145,15 @@ public void setup() {
lenient().when(process.instances()).thenReturn(instances);
lenient().when(instances.findById(anyString())).thenReturn(Optional.of(processInstance));
lenient().when(processInstance.error()).thenReturn(Optional.of(error));
lenient().when(processInstance.variables()).thenReturn(variables);
lenient().when(processInstance.variables()).thenReturn(model);
lenient().when(processInstance.id()).thenReturn(PROCESS_INSTANCE_ID);
lenient().when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ERROR);
lenient().when(error.failedNodeId()).thenReturn(NODE_ID_ERROR);
lenient().when(error.errorMessage()).thenReturn("Test error message");
lenient().when(application.unitOfWorkManager()).thenReturn(new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()));
lenient().when(applicationInstance.isResolvable()).thenReturn(true);
lenient().when(applicationInstance.get()).thenReturn(application);
lenient().when(model.toMap()).thenReturn(Map.of("name", "javierito"));

client = spy(new KogitoAddonRuntimeClientImpl(processSvgServiceInstance, sourceFilesProvider, processesInstance, applicationInstance));
client.setGatewayTargetUrl(Optional.empty());
Expand Down