Module 12: Triggering Flows

About

1.

Objectives

  • Read and write files
  • Trigger flows when files are added, created, or updated
  • Trigger flows when new records are added to a database table
  • Schedule flows to run at a certain time or frequency
  • Persist and share data in flows using the Object Store
  • Publish and consume JMS messages

Notes

Intro

Goal

  • How we initiated flows so far?

    • HTTP Listener
    • VM Listener
  • In this module, we will learn new ways

    • Scheduler
    • On New or Updated File
    • On Table Row
    • On New Message
  • At the end of this module, you should be able to

    • Read and write files
    • Trigger when files are added, created, or updated
    • Trigger flows when new records are added to a database table
    • Schedule flows to run at a certain time or frequency
    • Persist and share data in flows using the Object Store
    • Publish and consume JMS messages

Reading and writing files

Topic video

  • Reading and writing fiels

    • There are 4 connectors for working with files and folders

      • File (for locally mounted file system)
      • FTP
      • FTPS
      • SFTP
    • All have the same set of operations and they behave almost identically
    • Support for

      • File matching functionality
      • Locking files
      • Overwriting, appending, and generating new files
  • Use the File connector

    • Add the File module to the project
    • Create a global element configuration

      • Not required but a best practice
      • Set the working directory that will be the root for every path used with the connector
    • Use one of the connector operations and specify its properties
    • On CloudHub, the connector can only be used with the /tmp folder
    • On Customer-hosted Mule runtimes, the account running Mule must have read and/ or write permissions on the specified directories
    • Be careful not to permanently delete or overwrite files

      • Move or rename them after processing
  • Trigger a flow when a new file is created or update

    • Use the On New or Update File listener

      • Polls a directory for files that have been created or updated
      • One message is generated for each file that is found
    • Multiple ways to ensure a file is new

      • Delete each file after it has been processed so all files in the next poll will be new
      • Move each file to a different directory after it has been processed
      • Rename a file after it has been processed and filter the files to be processed
      • Save and compare the file creation or modification times
  • Walkthrough 7-1: Trigger a flow when a new file is added to a directory

    • Add and configure a File listener to watch an input directory
    • Restrict the type of file read
    • Rename and move the processed files

Synchronizing data with watermarks

Topic video

  • Synchronizing data from one system to another

    • The general process

      • The first time, you need to sync all the data
      • After that, you only need to sync the new data
      • Requires a field with ordered values to identify processed items
    • How do you determine what is new and needs to be synched?

      • On the first sync, store the highest field value for any item in the data set
      • On later syncs, retrieve that and compare the value of each item and see if it is larger
    • The field with ordered values is often a

      • Record ID
      • Creation or modification timestamp
  • Introducing watermarks

    • The timestamp or ID that is stored each sync and then retrieved and compared against in the next sync is called a watermark
    • Where did the name come from

      • After a flood, one might record how high the water got by marking the level on a wall
      • Simiarly, for data, we want to look at the last value - how “high” it was in the last sync
  • Types of watermarking in Mule

    • Automatic

      • The saving, retrieving, and comparing is automatically handled for you
      • Available for several connector listeners

        • On New or Updated File
        • On Table Row
      • Restricted in how you can specify what items/ records are retrieved
    • Manual

      • You handle saving, retrieving, and comparing the watermark
      • More flexible in that you specify exactly what records you want retrieved

Using listeners with automatic watermarking

Topic video

  • Using automatic watermarking with files

    • There is a watermarking option for the On New and Update file operation for the family of file connectors
    • There are two watermarking modes


      Watermarking Modes

      • CREATED_TIMESTAMP
      • MODIFIED_TIMESTAMP
    • This can be used for one of the ways introduced last section to ensure a file is new

      • Other options: Delete, move, filter
      • Save and compare the file creation or modification times
  • Triggering a flow for each row in a database table


    On Table Row Watermark

    • The Database connector ahs an On Table Row operation tha tis triggered for every row in a table
    • The operation can handle

      • Generating the query
      • Watermarking
      • Idempotency across concurrent requests
    • You can specify one, both, or neither of

      • Watermark column
      • Id column
  • Using automatic watermarking with database tables


    Automatic Watermarking with Database Tables

    • When a watermark column is specified, this query is automatically generated used

      #+begin_src sql SELECT * FROM table WHERE table_column > :watermark #+end_src

    • On each poll, the component will go through all the retrieved rows and store the maximum value obtained
  • Handling idempotency across concurrent requests

    • A new poll can be executed before the watermark is updated if

      • The poll interval is small
      • The amount of rows is big
      • Processing one single row takes too much time
    • To avoid a record being processed more than once

      • Specify an ID column

        • A unique identifier for the row
      • The listener will make sure the row is not processed again if


        Avoid a Record being Processed more than Once

        • It has already been retrieved and
        • Processing of it hasn’t finished yet
  • Walkthrough 7-2: Trigger a flow when a new record is added to a database & use automatic watermarking

    • Add and configure a Database listener to check a table on a set frequency for new records
    • Use the listener’s automatic watermarking to track the ID of the latest record retrieved and trigger the flow whenever a new record is added
    • Output new records to a CSV file
    • Use a form to add a new account to the table and see the CSV updated

