Skip to content

Commit

Permalink
[enhancement]adding ParallelExternalTaskResultUtil for synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
shahryarSafizadeh authored Dec 5, 2023
1 parent 2e81ed8 commit 0e02445
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.tosan.camunda.camundaclient.external.aspect.*;
import com.tosan.camunda.camundaclient.feign.aspect.FeignUndeclaredThrowableExceptionAspect;
import com.tosan.camunda.camundaclient.util.ExternalTaskResultUtil;
import com.tosan.camunda.camundaclient.util.ParallelExternalTaskResultUtil;
import com.tosan.client.http.core.HttpClientProperties;
import com.tosan.client.http.core.factory.ConfigurableApacheHttpClientFactory;
import com.tosan.client.http.starter.configuration.AbstractFeignConfiguration;
Expand Down Expand Up @@ -296,10 +297,17 @@ public ExternalTaskParallelExecutionAspect externalTaskParallelExecutionAspect(P
}

@Bean
@ConditionalOnProperty(name = "camunda.bpm.client.execution.execution-type", havingValue = "sequential", matchIfMissing = true)
public ExternalTaskResultUtil externalTaskResultUtil(CamundaClientConfig camundaClientConfig) {
return new ExternalTaskResultUtil(camundaClientConfig.getRetry());
}

@Bean
@ConditionalOnProperty(name = "camunda.bpm.client.execution.execution-type", havingValue = "parallel", matchIfMissing = false)
public ParallelExternalTaskResultUtil parallelExternalTaskResultUtil(CamundaClientConfig camundaClientConfig) {
return new ParallelExternalTaskResultUtil(camundaClientConfig.getRetry());
}

