TungDaDev's Blog

Activiti 8

Temp img.png
Published on
/18 mins read/

# activiti là gì và tại sao cần nó?

Bạn có thể code mọi workflow bằng if-else, state machine, hay event-driven architecture. Nhưng khi business process có 20+ steps, conditional branches, parallel paths, human approvals, timeouts, error handling, và cần thay đổi thường xuyên mà KHÔNG redeploy code — bạn cần một process engine.

Activiti là BPMN 2.0 engine chạy trên JVM. Nó nhận file BPMN XML (vẽ bằng visual designer), parse thành executable logic, và điều phối execution xuyên suốt process lifecycle. Developer không code flow control — chỉ code individual tasks (delegates). Flow logic nằm trong BPMN diagram mà BA có thể thay đổi.

Ví dụ thực tế: Một ngân hàng dùng Activiti cho quy trình duyệt khoản vay. Khi khách hàng submit đơn trên web portal, BPMN XML được execute — tự động gọi credit scoring API, route đến nhân viên phê duyệt phù hợp, gửi notification, và track SLA.

# activiti vs camunda vs flowable

Cả 3 đều fork từ cùng codebase (jBPM → Activiti 5 → fork). Điểm khác biệt:

Activiti 8Camunda 7/8Flowable 7
LicenseApache 2.0Community + EnterpriseApache 2.0
Cloud-nativeActiviti Cloud (K8s)Camunda 8 (Zeebe)Flowable Cloud
Spring BootNative integrationGood integrationNative integration
Runtime modelShared DB (RDBMS)Shared DB / Event-sourced (C8)Shared DB
Used by✓ (many enterprises)

# architecture — activiti bên trong làm gì?

Khi bạn gọi runtimeService.startProcessInstanceByKey("loan-approval"), đây là điều xảy ra bên dưới:

1. Engine đọc Process Definition từ ACT_RE_PROCDEF table
2. Tạo Process Instance (row trong ACT_RU_EXECUTION)
3. Tạo execution token ở Start Event
4. Token di chuyển theo Sequence Flow
5. Gặp Service Task → gọi Java Delegate → execute business logic
6. Gặp User Task → INSERT vào ACT_RU_TASK → DỪNG (wait state)
7. Khi user complete task → token tiếp tục di chuyển
8. Gặp Gateway → evaluate conditions → chọn paths
9. Gặp End Event → complete instance → move to history tables (ACT_HI_*)

# database tables

Activiti tạo ~28 tables, chia thành prefixes:

PrefixTênMục đíchVí dụ
ACTRE*RepositoryProcess definitions, deploymentsACT_RE_PROCDEF, ACT_RE_DEPLOYMENT
ACTRU*RuntimeRunning instances, tasks, variables, jobsACT_RU_EXECUTION, ACT_RU_TASK
ACTHI*HistoryCompleted instances, audit trailACT_HI_PROCINST, ACT_HI_ACTINST
ACTGE*GeneralByte arrays, propertiesACT_GE_BYTEARRAY
ACTID*IdentityUsers, groups (optional)ACT_ID_USER, ACT_ID_GROUP

Quan trọng cho performance: ACT_RU_* tables chỉ chứa data đang running — khi instance complete, data chuyển sang ACT_HI_*. Nếu bạn có 1 triệu instances hoàn thành, ACT_RU_EXECUTION vẫn nhỏ (chỉ instances đang chạy). Nhưng ACT_HI_* sẽ phình to — cần archiving strategy.

# spring boot integration — setup

# dependencies

<dependency>
   <groupId>org.activiti</groupId>
   <artifactId>activiti-spring-boot-starter</artifactId>
   <version>8.6.0</version>
</dependency>

# configuration

spring:
  activiti:
    database-schema-update: true # Auto create/update schema
    history-level: full # Lưu full audit (activity, variable, task details)
    async-executor-activate: true # Enable async job execution
    check-process-definitions: false # Không auto-deploy từ classpath
 
  datasource:
    url: jdbc:postgresql://localhost:5432/bpm_db
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    hikari:
      maximum-pool-size: 30 # Activiti cần nhiều connections cho async jobs

# configuration class

@Configuration
public class ActivitiConfig {
 