Using manual watermarking and scheduling flow

Topic video

  • Handling watermarking manually

    • The general process

      • Schedule when a flow should be executed
      • Give the watermark a default value
      • On the first sync

        • Determine a new watermark value
        • Store the watermark value so it is available in the future to other flow executions
      • On later syncs

        • Retrieve the watermark from storage
        • Check if each item in the data set should be retrieved based on the watermark value
  • Triggering flows at a certain time or frequency


    Scheduler Event Source

    • Some connector event sources use a scheduling strategy to trigger a flow

      • Like On New or Updated File and On Table Row
    • To trigger any flow at any time, use the Scheduler event source
  • Two types of scheduling strategies

    • Fixed frequency

      • The default is to poll every 1000 milliseconds
    • Cron

      • A standard for describing time and date information
      • Can specify either

        • An event to occur just onece at a certain time
        • A recurring event on some frequency


          Cron Job

  • Persisting data across executions of flows

    • Use the Object Store component to store simple key-value pairs

      • The component was designed to store

        • Synchronization information like watermarks
        • Temporal information like access tokens
        • User information
      • Retrieved values can be accessed through storage in a target variable

        • This storage causes the component to output the same message as the one received
    • Each Mule application has an Object Store that is

      • Available without any setup or configuration
      • Persistent

        • Saved to file for embedded Mule and standalone Mule runtime
        • Saved to data storage for CloudHub
        • Saved to shared distributed memory for clustered Mule runtimes
    • Using the Object Store connector for watermarking


      Object Store Connector for Watermarking

      • Add the ObjectStore module to the project
      • Use the Retrieve operation to retrieve a watermark value and to assign a default value for the first poll
      • Use the watermark value in a processor to retrieve the desired items

        • Like in a databavse query for records in a table
      • Use the Store operation to determine and store a watermark value
  • Walkthrough 7-3: Schedule a flow and use manual watermarking

    • Use the Scheduler component to create a new flow that executes at a specific frequency
    • Retrieve accounts with a specific postal code from the accounts table
    • Use the Object Store component to store the ID of the latest record and then use it to only retrieve new records

Extras

  • Triggering Flows from a Database Explained | Lightboard Series

    • Database are not great when we are trying to get data out of them despite being very good at storing data, and having good at persisting state
    • Options are limited when it comes to communicating with them in Mule
    • Mule needs to poll that data and detect the change
    • There is a built-in Scheduler that triggers on a time

      • On Table Row is an event source, which will call the flow depending on the number of changes detected: it woroks on a row by row basis and doesn’t work when trying to get all the rows together

        • It has watermarking capabilities, which uses the Object Store to keep a value so that it knows the last state
        • Limited when it comes to doing bulk operations since it only focuses on one row
      • Scheduler

        • Gets bulk operations without doing a round trip to the source system
        • It has the repeating task
        • It calls what’s in the event processors
        • Use the Object Store retrieve to get the watermark, and do an operation such as a database select to get the delta (changes) of the rows
  • Object Store Explained | Lightboard Series

    • Databases and File systems are very good at storing state
    • But when we need to store little snippets of values that we need to keep track of we use the Object Store
    • This is built into the Mule runtime and is not a database
    • It is comprised of key-value
    • It can be segmented into different chucks
    • To pick up the new entry in an app, for example, we can use a Watermark, which is a value that always goes up
    • Anything that increases can use a watermark
    • Caching uses the object store in Mule
    • Tokens can also use object store, e.g. to avoid making roundtrips to get values, and reuse values

Publishing and consuming JMS messages

