
Sometimes you need to react immediately to changes in your database. Perhaps you want to place an order with a distributor whenever an item's inventory drops below a given threshold. Or perhaps you want to send an email notification whenever the status of an order changes. Regardless of your particular use case, whenever you want to react immediately to changes in your MongoDB database, change streams and triggers are fantastic options.
If you're just joining us in this Quick Start with MongoDB and Node.js series, welcome! We began by walking through how to connect to MongoDB and perform each of the CRUD (create, read, update, and delete) Operations. Then we jumped into more advanced topics like the aggregation framework and transactions. The code we write today will use the same structure as the code we built in the first post in the series, so, if you have any questions about how to get started or how the code is structured, head back to that post.
And, with that, let's dive into change streams and triggers! Here is a summary of what we'll cover today:
Get started with an M0 cluster on Atlas today. It's free forever, and it's the easiest way to try out the steps in this blog series.
#What are Change Streams?
Change streams allow you to receive notifications about changes made to your MongoDB databases and collections. When you use change streams, you can choose to program actions that will be automatically taken whenever a change event occurs.
Change streams utilize the aggregation framework, so you can choose to filter for specific change events or transform the change event documents.
For example, let's say I want to be notified whenever a new listing in the Sydney, Australia market is added to the listingsAndReviews collection. I could create a change stream that monitors the listingsAndReviews collection and use an aggregation pipeline to match on the listings I'm interested in.
Let's take a look at three different ways to implement this change stream.
#Set Up
As with all posts in this MongoDB and Node.js Quick Start series, you'll need to ensure you've completed the prerequisite steps outlined in the Set up section of the first post in this series.
I find it helpful to have a script that will generate sample data when
I'm testing change streams. To help you quickly generate sample data, I
wrote
changeStreamsTestData.js.
Download a copy of the file, update the uri
constant to reflect your
Atlas connection info, and run it by executing
node changeStreamsTestData.js
. The script will do the following:
- Create 3 new listings (Opera House Views, Private room in London, and Beautiful Beach House)
- Update 2 of those listings (Opera House Views and Beautiful Beach House)
- Create 2 more listings (Italian Villa and Sydney Harbour Home)
- Delete a listing (Sydney Harbour Home).
#Create a Change Stream
Now that we're set up, let's explore three different ways to work with a change stream in Node.js.
#Get a Copy of the Node.js Template
To make following along with this blog post easier, I've created a starter template for a Node.js script that accesses an Atlas cluster.
- Download a copy of template.js.
- Open template.js in your favorite code editor.
- Update the Connection URI to point to your Atlas cluster. If you're not sure how to do that, refer back to the first post in this series.
- Save the file as
changeStreams.js
.
You can run this file by executing node changeStreams.js
in your
shell. At this point, the file simply opens and closes a connection to
your Atlas cluster, so no output is expected. If you see
DeprecationWarnings, you can ignore them for the purposes of this post.
#Create a Helper Function to Close the Change Stream
Regardless of how we monitor changes in our change stream, we will want to close the change stream after a certain amount of time. Let's create a helper function to do just that.
Paste the following function in
changeStreams.js
.1 function closeChangeStream(timeInMs = 60000, changeStream) { 2 return new Promise((resolve) => { 3 setTimeout(() => { 4 console.log("Closing the change stream"); 5 changeStream.close(); 6 resolve(); 7 }, timeInMs) 8 }) 9 };
#Monitor Change Stream using EventEmitter's on()
The MongoDB Node.js Driver's ChangeStream class inherits from the Node Built-in class EventEmitter. As a result, we can use EventEmitter's on() function to add a listener function that will be called whenever a change occurs in the change stream.
#Create the Function
Let's create a function that will monitor changes in the change stream
using EventEmitter's on()
.
Continuing to work in
changeStreams.js
, create an asynchronous function namedmonitorListingsUsingEventEmitter
. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.1 async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeline = []){ 2 3 } Now we need to access the collection we will monitor for changes. Add the following code to
monitorListingsUsingEventEmitter()
.1 const collection = client.db("sample_airbnb").collection("listingsAndReviews"); Now we are ready to create our change stream. We can do so by using Collection