MongoDB

Change Streams

15 Questions

Change Streams allow applications to watch for real-time data changes in MongoDB collections, databases, or entire deployments. They are built on MongoDB's oplog (operations log) and provide a push-based notification model. When a document is inserted, updated, replaced, or deleted, the change stream emits an event with details about the change. Change Streams require a replica set or sharded cluster (not available on standalone MongoDB). They use resumable cursors — if the connection drops, the stream can resume from where it left off using a resume token. They are widely used for real-time notifications, cache invalidation, and event-driven architectures.
// Watch a collection for changes
const changeStream = db.collection('orders').watch();

changeStream.on('change', (event) => {
  console.log('Change detected:', event.operationType);
  console.log('Document:', event.fullDocument);
  console.log('Document key:', event.documentKey._id);
});

// Using async iterator (preferred)
for await (const change of changeStream) {
  console.log(change.operationType); // insert, update, delete, replace
}

Change Streams emit events with an operationType field identifying what happened. The primary types are: insert (new document added), update (fields modified via update operators), replace (entire document replaced), delete (document removed), drop (collection dropped), rename (collection renamed), dropDatabase (database dropped), and invalidate (stream closed due to drop/rename). Update events include updateDescription.updatedFields and updateDescription.removedFields showing exactly what changed. The fullDocument field requires fullDocument: 'updateLookup' option for update/delete events.
// Change event structure for INSERT
{
  _id: { /* resume token */ },
  operationType: "insert",
  fullDocument: { _id: ObjectId("..."), name: "Alice", age: 30 },
  ns: { db: "myapp", coll: "users" },
  documentKey: { _id: ObjectId("...") },
  clusterTime: Timestamp(...)
}

// Change event structure for UPDATE
{
  operationType: "update",
  documentKey: { _id: ObjectId("...") },
  updateDescription: {
    updatedFields: { age: 31, "address.city": "Mumbai" },
    removedFields: ["oldField"],
    truncatedArrays: []
  }
}

Every change event includes an _id field called the resume token — a unique identifier for that specific change in the oplog. When a change stream disconnects (network error, application restart), you can resume it from the last processed event using the resumeAfter or startAfter option. This ensures no events are missed. Store the resume token durably (e.g., in MongoDB itself or Redis) after each processed event. The difference: resumeAfter resumes after the given token; startAfter starts after an invalidate event. Token validity depends on oplog size — tokens expire if the oplog is older than the token's position.
// Persist resume token to avoid missing events on restart
let resumeToken = null;

async function watchWithResume() {
  const options = resumeToken ? { resumeAfter: resumeToken } : {};
  const changeStream = db.collection('orders').watch([], options);

  for await (const change of changeStream) {
    // Process the change
    await processOrderChange(change);
    
    // Save token AFTER successful processing
    resumeToken = change._id;
    await db.collection('_changeStreamState').updateOne(
      { _id: 'orders_stream' },
      { $set: { resumeToken } },
      { upsert: true }
    );
  }
}

// On startup: load saved token
const state = await db.collection('_changeStreamState').findOne({ _id: 'orders_stream' });
resumeToken = state?.resumeToken;

The watch() method accepts an aggregation pipeline as its first argument, allowing you to filter and transform events server-side before they arrive. This is more efficient than filtering in application code because MongoDB only sends relevant events. The pipeline stages allowed in change streams are limited to: $match, $project, $addFields, $replaceRoot, $redact, and $set. Use $match on operationType, fullDocument fields, or documentKey. Filtered streams consume fewer resources and reduce event processing overhead in high-write collections.
// Only watch for order insertions with high value
const changeStream = db.collection('orders').watch([
  {
    $match: {
      operationType: 'insert',
      'fullDocument.amount': { $gte: 10000 }
    }
  }
]);

// Watch only specific operation types
const changeStream = db.collection('users').watch([
  { $match: { operationType: { $in: ['insert', 'delete'] } } }
]);

// Filter updates to specific fields
const changeStream = db.collection('inventory').watch([
  {
    $match: {
      operationType: 'update',
      'updateDescription.updatedFields.stock': { $exists: true }
    }
  }
]);

