Camunda BPM Engine Taskpool Collector
Taskpool Collector#
Purpose#
Taskpool Collector is a component deployed as a part of the process application (aside with Camunda BPM Engine) that is responsible for collecting information from the Camunda BPM Engine. It detects the intent of the operations executed inside the engine and creates the corresponding commands for the taskpool. The commands are enriched with data and transmitted to other taskpool components (via Axon Command Bus).
In the following description, we use the terms event and command. Event denotes an entity received from Camunda BPM Engine (from delegate event listener or from history event listener) which is passed over to the Taskpool Collector using internal Spring eventing mechanism. The Taskpool Collector converts the series of such events into a Taskpool Command - an entity carrying an intent of change inside of the taskpool core. Please note that event has another meaning in CQRS/ES systems and other components of the taskpool, but in the context of Taskpool collector an event alway originates from Spring eventing.
Features#
- Collection of process definitions
- Collection of process instance events
- Collection of process variable change events
- Collection of task events and history events
- Creation of task engine commands
- Enrichment of task engine commands with process variables
- Attachment of correlation information to task engine commands
- Transmission of commands to Axon command bus
- Provision of properties for process application
Architecture#
The Taskpool Collector consists of several components which can be devided into the following groups:
- Event collectors receive are responsible for gathering information and form commands
- Processors performs the command enrichment with payload and data correlation
- Command senders are responsible for accumulating commands and sending them to Command Gateway
Usage and configuration#
In order to enable collector component, include the Maven dependency to your process application:
<dependency>
<groupId>io.holunda.taskpool<groupId>
<artifactId>camunda-bpm-taskpool-collector</artifactId>
<version>${camunda-taskpool.version}</version>
<dependency>
Then activate the taskpool collector by providing the annotation on any Spring Configuration:
@Configuration
@EnableTaskpoolCollector
class MyProcessApplicationConfiguration {
}
Event collection#
Taskpool Collector registers Spring Event Listener to the following events, fired by Camunda Eventing Engine Plugin:
DelegateTask
events: create assign delete completeHistoryEvent
events: HistoricTaskInstanceEvent HistoricIdentityLinkLogEvent HistoricProcessInstanceEventEntity HistoricVariableUpdateEventEntity ** HistoricDetailVariableInstanceUpdateEntity
The events are transformed into corresponding commands and passed over to the processor layer.
Task commands enrichment#
Alongside with attributes received from the Camunda BPM engine, the engine task commands can be enriched with additional attributes.
There are three enrichment modes available controlled by the camunda.taskpool.task.collector.enricher.type
property:
no
: No enrichment takes placeprocess-variables
: Enrichment of engine task commands with process variablescustom
: User provides own implementation
Process variable enrichment#
In particular cases, the data enclosed into task attibutes is not sufficient for the task list or other user-related components. The information may be available as process variables and need to be attached to the task in the taskpool. This is where Process Variable Task Enricher can be used. For this purpose, active it setting the property camunda.taskpool.collector.task.enricher.type
to process-variables
and the enricher will
put process variables into the task payload.
You can control what variables will be put into task command payload by providing the Process Variables Filter.
The ProcessVariablesFilter
is a Spring bean holding a list of individual VariableFilter
- at most one per
process definition key and optionally one without process definition key (a global filter). If the filter is not provded,
a default filter is used which is an empty EXCLUDE
filter, resulting in all process variables being attached to the user task.
A VariableFilter
can be of the following type:
TaskVariableFilter
:INCLUDE
: task-level include filter, denoting a list of variables to be added for the task defined in the filter.EXCLUDE
: task-level exclude filter, denoting a list of variables to be ignored for the task defined in the filter. All other variables are included.ProcessVariableFilter
with process definition key:INCLUDE
: process-level include filter, denoting a list of variables to be added for all tasks of the process.EXCLUDE
: process-level exclude filter, denoting a list of variables to be ignored for all tasks of the process.ProcessVariableFilter
without process definition key:INCLUDE
: global include filter, denoting a list of variables to be added for all tasks of all processes for which no dedicatedProcessVariableFilter
is defined.EXCLUDE
: global exclude filter, denoting a list of variables to be ignored for all tasks of all processes for which no dedicatedProcessVariableFilter
is defined.
Here is an example, how the process variable filter can configure the enrichment:
@Configuration
public class MyTaskCollectorConfiguration {
@Bean
public ProcessVariablesFilter myProcessVariablesFilter() {
return new ProcessVariablesFilter(
// define a variable filter for every process
new VariableFilter[]{
// define for every process definition
// either a TaskVariableFilter or ProcessVariableFilter
new TaskVariableFilter(
ProcessApproveRequest.KEY,
// filter type
FilterType.INCLUDE,
ImmutableMap.<String, List<String>>builder()
// define a variable filter for every task of the process
.put(ProcessApproveRequest.Elements.APPROVE_REQUEST, Lists.newArrayList(
ProcessApproveRequest.Variables.REQUEST_ID,
ProcessApproveRequest.Variables.ORIGINATOR)
)
// and again
.put(ProcessApproveRequest.Elements.AMEND_REQUEST, Lists.newArrayList(
ProcessApproveRequest.Variables.REQUEST_ID,
ProcessApproveRequest.Variables.COMMENT,
ProcessApproveRequest.Variables.APPLICANT)
).build()
),
// optionally add a global filter for all processes
// for that no individual filter was created
new ProcessVariableFilter(FilterType.INCLUDE,
Lists.newArrayList(CommonProcessVariables.CUSTOMER_ID))
}
);
}
}
TIP: If you want to implement a custom enrichment, please provide your own implementation of the interface VariablesEnricher
(register a Spring Component of the type) and set the property camunda.taskpool.collector.task.enricher.type
to custom
.
Data Correlation#
Apart from task payload attached by the enricher, the so-called Correlation with data entries can
be configured. The data correlation allows to attach one or several references (that is a pair of values entryType
and entryId
) of
business data entry(ies) to a task. In the projection (which is used for querying of tasks) this correlations is be resolved and the
information from business data events can be shown together with task information.
The correlation to data events can be configured by providing a ProcessVariablesCorrelator
bean. Here is
an example how this can be done:
@Bean
fun processVariablesCorrelator() = ProcessVariablesCorrelator(
ProcessVariableCorrelation(ProcessApproveRequest.KEY, <1>
mapOf(
ProcessApproveRequest.Elements.APPROVE_REQUEST to mapOf( <2>
ProcessApproveRequest.Variables.REQUEST_ID to BusinessDataEntry.REQUEST
)
),
mapOf(ProcessApproveRequest.Variables.REQUEST_ID to BusinessDataEntry.REQUEST) <3>
)
)
The process variable correlator holds a list of process variable correlations - one for every process
definition key. Every ProcessVariableCorrelation
configures for all tasks or for an individual taskby providing a so-called correlation
map. A correlation map is keyed by the name of a process variable inside Camunda Process Engine and holds the type of business data entry as value.
Here is an example. Imagine the process instance is storing the id of an approval request in a process variable called
varRequestId
. The system responsible for storing approval requests fires data entry events supplying the
data and using the entry type io.my.approvalRequest
and the id of the request as entryId
. In order to
create a correlation in task task_approve_request
of the process_approval_process
we would provide the following configuration
of the correlator:
@Bean
fun processVariablesCorrelator() = ProcessVariablesCorrelator(
ProcessVariableCorrelation("process_approval_process",
mapOf(
"task_approve_request" to mapOf(
"varRequestId" to "io.my.approvalRequest" // process variable 'varRequestId' holds the id of a data entry of type 'io.my.approvalRequest'
)
)
)
)
"4711"
in the process variable varRequestId
and the process reaches the task task_approve_request
, the task will get the following correlation created
(here written in JSON):
"correlations": [
{ "entryType": "approvalRequest", "entryId": "4711" }
]
Command aggregation#
In order to control sending of commands to command sender, the command sender activation property
camunda.taskpool.collector.task.enabled
is available. If disabled, the command sender
will log any command instead of aggregating sending it to the command gateway.
In addition you can control by the property camunda.taskpool.collector.task.sender.type
if you want to use the default command sender or provide your own implementation.
The default provided command sender (type: tx
) is collects all task commands during one transaction, group them by task id
and accumulates by creating one command reflecting the intent of the task operation. It uses Axon Command Bus (encapsulated
by the AxonCommandListGateway
for sending the result over to the Axon command gateway.
TIP: If you want to implement a custom command sending, please provide your own implementation of the interface EngineTaskCommandSender
(register a Spring Component of the type) and set the property camunda.taskpool.collector.task.sender.type
to custom
.
The Spring event listeners receiving events from the Camunda Engine plugin are called before the engine commits the transaction. Since all processing inside collector component and enricher is performed synchronously, the sender must waits until transaction to be successfully committed before sending any commands to the Command Gateway. Otherwise, on any error the transaction would be rolled-back and the command would create an inconsistency between the taskpool and the engine.
Depending on your deployment scenario, you may want to control the exact point in time when the commands are sent to command gateway.
The property camunda.taskpool.collector.task.sender.send-within-transaction
is designed to influence this. If set to true
, the commands
are sent before the process engine transaction is committed, otherwise commands are sent after the process engine transaction is committed.
WARNING: Never send commands over remote messaging before the transaction is committed, since you may produce unexpected results if Camunda fails to commit the transaction.
Handling command transmission#
The commands sent via gateway (e.g. AxonCommandListGateway
) are received by Command Handlers. The latter may accept or reject commands, depending
on the state of the aggregate and other components. The AxonCommandListGateway
is informed about the command outcome. By default, it will log the outcome
to console (success is logged in DEBUG
log level, errors are using ERROR
log level).
In some situations it is required to take care of command outcome. A prominent example is to include a metric for command dispatching errors into monitoring. For doing so, it is possible to provide own handlers for success and error command outcome. For this purpose, please provide a Spring Bean implementing the CommandSuccessHandler
and CommandErrorHandler
accordingly.
Here is an example, how such a handler may look like:
@Bean
@Primary
fun taskCommandErrorHandler(): TaskCommandErrorHandler = object : LoggingTaskCommandErrorHandler(logger) {
override fun apply(commandMessage: Any, commandResultMessage: CommandResultMessage<out Any?>) {
logger.info { "<--------- CUSTOM ERROR HANDLER REPORT --------->" }
super.apply(commandMessage, commandResultMessage)
logger.info { "<------------------- END ----------------------->" }
}
}
Message codes#
Please note that the logger root hierarchy is
io.holunda.camunda.taskpool.collector
Message Code | Severity | Logger* | Description | Meaning |
---|---|---|---|---|
COLLECTOR-001 |
INFO |
Task commands will be collected. | ||
COLLECTOR-002 |
INFO |
Task commands not be collected. | ||
COLLECTOR-005 |
DEBUG |
.process.definition |
Process definition collecting has been disabled by property, skipping ${command.processDefinitionId}. | |
COLLECTOR-006 |
DEBUG |
.process.instance |
Process instance collecting has been disabled by property, skipping ${command.processInstanceId}. | |
COLLECTOR-007 |
DEBUG |
.process.variable |
Process variable collecting has been disabled by property, skipping ${command.processInstanceId}. | |
COLLECTOR-008 |
DEBUG |
.task |
Task command collecting is disabled by property, would have enriched and sent command $command. | |
ENRICHER-001 |
INFO |
Task commands will be enriched with process variables. | ||
ENRICHER-002 |
INFO |
Task commands will not be enriched. | ||
ENRICHER-003 |
INFO |
Task commands will be enriched by a custom enricher. | ||
ENRICHER-004 |
DEBUG |
.task.enricher |
Could not enrich variables from running execution ${command.sourceReference.executionId}, since it doesn't exist (anymore). |