Module 13: Processing Records

About

1.

Objectives

  • Process items in a collection using the For Each scope
  • Process records using the Batch Job scope
  • Use filtering and aggregation in a batch step

Notes

Intro

At the end of this module, you should be able to

  • Process items in a collection using the For Each scope
  • Process records using the Batch Job scope
  • Use filtering and aggregation in a batch step

Processing items in a collection with the For Each scope

Topic video

  • The For Each scope

    • Splits a payload collection and processes the individual elements sequentially

      • Collection can be any supported content type, including application/json, application/java, or application/xml
    • Returns the original payload, regardless of any modifications made inside the scope
    • Stops processing and invokes an error handler if one element throws an exception
    • There is also a Parallel For Each scope

      • Same payload split but works in multiple parallel threads
      • Rather than return the original payload, outputs a collection of output messages from each iteration
  • Walkthrough 8-1: Process items in a colleciton using the For Each scope

    • Use the For Each element to process each item in a collection individually
    • Change the value of an item inside the scope
    • Examine the payload before, during, and after the scope
    • Look at the thread used to process each item

Extras

  • Looping Explained | Lightboard Series

    • For Each scope takes a collection
    • It breaks apart the collection and processes it sequentially
    • It’s good where you need to preserve the order of your collection, loop over them, doing some action
    • It’s not for doing data transformation, since the end result after the the For Each is whatever you passed it i.e. it won’t override the structure of the collection
    • A map should be used for transformation instead of For Each scope
    • Example of usage: individual insert into a database, sending a notification to something for each record
    • To deal with large collections, override the batch size and do the action in batches e.g. for bulk insertion into a database, or bulk creation in Salesforce
    • Good for minimising roundtrip across a network
    • To process bulk records individually use the default For Each behaviour
    • To process bulk records in chucks, set the batch size to a value, and use bulk operations

Processing records with the Batch Job scope

Topic video

  • The Batch Job scope

    • Provides ability to split large messages into records that are processed asynchronously in a batch job
    • Created especially for processing data sets

      • Splits large or streamed messages into individual records
      • Performs actions upon each record
      • Handles record level failures that occur so batch job is not aborted
      • Reports on the results
      • Potentially pushes the processed output to other systems or queues
    • Enterprise edition only
  • Example use cases

    • Integrating data sets to parallel process records

      • Small or large data sets, streaming or not
    • Engineering “near real-time” data integration

      • Synchronising data sets between business applications
      • Like synching contacts between NetSuite and Salesforce
    • Extracting, transforming and loading (ETL) info into a target system

      • Like uploading data from a flat file (CSV) to Hadoop
    • Handling large quantities of incoming data from an API into a legacy system
  • How a batch job works

    • A batch job contains one or more batch steps that act upon records as they move through the batch job


      How a Batch Job works

  • A batch job contains three different phases


    Batch Job Phases

    • Load and dispatch (implicit)

      • Peforms “behind-the-scene” work

        • Splits payload into a collection of records
        • Creates a persistent queue and stores each record in it
    • Process (required)

      • Asynchronously processes the records
      • Contains one or more batch steps
    • On complete (optional)

      • Reports summary of records processed
      • Provides insight into which records failed so you can address issues
  • How record processing works


    Batch Job Processing

    • One queue exists
    • Each record

      • Keeps track of what steps it has been processed through
      • Moves through the processors in the first batch step
      • Is sent back to the queue
      • Waits to be processed by the second step
    • This repeats until each record has passed through every batch step
    • Note: All records do not have to finish processing in one step before any of them are sent to the next step
  • Batch job performance

    • Batch records are queued and scheduled in blocks of 100

      • This lessens the amount of I/O requests and improves an operation’s load
    • The Mule runtime engine determines the number of threads to use per job

      • Thread pool size is auto-tuned based on CPU cores and memory
      • Each thread processes a block of 100 records
      • Each thread iterates through that block processing each record, and then each block is queued back and the process continues
    • This configuration works for most use cases, but can be customised to improve batch’s performance in certain use cases
  • Variables in a batch job


    Batch Job Variables

    • Variables created before a batch job are available in all batch steps
    • Variables created inside a batch step are record-specific

      • Persist across all batch steps in the processing phase
      • Commonly used to capture whether or not a record already exists in a database
  • Handling record-level errors during processing

    • If a record fails to be processed by a processor in a batch step, there are three options

      • Stop processing the entire batch (default)

        • In-flight steps are finished, but all other steps are skipped and the on complete phase is invoked
      • Continue processing the batch

        • You need to specify how subsequent batch steps should handle failed records

          • To do this, use batch step filters
      • Continue processing the batch until a max number of failed records is reached

        • At that poiont, the on complete phase is invoked
  • Walkthrough 8-2: Processing records using the Batch Job scope

    • Use the Batch Job scope to process items in a collection
    • Examine the payload as it moves through the batch job
    • Explore variable persistence across batch steps and phases
    • Examine the payload that contains information about the job in the On Complete phase
    • Look at the threads used to process the records in each step

