- Use the For Each scope to process items in a collection individually
- Use the batch job element (EE) to process individual records
- Trigger a batch job using a poll
- Use a batch job to synchronize data from a legacy database to a SaaS application
12-1) Processing items in a collection
- Create a flow that uses:
--- A splitter-aggregator pair
--- A For Each scope - splits a message collection and processes the individual elements and then returns the original message.
- Use a batch job (Enterprise Edition) - it is not a flow but another top level element.
Walkthrough 12-1: Process items in a collection
- Use the For Each scope element to process each item in a collection individually
Image: Using the For Each scope element
Note: Logger has Message: #[payload]
12-2) Processing records with the batch job element
Provides ability to split large messages into records that are processed asynchronously in a batch job!
Input phase ---> Processing phase(s) ---> Report phase
Reporting phase lets you know if something went wrong.
Example use cases:
- Engineering “near real-time” data integration
- ETL (Extract Transform and Load)
- Accept data from an external source (can poll for input)
- Split messages into individual records
- Report on results (can push output)
Image: Batch scope element phases: Input, Process Records, On Complete
Triggering batch jobs:
1) Place an inbound, one-way message source at the beginning of the job
2) Use Batch Execute message processor to reference the batch job from within a Mule flow in the same application.
Phases of a batch job:
- Input (optional)
- Load and dispatch (implicit) - splits payload into a collection of records and creates a queue.
- Process (required)
- On complete (optional) - report
Note: A batch job instance does not wait for all its queued records to finish processing in one batch step before pushing any of them to the next batch step.
Image: To store record-specific information, use Record Variable
Handling record-level errors during processing:
batch:job name= "Batch1" max-failed-records="?"
0: Stop processing the entire batch (default)
-1: Continue processing the batch
INT: Continue processing until max number of failed records reached
Walkthrough 12-2: Create a batch job for records in a file
- Create a batch job
- In the input phase, check for CSV files every second and convert to a collection of objects
- In the process records phase, create two batch steps for setting and tracking variables
- In the on complete phase, look at the # of records processed and failed
Image: Create a batch job for records in a file
12-3) Using a batch job to synchronize data
Checking for duplicate records - use an Enricher!
Configure Message Enricher:
- Specify the message enricher source and target:
--- The target specifies which part of the message to modify
--- The source specifies what to set the target to (default = payload)
Image: How message enricher works
Add logic to only insert new records.
Check if record exists
Source: #[payload.size() > 0]
Then use Batch Step Filters to restrict records to be processed. Accept policies:
Accept expression: #[!recordVars.exists]
Accept Policy: NO_FAILURES (Default)
Walkthrough 12-3: Restrict processing using a message enricher and batch step filter
- Create a batch job that polls a database for records with a specific postal code
- Use a message encricher to check if a record already exists in Salesforce and stores the result in a record variable (retaining the original payload)
- Add a second batch step with a filter that only allows new records to be added to Salesforce
- Use a batch commit scope to commit records in batches
Image: Restrict processing using a message enricher and batch step filter