Change Streams
Learn how to use MongoDB change streams to listen for real-time data changes with watch(), and build live features like notifications.
Change Streams
Everything we have covered so far is reactive on request — your app asks MongoDB a question, and MongoDB answers. But what if you want your app to know immediately when something changes — without constantly asking?
Change streams let your application subscribe to real-time changes happening in a collection. The moment a document is inserted, updated, or deleted, your app is notified — no polling required.
Requirements
Change streams require a replica set or sharded cluster — the same requirement as transactions, which we covered in the Transactions section. If you set up a replica set for transactions, change streams work immediately with no extra setup.
# Local replica set — same as transactions setup
rs.initiate()MongoDB Atlas runs on a replica set by default — change streams work out of the box.
How Change Streams Work
Your application opens a change stream on a collection. Whenever a write happens — from anywhere, any process, any server — MongoDB pushes an event describing that change to every app listening on that stream.
Basic Usage — watch()
const { MongoClient } = require('mongodb');
const client = new MongoClient(process.env.MONGODB_URI);
await client.connect();
const db = client.db('school');
const enrollments = db.collection('enrollments');
// Open a change stream
const changeStream = enrollments.watch();
// Listen for changes
changeStream.on('change', (change) => {
console.log('Change detected:', change);
});Now insert a document into enrollments from anywhere — mongosh, another app, anywhere:
db.enrollments.insertOne({
studentId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f6"),
courseId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f1"),
enrolledAt: new Date(),
status: "active"
})Your listening app immediately receives:
{
_id: { _data: "8264F8A1B2000000012B022C0100296E5A10..." },
operationType: "insert",
clusterTime: Timestamp({ t: 1234567890, i: 1 }),
fullDocument: {
_id: ObjectId("..."),
studentId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f6"),
courseId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f1"),
enrolledAt: ISODate("2024-09-15T10:30:00.000Z"),
status: "active"
},
ns: { db: "school", coll: "enrollments" },
documentKey: { _id: ObjectId("...") }
}No polling, no delay — the event arrives the moment the insert happens.
Operation Types
The operationType field tells you what kind of change occurred:
| operationType | Triggered by |
|---|---|
insert | A new document was inserted |
update | An existing document was modified |
replace | A document was completely replaced |
delete | A document was deleted |
drop | The collection was dropped |
rename | The collection was renamed |
invalidate | The change stream is no longer valid |
Handling Different Operation Types
changeStream.on('change', (change) => {
switch (change.operationType) {
case 'insert':
console.log('New document:', change.fullDocument);
break;
case 'update':
console.log('Updated fields:', change.updateDescription.updatedFields);
console.log('Document ID:', change.documentKey._id);
break;
case 'delete':
console.log('Deleted document ID:', change.documentKey._id);
break;
default:
console.log('Other change:', change.operationType);
}
});updateDescription — What Changed
For update operations, updateDescription tells you exactly which fields changed:
// Someone runs this update
db.enrollments.updateOne(
{ _id: ObjectId("...") },
{ $set: { status: "completed" } }
)The change event includes:
{
operationType: "update",
documentKey: { _id: ObjectId("...") },
updateDescription: {
updatedFields: { status: "completed" },
removedFields: [],
truncatedArrays: []
}
}You know exactly what changed — without re-fetching the entire document.
Watching with a Pipeline — Filtering Events
You can filter which changes you receive using an aggregation pipeline — only $match, $project, $addFields, and a few other stages are supported.
// Only watch for new enrollments (inserts), not updates or deletes
const changeStream = enrollments.watch([
{ $match: { operationType: "insert" } }
]);
changeStream.on('change', (change) => {
console.log('New enrollment:', change.fullDocument);
});Watch Only Specific Field Changes
// Only react when the 'status' field changes
const changeStream = enrollments.watch([
{
$match: {
operationType: "update",
"updateDescription.updatedFields.status": { $exists: true }
}
}
]);
changeStream.on('change', (change) => {
const newStatus = change.updateDescription.updatedFields.status;
console.log(`Enrollment status changed to: ${newStatus}`);
});Watching an Entire Database
You can watch all collections in a database at once:
const changeStream = db.watch();
changeStream.on('change', (change) => {
console.log(`Change in ${change.ns.coll}:`, change.operationType);
});This is useful for building audit logs — recording every change across your entire school database in one place.
Resume Tokens — Handling Disconnections
If your app crashes or loses connection, you do not want to miss changes that happened while you were offline — but you also do not want to replay changes from the very beginning of time.
Every change event includes a _id field called a resume token. Store the most recent one, and use it to resume exactly where you left off.
let resumeToken = null;
async function startWatching() {
const options = resumeToken ? { resumeAfter: resumeToken } : {};
const changeStream = enrollments.watch([], options);
changeStream.on('change', (change) => {
console.log('Change:', change.operationType);
// Save the resume token after processing each change
resumeToken = change._id;
saveResumeTokenToDisk(resumeToken); // persist somewhere durable
});
changeStream.on('error', async (error) => {
console.error('Change stream error:', error);
changeStream.close();
// Reconnect, resuming from the last known token
setTimeout(startWatching, 1000);
});
}
startWatching();// Persist resume token to a file or database
const fs = require('fs');
function saveResumeTokenToDisk(token) {
fs.writeFileSync('resume-token.json', JSON.stringify(token));
}
function loadResumeTokenFromDisk() {
try {
return JSON.parse(fs.readFileSync('resume-token.json'));
} catch {
return null;
}
}MongoDB only keeps change history for a limited time (determined by the oplog size, typically hours to a few days). If your app is offline longer than that, the resume token becomes invalid and you must restart the stream from the current point — potentially missing changes. For critical data, do not rely solely on change streams; combine them with periodic reconciliation queries.
Real-Time Notifications — School System Example
Let's build a real feature — notify admins in real time whenever a new student enrolls in a course.
Server — Watching for New Enrollments
const { MongoClient } = require('mongodb');
const { Server } = require('socket.io');
const client = new MongoClient(process.env.MONGODB_URI);
await client.connect();
const db = client.db('school');
const enrollments = db.collection('enrollments');
const students = db.collection('students');
const courses = db.collection('courses');
// Set up WebSocket server for real-time updates to the browser
const io = new Server(3001, { cors: { origin: "*" } });
// Watch for new enrollments
const changeStream = enrollments.watch([
{ $match: { operationType: "insert" } }
]);
changeStream.on('change', async (change) => {
const enrollment = change.fullDocument;
// Fetch related details for a meaningful notification
const student = await students.findOne({ _id: enrollment.studentId });
const course = await courses.findOne({ _id: enrollment.courseId });
const notification = {
type: 'new_enrollment',
message: `${student.name} enrolled in ${course.title}`,
studentName: student.name,
courseTitle: course.title,
timestamp: enrollment.enrolledAt
};
// Push to all connected admin dashboards
io.emit('notification', notification);
console.log('Notification sent:', notification.message);
});
console.log('Watching for new enrollments...');Client — Admin Dashboard (Browser)
import { io } from "socket.io-client";
const socket = io("http://localhost:3001");
socket.on('notification', (notification) => {
console.log('New notification:', notification.message);
// Show a toast, update a notification badge, etc.
showToast(notification.message);
});The moment a teacher enrolls a student anywhere in the system — through the API, a script, or directly in mongosh — every connected admin dashboard receives a live notification.
Other Use Cases
Live attendance dashboard — watch the attendance collection, update a live "students present today" counter on a dashboard without refreshing.
Audit logging — watch the entire database, write every change to an auditLog collection for compliance and security review.
Cache invalidation — watch a courses collection, and whenever a course is updated, invalidate the corresponding cache entry in Redis.
Sync to another system — watch for changes and sync them to a search index (like Elasticsearch) or a data warehouse in real time.
Change Streams vs Polling
| Polling | Change Streams | |
|---|---|---|
| How it works | Repeatedly query for changes | MongoDB pushes changes to you |
| Latency | Delay equal to poll interval | Near-instant |
| Database load | Constant queries, even with no changes | Only active when changes occur |
| Complexity | Simple | Requires replica set, resume token handling |
| Best for | Simple periodic checks | Real-time features — notifications, dashboards, sync |
// Polling — checks every 5 seconds, even if nothing changed
setInterval(async () => {
const recent = await enrollments.find({
enrolledAt: { $gt: fiveSecondsAgo }
}).toArray();
if (recent.length > 0) {
notifyAdmins(recent);
}
}, 5000);
// Change stream — reacts instantly, only when something happens
enrollments.watch([{ $match: { operationType: "insert" } } ])
.on('change', (change) => notifyAdmins([change.fullDocument]));Quick Reference
// Basic watch
const stream = collection.watch();
// Watch with filter pipeline
const stream = collection.watch([
{ $match: { operationType: "insert" } }
]);
// Watch entire database
const stream = db.watch();
// Handle events
stream.on('change', (change) => { ... });
stream.on('error', (error) => { ... });
// Resume after disconnect
const stream = collection.watch([], { resumeAfter: savedResumeToken });
// Close the stream
await stream.close();Change streams are powerful but add operational complexity — your app now has a long-lived connection that needs error handling, reconnection logic, and resume token persistence. Use them when real-time behavior is a genuine product requirement — live notifications, dashboards, sync pipelines. For features where a few seconds of delay is acceptable, simple polling or periodic jobs are often easier to build and maintain.
NoSQL Injection
Learn what NoSQL injection is, how MongoDB query operators can be exploited through user input, and how to defend against it.
Time Series Collections
Learn how MongoDB time series collections work, when to use them, and how they improve on the manual bucket pattern for storing time-stamped data.