diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java index 8b60fd6444..6ccfb99f47 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/json/ProcessInstanceJobDescriptionDeserializer.java @@ -45,14 +45,14 @@ public ProcessInstanceJobDescription deserialize(JsonParser jp, DeserializationC ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.builder(); JsonNode node = jp.getCodec().readTree(jp); - ofNullable(node.get("id")).ifPresent(e -> builder.id(e.asText())); - ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.asText())); + ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue())); + ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.textValue())); ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt())); - ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.asText())); - ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.asText())); - ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.asText())); - ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.asText())); - ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.asText())); + ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.textValue())); + ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.textValue())); + ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.textValue())); + ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.textValue())); + ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.textValue())); String type = node.get("expirationTime").get("@type").asText(); try { diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java index 814db7e91d..aa8f331a8e 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java @@ -26,7 +26,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.kie.kogito.event.EventPublisher; import org.kie.kogito.event.job.JobInstanceDataEvent; +import org.kie.kogito.jobs.ProcessInstanceJobDescription; import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter; +import org.kie.kogito.jobs.service.api.Recipient; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobExecutionResponse; import org.kie.kogito.jobs.service.model.ScheduledJob; @@ -138,6 +140,15 @@ public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) { LOGGER.debug("Emmit in-vm publishJobStatusChange {}", jobDetails); try { ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails); + Recipient recipient = jobDetails.getRecipient().getRecipient(); + ProcessInstanceJobDescription jobDescription = recipient.getPayload().getJobDescription(); + + scheduledJob.setProcessInstanceId(jobDescription.processInstanceId()); + scheduledJob.setProcessId(jobDescription.processId()); + scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId()); + scheduledJob.setRootProcessId(jobDescription.rootProcessId()); + scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId()); + byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob); JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE, url + RestApiConstants.JOBS_PATH, diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java new file mode 100644 index 0000000000..c854f86f7f --- /dev/null +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.embedded; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.job.JobInstanceDataEvent; +import org.kie.kogito.jobs.DurationExpirationTime; +import org.kie.kogito.jobs.JobsService; +import org.kie.kogito.jobs.ProcessInstanceJobDescription; + +import io.quarkus.test.junit.QuarkusTest; + +import jakarta.inject.Inject; + +import static org.assertj.core.api.Assertions.assertThat; + +@QuarkusTest +public class EmbeddedJobsServiceTest { + + private static final String PROCESS_ID = "processId"; + private static final String PROCESS_INSTANCE_ID = "1"; + private static final String NODE_INSTANCE_ID = "node_1"; + private static final String ROOT_PROCESS_ID = "rootProcess"; + private static final String ROOT_PROCESS_INSTANCE_ID = "0"; + + @Inject + JobsService jobService; + + @Inject + TestEventPublisher publisher; + + @Test + public void testJobService() throws Exception { + + // testing only when we have the full lifecycle + CountDownLatch latch = new CountDownLatch(8); + publisher.setLatch(latch); + + ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder() + .generateId() + .timerId("-1") + .expirationTime(DurationExpirationTime.now()) + .processInstanceId(PROCESS_INSTANCE_ID) + .rootProcessInstanceId(null) + .processId(PROCESS_ID) + .rootProcessId(null) + .nodeInstanceId(NODE_INSTANCE_ID) + .build(); + jobService.scheduleProcessInstanceJob(description); + + ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.builder() + .generateId() + .timerId("-1") + .expirationTime(DurationExpirationTime.now()) + .processInstanceId(PROCESS_INSTANCE_ID) + .rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID) + .processId(PROCESS_ID) + .rootProcessId(ROOT_PROCESS_ID) + .nodeInstanceId(NODE_INSTANCE_ID) + .build(); + jobService.scheduleProcessInstanceJob(descriptionWRootProcess); + + latch.await(); + + List> events = publisher.getEvents(); + + Assertions.assertEquals(8, events.size()); + + Consumer> noRootProcess = e -> assertThat(e) + .hasFieldOrPropertyWithValue("kogitoRootProcessInstanceId", null) + .hasFieldOrPropertyWithValue("kogitoRootProcessId", null); + + Consumer> withRootProcess = e -> assertThat(e) + .hasFieldOrPropertyWithValue("kogitoRootProcessInstanceId", ROOT_PROCESS_INSTANCE_ID) + .hasFieldOrPropertyWithValue("kogitoRootProcessId", ROOT_PROCESS_ID); + + events.forEach(event -> { + assertThat(event) + .isInstanceOf(JobInstanceDataEvent.class) + .hasFieldOrPropertyWithValue("kogitoProcessId", PROCESS_ID) + .hasFieldOrPropertyWithValue("kogitoProcessInstanceId", PROCESS_INSTANCE_ID) + .satisfiesAnyOf(noRootProcess, withRootProcess); + }); + } + +} diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java deleted file mode 100644 index b602a9dff7..0000000000 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/EmbeddedJobsServiceTests.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.kie.kogito.jobs.embedded; - -import java.util.List; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.kie.kogito.event.DataEvent; -import org.kie.kogito.jobs.DurationExpirationTime; -import org.kie.kogito.jobs.JobsService; -import org.kie.kogito.jobs.ProcessInstanceJobDescription; - -import io.quarkus.test.junit.QuarkusTest; - -import jakarta.inject.Inject; - -@QuarkusTest -public class EmbeddedJobsServiceTests { - - @Inject - JobsService jobService; - - @Inject - TestEventPublisher publisher; - - @Test - public void testJobService() throws Exception { - // testing only we have the full lifecycle - publisher.expectedEvents(2); - - ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder() - .generateId() - .timerId("-1") - .expirationTime(DurationExpirationTime.now()) - .processInstanceId("1") - .rootProcessInstanceId(null) - .processId("processId") - .rootProcessId(null) - .nodeInstanceId("node_1") - .build(); - jobService.scheduleProcessInstanceJob(description); - - List> events = publisher.getEvents(); - Assertions.assertEquals(2, events.size()); - - } - -} diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java index f5b43f7afd..f077205d54 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/test/java/org/kie/kogito/jobs/embedded/TestEventPublisher.java @@ -54,8 +54,8 @@ public void publish(Collection> events) { events.forEach(e -> latch.countDown()); } - public void expectedEvents(int numOfEvents) { - latch = new CountDownLatch(numOfEvents); + public void setLatch(CountDownLatch latch) { + this.latch = latch; } }