DocsHub
Advanced

Change Streams

Learn how to use MongoDB change streams to listen for real-time data changes with watch(), and build live features like notifications.

Change Streams

Everything we have covered so far is reactive on request — your app asks MongoDB a question, and MongoDB answers. But what if you want your app to know immediately when something changes — without constantly asking?

Change streams let your application subscribe to real-time changes happening in a collection. The moment a document is inserted, updated, or deleted, your app is notified — no polling required.


Requirements

Change streams require a replica set or sharded cluster — the same requirement as transactions, which we covered in the Transactions section. If you set up a replica set for transactions, change streams work immediately with no extra setup.

# Local replica set — same as transactions setup
rs.initiate()

MongoDB Atlas runs on a replica set by default — change streams work out of the box.


How Change Streams Work

Teacher enrollsa student db.enrollments.insertOne(...) MongoDBenrollments collection Change Streamwatch() Your Appreceives change eventin real time Send notificationto admin dashboard'New enrollment: Ali Hassan'

Your application opens a change stream on a collection. Whenever a write happens — from anywhere, any process, any server — MongoDB pushes an event describing that change to every app listening on that stream.


Basic Usage — watch()

const { MongoClient } = require('mongodb');

const client = new MongoClient(process.env.MONGODB_URI);
await client.connect();

const db = client.db('school');
const enrollments = db.collection('enrollments');

// Open a change stream
const changeStream = enrollments.watch();

// Listen for changes
changeStream.on('change', (change) => {
  console.log('Change detected:', change);
});

Now insert a document into enrollments from anywhere — mongosh, another app, anywhere:

db.enrollments.insertOne({
  studentId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f6"),
  courseId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f1"),
  enrolledAt: new Date(),
  status: "active"
})

Your listening app immediately receives:

{
  _id: { _data: "8264F8A1B2000000012B022C0100296E5A10..." },
  operationType: "insert",
  clusterTime: Timestamp({ t: 1234567890, i: 1 }),
  fullDocument: {
    _id: ObjectId("..."),
    studentId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f6"),
    courseId: ObjectId("64a1f2c3e4b0a1b2c3d4e5f1"),
    enrolledAt: ISODate("2024-09-15T10:30:00.000Z"),
    status: "active"
  },
  ns: { db: "school", coll: "enrollments" },
  documentKey: { _id: ObjectId("...") }
}

No polling, no delay — the event arrives the moment the insert happens.


Operation Types

The operationType field tells you what kind of change occurred:

operationTypeTriggered by
insertA new document was inserted
updateAn existing document was modified
replaceA document was completely replaced
deleteA document was deleted
dropThe collection was dropped
renameThe collection was renamed
invalidateThe change stream is no longer valid

Handling Different Operation Types

changeStream.on('change', (change) => {
  switch (change.operationType) {
    case 'insert':
      console.log('New document:', change.fullDocument);
      break;

    case 'update':
      console.log('Updated fields:', change.updateDescription.updatedFields);
      console.log('Document ID:', change.documentKey._id);
      break;

    case 'delete':
      console.log('Deleted document ID:', change.documentKey._id);
      break;

    default:
      console.log('Other change:', change.operationType);
  }
});

updateDescription — What Changed

For update operations, updateDescription tells you exactly which fields changed:

// Someone runs this update
db.enrollments.updateOne(
  { _id: ObjectId("...") },
  { $set: { status: "completed" } }
)

The change event includes:

{
  operationType: "update",
  documentKey: { _id: ObjectId("...") },
  updateDescription: {
    updatedFields: { status: "completed" },
    removedFields: [],
    truncatedArrays: []
  }
}

You know exactly what changed — without re-fetching the entire document.


Watching with a Pipeline — Filtering Events

You can filter which changes you receive using an aggregation pipeline — only $match, $project, $addFields, and a few other stages are supported.

// Only watch for new enrollments (inserts), not updates or deletes
const changeStream = enrollments.watch([
  { $match: { operationType: "insert" } }
]);

changeStream.on('change', (change) => {
  console.log('New enrollment:', change.fullDocument);
});

Watch Only Specific Field Changes

// Only react when the 'status' field changes
const changeStream = enrollments.watch([
  {
    $match: {
      operationType: "update",
      "updateDescription.updatedFields.status": { $exists: true }
    }
  }
]);

changeStream.on('change', (change) => {
  const newStatus = change.updateDescription.updatedFields.status;
  console.log(`Enrollment status changed to: ${newStatus}`);
});

Watching an Entire Database

You can watch all collections in a database at once:

const changeStream = db.watch();

changeStream.on('change', (change) => {
  console.log(`Change in ${change.ns.coll}:`, change.operationType);
});

This is useful for building audit logs — recording every change across your entire school database in one place.


Resume Tokens — Handling Disconnections

If your app crashes or loses connection, you do not want to miss changes that happened while you were offline — but you also do not want to replay changes from the very beginning of time.

Every change event includes a _id field called a resume token. Store the most recent one, and use it to resume exactly where you left off.