Change Streams can be opened at three levels: collection (collection.watch()), database (db.watch()), and deployment/client (client.watch()). Database-level streams emit events for all collections. Client-level streams watch all databases and collections. Higher-level streams include additional context — the ns (namespace) field in the event identifies which database and collection changed. This is useful for audit logging, data replication pipelines, and event buses. For high-traffic deployments, database/client-level streams can generate very high event volumes — always use filtering pipelines.
// Watch entire database — all collections
const dbStream = db.watch([
  { $match: { 'ns.coll': { $in: ['orders', 'payments'] } } }
]);

dbStream.on('change', event => {
  console.log(`Change in ${event.ns.coll}:`, event.operationType);
});

// Watch entire deployment (all databases)
const deploymentStream = client.watch([
  { $match: { 'ns.db': { $ne: 'admin' } } }  // exclude admin db
]);

// Event includes namespace info
{
  operationType: "insert",
  ns: { db: "shop", coll: "orders" },  // which db and collection
  fullDocument: { ... }
}

Change Streams integrate naturally with WebSockets or Server-Sent Events (SSE) to push real-time updates to connected clients. When a change event is received, the Node.js server emits it to the relevant WebSocket subscribers. Use Socket.IO rooms to route events only to users watching specific documents (e.g., order status page). The change stream runs in one long-lived background process (not per-request), and the Socket.IO emission is triggered for each event. A typical pattern: user opens order tracking page → joins Socket.IO room → server watches order collection → on change event, emit to room.
// server.js — real-time order status updates
const io = require('socket.io')(httpServer);

async function startOrderWatcher() {
  const changeStream = db.collection('orders').watch([
    { $match: { operationType: { $in: ['update', 'replace'] } } }
  ], { fullDocument: 'updateLookup' });

  for await (const change of changeStream) {
    const orderId = change.documentKey._id.toString();
    const order = change.fullDocument;
    
    // Emit only to users watching this specific order
    io.to(`order:${orderId}`).emit('orderUpdate', {
      orderId,
      status: order.status,
      updatedAt: order.updatedAt
    });
  }
}

// Client joins room when viewing order
socket.on('watchOrder', (orderId) => {
  socket.join(`order:${orderId}`);
});

By default, update events only include updateDescription (what changed) and documentKey — not the full document. To receive the complete updated document, set the fullDocument option to 'updateLookup'. MongoDB performs a secondary lookup to fetch the current document state. Note this is a post-change lookup — in rare race conditions (rapid successive updates), the fetched document may reflect a later state than the event. MongoDB 6.0 introduced 'whenAvailable' and 'required' options that use pre/post-image collections for guaranteed point-in-time snapshots on collections with changeStreamPreAndPostImages enabled.
// Get full document on update events
const changeStream = db.collection('orders').watch([], {
  fullDocument: 'updateLookup'  // fetch full document on update
});

changeStream.on('change', event => {
  if (event.operationType === 'update') {
    console.log('Full updated order:', event.fullDocument); // complete document
    console.log('Changed fields only:', event.updateDescription.updatedFields);
  }
});

// MongoDB 6.0+: precise pre/post images
db.createCollection('orders');
db.runCommand({
  collMod: 'orders',
  changeStreamPreAndPostImages: { enabled: true }
})

const precise = db.collection('orders').watch([], {
  fullDocument: 'required',      // post-image
  fullDocumentBeforeChange: 'required'  // pre-image
});

In a sharded cluster, change streams opened on a collection or database are managed by mongos, which opens individual change streams on each shard and merges them into a single ordered stream. Events are ordered by cluster time ensuring causally consistent ordering across shards. For deployment-level streams, events from all shards are merged. This happens transparently to the application. Performance consideration: each shard's primary must tick the oplog for each change, and mongos must track the merge cursor — high-throughput sharded change streams can be resource-intensive. Filtering with aggregation pipelines is especially important in sharded environments to minimize event volume.
// Change stream on sharded collection — same API, mongos handles merging
const changeStream = db.collection('orders').watch([
  { $match: { 'fullDocument.region': 'APAC' } }
]);

// Change event includes clusterTime for ordering across shards
{
  _id: { resumeToken... },
  operationType: "insert",
  clusterTime: Timestamp(1704000000, 1),  // used for ordering
  ns: { db: "shop", coll: "orders" },
  fullDocument: { region: "APAC", amount: 5000, ... }
}

// Split brain prevention: change streams guarantee exactly-once
// delivery per event across shard merges