Topic video

  • Java Messaging Service (JMS)

    • Is a widely-used API for enabling applications to communicate through the exchange of messages
    • Simplifies application development by providing a standard interface for creating, sending and receiving messages
    • Communication is done asynchronously
    • Supports two messaging models

      • Queues: PTP (point-to-point or 1:1)

        • A sender sends messages to a queue and a single receiver pulls the message off the queuea
        • The receiver does not need to be listening to the queue at the time the message is sent
      • Topics: Pub-Sub (publish/ subscribe or 1:many)

        • A publisher sends a message to a topic and all active subscribers receive the message
        • Subscribers not actively listening will miss the published message (unless messages are made durable)
  • Using the JMS connector

    • Add the JMS module to the project
    • Configure a global element configuration

      • By default, it is set up with a finely tuned set of values for both publishing and consuming messages
      • Typically, you just need to configure which connection should be used
    • Use operations to publish and/ or consume messages to destinations
  • Walkthrough 7-4: Publish and listen for JMS messages

    • Add and configure a JMS connector for ActiveMQ (that uses an in-memory broker)
    • Send messages to a JMS queue
    • Listen for and process messages from a JMS queue

Extras

  • Messaging Options Explained | Lightboard Series

    • VMQ

      • They are part of the Mule ecosystem
      • Can be used when you want to breakup your flow, and push a message into it and consume it as part of the Mule application
      • VM : Mule to Mule
      • Comprises of queues
      • Light weight but limited admin visibility
      • Dealing with quality of service i.e. not deleting the messages until they’ve been dealt with, use:

        • Transaction manager can be used for consumption
    • JMS

      • When using the Java interface that talks to messaging providers
      • MQs have a JMS driver to communicate with JMS similar to database connectors using JDBC
      • Can communicate as a consumer or a publisher, and other systems
      • Comprises of queues and topics (one-to-many - Pub/ Sub style of invocation)
      • Admin capabilities depend on the system but externa systems generally have advanced monitoring capabilities
      • Dealing with quality of service i.e. not deleting the messages until they’ve been dealt with, use:

        • Acknowledgements
        • Transaction manager can be used for consumption
    • Anypoint MQ

      • Gives a cloud hosted solution
      • Flexible: It can be Mule (using a connector) or other systems:
      • It is via a REST interface, unlike JMS where a driver is needed, HTTPS can be used to both publish and receive the messages
      • It has queues, and an exchange (a one-to-many copy of operations: publish form one and it goes to multiple queues
      • Also provides First In First Out (FIFO) queues
      • Dealing with quality of service i.e. not deleting the messages until they’ve been dealt with, use:

        • Acknowledgements

Summary

Use watermarks to sychronise data across data stores

  • Use either manual or the automatic watermarking available for some connectors

Use the family of File, FTP, & SFTP connectors to work with files and folders

Use the On New or Update File listener to trigger flows when files are added, created, or update

  • Use the connector’s automatic watermarking to determine if a file is new based on a creation or modification timestamp

Use the On Table Row listener when new records are added to a database table

  • Use the connector’s automatic watermarking to determine if the record is new

Use the Scheduler component to schedule flows to run at a certain time or frequency

  • Use a watermark to keep a persistent variable between scheduling events

Use the Object Store connector to persist and share a watermark (or other data) across flow executions

Use the JMS connector to publish and consume messages

  • Connect to any JMS messaging service that implements the JMS spec

Test your knowledge

Question 1: A flow has a JMS Publish consume operation followed by a JMS Publish operation. Both of these operations have the default configurations. Which operation is asynchronous (does not wait for a response before continuing to the next event processor) and which operation is synchronous (blocks and waits for a response or timeout before continuing to the next event processor)?

  • Publish Consume: Synchronous. Publish: Asynchronous

Question 2: Refer t o the exhibit. What is the object type returned by the First List operation?

  • Array of Mule event objects

Question 3: A Flow Reference component sends a non-empty JSON object payload to another flow named childFlow, which then returns an XML body. A Flow Reference component saves the payload returned from childFlow to its target attribute named payload. Refer to the exhibit. What is true about the Mule event’s pavload at the next event processor after the Flow Reference component?

  • The payload is the original JSON object

Question 4: In the Databavse On Table Row operation, what does the Wathermark column enable the On Table Row operation to do?

  • To avoid duplicate processing of records in a database

Question 5: Assume that a database table contains a recordID column that always increases as new records get added to the table. In a Mule application, what is the key process to enable manual watermarking for requests to a database using the Scheduler endpoint and the Database SELECT operation?

  • Save the max recordID from the set of recordiDs in an ObjectStore and reference this recordID in subsequent database requests

Anki

References


  1. . “Module 12: Triggering flows”. Available at: . (Accessed: [2025-03-09 Sun 21:14]). ↩︎

Random Posts