   @Bean
   public SpringProcessEngineConfiguration processEngineConfiguration(
           DataSource dataSource,
           PlatformTransactionManager transactionManager,
           List<JavaDelegate> delegates) {
 
       SpringProcessEngineConfiguration config = new SpringProcessEngineConfiguration();
       config.setDataSource(dataSource);
       config.setTransactionManager(transactionManager);
       config.setDatabaseSchemaUpdate("true");
       config.setAsyncExecutorActivate(true);
       config.setHistoryLevel(HistoryLevel.FULL);
 
       // Custom ID generator (UUID thay vì sequential)
       config.setIdGenerator(new StrongUuidGenerator());
 
       // Async executor tuning
       config.setAsyncExecutorCorePoolSize(5);
       config.setAsyncExecutorMaxPoolSize(20);
       config.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(5000);
 
       return config;
   }
}

Activiti tự động detect Spring beans và cho phép reference chúng trong BPMN expressions (${beanName.method()}). Không cần register thủ công — Spring ApplicationContext = Activiti's bean registry.

# core services — api chính bạn cần biết

Activiti expose 7 services chính. Trong daily work, bạn dùng 4-5 cái đầu tiên thường xuyên nhất.

# repositoryservice — quản lý process definitions

Think of this as "deployment & versioning." Mỗi BPMN file deploy = 1 ProcessDefinition. Deploy lại cùng key = version mới.

@Service
@RequiredArgsConstructor
@Slf4j
public class ProcessDefinitionService {
 
   private final RepositoryService repositoryService;
 
   /**
    * Deploy BPMN XML. Mỗi lần deploy cùng key → version tăng.
    * Instances đang chạy KHÔNG bị ảnh hưởng — giữ version cũ.
    * Instances mới start sẽ dùng version mới nhất.
    */
   public DeploymentResult deploy(String bpmnXml, String processName, String workspaceId) {
       Deployment deployment = repositoryService.createDeployment()
           .name(processName)
           .tenantId(workspaceId)  // Multi-tenant isolation
           .addString(processName + ".bpmn20.xml", bpmnXml)
           .deploy();
 
       // Lấy definition mới deploy
       ProcessDefinition definition = repositoryService.createProcessDefinitionQuery()
           .deploymentId(deployment.getId())
           .singleResult();
 
       log.info("Deployed process | name={} | version={} | deploymentId={}",
           definition.getName(), definition.getVersion(), deployment.getId());
 
       return new DeploymentResult(deployment.getId(), definition.getId(), definition.getVersion());
   }
 
   /**
    * Lấy tất cả versions của 1 process definition.
    * Hữu ích cho rollback: activate version cũ, suspend version mới.
    */
   public List<ProcessDefinitionDTO> getVersions(String processKey, String workspaceId) {
       return repositoryService.createProcessDefinitionQuery()
           .processDefinitionKey(processKey)
           .processDefinitionTenantId(workspaceId)
           .orderByProcessDefinitionVersion().desc()
           .list()
           .stream()
           .map(this::toDTO)
           .toList();
   }
 
   /**
    * Suspend definition → không thể start instances mới.
    * Useful khi phát hiện bug trong process, cần "pause" nó.
    */
   public void suspend(String definitionId) {
       repositoryService.suspendProcessDefinitionById(definitionId, true, null);
       // suspendProcessInstances=true → suspend cả running instances
       log.info("Suspended process definition: {}", definitionId);
   }
 
   /**
    * Lấy BPMN XML từ deployed definition.
    * Useful cho: hiển thị diagram, audit, export.
    */
   public String getBpmnXml(String definitionId) {
       ProcessDefinition def = repositoryService.getProcessDefinition(definitionId);
       try (InputStream is = repositoryService.getResourceAsStream(
               def.getDeploymentId(), def.getResourceName())) {
           return new String(is.readAllBytes(), StandardCharsets.UTF_8);
       } catch (IOException e) {
           throw new RuntimeException("Failed to read BPMN resource", e);
       }
   }
}

# runtimeservice — start & control instances

Đây là service bạn dùng nhiều nhất. Start processes, signal events, get/set variables.

@Service
@RequiredArgsConstructor
@Slf4j
public class ProcessInstanceService {
 
