Skip to content

Commit

Permalink
incubator-kie-issues#844: JobInstanceDataEvent are loosing some val…
Browse files Browse the repository at this point in the history
…ues when published by `JobInVMEventPublisher`. (apache#1963)
  • Loading branch information
pefernan authored and rgdoliveira committed Jan 25, 2024
1 parent d1af059 commit d5944dd
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public ProcessInstanceJobDescription deserialize(JsonParser jp, DeserializationC
ProcessInstanceJobDescriptionBuilder builder = ProcessInstanceJobDescription.builder();

JsonNode node = jp.getCodec().readTree(jp);
ofNullable(node.get("id")).ifPresent(e -> builder.id(e.asText()));
ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.asText()));
ofNullable(node.get("id")).ifPresent(e -> builder.id(e.textValue()));
ofNullable(node.get("timerId")).ifPresent(e -> builder.timerId(e.textValue()));
ofNullable(node.get("priority")).ifPresent(e -> builder.priority(e.asInt()));
ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.asText()));
ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.asText()));
ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.asText()));
ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.asText()));
ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.asText()));
ofNullable(node.get("processInstanceId")).ifPresent(e -> builder.processInstanceId(e.textValue()));
ofNullable(node.get("rootProcessInstanceId")).ifPresent(e -> builder.rootProcessInstanceId(e.textValue()));
ofNullable(node.get("processId")).ifPresent(e -> builder.processId(e.textValue()));
ofNullable(node.get("rootProcessId")).ifPresent(e -> builder.rootProcessId(e.textValue()));
ofNullable(node.get("nodeInstanceId")).ifPresent(e -> builder.nodeInstanceId(e.textValue()));

String type = node.get("expirationTime").get("@type").asText();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.api.Recipient;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ScheduledJob;
Expand Down Expand Up @@ -138,6 +140,15 @@ public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) {
LOGGER.debug("Emmit in-vm publishJobStatusChange {}", jobDetails);
try {
ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
Recipient<InVMPayloadData> recipient = jobDetails.getRecipient().getRecipient();
ProcessInstanceJobDescription jobDescription = recipient.getPayload().getJobDescription();

scheduledJob.setProcessInstanceId(jobDescription.processInstanceId());
scheduledJob.setProcessId(jobDescription.processId());
scheduledJob.setRootProcessInstanceId(jobDescription.rootProcessInstanceId());
scheduledJob.setRootProcessId(jobDescription.rootProcessId());
scheduledJob.setNodeInstanceId(jobDescription.nodeInstanceId());

byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE,
url + RestApiConstants.JOBS_PATH,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.embedded;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.job.JobInstanceDataEvent;
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;

import io.quarkus.test.junit.QuarkusTest;

import jakarta.inject.Inject;

import static org.assertj.core.api.Assertions.assertThat;

@QuarkusTest
public class EmbeddedJobsServiceTest {

private static final String PROCESS_ID = "processId";
private static final String PROCESS_INSTANCE_ID = "1";
private static final String NODE_INSTANCE_ID = "node_1";
private static final String ROOT_PROCESS_ID = "rootProcess";
private static final String ROOT_PROCESS_INSTANCE_ID = "0";

@Inject
JobsService jobService;

@Inject
TestEventPublisher publisher;

@Test
public void testJobService() throws Exception {

// testing only when we have the full lifecycle
CountDownLatch latch = new CountDownLatch(8);
publisher.setLatch(latch);

ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder()
.generateId()
.timerId("-1")
.expirationTime(DurationExpirationTime.now())
.processInstanceId(PROCESS_INSTANCE_ID)
.rootProcessInstanceId(null)
.processId(PROCESS_ID)
.rootProcessId(null)
.nodeInstanceId(NODE_INSTANCE_ID)
.build();
jobService.scheduleProcessInstanceJob(description);

ProcessInstanceJobDescription descriptionWRootProcess = ProcessInstanceJobDescription.builder()
.generateId()
.timerId("-1")
.expirationTime(DurationExpirationTime.now())
.processInstanceId(PROCESS_INSTANCE_ID)
.rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID)
.processId(PROCESS_ID)
.rootProcessId(ROOT_PROCESS_ID)
.nodeInstanceId(NODE_INSTANCE_ID)
.build();
jobService.scheduleProcessInstanceJob(descriptionWRootProcess);

latch.await();

List<DataEvent<?>> events = publisher.getEvents();

Assertions.assertEquals(8, events.size());

Consumer<DataEvent<?>> noRootProcess = e -> assertThat(e)
.hasFieldOrPropertyWithValue("kogitoRootProcessInstanceId", null)
.hasFieldOrPropertyWithValue("kogitoRootProcessId", null);

Consumer<DataEvent<?>> withRootProcess = e -> assertThat(e)
.hasFieldOrPropertyWithValue("kogitoRootProcessInstanceId", ROOT_PROCESS_INSTANCE_ID)
.hasFieldOrPropertyWithValue("kogitoRootProcessId", ROOT_PROCESS_ID);

events.forEach(event -> {
assertThat(event)
.isInstanceOf(JobInstanceDataEvent.class)
.hasFieldOrPropertyWithValue("kogitoProcessId", PROCESS_ID)
.hasFieldOrPropertyWithValue("kogitoProcessInstanceId", PROCESS_INSTANCE_ID)
.satisfiesAnyOf(noRootProcess, withRootProcess);
});
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void publish(Collection<DataEvent<?>> events) {
events.forEach(e -> latch.countDown());
}

public void expectedEvents(int numOfEvents) {
latch = new CountDownLatch(numOfEvents);
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}

}

0 comments on commit d5944dd

Please sign in to comment.