Extras

  • Record Processing Options Explained | Lightboard Series

    • DataWeave Transform

      • For data transformations of collections at a structural level, use DataWeave, Transform component, using map function
      • It changes the structure
    • For executing lines of code or event processes for each record, use scopes

      • For Each scope

        • Has the ability to put bits of flow code, event processes inside the scope, and it will loop over, executing that For Each record, there is also a method to do it in chunks or bulk operations
        • It loops sequentially
        • Is single-threaded
        • Doesn’t change the structure of whatever was passed into it, it just processes it and returns the same structure with the changes, of course
      • Batch scope

        • Is multi-threaded
        • Get’s things done with greater amount of processing unlike sequantial For Each
        • It has a work queue that it maintains
        • It has a series of steps that you take each record through
        • It has an On complete, which will give you a summary of how everything is going
        • It doesn’t change the structure
        • It provides the batch job result as the output i.e. summary information of the number of records processed, successful and failed records etc
      • Queues

        • They are also a way to distribute load
        • Provide asynchronous processing
        • You push messages into some sort of queues, and having one or more consumers read those in
        • Examples

          • VM Queues: Built-in capability therefore are for lightweight messaging
          • JMS: When you need to go to something external of Mule ecosystem or the producer is outside the Mule ecosystem

Using filtering and aggregation in a batch step

Topic video

  • Using filters to specify when a batch step is executed

    • A batch step has two attributes to filter the records it processes


      Batch Job Step Attributes

      • An accept expression
      • An accept policy
    • Examples

      • Prevent a step from processing any records which failed processing in the preceding step
      • In one step, check and see if the record exists in some data store and then in the next only upload it to that data store if it does not exist
  • Aggregating records in a batch step for bulk insert

    • To accumulate records, use a Batch Aggregator scope inside the Aggregator section of a batch step
    • For example, instead of using a separate API call to upsert each record to a service, upload them in a batch of 100
  • Walkthrough 8-3: Use filtering and aggregation in a batach step

    • Use a batch job to synchronise database records to Salesforce
    • In a first step, check to see if the record exists in Salesforce
    • In a second batch step, add the record to Salesforce
    • Use a batch step filter so the second batch step is only executed for specific records
    • Use a Batch Aggregator scope to commit records in batches

Extras

  • Batch Processing Explained | Lightboard Series

    • Batch is an Enterprise integration only feature for processing very large numbers of records, not single entities
    • It can process a CSV, or a large number of line items as input i.e. a collection
    • It has a work queue it manages
    • Batch takes the collection and feeds it into the collection
    • It has a batch block size, so it chunks things together, the default is 100, and push them into the work queue, pulled back out and given to a thread
    • It has the ability to grab multiple threads to process them instead of doing them one at a time like wiht the For Each scope
    • Batch scope consists of a series of steps
    • The batch steps are the overal workflow or how the records are going to be processed
    • So with Batch we process a lot of records at the same time using multiple threads
    • There is a natural cap on the number of records based on the size of the machine
    • There are two parts to each batch step

      • Processing section

        • Spot to process each record individually
      • Aggregation section

        • Spot to do some aggregation of either a chuck of records or a stream of all the records to pull it back together
        • It’s for doing bulk operations
    • On Complete

      • It is a summary section
      • The final bit to a batch step
      • It’s got a batch job results structure
      • Provides a report
      • It only consists of number of records, elapsed time, number of failed records, not the actual end result

Summary

Use the For Each scope to process individual collection elements sequentially and return the original payload

Use the Batch Job scope (EE only) for complex batch jobs

  • Created especially for processing data sets
  • Splits messages into individual records and performs actions upon each record
  • Can have multiple batch steps and these can have filters
  • Record-level data is persisted across steps using variables
  • Can handle record level failures so the job is not aborted
  • The bach job retunr sand object with the results of the job for insight into which records were processed or failed

Anki

References


  1. . “Module 13: Processing records”. Available at: . (Accessed: [2025-03-09 Sun 21:16]). ↩︎

Random Posts