   private final RuntimeService runtimeService;
 
   /**
    * Start process instance.
    * businessKey = unique identifier từ business domain (order ID, loan ID).
    * Variables = initial data cho process.
    */
   public ProcessInstanceDTO startProcess(StartProcessRequest request) {
       Map<String, Object> variables = new HashMap<>();
       variables.put("applicantName", request.getApplicantName());
       variables.put("loanAmount", request.getLoanAmount());
       variables.put("applicationDate", LocalDate.now().toString());
       variables.put("workspaceId", request.getWorkspaceId());
 
       ProcessInstance instance = runtimeService.startProcessInstanceByKey(
           request.getProcessKey(),       // BPMN process ID (VD: "loan-approval")
           request.getBusinessKey(),      // Business identifier (VD: "LOAN-2024-001")
           variables
       );
 
       log.info("Process started | key={} | instanceId={} | businessKey={}",
           request.getProcessKey(), instance.getId(), request.getBusinessKey());
 
       return toDTO(instance);
   }
 
   /**
    * Gửi signal/message đến running instance.
    * Use case: external system callback, webhook response.
    */
   public void sendMessage(String messageName, String businessKey, Map<String, Object> payload) {
       // Find execution đang wait message
       Execution execution = runtimeService.createExecutionQuery()
           .messageEventSubscriptionName(messageName)
           .processInstanceBusinessKey(businessKey)
           .singleResult();
 
       if (execution == null) {
           throw new ProcessNotFoundException(
               "No process waiting for message: " + messageName + " with key: " + businessKey);
       }
 
       runtimeService.messageEventReceived(messageName, execution.getId(), payload);
       log.info("Message delivered | message={} | businessKey={}", messageName, businessKey);
   }
 
   /**
    * Get variables từ running instance.
    * Variables = shared state giữa tasks trong cùng instance.
    */
   public Map<String, Object> getVariables(String instanceId) {
       return runtimeService.getVariables(instanceId);
   }
 
   /**
    * Set/update variable dynamically.
    * Use case: external event update data giữa chừng process.
    */
   public void setVariable(String instanceId, String varName, Object value) {
       runtimeService.setVariable(instanceId, varName, value);
   }
 
   /**
    * Cancel running instance.
    * Use case: customer cancel application, admin abort stuck process.
    */
   public void cancel(String instanceId, String reason) {
       runtimeService.deleteProcessInstance(instanceId, reason);
       log.info("Process cancelled | instanceId={} | reason={}", instanceId, reason);
   }
}

# taskservice — user tasks (human interaction)

User Tasks là "pause points" — process dừng, chờ người thao tác. TaskService quản lý: query tasks cho user, claim/release, complete với output variables.

Concept quan trọng: Assignee vs Candidate. Assignee = task thuộc về 1 người cụ thể. Candidate Groups = task available cho nhóm (ai claim trước thì người đó làm). Giống email vs shared inbox.

@Service
@RequiredArgsConstructor
@Slf4j
public class UserTaskService {
 
   private final TaskService taskService;
 
   /**
    * Lấy tasks assigned cho user cụ thể.
    * "Inbox" của user — tasks đang chờ user hoàn thành.
    */
   public Page<TaskDTO> getMyTasks(String userId, Pageable pageable) {
       long total = taskService.createTaskQuery()
           .taskAssignee(userId)
           .count();
 
       List<Task> tasks = taskService.createTaskQuery()
           .taskAssignee(userId)
           .orderByTaskCreateTime().desc()
           .listPage((int) pageable.getOffset(), pageable.getPageSize());
 
       List<TaskDTO> dtos = tasks.stream().map(this::toDTO).toList();
       return new PageImpl<>(dtos, pageable, total);
   }
 
   /**
    * Lấy tasks available cho group (chưa ai claim).
    * "Shared queue" — ai rảnh thì nhận.
    */
   public List<TaskDTO> getGroupTasks(String groupId) {
       return taskService.createTaskQuery()
           .taskCandidateGroup(groupId)
           .taskUnassigned()  // Chưa ai claim
           .orderByTaskCreateTime().desc()
           .list()
           .stream()
           .map(this::toDTO)
           .toList();
   }
 
