About
1.
Objectives
- Read and write files
- Trigger flows when files are added, created, or updated
- Trigger flows when new records are added to a database table
- Schedule flows to run at a certain time or frequency
- Persist and share data in flows using the Object Store
- Publish and consume JMS messages
Notes
Intro
Goal
How we initiated flows so far?
- HTTP Listener
- VM Listener
In this module, we will learn new ways
- Scheduler
- On New or Updated File
- On Table Row
- On New Message
At the end of this module, you should be able to
- Read and write files
- Trigger when files are added, created, or updated
- Trigger flows when new records are added to a database table
- Schedule flows to run at a certain time or frequency
- Persist and share data in flows using the Object Store
- Publish and consume JMS messages
Reading and writing files
Topic video
Reading and writing fiels
There are 4 connectors for working with files and folders
- File (for locally mounted file system)
- FTP
- FTPS
- SFTP
- All have the same set of operations and they behave almost identically
Support for
- File matching functionality
- Locking files
- Overwriting, appending, and generating new files
Use the File connector
- Add the File module to the project
Create a global element configuration
- Not required but a best practice
- Set the working directory that will be the root for every path used with the connector
- Use one of the connector operations and specify its properties
- On CloudHub, the connector can only be used with the /tmp folder
- On Customer-hosted Mule runtimes, the account running Mule must have read and/ or write permissions on the specified directories
Be careful not to permanently delete or overwrite files
- Move or rename them after processing
Trigger a flow when a new file is created or update
Use the On New or Update File listener
- Polls a directory for files that have been created or updated
- One message is generated for each file that is found
Multiple ways to ensure a file is new
- Delete each file after it has been processed so all files in the next poll will be new
- Move each file to a different directory after it has been processed
- Rename a file after it has been processed and filter the files to be processed
- Save and compare the file creation or modification times
Walkthrough 7-1: Trigger a flow when a new file is added to a directory
- Add and configure a File listener to watch an input directory
- Restrict the type of file read
- Rename and move the processed files
Synchronizing data with watermarks
Topic video
Synchronizing data from one system to another
The general process
- The first time, you need to sync all the data
- After that, you only need to sync the new data
- Requires a field with ordered values to identify processed items
How do you determine what is new and needs to be synched?
- On the first sync, store the highest field value for any item in the data set
- On later syncs, retrieve that and compare the value of each item and see if it is larger
The field with ordered values is often a
- Record ID
- Creation or modification timestamp
Introducing watermarks
- The timestamp or ID that is stored each sync and then retrieved and compared against in the next sync is called a watermark
Where did the name come from
- After a flood, one might record how high the water got by marking the level on a wall
- Simiarly, for data, we want to look at the last value - how “high” it was in the last sync
Types of watermarking in Mule
Automatic
- The saving, retrieving, and comparing is automatically handled for you
Available for several connector listeners
- On New or Updated File
- On Table Row
- Restricted in how you can specify what items/ records are retrieved
Manual
- You handle saving, retrieving, and comparing the watermark
- More flexible in that you specify exactly what records you want retrieved
Using listeners with automatic watermarking
Topic video
Using automatic watermarking with files
- There is a watermarking option for the On New and Update file operation for the family of file connectors
There are two watermarking modes
Watermarking Modes- CREATED_TIMESTAMP
- MODIFIED_TIMESTAMP
This can be used for one of the ways introduced last section to ensure a file is new
- Other options: Delete, move, filter
- Save and compare the file creation or modification times
Triggering a flow for each row in a database table
On Table Row Watermark- The Database connector ahs an On Table Row operation tha tis triggered for every row in a table
The operation can handle
- Generating the query
- Watermarking
- Idempotency across concurrent requests
You can specify one, both, or neither of
- Watermark column
- Id column
Using automatic watermarking with database tables
Automatic Watermarking with Database TablesWhen a watermark column is specified, this query is automatically generated used
#+begin_src sql SELECT * FROM table WHERE table_column > :watermark #+end_src
- On each poll, the component will go through all the retrieved rows and store the maximum value obtained
Handling idempotency across concurrent requests
A new poll can be executed before the watermark is updated if
- The poll interval is small
- The amount of rows is big
- Processing one single row takes too much time
To avoid a record being processed more than once
Specify an ID column
- A unique identifier for the row
The listener will make sure the row is not processed again if
Avoid a Record being Processed more than Once- It has already been retrieved and
- Processing of it hasn’t finished yet
Walkthrough 7-2: Trigger a flow when a new record is added to a database & use automatic watermarking
- Add and configure a Database listener to check a table on a set frequency for new records
- Use the listener’s automatic watermarking to track the ID of the latest record retrieved and trigger the flow whenever a new record is added
- Output new records to a CSV file
- Use a form to add a new account to the table and see the CSV updated
Using manual watermarking and scheduling flow
Topic video
Handling watermarking manually
The general process
- Schedule when a flow should be executed
- Give the watermark a default value
On the first sync
- Determine a new watermark value
- Store the watermark value so it is available in the future to other flow executions
On later syncs
- Retrieve the watermark from storage
- Check if each item in the data set should be retrieved based on the watermark value
Triggering flows at a certain time or frequency
Scheduler Event SourceSome connector event sources use a scheduling strategy to trigger a flow
- Like On New or Updated File and On Table Row
- To trigger any flow at any time, use the Scheduler event source
Two types of scheduling strategies
Fixed frequency
- The default is to poll every 1000 milliseconds
Cron
- A standard for describing time and date information
Can specify either
- An event to occur just onece at a certain time
A recurring event on some frequency
Cron Job
Persisting data across executions of flows
Use the Object Store component to store simple key-value pairs
The component was designed to store
- Synchronization information like watermarks
- Temporal information like access tokens
- User information
Retrieved values can be accessed through storage in a target variable
- This storage causes the component to output the same message as the one received
Each Mule application has an Object Store that is
- Available without any setup or configuration
Persistent
- Saved to file for embedded Mule and standalone Mule runtime
- Saved to data storage for CloudHub
- Saved to shared distributed memory for clustered Mule runtimes
Using the Object Store connector for watermarking
Object Store Connector for Watermarking- Add the ObjectStore module to the project
- Use the Retrieve operation to retrieve a watermark value and to assign a default value for the first poll
Use the watermark value in a processor to retrieve the desired items
- Like in a databavse query for records in a table
- Use the Store operation to determine and store a watermark value
Walkthrough 7-3: Schedule a flow and use manual watermarking
- Use the Scheduler component to create a new flow that executes at a specific frequency
- Retrieve accounts with a specific postal code from the accounts table
- Use the Object Store component to store the ID of the latest record and then use it to only retrieve new records
Extras
Triggering Flows from a Database Explained | Lightboard Series
- Database are not great when we are trying to get data out of them despite being very good at storing data, and having good at persisting state
- Options are limited when it comes to communicating with them in Mule
- Mule needs to poll that data and detect the change
There is a built-in Scheduler that triggers on a time
On Table Row is an event source, which will call the flow depending on the number of changes detected: it woroks on a row by row basis and doesn’t work when trying to get all the rows together
- It has watermarking capabilities, which uses the Object Store to keep a value so that it knows the last state
- Limited when it comes to doing bulk operations since it only focuses on one row
Scheduler
- Gets bulk operations without doing a round trip to the source system
- It has the repeating task
- It calls what’s in the event processors
- Use the Object Store retrieve to get the watermark, and do an operation such as a database select to get the delta (changes) of the rows
Object Store Explained | Lightboard Series
- Databases and File systems are very good at storing state
- But when we need to store little snippets of values that we need to keep track of we use the Object Store
- This is built into the Mule runtime and is not a database
- It is comprised of key-value
- It can be segmented into different chucks
- To pick up the new entry in an app, for example, we can use a Watermark, which is a value that always goes up
- Anything that increases can use a watermark
- Caching uses the object store in Mule
- Tokens can also use object store, e.g. to avoid making roundtrips to get values, and reuse values
Publishing and consuming JMS messages
Topic video
Java Messaging Service (JMS)
- Is a widely-used API for enabling applications to communicate through the exchange of messages
- Simplifies application development by providing a standard interface for creating, sending and receiving messages
- Communication is done asynchronously
Supports two messaging models
Queues: PTP (point-to-point or 1:1)
- A sender sends messages to a queue and a single receiver pulls the message off the queuea
- The receiver does not need to be listening to the queue at the time the message is sent
Topics: Pub-Sub (publish/ subscribe or 1:many)
- A publisher sends a message to a topic and all active subscribers receive the message
- Subscribers not actively listening will miss the published message (unless messages are made durable)
Using the JMS connector
- Add the JMS module to the project
Configure a global element configuration
- By default, it is set up with a finely tuned set of values for both publishing and consuming messages
- Typically, you just need to configure which connection should be used
- Use operations to publish and/ or consume messages to destinations
Walkthrough 7-4: Publish and listen for JMS messages
- Add and configure a JMS connector for ActiveMQ (that uses an in-memory broker)
- Send messages to a JMS queue
- Listen for and process messages from a JMS queue
Extras
Messaging Options Explained | Lightboard Series
VMQ
- They are part of the Mule ecosystem
- Can be used when you want to breakup your flow, and push a message into it and consume it as part of the Mule application
- VM : Mule to Mule
- Comprises of queues
- Light weight but limited admin visibility
Dealing with quality of service i.e. not deleting the messages until they’ve been dealt with, use:
- Transaction manager can be used for consumption
JMS
- When using the Java interface that talks to messaging providers
- MQs have a JMS driver to communicate with JMS similar to database connectors using JDBC
- Can communicate as a consumer or a publisher, and other systems
- Comprises of queues and topics (one-to-many - Pub/ Sub style of invocation)
- Admin capabilities depend on the system but externa systems generally have advanced monitoring capabilities
Dealing with quality of service i.e. not deleting the messages until they’ve been dealt with, use:
- Acknowledgements
- Transaction manager can be used for consumption
Anypoint MQ
- Gives a cloud hosted solution
- Flexible: It can be Mule (using a connector) or other systems:
- It is via a REST interface, unlike JMS where a driver is needed, HTTPS can be used to both publish and receive the messages
- It has queues, and an exchange (a one-to-many copy of operations: publish form one and it goes to multiple queues
- Also provides First In First Out (FIFO) queues
Dealing with quality of service i.e. not deleting the messages until they’ve been dealt with, use:
- Acknowledgements
Summary
Use watermarks to sychronise data across data stores
- Use either manual or the automatic watermarking available for some connectors
Use the family of File, FTP, & SFTP connectors to work with files and folders
Use the On New or Update File listener to trigger flows when files are added, created, or update
- Use the connector’s automatic watermarking to determine if a file is new based on a creation or modification timestamp
Use the On Table Row listener when new records are added to a database table
- Use the connector’s automatic watermarking to determine if the record is new
Use the Scheduler component to schedule flows to run at a certain time or frequency
- Use a watermark to keep a persistent variable between scheduling events
Use the Object Store connector to persist and share a watermark (or other data) across flow executions
Use the JMS connector to publish and consume messages
- Connect to any JMS messaging service that implements the JMS spec
Test your knowledge
Question 1: A flow has a JMS Publish consume operation followed by a JMS Publish operation. Both of these operations have the default configurations. Which operation is asynchronous (does not wait for a response before continuing to the next event processor) and which operation is synchronous (blocks and waits for a response or timeout before continuing to the next event processor)?
- Publish Consume: Synchronous. Publish: Asynchronous
Question 2: Refer t o the exhibit. What is the object type returned by the First List operation?

- Array of Mule event objects
Question 3: A Flow Reference component sends a non-empty JSON object payload to another flow named childFlow, which then returns an XML body. A Flow Reference component saves the payload returned from childFlow to its target attribute named payload. Refer to the exhibit. What is true about the Mule event’s pavload at the next event processor after the Flow Reference component?

- The payload is the original JSON object
Question 4: In the Databavse On Table Row operation, what does the Wathermark column enable the On Table Row operation to do?
- To avoid duplicate processing of records in a database
Question 5: Assume that a database table contains a recordID column that always increases as new records get added to the table. In a Mule application, what is the key process to enable manual watermarking for requests to a database using the Scheduler endpoint and the Database SELECT operation?
- Save the max recordID from the set of recordiDs in an ObjectStore and reference this recordID in subsequent database requests
Anki
Links
- Anypoint Platform Development: Fundamentals - Part 3: Building applications to synchronize data
- Module 11: Writing DataWeave transformations
- Module 13: Processing records
References
. “Module 12: Triggering flows”. Available at: . (Accessed: ↩︎
).