let resumeToken = null;

async function startWatching() {
  const options = resumeToken ? { resumeAfter: resumeToken } : {};

  const changeStream = enrollments.watch([], options);

  changeStream.on('change', (change) => {
    console.log('Change:', change.operationType);

    // Save the resume token after processing each change
    resumeToken = change._id;
    saveResumeTokenToDisk(resumeToken); // persist somewhere durable
  });

  changeStream.on('error', async (error) => {
    console.error('Change stream error:', error);
    changeStream.close();

    // Reconnect, resuming from the last known token
    setTimeout(startWatching, 1000);
  });
}

startWatching();
// Persist resume token to a file or database
const fs = require('fs');

function saveResumeTokenToDisk(token) {
  fs.writeFileSync('resume-token.json', JSON.stringify(token));
}

function loadResumeTokenFromDisk() {
  try {
    return JSON.parse(fs.readFileSync('resume-token.json'));
  } catch {
    return null;
  }
}

MongoDB only keeps change history for a limited time (determined by the oplog size, typically hours to a few days). If your app is offline longer than that, the resume token becomes invalid and you must restart the stream from the current point — potentially missing changes. For critical data, do not rely solely on change streams; combine them with periodic reconciliation queries.


Real-Time Notifications — School System Example

Let's build a real feature — notify admins in real time whenever a new student enrolls in a course.

Server — Watching for New Enrollments

const { MongoClient } = require('mongodb');
const { Server } = require('socket.io');

const client = new MongoClient(process.env.MONGODB_URI);
await client.connect();

const db = client.db('school');
const enrollments = db.collection('enrollments');
const students = db.collection('students');
const courses = db.collection('courses');

// Set up WebSocket server for real-time updates to the browser
const io = new Server(3001, { cors: { origin: "*" } });

// Watch for new enrollments
const changeStream = enrollments.watch([
  { $match: { operationType: "insert" } }
]);

changeStream.on('change', async (change) => {
  const enrollment = change.fullDocument;

  // Fetch related details for a meaningful notification
  const student = await students.findOne({ _id: enrollment.studentId });
  const course = await courses.findOne({ _id: enrollment.courseId });

  const notification = {
    type: 'new_enrollment',
    message: `${student.name} enrolled in ${course.title}`,
    studentName: student.name,
    courseTitle: course.title,
    timestamp: enrollment.enrolledAt
  };

  // Push to all connected admin dashboards
  io.emit('notification', notification);

  console.log('Notification sent:', notification.message);
});

console.log('Watching for new enrollments...');

Client — Admin Dashboard (Browser)

import { io } from "socket.io-client";

const socket = io("http://localhost:3001");

socket.on('notification', (notification) => {
  console.log('New notification:', notification.message);
  // Show a toast, update a notification badge, etc.
  showToast(notification.message);
});

The moment a teacher enrolls a student anywhere in the system — through the API, a script, or directly in mongosh — every connected admin dashboard receives a live notification.


Other Use Cases

Live attendance dashboard — watch the attendance collection, update a live "students present today" counter on a dashboard without refreshing.

Audit logging — watch the entire database, write every change to an auditLog collection for compliance and security review.

Cache invalidation — watch a courses collection, and whenever a course is updated, invalidate the corresponding cache entry in Redis.

Sync to another system — watch for changes and sync them to a search index (like Elasticsearch) or a data warehouse in real time.


Change Streams vs Polling

PollingChange Streams
How it worksRepeatedly query for changesMongoDB pushes changes to you
LatencyDelay equal to poll intervalNear-instant
Database loadConstant queries, even with no changesOnly active when changes occur
ComplexitySimpleRequires replica set, resume token handling
Best forSimple periodic checksReal-time features — notifications, dashboards, sync
// Polling — checks every 5 seconds, even if nothing changed
setInterval(async () => {
  const recent = await enrollments.find({
    enrolledAt: { $gt: fiveSecondsAgo }
  }).toArray();

  if (recent.length > 0) {
    notifyAdmins(recent);
  }
}, 5000);

// Change stream — reacts instantly, only when something happens
enrollments.watch([{ $match: { operationType: "insert" } } ])
  .on('change', (change) => notifyAdmins([change.fullDocument]));

Quick Reference

// Basic watch
const stream = collection.watch();

// Watch with filter pipeline
const stream = collection.watch([
  { $match: { operationType: "insert" } }
]);

// Watch entire database
const stream = db.watch();

// Handle events
stream.on('change', (change) => { ... });
stream.on('error', (error) => { ... });

// Resume after disconnect
const stream = collection.watch([], { resumeAfter: savedResumeToken });

// Close the stream
await stream.close();

Change streams are powerful but add operational complexity — your app now has a long-lived connection that needs error handling, reconnection logic, and resume token persistence. Use them when real-time behavior is a genuine product requirement — live notifications, dashboards, sync pipelines. For features where a few seconds of delay is acceptable, simple polling or periodic jobs are often easier to build and maintain.

On this page