   /**
    * Claim task từ group queue.
    * Sau khi claim, task không còn visible cho người khác trong group.
    */
   public void claim(String taskId, String userId) {
       taskService.claim(taskId, userId);
       log.info("Task claimed | taskId={} | userId={}", taskId, userId);
   }
 
   /**
    * Unclaim — trả task về queue (user bận, không thể handle).
    */
   public void unclaim(String taskId) {
       taskService.unclaim(taskId);
   }
 
   /**
    * Complete task với output variables.
    * Variables này available cho downstream tasks/gateways.
    *
    * Ví dụ: Approver complete với decision="approve", comments="Looks good"
    * → Gateway đọc ${decision == 'approve'} → route accordingly
    */
   public void complete(String taskId, Map<String, Object> variables, String userId) {
       // Verify task assigned to this user
       Task task = taskService.createTaskQuery()
           .taskId(taskId)
           .taskAssignee(userId)
           .singleResult();
 
       if (task == null) {
           throw new TaskNotFoundException("Task not found or not assigned to you: " + taskId);
       }
 
       taskService.complete(taskId, variables);
       log.info("Task completed | taskId={} | taskName={} | userId={} | vars={}",
           taskId, task.getName(), userId, variables.keySet());
   }
 
   /**
    * Delegate task cho người khác.
    * Use case: manager delegate cho subordinate.
    */
   public void delegate(String taskId, String fromUserId, String toUserId) {
       taskService.delegateTask(taskId, toUserId);
       log.info("Task delegated | taskId={} | from={} | to={}", taskId, fromUserId, toUserId);
   }
 
   /**
    * Add comment to task (audit trail).
    */
   public void addComment(String taskId, String userId, String message) {
       Task task = taskService.createTaskQuery().taskId(taskId).singleResult();
       taskService.addComment(taskId, task.getProcessInstanceId(), message);
   }
 
   private TaskDTO toDTO(Task task) {
       Map<String, Object> variables = taskService.getVariables(task.getId());
       return TaskDTO.builder()
           .id(task.getId())
           .name(task.getName())
           .assignee(task.getAssignee())
           .createTime(task.getCreateTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime())
           .dueDate(task.getDueDate() != null ? task.getDueDate().toInstant().atZone(ZoneId.systemDefault()).toLocalDate() : null)
           .processInstanceId(task.getProcessInstanceId())
           .processDefinitionId(task.getProcessDefinitionId())
           .formKey(task.getFormKey())
           .variables(variables)
           .build();
   }
}

# historyservice — audit trail & analytics

Mọi thứ đã xảy ra được lưu trong history. Khi process complete, runtime tables clean up nhưng history giữ lại. Đây là data source cho: audit compliance, performance analytics, bottleneck detection.

@Service
@RequiredArgsConstructor
public class ProcessHistoryService {
 
   private final HistoryService historyService;
 
   /**
    * Lấy toàn bộ history của 1 process instance.
    * Ai làm gì, khi nào, mất bao lâu — full timeline.
    */
   public ProcessHistoryDTO getInstanceHistory(String instanceId) {
       HistoricProcessInstance instance = historyService.createHistoricProcessInstanceQuery()
           .processInstanceId(instanceId)
           .singleResult();
 
       List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
           .processInstanceId(instanceId)
           .orderByHistoricActivityInstanceStartTime().asc()
           .list();
 
       List<HistoricTaskInstance> tasks = historyService.createHistoricTaskInstanceQuery()
           .processInstanceId(instanceId)
           .orderByHistoricTaskInstanceStartTime().asc()
           .list();
 
       return ProcessHistoryDTO.builder()
           .instanceId(instanceId)
           .startTime(instance.getStartTime())
           .endTime(instance.getEndTime())
           .durationMs(instance.getDurationInMillis())
           .status(instance.getEndTime() != null ? "COMPLETED" : "RUNNING")
           .activities(activities.stream().map(this::toActivityDTO).toList())
           .tasks(tasks.stream().map(this::toTaskDTO).toList())
           .build();
   }
 