Cache invalidation is one of the most practical change stream use cases. When data changes in MongoDB, the change stream event triggers deletion or update of the corresponding cache entry in Redis or an in-memory cache, keeping cached data consistent. This eliminates stale cache problems without TTL hacks. Watch only the specific fields that affect cached data to minimize cache churn. Use the document ID as the cache key for precise invalidation. For aggregate/computed caches (dashboards, report summaries), use a coalescing pattern — debounce rapid updates to recalculate once rather than on every change.
const redis = require('redis').createClient();

async function startCacheInvalidationWatcher() {
  const changeStream = db.collection('products').watch([
    { $match: { operationType: { $in: ['update', 'replace', 'delete'] } } }
  ]);

  for await (const change of changeStream) {
    const productId = change.documentKey._id.toString();
    
    // Invalidate specific cache keys
    await redis.del(`product:${productId}`);
    await redis.del(`product:${productId}:details`);
    
    // Also invalidate list caches for this category
    if (change.updateDescription?.updatedFields?.category) {
      const oldCategory = change.updateDescription.updatedFields.category;
      await redis.del(`products:category:${oldCategory}`);
    }
    
    console.log(`Cache invalidated for product ${productId}`);
  }
}

Change Streams can fail due to network errors, replica set elections (up to 30 seconds), or oplog overwrites. A robust implementation must: catch errors, store the resume token persistently, and reconnect automatically with backoff. The resumeAfter token allows picking up exactly where processing stopped. Use exponential backoff for reconnection to avoid thundering herd problems on cluster recovery. Check if the error is a resumable error (network/election related) vs. non-resumable (invalidate event — collection dropped). Modern MongoDB drivers handle most transient failures automatically via built-in retry logic.
async function watchWithRetry(collection, pipeline, handler) {
  let resumeToken = await loadResumeToken();
  let retryDelay = 1000;
  
  while (true) {
    try {
      const options = { fullDocument: 'updateLookup' };
      if (resumeToken) options.resumeAfter = resumeToken;
      
      const stream = db.collection(collection).watch(pipeline, options);
      
      for await (const change of stream) {
        await handler(change);
        resumeToken = change._id;
        await saveResumeToken(resumeToken);
        retryDelay = 1000; // reset on success
      }
    } catch (err) {
      if (err.code === 136 || err.message.includes('invalidate')) {
        console.error('Stream invalidated — collection dropped. Stopping.');
        break; // non-resumable
      }
      console.error(`Stream error, retrying in ${retryDelay}ms:`, err.message);
      await sleep(retryDelay);
      retryDelay = Math.min(retryDelay * 2, 30000); // max 30s backoff
    }
  }
}

Polling periodically queries the database for changes (e.g., every 5 seconds), consuming database and network resources even when nothing changed, adding latency up to the poll interval, and missing rapid changes between polls. Change Streams are push-based — MongoDB notifies the application immediately when data changes, with virtually zero latency, no wasted queries, and guaranteed event ordering. Polling is simpler to implement but inefficient at scale. Change Streams require a replica set and proper error handling but are significantly more efficient. For anything requiring real-time behavior (notifications, live feeds, inventory updates), Change Streams are the correct approach.
// BAD — polling approach (wastes resources, adds latency)
setInterval(async () => {
  const newOrders = await db.collection('orders').find({
    status: 'pending',
    createdAt: { $gt: lastCheckTime }
  }).toArray();
  if (newOrders.length) processOrders(newOrders);
  lastCheckTime = new Date();
}, 5000); // 5 second delay, 17,280 queries/day even if no changes!

// GOOD — Change Stream (push-based, immediate, efficient)
const stream = db.collection('orders').watch([
  { $match: { operationType: 'insert', 'fullDocument.status': 'pending' } }
]);
for await (const change of stream) {
  await processOrder(change.fullDocument); // immediate, zero wasted queries
}

An audit trail records every change to sensitive data — who changed what, when, and what it looked like before and after. Change Streams automatically capture all mutations. The pattern is to watch the target collection and write each change event to an audit_log collection. For GDPR/HIPAA compliance, include user context by enabling MongoDB Auditing or by capturing user session info at write time. With MongoDB 6.0's pre and post images, the change stream includes both the before and after state without a separate lookup. Store audit logs in a time-series collection for efficient time-range queries.
// Comprehensive audit trail
const changeStream = db.collection('financialRecords').watch([], {
  fullDocument: 'updateLookup',
  fullDocumentBeforeChange: 'required' // MongoDB 6.0+, requires preAndPostImages enabled
});

