About
1.
Objectives
- Process items in a collection using the For Each scope
- Process records using the Batch Job scope
- Use filtering and aggregation in a batch step
Notes
Intro
At the end of this module, you should be able to
- Process items in a collection using the For Each scope
- Process records using the Batch Job scope
- Use filtering and aggregation in a batch step
Processing items in a collection with the For Each scope
Topic video
The For Each scope
Splits a payload collection and processes the individual elements sequentially
- Collection can be any supported content type, including application/json, application/java, or application/xml
- Returns the original payload, regardless of any modifications made inside the scope
- Stops processing and invokes an error handler if one element throws an exception
There is also a Parallel For Each scope
- Same payload split but works in multiple parallel threads
- Rather than return the original payload, outputs a collection of output messages from each iteration
Walkthrough 8-1: Process items in a colleciton using the For Each scope
- Use the For Each element to process each item in a collection individually
- Change the value of an item inside the scope
- Examine the payload before, during, and after the scope
- Look at the thread used to process each item
Extras
Looping Explained | Lightboard Series
- For Each scope takes a collection
- It breaks apart the collection and processes it sequentially
- It’s good where you need to preserve the order of your collection, loop over them, doing some action
- It’s not for doing data transformation, since the end result after the the For Each is whatever you passed it i.e. it won’t override the structure of the collection
- A map should be used for transformation instead of For Each scope
- Example of usage: individual insert into a database, sending a notification to something for each record
- To deal with large collections, override the batch size and do the action in batches e.g. for bulk insertion into a database, or bulk creation in Salesforce
- Good for minimising roundtrip across a network
- To process bulk records individually use the default For Each behaviour
- To process bulk records in chucks, set the batch size to a value, and use bulk operations
Processing records with the Batch Job scope
Topic video
The Batch Job scope
- Provides ability to split large messages into records that are processed asynchronously in a batch job
Created especially for processing data sets
- Splits large or streamed messages into individual records
- Performs actions upon each record
- Handles record level failures that occur so batch job is not aborted
- Reports on the results
- Potentially pushes the processed output to other systems or queues
- Enterprise edition only
Example use cases
Integrating data sets to parallel process records
- Small or large data sets, streaming or not
Engineering “near real-time” data integration
- Synchronising data sets between business applications
- Like synching contacts between NetSuite and Salesforce
Extracting, transforming and loading (ETL) info into a target system
- Like uploading data from a flat file (CSV) to Hadoop
- Handling large quantities of incoming data from an API into a legacy system
How a batch job works
A batch job contains one or more batch steps that act upon records as they move through the batch job
How a Batch Job works
A batch job contains three different phases
Batch Job PhasesLoad and dispatch (implicit)
Peforms “behind-the-scene” work
- Splits payload into a collection of records
- Creates a persistent queue and stores each record in it
Process (required)
- Asynchronously processes the records
- Contains one or more batch steps
On complete (optional)
- Reports summary of records processed
- Provides insight into which records failed so you can address issues
How record processing works
Batch Job Processing- One queue exists
Each record
- Keeps track of what steps it has been processed through
- Moves through the processors in the first batch step
- Is sent back to the queue
- Waits to be processed by the second step
- This repeats until each record has passed through every batch step
- Note: All records do not have to finish processing in one step before any of them are sent to the next step
Batch job performance
Batch records are queued and scheduled in blocks of 100
- This lessens the amount of I/O requests and improves an operation’s load
The Mule runtime engine determines the number of threads to use per job
- Thread pool size is auto-tuned based on CPU cores and memory
- Each thread processes a block of 100 records
- Each thread iterates through that block processing each record, and then each block is queued back and the process continues
- This configuration works for most use cases, but can be customised to improve batch’s performance in certain use cases
Variables in a batch job
Batch Job Variables- Variables created before a batch job are available in all batch steps
Variables created inside a batch step are record-specific
- Persist across all batch steps in the processing phase
- Commonly used to capture whether or not a record already exists in a database
Handling record-level errors during processing
If a record fails to be processed by a processor in a batch step, there are three options
Stop processing the entire batch (default)
- In-flight steps are finished, but all other steps are skipped and the on complete phase is invoked
Continue processing the batch
You need to specify how subsequent batch steps should handle failed records
- To do this, use batch step filters
Continue processing the batch until a max number of failed records is reached
- At that poiont, the on complete phase is invoked
Walkthrough 8-2: Processing records using the Batch Job scope
- Use the Batch Job scope to process items in a collection
- Examine the payload as it moves through the batch job
- Explore variable persistence across batch steps and phases
- Examine the payload that contains information about the job in the On Complete phase
- Look at the threads used to process the records in each step
Extras
Record Processing Options Explained | Lightboard Series
DataWeave Transform
- For data transformations of collections at a structural level, use DataWeave, Transform component, using map function
- It changes the structure
For executing lines of code or event processes for each record, use scopes
For Each scope
- Has the ability to put bits of flow code, event processes inside the scope, and it will loop over, executing that For Each record, there is also a method to do it in chunks or bulk operations
- It loops sequentially
- Is single-threaded
- Doesn’t change the structure of whatever was passed into it, it just processes it and returns the same structure with the changes, of course
Batch scope
- Is multi-threaded
- Get’s things done with greater amount of processing unlike sequantial For Each
- It has a work queue that it maintains
- It has a series of steps that you take each record through
- It has an On complete, which will give you a summary of how everything is going
- It doesn’t change the structure
- It provides the batch job result as the output i.e. summary information of the number of records processed, successful and failed records etc
Queues
- They are also a way to distribute load
- Provide asynchronous processing
- You push messages into some sort of queues, and having one or more consumers read those in
Examples
- VM Queues: Built-in capability therefore are for lightweight messaging
- JMS: When you need to go to something external of Mule ecosystem or the producer is outside the Mule ecosystem
Using filtering and aggregation in a batch step
Topic video
Using filters to specify when a batch step is executed
A batch step has two attributes to filter the records it processes
Batch Job Step Attributes- An accept expression
- An accept policy
Examples
- Prevent a step from processing any records which failed processing in the preceding step
- In one step, check and see if the record exists in some data store and then in the next only upload it to that data store if it does not exist
Aggregating records in a batch step for bulk insert
- To accumulate records, use a Batch Aggregator scope inside the Aggregator section of a batch step
- For example, instead of using a separate API call to upsert each record to a service, upload them in a batch of 100
Walkthrough 8-3: Use filtering and aggregation in a batach step
- Use a batch job to synchronise database records to Salesforce
- In a first step, check to see if the record exists in Salesforce
- In a second batch step, add the record to Salesforce
- Use a batch step filter so the second batch step is only executed for specific records
- Use a Batch Aggregator scope to commit records in batches
Extras
Batch Processing Explained | Lightboard Series
- Batch is an Enterprise integration only feature for processing very large numbers of records, not single entities
- It can process a CSV, or a large number of line items as input i.e. a collection
- It has a work queue it manages
- Batch takes the collection and feeds it into the collection
- It has a batch block size, so it chunks things together, the default is 100, and push them into the work queue, pulled back out and given to a thread
- It has the ability to grab multiple threads to process them instead of doing them one at a time like wiht the For Each scope
- Batch scope consists of a series of steps
- The batch steps are the overal workflow or how the records are going to be processed
- So with Batch we process a lot of records at the same time using multiple threads
- There is a natural cap on the number of records based on the size of the machine
There are two parts to each batch step
Processing section
- Spot to process each record individually
Aggregation section
- Spot to do some aggregation of either a chuck of records or a stream of all the records to pull it back together
- It’s for doing bulk operations
On Complete
- It is a summary section
- The final bit to a batch step
- It’s got a batch job results structure
- Provides a report
- It only consists of number of records, elapsed time, number of failed records, not the actual end result
Summary
Use the For Each scope to process individual collection elements sequentially and return the original payload
Use the Batch Job scope (EE only) for complex batch jobs
- Created especially for processing data sets
- Splits messages into individual records and performs actions upon each record
- Can have multiple batch steps and these can have filters
- Record-level data is persisted across steps using variables
- Can handle record level failures so the job is not aborted
- The bach job retunr sand object with the results of the job for insight into which records were processed or failed
Anki
Links
References
. “Module 13: Processing records”. Available at: . (Accessed: ↩︎
).