   /**
    * Phân tích performance: average duration per task.
    * Phát hiện bottleneck: task nào mất thời gian nhiều nhất?
    */
   public List<TaskPerformanceDTO> getTaskPerformance(String processKey, LocalDate from, LocalDate to) {
       return historyService.createHistoricTaskInstanceQuery()
           .processDefinitionKey(processKey)
           .taskCompletedAfter(toDate(from))
           .taskCompletedBefore(toDate(to))
           .list()
           .stream()
           .collect(Collectors.groupingBy(HistoricTaskInstance::getName))
           .entrySet().stream()
           .map(entry -> {
               List<HistoricTaskInstance> tasks = entry.getValue();
               double avgDuration = tasks.stream()
                   .mapToLong(t -> t.getDurationInMillis() != null ? t.getDurationInMillis() : 0)
                   .average().orElse(0);
               return new TaskPerformanceDTO(
                   entry.getKey(),
                   tasks.size(),
                   (long) avgDuration,
                   tasks.stream().mapToLong(HistoricTaskInstance::getDurationInMillis).max().orElse(0)
               );
           })
           .sorted(Comparator.comparingLong(TaskPerformanceDTO::avgDurationMs).reversed())
           .toList();
   }
}

# java delegates — nơi business logic sống

Java Delegate là bridge giữa BPMN diagram và Java code. Mỗi Service Task trong diagram gọi 1 delegate. Delegate nhận execution context (variables, process info), thực hiện logic, và set output variables.

Design principles cho delegates:

  1. Single Responsibility — 1 delegate làm 1 việc. Đừng nhồi 500 dòng logic vào 1 delegate.
  2. Idempotent — Engine có thể retry. chargePayment() phải check "đã charge chưa?" trước khi charge.
  3. Fast — Synchronous delegates block engine thread. Nếu cần gọi external API mất 30s, dùng async task.
  4. Stateless — Delegate là Spring singleton. Không giữ state giữa các lần gọi. Dùng process variables.
/**
* Delegate tính toán credit score.
*
* Input variables: customerId, loanAmount
* Output variables: creditScore, riskLevel, autoApproved
*
* BPMN reference: activiti:delegateExpression="${creditScoreDelegate}"
*/
@Component("creditScoreDelegate")
@RequiredArgsConstructor
@Slf4j
public class CreditScoreDelegate implements JavaDelegate {
 
   private final CreditScoringClient scoringClient;  // Gọi external service
   private final MetricsService metrics;
 
   @Override
   public void execute(DelegateExecution execution) {
       // 1. Read input variables
       String customerId = (String) execution.getVariable("customerId");
       BigDecimal loanAmount = (BigDecimal) execution.getVariable("loanAmount");
       String processId = execution.getProcessInstanceId();
 
       log.info("[processId={}] Calculating credit score | customerId={} | amount={}",
           processId, customerId, loanAmount);
 
       // 2. Execute business logic
       long start = System.nanoTime();
       try {
           CreditScoreResult result = scoringClient.calculate(customerId, loanAmount);
 
           // 3. Set output variables (available for downstream tasks/gateways)
           execution.setVariable("creditScore", result.getScore());
           execution.setVariable("riskLevel", result.getRiskLevel().name());
           execution.setVariable("autoApproved", result.getScore() >= 750);
           execution.setVariable("scoringDetails", result.getDetails());
 
           long durationMs = (System.nanoTime() - start) / 1_000_000;
           metrics.recordTaskDuration("credit_score_calculation", durationMs);
 
           log.info("[processId={}] Credit score calculated | score={} | risk={} | autoApproved={} | {}ms",
               processId, result.getScore(), result.getRiskLevel(),
               result.getScore() >= 750, durationMs);
 
       } catch (ScoringServiceUnavailableException e) {
           // Throw BpmnError → caught by boundary error event trên diagram
           log.error("[processId={}] Scoring service unavailable: {}", processId, e.getMessage());
           throw new BpmnError("SCORING_UNAVAILABLE",
               "Credit scoring service is temporarily unavailable");
 
       } catch (InvalidCustomerException e) {
           // Business error → route to rejection path
           execution.setVariable("rejectionReason", e.getMessage());
           throw new BpmnError("INVALID_CUSTOMER", e.getMessage());
       }
       // RuntimeException (NPE, etc.) → task fails, engine creates incident
       // → requires manual intervention or job retry
   }
}