@Bean
public ExternalTaskInfoAspect externalTaskInfoAspect() {
return new ExternalTaskInfoAspect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private void waitForTaskCompletion(List<ExternalTask> externalTasks) throws Inte
throw new RuntimeException("camunda.bpm.client.execution.wait-duration-per-task is not set or is not valid.");
} else {
Date expireDate = new Date(System.currentTimeMillis() + waitDurationPerTask * externalTasks.size());
log.debug("Waiting for list of polled external tasks:{} to be completed till:{}", externalTasks.toArray(), expireDate);
while (!isAllTasksCompleted(parallelTaskExecutor.getFutures())) {
Date currentTime = new Date(System.currentTimeMillis());
if (expireDate.before(currentTime)) {
Expand All @@ -52,6 +53,7 @@ private void waitForTaskCompletion(List<ExternalTask> externalTasks) throws Inte
Thread.sleep(5000);
}
cancelAllFutures(parallelTaskExecutor.getFutures());
log.info("Canceling all the futures with size of:{}", parallelTaskExecutor.getFutures().size());
parallelTaskExecutor.getFutures().clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.camunda.bpm.client.exception.ConnectionLostException;
import org.camunda.bpm.client.exception.NotResumedException;
import org.camunda.bpm.client.task.ExternalTask;
import org.springframework.core.annotation.Order;

/**
Expand All @@ -29,20 +32,30 @@ public Object sendResults(ProceedingJoinPoint pjp) throws Throwable {
boolean convertToBpmnError = checkConvertToBpmnErrorInCaseOfIncident(pjp);
try {
Object proceed = pjp.proceed();
Object[] args = pjp.getArgs();
ExternalTask externalTask = (ExternalTask) args[0];
if (Thread.currentThread().isInterrupted()) {
log.error("Thread has been interrupted before completion.");
log.error("Thread has been interrupted before completion of task with business key:{}", externalTask.getBusinessKey());
throw new InterruptedException("Thread has been interrupted before completion.");
} else {
externalTaskResultUtil.declareTaskCompleted(pjp.getArgs());
externalTaskResultUtil.declareTaskCompleted(args);
}
return proceed;
} catch (NotResumedException | ConnectionLostException e) {
log.error("Error occurred in task completion.", e);
return null;
} catch (Exception e) {
if (Thread.currentThread().isInterrupted() || e.getCause() instanceof InterruptedException) {
throw new InterruptedException("Task has been interrupted before completion.");
} else if (e instanceof CamundaClientRuntimeIncident runtimeIncident) {
externalTaskResultUtil.handleException(runtimeIncident.getExceptionIncidentState(), e, pjp.getArgs(), convertToBpmnError);
} else {
externalTaskResultUtil.handleException(ExceptionIncidentState.NON_REPEATABLE, e, pjp.getArgs(), convertToBpmnError);
try {
if (Thread.currentThread().isInterrupted() || e.getCause() instanceof InterruptedException) {
throw new InterruptedException("Task has been interrupted before completion.");
} else if (e instanceof CamundaClientRuntimeIncident) {
CamundaClientRuntimeIncident runtimeIncident = (CamundaClientRuntimeIncident) e;
externalTaskResultUtil.handleException(runtimeIncident.getExceptionIncidentState(), e, pjp.getArgs(), convertToBpmnError);
} else {
externalTaskResultUtil.handleException(ExceptionIncidentState.NON_REPEATABLE, e, pjp.getArgs(), convertToBpmnError);
}
} catch (NotResumedException | ConnectionLostException exception) {
log.error("Error occurred in task completion.", exception);
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ public class ExternalTaskResultUtil {
private final RetryConfig retryConfig;

public void declareTaskCompleted(Object[] args) {
ExternalTask externalTask = (ExternalTask) args[0];
ExternalTaskService externalTaskService = (ExternalTaskService) args[1];
ExternalTask externalTask = getExternalTask(args);
ExternalTaskService externalTaskService = getExternalTaskService(args);
ExternalTaskInfo taskInfo = getTaskInfo(externalTask);
externalTaskService.complete(externalTask, taskInfo.getVariables());
}

public void handleBpmnException(ExternalTask externalTask, ExternalTaskService externalTaskService, String errorCode, String errorMessage) {
externalTaskService.handleBpmnError(externalTask, errorCode, errorMessage, getTaskInfo(externalTask).getVariables());
}

public void handleException(ExceptionIncidentState exceptionIncidentState,
Exception e, Object[] args, boolean convertToBpmnError) {
ExternalTask externalTask = (ExternalTask) args[0];
ExternalTaskService externalTaskService = (ExternalTaskService) args[1];
ExternalTask externalTask = getExternalTask(args);
ExternalTaskService externalTaskService = getExternalTaskService(args);
ExternalTaskInfo taskInfo = getTaskInfo(externalTask);
int retryCount = exceptionIncidentState.equals(ExceptionIncidentState.NON_REPEATABLE) ? 0 :
calculateRetries(externalTask.getRetries());
Expand All @@ -51,6 +55,14 @@ public void handleException(ExceptionIncidentState exceptionIncidentState,
}
}

protected ExternalTask getExternalTask(Object[] args) {
return (ExternalTask) args[0];
}

protected ExternalTaskService getExternalTaskService(Object[] args) {
return (ExternalTaskService) args[1];
}

private int calculateRetries(Integer retries) {
if (retries == null) {
return retryConfig.getRetryCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.tosan.camunda.camundaclient.util;

import com.tosan.camunda.api.ExceptionIncidentState;
import com.tosan.camunda.camundaclient.config.RetryConfig;
import lombok.extern.slf4j.Slf4j;
import org.camunda.bpm.client.task.ExternalTask;
import org.camunda.bpm.client.task.ExternalTaskService;

/**
* @author Shahryar Safizadeh
* @since 12/5/2023
*/
@Slf4j
public class ParallelExternalTaskResultUtil extends ExternalTaskResultUtil {

public ParallelExternalTaskResultUtil(RetryConfig retryConfig) {
super(retryConfig);
}

@Override
public void declareTaskCompleted(Object[] args) {
ExternalTask externalTask = getExternalTask(args);
String businessKey = externalTask.getBusinessKey().intern();
log.info("Acquiring lock for business key:{}", businessKey);
synchronized (businessKey) {
log.debug("Lock has been acquired by task with business key: {}", businessKey);
super.declareTaskCompleted(args);
}
log.info("Lock has been released from task with business key: {}", businessKey);
}

@Override
public void handleBpmnException(ExternalTask externalTask, ExternalTaskService externalTaskService, String errorCode, String errorMessage) {
String businessKey = externalTask.getBusinessKey().intern();
log.info("Acquiring lock for business key:{}", businessKey);
synchronized (businessKey) {
log.debug("Lock has been acquired by task with business key: {}", businessKey);
super.handleBpmnException(externalTask, externalTaskService, errorCode, errorMessage);
}
log.info("Lock has been released from task with business key: {}", businessKey);
}

@Override
public void handleException(ExceptionIncidentState exceptionIncidentState, Exception e, Object[] args, boolean convertToBpmnError) {
ExternalTask externalTask = getExternalTask(args);
String businessKey = externalTask.getBusinessKey().intern();
log.info("Acquiring lock for business key:{}", businessKey);
synchronized (businessKey) {
log.debug("Lock has been acquired by task with business key: {}", businessKey);
super.handleException(exceptionIncidentState, e, args, convertToBpmnError);
}
log.info("Lock has been released from task with business key: {}", businessKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ server.port=8098
#type of execution: parallel , sequential
camunda.bpm.client.execution.execution-type=parallel
#size of thread pool in parallel execution
camunda.bpm.client.execution.thread-pool-size=2
camunda.bpm.client.execution.thread-pool-size=4
#wait duration per task in parallel backoff strategy in milliseconds
camunda.bpm.client.execution.wait-duration-per-task=27000
camunda.bpm.client.execution.wait-duration-per-task=120000
#is multi instance clients enabled: ture , false
camunda.bpm.client.execution.multi-instance-enabled=true
camunda.bpm.client.execution.multi-instance-enabled=false

0 comments on commit 0e02445

Please sign in to comment.