Skip to content

Commit

Permalink
[incubator-kie-issues-1551] Deadlines for Human Task
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Oct 24, 2024
1 parent 5c57e58 commit cad1420
Show file tree
Hide file tree
Showing 19 changed files with 76 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import java.time.temporal.ChronoUnit;

import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.api.TemporalUnit;
import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient;
import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipientJsonPayloadData;
Expand Down Expand Up @@ -53,25 +54,18 @@ public class JobCallbackResourceDef {

public static final String LIMIT_DEFAULT_VALUE = "0";

public static final String JOBS_CALLBACK_URI = "/management/jobs";


public static final String JOBS_CALLBACK_POST_URI = "{" + PROCESS_ID + "}/instances/{" + PROCESS_INSTANCE_ID + "}/timers/{" + TIMER_ID + "}";

private JobCallbackResourceDef() {
}

public static String buildCallbackURI(ProcessInstanceJobDescription description, String jobsCallbackEndpoint) {
return URIBuilder.toURI(jobsCallbackEndpoint
+ JOBS_CALLBACK_URI + "/"
+ description.processId()
+ "/instances/"
+ description.processInstanceId()
+ "/timers/"
+ description.timerId())
.toString();
public static String buildCallbackURI(JobDescription description, String jobsCallbackEndpoint) {
return URIBuilder.toURI(jobsCallbackEndpoint + description.path()).toString();
}

public static org.kie.kogito.jobs.service.api.Job buildCallbackPatternJob(ProcessInstanceJobDescription description,
public static org.kie.kogito.jobs.service.api.Job buildCallbackPatternJob(JobDescription description,
String callback, ObjectMapper objectMapper) {
return org.kie.kogito.jobs.service.api.Job.builder()
.id(description.id())
Expand All @@ -81,25 +75,20 @@ public static org.kie.kogito.jobs.service.api.Job buildCallbackPatternJob(Proces
.build();
}

private static HttpRecipient<HttpRecipientJsonPayloadData> buildRecipient(ProcessInstanceJobDescription description, String callback, ObjectMapper objectMapper) {
private static HttpRecipient<HttpRecipientJsonPayloadData> buildRecipient(JobDescription description, String callback, ObjectMapper objectMapper) {
return HttpRecipient.builder()
.forJsonPayload()
.payload(HttpRecipientJsonPayloadData.from(buildPayload(description, objectMapper)))
.url(callback)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.header(PROCESS_ID, description.processId())
.header(PROCESS_INSTANCE_ID, description.processInstanceId())
.header(ROOT_PROCESS_ID, description.rootProcessId())
.header(ROOT_PROCESS_INSTANCE_ID, description.rootProcessInstanceId())
.header(NODE_INSTANCE_ID, description.nodeInstanceId())
.build();
}

private static JsonNode buildPayload(ProcessInstanceJobDescription description, ObjectMapper objectMapper) {
private static JsonNode buildPayload(JobDescription description, ObjectMapper objectMapper) {
return objectMapper.valueToTree(new JobCallbackPayload(description.id()));
}

public static TimerSchedule buildSchedule(ProcessInstanceJobDescription description) {
public static TimerSchedule buildSchedule(JobDescription description) {
return TimerSchedule.builder()
.startTime(description.expirationTime().get().toOffsetDateTime().truncatedTo(ChronoUnit.MILLIS))
.repeatCount(translateLimit(description.expirationTime().repeatLimit()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.jupiter.api.Test;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient;
import org.kie.kogito.jobs.service.api.schedule.timer.TimerSchedule;
import org.kie.kogito.jobs.service.api.serlialization.SerializationUtils;
Expand Down Expand Up @@ -91,7 +91,7 @@ void buildCallbackPatternJob() {
}

private ProcessInstanceJobDescription mockProcessInstanceJobDescription() {
return ProcessInstanceJobDescription.builder()
return ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.id(JOB_ID)
.timerId(TIMER_ID)
.expirationTime(EXPIRATION_TIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.net.URI;
import java.util.Objects;

import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.api.URIBuilder;
import org.kie.kogito.jobs.service.api.Job;

Expand All @@ -46,7 +46,7 @@ public RestJobsService(String jobServiceUrl, String callbackEndpoint, ObjectMapp
this.objectMapper = objectMapper;
}

public String getCallbackEndpoint(ProcessInstanceJobDescription description) {
public String getCallbackEndpoint(JobDescription description) {
return buildCallbackURI(description, callbackEndpoint);
}

Expand All @@ -58,7 +58,7 @@ public URI getJobsServiceUri() {
return jobsServiceUri;
}

public Job buildJob(ProcessInstanceJobDescription description, String callback) {
public Job buildJob(JobDescription description, String callback) {
return buildCallbackPatternJob(description, callback, objectMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.api.JobCallbackPayload;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessJobDescription;
import org.kie.kogito.jobs.service.api.Job;
import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient;
import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipientJsonPayloadData;
Expand Down Expand Up @@ -64,7 +64,7 @@ void setUp() {

@Test
void testGetCallbackEndpoint() {
ProcessInstanceJobDescription description = ProcessInstanceJobDescription.builder()
ProcessInstanceJobDescription description = ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.id(JOB_ID)
.timerId(TIMER_ID)
.expirationTime(ExactExpirationTime.now())
Expand Down Expand Up @@ -96,7 +96,7 @@ void testScheduleProcessJob() {
}

protected ProcessInstanceJobDescription buildProcessInstanceJobDescription() {
return ProcessInstanceJobDescription.builder()
return ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.id(JOB_ID)
.timerId(TIMER_ID)
.expirationTime(ExactExpirationTime.of(EXPIRATION_TIME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

public interface JobDescription {

static final String JOBS_CALLBACK_URI = "/management/jobs";

String id();

ExpirationTime expirationTime();

Integer priority();

String path();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface JobsService {
* @param description defines what kind of process should be started upon expiration time
* @return returns unique id of the job
*/
String scheduleProcessJob(ProcessJobDescription description);
String scheduleProcessJob(JobDescription description);

/**
* Schedules process instance related job that will signal exact same process instance
Expand All @@ -41,7 +41,7 @@ public interface JobsService {
* @param description defines the context of the process instance that should be signaled
* @return returns unique id of the job
*/
String scheduleProcessInstanceJob(ProcessInstanceJobDescription description);
String scheduleProcessInstanceJob(JobDescription description);

/**
* Cancels given job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs;
package org.kie.kogito.jobs.descriptiors;

import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobDescription;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -93,10 +96,21 @@ public String nodeInstanceId() {
return nodeInstanceId;
}

public static ProcessInstanceJobDescriptionBuilder builder() {
public static ProcessInstanceJobDescriptionBuilder newProcessInstanceJobDescriptionBuilder() {
return new ProcessInstanceJobDescriptionBuilder();
}

@Override
public String path() {
return JOBS_CALLBACK_URI + "/"
+ processId()
+ "/instances/"
+ processInstanceId()
+ "/timers/"
+ timerId();

}

@Override
public String toString() {
return "ProcessInstanceJobDescription{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs;
package org.kie.kogito.jobs.descriptiors;

import java.util.UUID;

import org.kie.kogito.jobs.ExpirationTime;

public class ProcessInstanceJobDescriptionBuilder {

private String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs;
package org.kie.kogito.jobs.descriptiors;

import java.util.UUID;

import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.process.Process;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -87,4 +89,9 @@ public String processId() {
public Process<?> process() {
return process;
}

@Override
public String path() {
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.kie.kogito.Model;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessJobDescription;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstanceOptimisticLockingException;
import org.kie.kogito.process.Processes;
Expand Down Expand Up @@ -79,7 +79,7 @@ public static InMemoryJobService get(Processes processes, UnitOfWorkManager unit
}

@Override
public String scheduleProcessJob(ProcessJobDescription description) {
public String scheduleProcessJob(JobDescription description) {
LOGGER.debug("ScheduleProcessJob: {}", description);
ScheduledFuture<?> future;
if (description.expirationTime().repeatInterval() != null) {
Expand All @@ -92,7 +92,7 @@ public String scheduleProcessJob(ProcessJobDescription description) {
}

@Override
public String scheduleProcessInstanceJob(ProcessInstanceJobDescription description) {
public String scheduleProcessInstanceJob(JobDescription description) {
ScheduledFuture<?> future;
if (description.expirationTime().repeatInterval() != null) {
future = scheduler.scheduleAtFixedRate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessJobDescription;
import org.kie.kogito.services.uow.UnitOfWorkExecutor;
import org.kie.kogito.timer.TimerInstance;
import org.kie.kogito.uow.UnitOfWorkManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessJobDescription;
import org.kie.kogito.process.Processes;
import org.kie.kogito.services.jobs.impl.InMemoryJobService;
import org.kie.kogito.signal.SignalManager;
Expand Down Expand Up @@ -109,9 +110,8 @@ public void initStartTimers() {
if (startNodes != null && !startNodes.isEmpty()) {
for (StartNode startNode : startNodes) {
if (startNode != null && startNode.getTimer() != null) {

jobService.scheduleProcessJob(ProcessJobDescription.of(createTimerInstance(startNode.getTimer(), knowledgeRuntime), p.getId()));

JobDescription jobDescription = ProcessJobDescription.of(createTimerInstance(startNode.getTimer(), knowledgeRuntime), p.getId());
jobService.scheduleProcessJob(jobDescription);
}
}
}
Expand Down Expand Up @@ -167,9 +167,7 @@ public KogitoProcessInstance startProcess(String processId, CorrelationKey corre
@Override
public KogitoProcessInstance createProcessInstance(String processId, CorrelationKey correlationKey, Map<String, Object> parameters) {
return createProcessInstance(
runtimeContext.findProcess(processId)
.orElseThrow(() -> new IllegalArgumentException(
"Unknown process ID: " + processId)),
runtimeContext.findProcess(processId).orElseThrow(() -> new IllegalArgumentException("Unknown process ID: " + processId)),
correlationKey, parameters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessJobDescription;
import org.kie.kogito.services.identity.NoOpIdentityProvider;
import org.kie.kogito.services.jobs.impl.LegacyInMemoryJobService;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.services.uow.BaseWorkUnit;
import org.kie.kogito.timer.TimerInstance;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void internalTrigger(KogitoNodeInstance from, String type) {
new BaseWorkUnit<>(this, instance -> {
ExpirationTime expirationTime = ExactExpirationTime.of(ZonedDateTime.now().plus(1, ChronoUnit.MILLIS));
ProcessInstanceJobDescription jobDescription =
ProcessInstanceJobDescription.builder()
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.id(getJobId())
.timerId("-1")
.expirationTime(expirationTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import org.kie.kogito.internal.process.runtime.MessageException;
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.process.BaseEventDescription;
import org.kie.kogito.process.EventDescription;
import org.kie.kogito.process.NamedDataType;
Expand Down Expand Up @@ -590,7 +590,7 @@ private TimerInstance createDurationTimer(long duration) {

private TimerInstance registerTimer(TimerInstance timerInstance) {
ProcessInstanceJobDescription description =
ProcessInstanceJobDescription.builder()
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.id(timerInstance.getId())
.timerId(timerInstance.getTimerId())
.expirationTime(DurationExpirationTime.after(timerInstance.getDelay()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobsService;
import org.kie.kogito.jobs.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptiors.ProcessInstanceJobDescription;
import org.kie.kogito.process.expr.Expression;
import org.kie.kogito.process.expr.ExpressionHandlerFactory;
import org.kie.kogito.timer.TimerInstance;
Expand Down Expand Up @@ -95,7 +95,7 @@ public void internalTrigger(KogitoNodeInstance from, String type) {
JobsService jobService = ((KogitoProcessRuntime.Provider) getProcessInstance().getKnowledgeRuntime().getProcessRuntime()).getKogitoProcessRuntime().getJobsService();
for (Timer timer : timers.keySet()) {
ProcessInstanceJobDescription jobDescription =
ProcessInstanceJobDescription.builder()
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder()
.generateId()
.timerId(Long.toString(timer.getId()))
.expirationTime(createTimerInstance(timer))
Expand Down
Loading

0 comments on commit cad1420

Please sign in to comment.