# delegate expression vs class vs expression

3 cách reference delegate trong BPMN XML:

<!-- 1. delegateExpression: Spring bean name (RECOMMENDED) -->
<!-- Bean phải implement JavaDelegate -->
<serviceTask activiti:delegateExpression="${creditScoreDelegate}"/>
 
<!-- 2. class: Full qualified class name (instantiated by engine, NOT Spring-managed) -->
<!-- ❌ KHÔNG inject Spring beans được (no @Autowired) -->
<serviceTask activiti:class="com.example.bpm.delegate.CreditScoreDelegate"/>
 
<!-- 3. expression: Call method trên Spring bean (không cần implement JavaDelegate) -->
<!-- Return value tự động set vào biến nếu có resultVariable -->
<serviceTask activiti:expression="${orderService.calculateTotal(execution)}"
            activiti:resultVariable="totalAmount"/>

Recommendation: Luôn dùng delegateExpression — Spring-managed, injectable, testable.

# async execution & job executor

Mặc định, Service Tasks chạy synchronous — cùng thread với process execution. Nghĩa là nếu delegate gọi external API mất 30 giây, engine thread bị block 30 giây. Với 10 concurrent instances → 10 threads bị block → thread pool exhausted → mọi process trong engine freeze.

Async tasks giải quyết: engine tạo "job" trong database và return ngay. Job Executor (background thread pool) pick up jobs và execute async. Nếu fail → retry với exponential backoff. Nếu retry hết → move to deadletter queue.

<!-- Async service task: engine không block, delegate chạy bởi job executor -->
<serviceTask id="callExternalApi"
            activiti:delegateExpression="${externalApiDelegate}"
            activiti:async="true"
            activiti:exclusive="true"/>
<!--
 async="true": tạo job thay vì execute inline
 exclusive="true": không chạy song song với jobs cùng instance (tránh optimistic lock)
-->

# khi nào dùng async?

ScenarioSyncAsyncLý do
Tính toán nhanh (<100ms)Overhead job creation không đáng
Gọi external APINetwork latency unpredictable
Gửi email/notificationKhông cần block process flow
Database query nội bộNhanh, cùng transaction
File processing (large)CPU-intensive, có thể OOM
Sau parallel gateway forkMỗi path chạy independent job

# job executor configuration

@Configuration
public class AsyncExecutorConfig {
 
   @Bean
   public SpringProcessEngineConfiguration engineConfig(DataSource ds,
           PlatformTransactionManager txManager) {
       SpringProcessEngineConfiguration config = new SpringProcessEngineConfiguration();
       config.setDataSource(ds);
       config.setTransactionManager(txManager);
 
       // Async executor settings
       config.setAsyncExecutorActivate(true);
       config.setAsyncExecutorCorePoolSize(5);        // Minimum threads
       config.setAsyncExecutorMaxPoolSize(20);       // Maximum threads
       config.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(5000);  // Poll interval (ms)
       config.setAsyncExecutorDefaultTimerJobAcquireWaitTime(5000);
       config.setAsyncFailedJobWaitTime(10);         // Seconds between retries
       config.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(5);     // Jobs per batch
 
       // Retry configuration
       config.setDefaultFailedJobWaitTime(300);      // 5 minutes between retries
       config.setAsyncExecutorResetExpiredJobsInterval(60000);      // 1 minute
 
       return config;
   }
}

# handling failed jobs

@Service
@RequiredArgsConstructor
@Slf4j
public class FailedJobService {
 
   private final ManagementService managementService;
 
   /**
    * Lấy jobs đang stuck (retries exhausted).
    * Monitor dashboard nên alert khi có deadletter jobs.
    */
   public List<DeadLetterJobDTO> getDeadLetterJobs() {
       return managementService.createDeadLetterJobQuery()
           .orderByJobCreateTime().desc()
           .list()
           .stream()
           .map(job -> DeadLetterJobDTO.builder()
               .jobId(job.getId())
               .processInstanceId(job.getProcessInstanceId())
               .exceptionMessage(job.getExceptionMessage())
               .createTime(job.getCreateTime())
               .retries(job.getRetries())
               .build())
           .toList();
   }
 
