# activiti là gì và tại sao cần nó?
Bạn có thể code mọi workflow bằng if-else, state machine, hay event-driven architecture. Nhưng khi business process có 20+ steps, conditional branches, parallel paths, human approvals, timeouts, error handling, và cần thay đổi thường xuyên mà KHÔNG redeploy code — bạn cần một process engine.
Activiti là BPMN 2.0 engine chạy trên JVM. Nó nhận file BPMN XML (vẽ bằng visual designer), parse thành executable logic, và điều phối execution xuyên suốt process lifecycle. Developer không code flow control — chỉ code individual tasks (delegates). Flow logic nằm trong BPMN diagram mà BA có thể thay đổi.
Ví dụ thực tế: Một ngân hàng dùng Activiti cho quy trình duyệt khoản vay. Khi khách hàng submit đơn trên web portal, BPMN XML được execute — tự động gọi credit scoring API, route đến nhân viên phê duyệt phù hợp, gửi notification, và track SLA.
# activiti vs camunda vs flowable
Cả 3 đều fork từ cùng codebase (jBPM → Activiti 5 → fork). Điểm khác biệt:
| Activiti 8 | Camunda 7/8 | Flowable 7 | |
|---|---|---|---|
| License | Apache 2.0 | Community + Enterprise | Apache 2.0 |
| Cloud-native | Activiti Cloud (K8s) | Camunda 8 (Zeebe) | Flowable Cloud |
| Spring Boot | Native integration | Good integration | Native integration |
| Runtime model | Shared DB (RDBMS) | Shared DB / Event-sourced (C8) | Shared DB |
| Used by | ✓ (many enterprises) |
# architecture — activiti bên trong làm gì?
Khi bạn gọi runtimeService.startProcessInstanceByKey("loan-approval"), đây là điều xảy ra bên dưới:
1. Engine đọc Process Definition từ ACT_RE_PROCDEF table
2. Tạo Process Instance (row trong ACT_RU_EXECUTION)
3. Tạo execution token ở Start Event
4. Token di chuyển theo Sequence Flow
5. Gặp Service Task → gọi Java Delegate → execute business logic
6. Gặp User Task → INSERT vào ACT_RU_TASK → DỪNG (wait state)
7. Khi user complete task → token tiếp tục di chuyển
8. Gặp Gateway → evaluate conditions → chọn paths
9. Gặp End Event → complete instance → move to history tables (ACT_HI_*)
# database tables
Activiti tạo ~28 tables, chia thành prefixes:
| Prefix | Tên | Mục đích | Ví dụ |
|---|---|---|---|
| ACTRE* | Repository | Process definitions, deployments | ACT_RE_PROCDEF, ACT_RE_DEPLOYMENT |
| ACTRU* | Runtime | Running instances, tasks, variables, jobs | ACT_RU_EXECUTION, ACT_RU_TASK |
| ACTHI* | History | Completed instances, audit trail | ACT_HI_PROCINST, ACT_HI_ACTINST |
| ACTGE* | General | Byte arrays, properties | ACT_GE_BYTEARRAY |
| ACTID* | Identity | Users, groups (optional) | ACT_ID_USER, ACT_ID_GROUP |
Quan trọng cho performance: ACT_RU_* tables chỉ chứa data đang running — khi instance complete, data chuyển sang ACT_HI_*. Nếu bạn có 1 triệu instances hoàn thành, ACT_RU_EXECUTION vẫn nhỏ (chỉ instances đang chạy). Nhưng ACT_HI_* sẽ phình to — cần archiving strategy.
# spring boot integration — setup
# dependencies
<dependency>
<groupId>org.activiti</groupId>
<artifactId>activiti-spring-boot-starter</artifactId>
<version>8.6.0</version>
</dependency># configuration
spring:
activiti:
database-schema-update: true # Auto create/update schema
history-level: full # Lưu full audit (activity, variable, task details)
async-executor-activate: true # Enable async job execution
check-process-definitions: false # Không auto-deploy từ classpath
datasource:
url: jdbc:postgresql://localhost:5432/bpm_db
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
maximum-pool-size: 30 # Activiti cần nhiều connections cho async jobs# configuration class
@Configuration
public class ActivitiConfig {
@Bean
public SpringProcessEngineConfiguration processEngineConfiguration(
DataSource dataSource,
PlatformTransactionManager transactionManager,
List<JavaDelegate> delegates) {
SpringProcessEngineConfiguration config = new SpringProcessEngineConfiguration();
config.setDataSource(dataSource);
config.setTransactionManager(transactionManager);
config.setDatabaseSchemaUpdate("true");
config.setAsyncExecutorActivate(true);
config.setHistoryLevel(HistoryLevel.FULL);
// Custom ID generator (UUID thay vì sequential)
config.setIdGenerator(new StrongUuidGenerator());
// Async executor tuning
config.setAsyncExecutorCorePoolSize(5);
config.setAsyncExecutorMaxPoolSize(20);
config.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(5000);
return config;
}
}Activiti tự động detect Spring beans và cho phép reference chúng trong BPMN expressions (${beanName.method()}). Không cần register thủ công — Spring ApplicationContext = Activiti's bean registry.
# core services — api chính bạn cần biết
Activiti expose 7 services chính. Trong daily work, bạn dùng 4-5 cái đầu tiên thường xuyên nhất.
# repositoryservice — quản lý process definitions
Think of this as "deployment & versioning." Mỗi BPMN file deploy = 1 ProcessDefinition. Deploy lại cùng key = version mới.
@Service
@RequiredArgsConstructor
@Slf4j
public class ProcessDefinitionService {
private final RepositoryService repositoryService;
/**
* Deploy BPMN XML. Mỗi lần deploy cùng key → version tăng.
* Instances đang chạy KHÔNG bị ảnh hưởng — giữ version cũ.
* Instances mới start sẽ dùng version mới nhất.
*/
public DeploymentResult deploy(String bpmnXml, String processName, String workspaceId) {
Deployment deployment = repositoryService.createDeployment()
.name(processName)
.tenantId(workspaceId) // Multi-tenant isolation
.addString(processName + ".bpmn20.xml", bpmnXml)
.deploy();
// Lấy definition mới deploy
ProcessDefinition definition = repositoryService.createProcessDefinitionQuery()
.deploymentId(deployment.getId())
.singleResult();
log.info("Deployed process | name={} | version={} | deploymentId={}",
definition.getName(), definition.getVersion(), deployment.getId());
return new DeploymentResult(deployment.getId(), definition.getId(), definition.getVersion());
}
/**
* Lấy tất cả versions của 1 process definition.
* Hữu ích cho rollback: activate version cũ, suspend version mới.
*/
public List<ProcessDefinitionDTO> getVersions(String processKey, String workspaceId) {
return repositoryService.createProcessDefinitionQuery()
.processDefinitionKey(processKey)
.processDefinitionTenantId(workspaceId)
.orderByProcessDefinitionVersion().desc()
.list()
.stream()
.map(this::toDTO)
.toList();
}
/**
* Suspend definition → không thể start instances mới.
* Useful khi phát hiện bug trong process, cần "pause" nó.
*/
public void suspend(String definitionId) {
repositoryService.suspendProcessDefinitionById(definitionId, true, null);
// suspendProcessInstances=true → suspend cả running instances
log.info("Suspended process definition: {}", definitionId);
}
/**
* Lấy BPMN XML từ deployed definition.
* Useful cho: hiển thị diagram, audit, export.
*/
public String getBpmnXml(String definitionId) {
ProcessDefinition def = repositoryService.getProcessDefinition(definitionId);
try (InputStream is = repositoryService.getResourceAsStream(
def.getDeploymentId(), def.getResourceName())) {
return new String(is.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to read BPMN resource", e);
}
}
}# runtimeservice — start & control instances
Đây là service bạn dùng nhiều nhất. Start processes, signal events, get/set variables.
@Service
@RequiredArgsConstructor
@Slf4j
public class ProcessInstanceService {
private final RuntimeService runtimeService;
/**
* Start process instance.
* businessKey = unique identifier từ business domain (order ID, loan ID).
* Variables = initial data cho process.
*/
public ProcessInstanceDTO startProcess(StartProcessRequest request) {
Map<String, Object> variables = new HashMap<>();
variables.put("applicantName", request.getApplicantName());
variables.put("loanAmount", request.getLoanAmount());
variables.put("applicationDate", LocalDate.now().toString());
variables.put("workspaceId", request.getWorkspaceId());
ProcessInstance instance = runtimeService.startProcessInstanceByKey(
request.getProcessKey(), // BPMN process ID (VD: "loan-approval")
request.getBusinessKey(), // Business identifier (VD: "LOAN-2024-001")
variables
);
log.info("Process started | key={} | instanceId={} | businessKey={}",
request.getProcessKey(), instance.getId(), request.getBusinessKey());
return toDTO(instance);
}
/**
* Gửi signal/message đến running instance.
* Use case: external system callback, webhook response.
*/
public void sendMessage(String messageName, String businessKey, Map<String, Object> payload) {
// Find execution đang wait message
Execution execution = runtimeService.createExecutionQuery()
.messageEventSubscriptionName(messageName)
.processInstanceBusinessKey(businessKey)
.singleResult();
if (execution == null) {
throw new ProcessNotFoundException(
"No process waiting for message: " + messageName + " with key: " + businessKey);
}
runtimeService.messageEventReceived(messageName, execution.getId(), payload);
log.info("Message delivered | message={} | businessKey={}", messageName, businessKey);
}
/**
* Get variables từ running instance.
* Variables = shared state giữa tasks trong cùng instance.
*/
public Map<String, Object> getVariables(String instanceId) {
return runtimeService.getVariables(instanceId);
}
/**
* Set/update variable dynamically.
* Use case: external event update data giữa chừng process.
*/
public void setVariable(String instanceId, String varName, Object value) {
runtimeService.setVariable(instanceId, varName, value);
}
/**
* Cancel running instance.
* Use case: customer cancel application, admin abort stuck process.
*/
public void cancel(String instanceId, String reason) {
runtimeService.deleteProcessInstance(instanceId, reason);
log.info("Process cancelled | instanceId={} | reason={}", instanceId, reason);
}
}# taskservice — user tasks (human interaction)
User Tasks là "pause points" — process dừng, chờ người thao tác. TaskService quản lý: query tasks cho user, claim/release, complete với output variables.
Concept quan trọng: Assignee vs Candidate. Assignee = task thuộc về 1 người cụ thể. Candidate Groups = task available cho nhóm (ai claim trước thì người đó làm). Giống email vs shared inbox.
@Service
@RequiredArgsConstructor
@Slf4j
public class UserTaskService {
private final TaskService taskService;
/**
* Lấy tasks assigned cho user cụ thể.
* "Inbox" của user — tasks đang chờ user hoàn thành.
*/
public Page<TaskDTO> getMyTasks(String userId, Pageable pageable) {
long total = taskService.createTaskQuery()
.taskAssignee(userId)
.count();
List<Task> tasks = taskService.createTaskQuery()
.taskAssignee(userId)
.orderByTaskCreateTime().desc()
.listPage((int) pageable.getOffset(), pageable.getPageSize());
List<TaskDTO> dtos = tasks.stream().map(this::toDTO).toList();
return new PageImpl<>(dtos, pageable, total);
}
/**
* Lấy tasks available cho group (chưa ai claim).
* "Shared queue" — ai rảnh thì nhận.
*/
public List<TaskDTO> getGroupTasks(String groupId) {
return taskService.createTaskQuery()
.taskCandidateGroup(groupId)
.taskUnassigned() // Chưa ai claim
.orderByTaskCreateTime().desc()
.list()
.stream()
.map(this::toDTO)
.toList();
}
/**
* Claim task từ group queue.
* Sau khi claim, task không còn visible cho người khác trong group.
*/
public void claim(String taskId, String userId) {
taskService.claim(taskId, userId);
log.info("Task claimed | taskId={} | userId={}", taskId, userId);
}
/**
* Unclaim — trả task về queue (user bận, không thể handle).
*/
public void unclaim(String taskId) {
taskService.unclaim(taskId);
}
/**
* Complete task với output variables.
* Variables này available cho downstream tasks/gateways.
*
* Ví dụ: Approver complete với decision="approve", comments="Looks good"
* → Gateway đọc ${decision == 'approve'} → route accordingly
*/
public void complete(String taskId, Map<String, Object> variables, String userId) {
// Verify task assigned to this user
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskAssignee(userId)
.singleResult();
if (task == null) {
throw new TaskNotFoundException("Task not found or not assigned to you: " + taskId);
}
taskService.complete(taskId, variables);
log.info("Task completed | taskId={} | taskName={} | userId={} | vars={}",
taskId, task.getName(), userId, variables.keySet());
}
/**
* Delegate task cho người khác.
* Use case: manager delegate cho subordinate.
*/
public void delegate(String taskId, String fromUserId, String toUserId) {
taskService.delegateTask(taskId, toUserId);
log.info("Task delegated | taskId={} | from={} | to={}", taskId, fromUserId, toUserId);
}
/**
* Add comment to task (audit trail).
*/
public void addComment(String taskId, String userId, String message) {
Task task = taskService.createTaskQuery().taskId(taskId).singleResult();
taskService.addComment(taskId, task.getProcessInstanceId(), message);
}
private TaskDTO toDTO(Task task) {
Map<String, Object> variables = taskService.getVariables(task.getId());
return TaskDTO.builder()
.id(task.getId())
.name(task.getName())
.assignee(task.getAssignee())
.createTime(task.getCreateTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime())
.dueDate(task.getDueDate() != null ? task.getDueDate().toInstant().atZone(ZoneId.systemDefault()).toLocalDate() : null)
.processInstanceId(task.getProcessInstanceId())
.processDefinitionId(task.getProcessDefinitionId())
.formKey(task.getFormKey())
.variables(variables)
.build();
}
}# historyservice — audit trail & analytics
Mọi thứ đã xảy ra được lưu trong history. Khi process complete, runtime tables clean up nhưng history giữ lại. Đây là data source cho: audit compliance, performance analytics, bottleneck detection.
@Service
@RequiredArgsConstructor
public class ProcessHistoryService {
private final HistoryService historyService;
/**
* Lấy toàn bộ history của 1 process instance.
* Ai làm gì, khi nào, mất bao lâu — full timeline.
*/
public ProcessHistoryDTO getInstanceHistory(String instanceId) {
HistoricProcessInstance instance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(instanceId)
.singleResult();
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(instanceId)
.orderByHistoricActivityInstanceStartTime().asc()
.list();
List<HistoricTaskInstance> tasks = historyService.createHistoricTaskInstanceQuery()
.processInstanceId(instanceId)
.orderByHistoricTaskInstanceStartTime().asc()
.list();
return ProcessHistoryDTO.builder()
.instanceId(instanceId)
.startTime(instance.getStartTime())
.endTime(instance.getEndTime())
.durationMs(instance.getDurationInMillis())
.status(instance.getEndTime() != null ? "COMPLETED" : "RUNNING")
.activities(activities.stream().map(this::toActivityDTO).toList())
.tasks(tasks.stream().map(this::toTaskDTO).toList())
.build();
}
/**
* Phân tích performance: average duration per task.
* Phát hiện bottleneck: task nào mất thời gian nhiều nhất?
*/
public List<TaskPerformanceDTO> getTaskPerformance(String processKey, LocalDate from, LocalDate to) {
return historyService.createHistoricTaskInstanceQuery()
.processDefinitionKey(processKey)
.taskCompletedAfter(toDate(from))
.taskCompletedBefore(toDate(to))
.list()
.stream()
.collect(Collectors.groupingBy(HistoricTaskInstance::getName))
.entrySet().stream()
.map(entry -> {
List<HistoricTaskInstance> tasks = entry.getValue();
double avgDuration = tasks.stream()
.mapToLong(t -> t.getDurationInMillis() != null ? t.getDurationInMillis() : 0)
.average().orElse(0);
return new TaskPerformanceDTO(
entry.getKey(),
tasks.size(),
(long) avgDuration,
tasks.stream().mapToLong(HistoricTaskInstance::getDurationInMillis).max().orElse(0)
);
})
.sorted(Comparator.comparingLong(TaskPerformanceDTO::avgDurationMs).reversed())
.toList();
}
}# java delegates — nơi business logic sống
Java Delegate là bridge giữa BPMN diagram và Java code. Mỗi Service Task trong diagram gọi 1 delegate. Delegate nhận execution context (variables, process info), thực hiện logic, và set output variables.
Design principles cho delegates:
- Single Responsibility — 1 delegate làm 1 việc. Đừng nhồi 500 dòng logic vào 1 delegate.
- Idempotent — Engine có thể retry.
chargePayment()phải check "đã charge chưa?" trước khi charge. - Fast — Synchronous delegates block engine thread. Nếu cần gọi external API mất 30s, dùng async task.
- Stateless — Delegate là Spring singleton. Không giữ state giữa các lần gọi. Dùng process variables.
/**
* Delegate tính toán credit score.
*
* Input variables: customerId, loanAmount
* Output variables: creditScore, riskLevel, autoApproved
*
* BPMN reference: activiti:delegateExpression="${creditScoreDelegate}"
*/
@Component("creditScoreDelegate")
@RequiredArgsConstructor
@Slf4j
public class CreditScoreDelegate implements JavaDelegate {
private final CreditScoringClient scoringClient; // Gọi external service
private final MetricsService metrics;
@Override
public void execute(DelegateExecution execution) {
// 1. Read input variables
String customerId = (String) execution.getVariable("customerId");
BigDecimal loanAmount = (BigDecimal) execution.getVariable("loanAmount");
String processId = execution.getProcessInstanceId();
log.info("[processId={}] Calculating credit score | customerId={} | amount={}",
processId, customerId, loanAmount);
// 2. Execute business logic
long start = System.nanoTime();
try {
CreditScoreResult result = scoringClient.calculate(customerId, loanAmount);
// 3. Set output variables (available for downstream tasks/gateways)
execution.setVariable("creditScore", result.getScore());
execution.setVariable("riskLevel", result.getRiskLevel().name());
execution.setVariable("autoApproved", result.getScore() >= 750);
execution.setVariable("scoringDetails", result.getDetails());
long durationMs = (System.nanoTime() - start) / 1_000_000;
metrics.recordTaskDuration("credit_score_calculation", durationMs);
log.info("[processId={}] Credit score calculated | score={} | risk={} | autoApproved={} | {}ms",
processId, result.getScore(), result.getRiskLevel(),
result.getScore() >= 750, durationMs);
} catch (ScoringServiceUnavailableException e) {
// Throw BpmnError → caught by boundary error event trên diagram
log.error("[processId={}] Scoring service unavailable: {}", processId, e.getMessage());
throw new BpmnError("SCORING_UNAVAILABLE",
"Credit scoring service is temporarily unavailable");
} catch (InvalidCustomerException e) {
// Business error → route to rejection path
execution.setVariable("rejectionReason", e.getMessage());
throw new BpmnError("INVALID_CUSTOMER", e.getMessage());
}
// RuntimeException (NPE, etc.) → task fails, engine creates incident
// → requires manual intervention or job retry
}
}# delegate expression vs class vs expression
3 cách reference delegate trong BPMN XML:
<!-- 1. delegateExpression: Spring bean name (RECOMMENDED) -->
<!-- Bean phải implement JavaDelegate -->
<serviceTask activiti:delegateExpression="${creditScoreDelegate}"/>
<!-- 2. class: Full qualified class name (instantiated by engine, NOT Spring-managed) -->
<!-- ❌ KHÔNG inject Spring beans được (no @Autowired) -->
<serviceTask activiti:class="com.example.bpm.delegate.CreditScoreDelegate"/>
<!-- 3. expression: Call method trên Spring bean (không cần implement JavaDelegate) -->
<!-- Return value tự động set vào biến nếu có resultVariable -->
<serviceTask activiti:expression="${orderService.calculateTotal(execution)}"
activiti:resultVariable="totalAmount"/>Recommendation: Luôn dùng delegateExpression — Spring-managed, injectable, testable.
# async execution & job executor
Mặc định, Service Tasks chạy synchronous — cùng thread với process execution. Nghĩa là nếu delegate gọi external API mất 30 giây, engine thread bị block 30 giây. Với 10 concurrent instances → 10 threads bị block → thread pool exhausted → mọi process trong engine freeze.
Async tasks giải quyết: engine tạo "job" trong database và return ngay. Job Executor (background thread pool) pick up jobs và execute async. Nếu fail → retry với exponential backoff. Nếu retry hết → move to deadletter queue.
<!-- Async service task: engine không block, delegate chạy bởi job executor -->
<serviceTask id="callExternalApi"
activiti:delegateExpression="${externalApiDelegate}"
activiti:async="true"
activiti:exclusive="true"/>
<!--
async="true": tạo job thay vì execute inline
exclusive="true": không chạy song song với jobs cùng instance (tránh optimistic lock)
--># khi nào dùng async?
| Scenario | Sync | Async | Lý do |
|---|---|---|---|
Tính toán nhanh (<100ms) | ✓ | Overhead job creation không đáng | |
| Gọi external API | ✓ | Network latency unpredictable | |
| Gửi email/notification | ✓ | Không cần block process flow | |
| Database query nội bộ | ✓ | Nhanh, cùng transaction | |
| File processing (large) | ✓ | CPU-intensive, có thể OOM | |
| Sau parallel gateway fork | ✓ | Mỗi path chạy independent job |
# job executor configuration
@Configuration
public class AsyncExecutorConfig {
@Bean
public SpringProcessEngineConfiguration engineConfig(DataSource ds,
PlatformTransactionManager txManager) {
SpringProcessEngineConfiguration config = new SpringProcessEngineConfiguration();
config.setDataSource(ds);
config.setTransactionManager(txManager);
// Async executor settings
config.setAsyncExecutorActivate(true);
config.setAsyncExecutorCorePoolSize(5); // Minimum threads
config.setAsyncExecutorMaxPoolSize(20); // Maximum threads
config.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(5000); // Poll interval (ms)
config.setAsyncExecutorDefaultTimerJobAcquireWaitTime(5000);
config.setAsyncFailedJobWaitTime(10); // Seconds between retries
config.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(5); // Jobs per batch
// Retry configuration
config.setDefaultFailedJobWaitTime(300); // 5 minutes between retries
config.setAsyncExecutorResetExpiredJobsInterval(60000); // 1 minute
return config;
}
}# handling failed jobs
@Service
@RequiredArgsConstructor
@Slf4j
public class FailedJobService {
private final ManagementService managementService;
/**
* Lấy jobs đang stuck (retries exhausted).
* Monitor dashboard nên alert khi có deadletter jobs.
*/
public List<DeadLetterJobDTO> getDeadLetterJobs() {
return managementService.createDeadLetterJobQuery()
.orderByJobCreateTime().desc()
.list()
.stream()
.map(job -> DeadLetterJobDTO.builder()
.jobId(job.getId())
.processInstanceId(job.getProcessInstanceId())
.exceptionMessage(job.getExceptionMessage())
.createTime(job.getCreateTime())
.retries(job.getRetries())
.build())
.toList();
}
/**
* Retry dead letter job (sau khi fix root cause).
* Move job từ deadletter → executable queue.
*/
public void retryDeadLetterJob(String jobId) {
managementService.moveDeadLetterJobToExecutableJob(jobId, 3); // 3 retries
log.info("Dead letter job moved to executable | jobId={}", jobId);
}
}# multi-tenancy — workspace isolation
Trong SaaS platforms, mỗi organization/tenant cần isolation. Process definitions và instances phải separated — tenant A không thấy processes của tenant B. Activiti hỗ trợ multi-tenancy native qua tenantId.
@Service
@RequiredArgsConstructor
public class TenantAwareProcessService {
private final RepositoryService repositoryService;
private final RuntimeService runtimeService;
private final TaskService taskService;
// Deploy với tenant isolation
public String deploy(String bpmnXml, String name, String workspaceId) {
Deployment deployment = repositoryService.createDeployment()
.name(name)
.tenantId(workspaceId) // ← Tenant isolation
.addString(name + ".bpmn20.xml", bpmnXml)
.deploy();
return deployment.getId();
}
// Start instance — tự động inherit tenant từ definition
public String startProcess(String processKey, String workspaceId, Map<String, Object> vars) {
ProcessInstance instance = runtimeService.startProcessInstanceByKeyAndTenantId(
processKey, vars, workspaceId); // ← Tenant-scoped
return instance.getId();
}
// Query tasks — chỉ trong workspace
public List<Task> getTasks(String userId, String workspaceId) {
return taskService.createTaskQuery()
.taskAssignee(userId)
.taskTenantId(workspaceId) // ← Tenant filter
.list();
}
}# testing processes
Testing BPMN processes = testing cả flow logic (diagram) + business logic (delegates). Unit test delegates riêng, integration test toàn process.
@SpringBootTest
@ActiveProfiles("test")
class LoanApprovalProcessTest {
@Autowired private RuntimeService runtimeService;
@Autowired private TaskService taskService;
@Autowired private HistoryService historyService;
@Autowired private RepositoryService repositoryService;
@MockBean private CreditScoringClient creditScoringClient;
@MockBean private NotificationService notificationService;
@BeforeEach
void deployProcess() {
repositoryService.createDeployment()
.addClasspathResource("processes/loan-approval.bpmn20.xml")
.deploy();
}
@Test
void loanApproval_highCreditScore_autoApproved() {
// Given: high credit score → auto-approve path
when(creditScoringClient.calculate(any(), any()))
.thenReturn(new CreditScoreResult(800, RiskLevel.LOW));
// When: start process
Map<String, Object> variables = Map.of(
"customerId", "CUST-001",
"loanAmount", new BigDecimal("500000000"),
"applicantName", "Nguyen Van A"
);
ProcessInstance instance = runtimeService.startProcessInstanceByKey(
"loan-approval", "LOAN-001", variables);
// Then: process should complete automatically (no user task needed)
HistoricProcessInstance completed = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(instance.getId())
.finished()
.singleResult();
assertThat(completed).isNotNull();
assertThat(completed.getEndTime()).isNotNull();
// Verify notification sent
verify(notificationService).sendApprovalNotification("CUST-001", "LOAN-001");
}
@Test
void loanApproval_lowCreditScore_requiresManualReview() {
// Given: low credit score → manual review path
when(creditScoringClient.calculate(any(), any()))
.thenReturn(new CreditScoreResult(550, RiskLevel.HIGH));
// When: start process
ProcessInstance instance = runtimeService.startProcessInstanceByKey(
"loan-approval", "LOAN-002", Map.of(
"customerId", "CUST-002",
"loanAmount", new BigDecimal("2000000000")
));
// Then: should create user task for manual review
Task reviewTask = taskService.createTaskQuery()
.processInstanceId(instance.getId())
.taskDefinitionKey("manualReviewTask")
.singleResult();
assertThat(reviewTask).isNotNull();
assertThat(reviewTask.getName()).isEqualTo("Manual Review");
// Complete review with approval
taskService.complete(reviewTask.getId(), Map.of(
"decision", "approve",
"reviewComments", "High amount but good collateral"
));
// Verify process completed
HistoricProcessInstance completed = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(instance.getId())
.finished()
.singleResult();
assertThat(completed).isNotNull();
}
@Test
void loanApproval_scoringServiceDown_errorPath() {
// Given: scoring service throws error
when(creditScoringClient.calculate(any(), any()))
.thenThrow(new ScoringServiceUnavailableException("Service down"));
// When: start process
ProcessInstance instance = runtimeService.startProcessInstanceByKey(
"loan-approval", "LOAN-003", Map.of(
"customerId", "CUST-003",
"loanAmount", new BigDecimal("100000000")
));
// Then: should route to error handling path (manual scoring task)
Task manualScoringTask = taskService.createTaskQuery()
.processInstanceId(instance.getId())
.taskDefinitionKey("manualScoringTask")
.singleResult();
assertThat(manualScoringTask).isNotNull();
assertThat(manualScoringTask.getName()).isEqualTo("Manual Credit Assessment");
}
}# production monitoring & troubleshooting
# metrics to track
@Component
@RequiredArgsConstructor
public class ProcessMetricsCollector {
private final RuntimeService runtimeService;
private final ManagementService managementService;
private final HistoryService historyService;
private final MeterRegistry meterRegistry;
@Scheduled(fixedRate = 30000) // Every 30s
public void collectMetrics() {
// Active instances
long activeInstances = runtimeService.createProcessInstanceQuery().count();
meterRegistry.gauge("bpmn.instances.active", activeInstances);
// Suspended instances (paused, needs attention)
long suspendedInstances = runtimeService.createProcessInstanceQuery().suspended().count();
meterRegistry.gauge("bpmn.instances.suspended", suspendedInstances);
// Pending user tasks (work backlog)
long pendingTasks = managementService.createJobQuery().count();
meterRegistry.gauge("bpmn.tasks.pending", pendingTasks);
// Failed jobs (needs intervention)
long failedJobs = managementService.createDeadLetterJobQuery().count();
meterRegistry.gauge("bpmn.jobs.deadletter", failedJobs);
// Timer jobs waiting (scheduled future work)
long timerJobs = managementService.createTimerJobQuery().count();
meterRegistry.gauge("bpmn.jobs.timer", timerJobs);
}
}# common issues & solutions
| Symptom | Root Cause | Solution |
|---|---|---|
| Process stuck at service task | Delegate throws uncaught exception | Check ACT_RU_JOB exceptions, fix delegate, retry |
| User task not appearing | Wrong assignee/candidateGroup | Check BPMN definition, verify user in correct group |
| Process very slow | Synchronous external calls | Make service tasks async |
| "Optimistic locking exception" | Concurrent updates on same instance | Use exclusive="true" on async tasks |
| History tables huge | No cleanup/archiving | Schedule history cleanup job |
| Deadletter jobs piling up | External service permanently down | Fix service, then moveDeadLetterJobToExecutable |
# kết luận
Activiti là powerful tool nhưng có learning curve. Key takeaways cho developer:
- BPMN diagram = executable code — respect it like production code (version, test, review)
- Delegates = your code — keep them small, idempotent, testable
- Async cho I/O — đừng block engine threads với external calls
- Monitor deadletter jobs — silent failures kill production
- Multi-tenancy từ đầu — retrofit tenant isolation sau = nightmare
- Test full flows — unit test delegates + integration test process paths
- History = gold mine — dùng cho analytics, bottleneck detection, compliance
Chỉ là những ghi chép cá nhân với hy vọng mang lại chút giá trị. Nếu thấy hữu ích, đừng ngại chia sẻ cho bạn bè & đồng nghiệp nhé!
Happy coding 😎 👍🏻 🚀 🔥.
On this page
- # activiti là gì và tại sao cần nó?
- # activiti vs camunda vs flowable
- # architecture — activiti bên trong làm gì?
- # database tables
- # spring boot integration — setup
- # dependencies
- # configuration
- # configuration class
- # core services — api chính bạn cần biết
- # repositoryservice — quản lý process definitions
- # runtimeservice — start & control instances
- # taskservice — user tasks (human interaction)
- # historyservice — audit trail & analytics
- # java delegates — nơi business logic sống
- Design principles cho delegates:
- # delegate expression vs class vs expression
- # async execution & job executor
- # khi nào dùng async?
- # job executor configuration
- # handling failed jobs
- # multi-tenancy — workspace isolation
- # testing processes
- # production monitoring & troubleshooting
- # metrics to track
- # common issues & solutions
- # kết luận
