Skip to content

Commit

Permalink
[feature]implementing parallel task execution
Browse files Browse the repository at this point in the history
PRD-202368
  • Loading branch information
minakh1993 committed Dec 5, 2023
1 parent 34d5809 commit 6ca7821
Show file tree
Hide file tree
Showing 15 changed files with 443 additions and 15 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,34 @@ worker name in above log will be provided from getWorker method:
}
```

### Parallel External Task Execution Capability
With the limitations of conventional single-threaded, sequential execution of the polling external tasks, we decided to enable the execution of the polling external tasks through a multi-threaded approach. By doing so, we not only overcome the limitations of the sequential way but also enhanced overall performance.
Explore the details of this transition below.

In our latest changes, we introduced a new Aspect, designed with the lowest order, takes the lead in submitting each external task to a thread.
Each task either allocated its own thread within the executerService or waits to allocate its own thread.

We’ve added this config in order of enabling this new parallel task execution aspect properly designed :
> camunda.bpm.client.execution.execution-type
specifying type of execution between options: parallel, sequential.
By selecting the parallel option for this config the parallel mode is enabaled, and you use the next provided configs.

And by the config below, you can specify the thread pool size :
>camunda.bpm.client.execution.thread-pool-size
We’ve also provided an option for situations where we have multiple instances of the client, and we want to check if any of polling external tasks has reached its lockExpirationDate, it cancels the task so that other instances of the client could be able to fetch that task.
By this config you can specify if you have multiple instance or not :
>camunda.bpm.client.execution.multi-instance-enabled
possible values of above config : true , false

Addressing the imperative need for all fetched and locked tasks completion within specified timeouts, we’ve implemented a special Backoff strategy mechanism for parallel execution. This mechanism extends exponential strategy and ensures that the main thread waits for all the tasks to be either completed or canceled by timely interruption. So we’ve added this config to specify the wait duration per each task:
> camunda.bpm.client.execution.wait-duration-per-task
that takes wait duration in milliseconds and multiplied it by number of polled tasks and then wait for their execution within the time.


## 🚀 camunda rest client
there are other camunda rest services such as messageApi or deploymentApi that camunda-external client doesn't cover but usually need to be used in external project. this project provide a rest client based on camunda official openApi json and creates rest client for these restControllers with tosan-httpclient-feign-spring-boot-starter project.
in order to call a rest service you can easily inject selected api into your class and call each method with proper inputs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.tosan.camunda.camundaclient.config.CamundaClientConfig;
import com.tosan.camunda.camundaclient.config.CamundaFeignConfig;
import com.tosan.camunda.camundaclient.config.CamundaRestServiceConfig;
import com.tosan.camunda.camundaclient.external.aspect.ExternalTaskInfoAspect;
import com.tosan.camunda.camundaclient.external.aspect.ExternalTaskLogAspect;
import com.tosan.camunda.camundaclient.external.aspect.ExternalTaskMdcAspect;
import com.tosan.camunda.camundaclient.external.aspect.ExternalTaskResultAspect;
import com.tosan.camunda.camundaclient.config.*;
import com.tosan.camunda.camundaclient.external.aspect.*;
import com.tosan.camunda.camundaclient.feign.aspect.FeignUndeclaredThrowableExceptionAspect;
import com.tosan.camunda.camundaclient.util.ExternalTaskResultUtil;
import com.tosan.client.http.core.HttpClientProperties;
Expand All @@ -31,6 +26,7 @@
import org.apache.http.impl.client.HttpClientBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.commons.httpclient.ApacheHttpClientConnectionManagerFactory;
import org.springframework.cloud.commons.httpclient.ApacheHttpClientFactory;
Expand All @@ -48,6 +44,8 @@
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author M.khoshnevisan
Expand All @@ -59,6 +57,7 @@ public class CamundaEngineClientConfig extends AbstractFeignConfiguration {

public static final String DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
private static HashMap<String, Class<? extends Exception>> exceptionMap = new HashMap<>();

static {
exceptionMap.put("Validation.invalid", Exception.class);
}
Expand Down Expand Up @@ -104,6 +103,28 @@ public CamundaClientConfig camundaClientConfig() {
return new CamundaClientConfig();
}

@Bean
@ConditionalOnProperty(name = "camunda.bpm.client.execution.execution-type", havingValue = "parallel", matchIfMissing = false)
public ParallelBackoffStrategy backoffStrategy(ParallelTaskExecutor parallelTaskExecutor) {
return new ParallelBackoffStrategy(parallelTaskExecutor);
}

@Bean
@ConditionalOnProperty(name = "camunda.bpm.client.execution.execution-type", havingValue = "parallel", matchIfMissing = false)
public ParallelTaskExecutor parallelTaskExecutor(@Qualifier("camunda-executorService") ExecutorService executorService) {
return new ParallelTaskExecutor(executorService);
}

@Bean(name = "camunda-executorService")
@ConditionalOnProperty(name = "camunda.bpm.client.execution.execution-type", havingValue = "parallel", matchIfMissing = false)
public ExecutorService executorService(CamundaClientConfig camundaClientConfig) {
if (camundaClientConfig.getExecution() != null && camundaClientConfig.getExecution().getThreadPoolSize() > 0) {
return Executors.newFixedThreadPool(camundaClientConfig.getExecution().getThreadPoolSize());
} else {
throw new RuntimeException("camunda.bpm.client.execution.thread-pool-size is not set or is not valid.");
}
}

@Bean("camunda-client-clientConfig")
@ConditionalOnMissingBean(name = "camunda-client-clientConfig")
public HttpClientProperties camundaFeignConfig(CamundaClientConfig camundaClientConfig) {
Expand Down Expand Up @@ -267,6 +288,12 @@ public ExternalTaskResultAspect externalTaskResultAspect(ExternalTaskResultUtil
return new ExternalTaskResultAspect(externalTaskResultUtil);
}

@Bean
@ConditionalOnProperty(name = "camunda.bpm.client.execution.execution-type", havingValue = "parallel", matchIfMissing = false)
public ExternalTaskParallelExecutionAspect externalTaskParallelExecutionAspect(ParallelTaskExecutor parallelTaskExecutor) {
return new ExternalTaskParallelExecutionAspect(parallelTaskExecutor);
}

@Bean
public ExternalTaskResultUtil externalTaskResultUtil(CamundaClientConfig camundaClientConfig) {
return new ExternalTaskResultUtil(camundaClientConfig.getRetry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public class CamundaClientConfig extends ClientProperties {
@Valid
private CamundaRestServiceConfig restServiceConfig;

/**
* execution configuration
*/
@NestedConfigurationProperty
@Valid
private ExecutionConfig execution;

/**
* @return retry config for technical repeatable exceptions
*/
Expand Down Expand Up @@ -58,11 +65,26 @@ public void setRestServiceConfig(CamundaRestServiceConfig restServiceConfig) {
this.restServiceConfig = restServiceConfig;
}

/**
* @return execution configuration
*/
public ExecutionConfig getExecution() {
return execution;
}

/**
* @param execution execution configuration
*/
public void setExecution(ExecutionConfig execution) {
this.execution = execution;
}

@Override
public String toString() {
return "CamundaClientConfig{\n" +
"retry=" + retry + "\n" +
", restServiceConfig=" + restServiceConfig + "\n" +
return "CamundaClientConfig{" +
"retry=" + retry +
", restServiceConfig=" + restServiceConfig +
", execution=" + execution +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.tosan.camunda.camundaclient.config;

import com.tosan.camunda.camundaclient.config.enumuration.ExecutionType;

/**
* @author Shahryar Safizadeh
* @since 12/4/2023
*/
public class ExecutionConfig {
/**
* wait duration per task in parallel backoff strategy
*/
private Long waitDurationPerTask;

/**
* thread pool size for parallel executor
*/
private int threadPoolSize;

/**
* type of execution
*/
private ExecutionType executionType;

/**
* is multi-instance enabled
*/
private boolean isMultiInstanceEnabled;

/**
* @return wait duration per task in parallel backoff strategy
*/
public Long getWaitDurationPerTask() {
return waitDurationPerTask;
}

/**
* @param waitDurationPerTask wait duration per task in parallel backoff strategy
*/
public void setWaitDurationPerTask(Long waitDurationPerTask) {
this.waitDurationPerTask = waitDurationPerTask;
}

/**
* @return thread pool size for parallel executor
*/
public int getThreadPoolSize() {
return threadPoolSize;
}

/**
* @param threadPoolSize thread pool size for parallel executor
*/
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}

/**
* @return type of execution
*/
public ExecutionType getExecutionType() {
return executionType;
}

/**
* @param executionType type of execution
*/
public void setExecutionType(ExecutionType executionType) {
this.executionType = executionType;
}

/**
* @return is multi-instance enabled
*/
public boolean isMultiInstanceEnabled() {
return isMultiInstanceEnabled;
}

/**
* @param multiInstanceEnabled is multi-instance enabled
*/
public void setMultiInstanceEnabled(boolean multiInstanceEnabled) {
isMultiInstanceEnabled = multiInstanceEnabled;
}

@Override
public String toString() {
return "ExecutionConfig{" +
"waitDurationPerTask=" + waitDurationPerTask +
", threadPoolSize=" + threadPoolSize +
", executionType=" + executionType +
", isMultiInstanceEnabled=" + isMultiInstanceEnabled +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.tosan.camunda.camundaclient.config;

import lombok.extern.slf4j.Slf4j;
import org.camunda.bpm.client.backoff.ExponentialBackoffStrategy;
import org.camunda.bpm.client.task.ExternalTask;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.validation.annotation.Validated;

import java.util.Date;
import java.util.List;
import java.util.concurrent.Future;

/**
* @author Shahryar Safizadeh
* @since 11/20/2023
*/
@Slf4j
@Validated
public class ParallelBackoffStrategy extends ExponentialBackoffStrategy {

@Value("${camunda.bpm.client.execution.wait-duration-per-task}")
private Long waitDurationPerTask;

private ParallelTaskExecutor parallelTaskExecutor;

public ParallelBackoffStrategy(ParallelTaskExecutor parallelTaskExecutor) {
this.parallelTaskExecutor = parallelTaskExecutor;
}

@Override
public void reconfigure(List<ExternalTask> externalTasks) {
super.reconfigure(externalTasks);
if (!externalTasks.isEmpty()) {
try {
waitForTaskCompletion(externalTasks);
} catch (InterruptedException | RuntimeException e) {
log.error("Error happened in main thread while waiting for tasks in backoff strategy!!!", e);
}
}
}

private void waitForTaskCompletion(List<ExternalTask> externalTasks) throws InterruptedException {
if (waitDurationPerTask == null) {
throw new RuntimeException("camunda.bpm.client.execution.wait-duration-per-task is not set or is not valid.");
} else {
Date expireDate = new Date(System.currentTimeMillis() + waitDurationPerTask * externalTasks.size());
while (!isAllTasksCompleted(parallelTaskExecutor.getFutures())) {
Date currentTime = new Date(System.currentTimeMillis());
if (expireDate.before(currentTime)) {
break;
}
Thread.sleep(5000);
}
cancelAllFutures(parallelTaskExecutor.getFutures());
parallelTaskExecutor.getFutures().clear();
}
}

private void cancelAllFutures(List<Future<?>> futures) {
for (Future future : futures) {
if (!future.isDone() && !future.isCancelled()) {
log.error("Future : {} did not complete before wait duration time and got canceled.", future);
future.cancel(true);
}
}
}

private boolean isAllTasksCompleted(List<Future<?>> futures) {
for (Future future : futures) {
if (!future.isDone()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.tosan.camunda.camundaclient.config;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.camunda.bpm.client.task.ExternalTask;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* @author Shahryar Safizadeh
* @since 11/20/2023
*/
@Slf4j
public class ParallelTaskExecutor {
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private ExecutorService executorService;
private static List<Future<?>> futures = new ArrayList<>();

public ParallelTaskExecutor(ExecutorService executorService) {
this.executorService = executorService;
}

public Future<?> submitToExecutor(ProceedingJoinPoint pjp) {
Future<?> future = executorService.submit(() -> {
try {
pjp.proceed();
} catch (Throwable e) {
if (e instanceof RuntimeException) {
RuntimeException runtimeException = (RuntimeException) e;
throw runtimeException;
}
throw new RuntimeException(e);
}
});
return future;
}

public void scheduledTaskCancellation(Future<?> future, ExternalTask externalTask) {
scheduledExecutorService.schedule(() -> {
if (!future.isDone()) {
log.info("Task : {} is canceled by scheduledCanceller!!!", externalTask.getBusinessKey());
future.cancel(true);
}
}, getLockExpirationTimeDuration(externalTask), TimeUnit.MILLISECONDS);
}

public long getLockExpirationTimeDuration(ExternalTask externalTask) {
return externalTask.getLockExpirationTime().getTime() - System.currentTimeMillis();
}

public List<Future<?>> getFutures() {
return futures;
}
}
Loading

0 comments on commit 6ca7821

Please sign in to comment.