   /**
    * Retry dead letter job (sau khi fix root cause).
    * Move job từ deadletter → executable queue.
    */
   public void retryDeadLetterJob(String jobId) {
       managementService.moveDeadLetterJobToExecutableJob(jobId, 3); // 3 retries
       log.info("Dead letter job moved to executable | jobId={}", jobId);
   }
}

# multi-tenancy — workspace isolation

Trong SaaS platforms, mỗi organization/tenant cần isolation. Process definitions và instances phải separated — tenant A không thấy processes của tenant B. Activiti hỗ trợ multi-tenancy native qua tenantId.

@Service
@RequiredArgsConstructor
public class TenantAwareProcessService {
 
   private final RepositoryService repositoryService;
   private final RuntimeService runtimeService;
   private final TaskService taskService;
 
   // Deploy với tenant isolation
   public String deploy(String bpmnXml, String name, String workspaceId) {
       Deployment deployment = repositoryService.createDeployment()
           .name(name)
           .tenantId(workspaceId)  // ← Tenant isolation
           .addString(name + ".bpmn20.xml", bpmnXml)
           .deploy();
       return deployment.getId();
   }
 
   // Start instance — tự động inherit tenant từ definition
   public String startProcess(String processKey, String workspaceId, Map<String, Object> vars) {
       ProcessInstance instance = runtimeService.startProcessInstanceByKeyAndTenantId(
           processKey, vars, workspaceId);  // ← Tenant-scoped
       return instance.getId();
   }
 
   // Query tasks — chỉ trong workspace
   public List<Task> getTasks(String userId, String workspaceId) {
       return taskService.createTaskQuery()
           .taskAssignee(userId)
           .taskTenantId(workspaceId)  // ← Tenant filter
           .list();
   }
}

# testing processes

Testing BPMN processes = testing cả flow logic (diagram) + business logic (delegates). Unit test delegates riêng, integration test toàn process.

@SpringBootTest
@ActiveProfiles("test")
class LoanApprovalProcessTest {
 
   @Autowired private RuntimeService runtimeService;
   @Autowired private TaskService taskService;
   @Autowired private HistoryService historyService;
   @Autowired private RepositoryService repositoryService;
 
   @MockBean private CreditScoringClient creditScoringClient;
   @MockBean private NotificationService notificationService;
 
   @BeforeEach
   void deployProcess() {
       repositoryService.createDeployment()
           .addClasspathResource("processes/loan-approval.bpmn20.xml")
           .deploy();
   }
 
   @Test
   void loanApproval_highCreditScore_autoApproved() {
       // Given: high credit score → auto-approve path
       when(creditScoringClient.calculate(any(), any()))
           .thenReturn(new CreditScoreResult(800, RiskLevel.LOW));
 
       // When: start process
       Map<String, Object> variables = Map.of(
           "customerId", "CUST-001",
           "loanAmount", new BigDecimal("500000000"),
           "applicantName", "Nguyen Van A"
       );
       ProcessInstance instance = runtimeService.startProcessInstanceByKey(
           "loan-approval", "LOAN-001", variables);
 
       // Then: process should complete automatically (no user task needed)
       HistoricProcessInstance completed = historyService.createHistoricProcessInstanceQuery()
           .processInstanceId(instance.getId())
           .finished()
           .singleResult();
 
       assertThat(completed).isNotNull();
       assertThat(completed.getEndTime()).isNotNull();
 
       // Verify notification sent
       verify(notificationService).sendApprovalNotification("CUST-001", "LOAN-001");
   }
 
   @Test
   void loanApproval_lowCreditScore_requiresManualReview() {
       // Given: low credit score → manual review path
       when(creditScoringClient.calculate(any(), any()))
           .thenReturn(new CreditScoreResult(550, RiskLevel.HIGH));
 
       // When: start process
       ProcessInstance instance = runtimeService.startProcessInstanceByKey(
           "loan-approval", "LOAN-002", Map.of(
               "customerId", "CUST-002",
               "loanAmount", new BigDecimal("2000000000")
           ));
 
       // Then: should create user task for manual review
       Task reviewTask = taskService.createTaskQuery()
           .processInstanceId(instance.getId())
           .taskDefinitionKey("manualReviewTask")
           .singleResult();
 
       assertThat(reviewTask).isNotNull();
       assertThat(reviewTask.getName()).isEqualTo("Manual Review");
 
       // Complete review with approval
       taskService.complete(reviewTask.getId(), Map.of(
           "decision", "approve",
           "reviewComments", "High amount but good collateral"
       ));
 
       // Verify process completed
       HistoricProcessInstance completed = historyService.createHistoricProcessInstanceQuery()
           .processInstanceId(instance.getId())
           .finished()
           .singleResult();
       assertThat(completed).isNotNull();
   }
 