for await (const change of changeStream) {
  await db.collection('audit_log').insertOne({
    timestamp: change.clusterTime,
    collection: change.ns.coll,
    operationType: change.operationType,
    documentId: change.documentKey._id,
    before: change.fullDocumentBeforeChange || null,
    after: change.fullDocument || null,
    changedFields: change.updateDescription?.updatedFields || null,
    // Capture user context if passed via session variables
    modifiedBy: change.fullDocument?.lastModifiedBy || 'system'
  });
}

Change Streams enable CDC (Change Data Capture) — synchronizing MongoDB changes to Elasticsearch, a relational database, message queues, or data warehouses in real time. The pattern: watch MongoDB, transform each event to the target format, and write to the destination. For Elasticsearch sync, an insert/update in MongoDB creates/updates the corresponding Elasticsearch document. Use idempotent processing — if the same event is replayed (from resume token), the result should be the same. Include the resume token in a managed transaction with the write to the destination for exactly-once guarantees. Tools like Debezium and MongoDB Kafka Connector implement this pattern at scale.
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client({ node: 'http://localhost:9200' });

const changeStream = db.collection('products').watch([], {
  fullDocument: 'updateLookup'
});

for await (const change of changeStream) {
  const { operationType, documentKey, fullDocument } = change;
  
  if (operationType === 'insert' || operationType === 'update' || operationType === 'replace') {
    // Sync to Elasticsearch
    await esClient.index({
      index: 'products',
      id: documentKey._id.toString(),
      document: {
        name: fullDocument.name,
        price: fullDocument.price,
        category: fullDocument.category,
        updatedAt: new Date()
      }
    });
  } else if (operationType === 'delete') {
    await esClient.delete({ index: 'products', id: documentKey._id.toString() });
  }
}

Change Streams require: a replica set or sharded cluster (not standalone), MongoDB 3.6+, and the WiredTiger storage engine. Key limitations: the stream is tied to oplog retention — if the oplog rolls over your resume token position, you cannot resume and must do a full re-sync. Only $match, $project, $addFields, $replaceRoot, $redact, $set pipeline stages are supported. Change streams do not trigger on internal MongoDB operations (e.g., chunk migrations in sharded clusters). Very high-write collections can generate substantial change stream overhead. Each open change stream is a persistent server-side cursor — avoid opening hundreds of streams from a single deployment.
// Check oplog configuration
use local
db.oplog.rs.stats()
// Check oplog size and retention window

// Requirements check
db.adminCommand({ isMaster: 1 }).setName  // must not be null (must be RS)
db.serverStatus().storageEngine.name      // must be "wiredTiger"

// Unsupported pipeline stages (will throw error):
db.collection('data').watch([
  { $lookup: { ... } }  // ERROR: not allowed in change stream pipeline
]);

// Performance: limit open streams
// BAD — opening stream per user request
app.get('/watch/:id', (req, res) => {
  const stream = db.collection('orders').watch(); // creates millions of cursors!
});

// GOOD — single shared stream, broadcast via WebSocket rooms

The startAtOperationTime option opens a change stream starting from a specific cluster timestamp, allowing you to replay changes from a point in the past — useful for backfilling missed events during downtime. The timestamp must be within the oplog retention window. Get the current cluster time via db.hello().operationTime or from any server response. This is different from resumeAfter (which uses a stored token from a previous stream) — startAtOperationTime uses an absolute time. It's commonly used when deploying a new consumer service that needs to catch up from a known point without a stored token.
const { Timestamp } = require('mongodb');

// Get cluster time before starting an operation
const startTime = db.hello().operationTime; // current cluster time

// Perform some operations that we want to capture
await db.collection('orders').insertMany(bulkOrders);

// Open change stream from BEFORE those operations
// (will capture them and all future changes)
const changeStream = db.collection('orders').watch([], {
  startAtOperationTime: startTime
});

// Replay from 1 hour ago (within oplog window)
const oneHourAgo = Timestamp.fromNumber(
  Math.floor((Date.now() - 3600000) / 1000)
);
const historicalStream = db.collection('orders').watch([], {
  startAtOperationTime: oneHourAgo
});