   @Test
   void loanApproval_scoringServiceDown_errorPath() {
       // Given: scoring service throws error
       when(creditScoringClient.calculate(any(), any()))
           .thenThrow(new ScoringServiceUnavailableException("Service down"));
 
       // When: start process
       ProcessInstance instance = runtimeService.startProcessInstanceByKey(
           "loan-approval", "LOAN-003", Map.of(
               "customerId", "CUST-003",
               "loanAmount", new BigDecimal("100000000")
           ));
 
       // Then: should route to error handling path (manual scoring task)
       Task manualScoringTask = taskService.createTaskQuery()
           .processInstanceId(instance.getId())
           .taskDefinitionKey("manualScoringTask")
           .singleResult();
 
       assertThat(manualScoringTask).isNotNull();
       assertThat(manualScoringTask.getName()).isEqualTo("Manual Credit Assessment");
   }
}

# production monitoring & troubleshooting

# metrics to track

@Component
@RequiredArgsConstructor
public class ProcessMetricsCollector {
 
   private final RuntimeService runtimeService;
   private final ManagementService managementService;
   private final HistoryService historyService;
   private final MeterRegistry meterRegistry;
 
   @Scheduled(fixedRate = 30000)  // Every 30s
   public void collectMetrics() {
       // Active instances
       long activeInstances = runtimeService.createProcessInstanceQuery().count();
       meterRegistry.gauge("bpmn.instances.active", activeInstances);
 
       // Suspended instances (paused, needs attention)
       long suspendedInstances = runtimeService.createProcessInstanceQuery().suspended().count();
       meterRegistry.gauge("bpmn.instances.suspended", suspendedInstances);
 
       // Pending user tasks (work backlog)
       long pendingTasks = managementService.createJobQuery().count();
       meterRegistry.gauge("bpmn.tasks.pending", pendingTasks);
 
       // Failed jobs (needs intervention)
       long failedJobs = managementService.createDeadLetterJobQuery().count();
       meterRegistry.gauge("bpmn.jobs.deadletter", failedJobs);
 
       // Timer jobs waiting (scheduled future work)
       long timerJobs = managementService.createTimerJobQuery().count();
       meterRegistry.gauge("bpmn.jobs.timer", timerJobs);
   }
}

# common issues & solutions

SymptomRoot CauseSolution
Process stuck at service taskDelegate throws uncaught exceptionCheck ACT_RU_JOB exceptions, fix delegate, retry
User task not appearingWrong assignee/candidateGroupCheck BPMN definition, verify user in correct group
Process very slowSynchronous external callsMake service tasks async
"Optimistic locking exception"Concurrent updates on same instanceUse exclusive="true" on async tasks
History tables hugeNo cleanup/archivingSchedule history cleanup job
Deadletter jobs piling upExternal service permanently downFix service, then moveDeadLetterJobToExecutable

# kết luận

Activiti là powerful tool nhưng có learning curve. Key takeaways cho developer:

  • BPMN diagram = executable code — respect it like production code (version, test, review)
  • Delegates = your code — keep them small, idempotent, testable
  • Async cho I/O — đừng block engine threads với external calls
  • Monitor deadletter jobs — silent failures kill production
  • Multi-tenancy từ đầu — retrofit tenant isolation sau = nightmare
  • Test full flows — unit test delegates + integration test process paths
  • History = gold mine — dùng cho analytics, bottleneck detection, compliance

Chỉ là những ghi chép cá nhân với hy vọng mang lại chút giá trị. Nếu thấy hữu ích, đừng ngại chia sẻ cho bạn bè & đồng nghiệp nhé!

Happy coding 😎 👍🏻 🚀 🔥.