// Watch a collection for changes
const changeStream = db.collection('orders').watch();
changeStream.on('change', (event) => {
console.log('Change detected:', event.operationType);
console.log('Document:', event.fullDocument);
console.log('Document key:', event.documentKey._id);
});
// Using async iterator (preferred)
for await (const change of changeStream) {
console.log(change.operationType); // insert, update, delete, replace
}
updateDescription.updatedFields and updateDescription.removedFields showing exactly what changed. The fullDocument field requires fullDocument: 'updateLookup' option for update/delete events.
// Change event structure for INSERT
{
_id: { /* resume token */ },
operationType: "insert",
fullDocument: { _id: ObjectId("..."), name: "Alice", age: 30 },
ns: { db: "myapp", coll: "users" },
documentKey: { _id: ObjectId("...") },
clusterTime: Timestamp(...)
}
// Change event structure for UPDATE
{
operationType: "update",
documentKey: { _id: ObjectId("...") },
updateDescription: {
updatedFields: { age: 31, "address.city": "Mumbai" },
removedFields: ["oldField"],
truncatedArrays: []
}
}
resumeAfter or startAfter option. This ensures no events are missed. Store the resume token durably (e.g., in MongoDB itself or Redis) after each processed event. The difference: resumeAfter resumes after the given token; startAfter starts after an invalidate event. Token validity depends on oplog size — tokens expire if the oplog is older than the token's position.
// Persist resume token to avoid missing events on restart
let resumeToken = null;
async function watchWithResume() {
const options = resumeToken ? { resumeAfter: resumeToken } : {};
const changeStream = db.collection('orders').watch([], options);
for await (const change of changeStream) {
// Process the change
await processOrderChange(change);
// Save token AFTER successful processing
resumeToken = change._id;
await db.collection('_changeStreamState').updateOne(
{ _id: 'orders_stream' },
{ $set: { resumeToken } },
{ upsert: true }
);
}
}
// On startup: load saved token
const state = await db.collection('_changeStreamState').findOne({ _id: 'orders_stream' });
resumeToken = state?.resumeToken;
watch() method accepts an aggregation pipeline as its first argument, allowing you to filter and transform events server-side before they arrive. This is more efficient than filtering in application code because MongoDB only sends relevant events. The pipeline stages allowed in change streams are limited to: $match, $project, $addFields, $replaceRoot, $redact, and $set. Use $match on operationType, fullDocument fields, or documentKey. Filtered streams consume fewer resources and reduce event processing overhead in high-write collections.
// Only watch for order insertions with high value
const changeStream = db.collection('orders').watch([
{
$match: {
operationType: 'insert',
'fullDocument.amount': { $gte: 10000 }
}
}
]);
// Watch only specific operation types
const changeStream = db.collection('users').watch([
{ $match: { operationType: { $in: ['insert', 'delete'] } } }
]);
// Filter updates to specific fields
const changeStream = db.collection('inventory').watch([
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.stock': { $exists: true }
}
}
]);
collection.watch()), database (db.watch()), and deployment/client (client.watch()). Database-level streams emit events for all collections. Client-level streams watch all databases and collections. Higher-level streams include additional context — the ns (namespace) field in the event identifies which database and collection changed. This is useful for audit logging, data replication pipelines, and event buses. For high-traffic deployments, database/client-level streams can generate very high event volumes — always use filtering pipelines.
// Watch entire database — all collections
const dbStream = db.watch([
{ $match: { 'ns.coll': { $in: ['orders', 'payments'] } } }
]);
dbStream.on('change', event => {
console.log(`Change in ${event.ns.coll}:`, event.operationType);
});
// Watch entire deployment (all databases)
const deploymentStream = client.watch([
{ $match: { 'ns.db': { $ne: 'admin' } } } // exclude admin db
]);
// Event includes namespace info
{
operationType: "insert",
ns: { db: "shop", coll: "orders" }, // which db and collection
fullDocument: { ... }
}
// server.js — real-time order status updates
const io = require('socket.io')(httpServer);
async function startOrderWatcher() {
const changeStream = db.collection('orders').watch([
{ $match: { operationType: { $in: ['update', 'replace'] } } }
], { fullDocument: 'updateLookup' });
for await (const change of changeStream) {
const orderId = change.documentKey._id.toString();
const order = change.fullDocument;
// Emit only to users watching this specific order
io.to(`order:${orderId}`).emit('orderUpdate', {
orderId,
status: order.status,
updatedAt: order.updatedAt
});
}
}
// Client joins room when viewing order
socket.on('watchOrder', (orderId) => {
socket.join(`order:${orderId}`);
});
updateDescription (what changed) and documentKey — not the full document. To receive the complete updated document, set the fullDocument option to 'updateLookup'. MongoDB performs a secondary lookup to fetch the current document state. Note this is a post-change lookup — in rare race conditions (rapid successive updates), the fetched document may reflect a later state than the event. MongoDB 6.0 introduced 'whenAvailable' and 'required' options that use pre/post-image collections for guaranteed point-in-time snapshots on collections with changeStreamPreAndPostImages enabled.
// Get full document on update events
const changeStream = db.collection('orders').watch([], {
fullDocument: 'updateLookup' // fetch full document on update
});
changeStream.on('change', event => {
if (event.operationType === 'update') {
console.log('Full updated order:', event.fullDocument); // complete document
console.log('Changed fields only:', event.updateDescription.updatedFields);
}
});
// MongoDB 6.0+: precise pre/post images
db.createCollection('orders');
db.runCommand({
collMod: 'orders',
changeStreamPreAndPostImages: { enabled: true }
})
const precise = db.collection('orders').watch([], {
fullDocument: 'required', // post-image
fullDocumentBeforeChange: 'required' // pre-image
});
// Change stream on sharded collection — same API, mongos handles merging
const changeStream = db.collection('orders').watch([
{ $match: { 'fullDocument.region': 'APAC' } }
]);
// Change event includes clusterTime for ordering across shards
{
_id: { resumeToken... },
operationType: "insert",
clusterTime: Timestamp(1704000000, 1), // used for ordering
ns: { db: "shop", coll: "orders" },
fullDocument: { region: "APAC", amount: 5000, ... }
}
// Split brain prevention: change streams guarantee exactly-once
// delivery per event across shard merges
const redis = require('redis').createClient();
async function startCacheInvalidationWatcher() {
const changeStream = db.collection('products').watch([
{ $match: { operationType: { $in: ['update', 'replace', 'delete'] } } }
]);
for await (const change of changeStream) {
const productId = change.documentKey._id.toString();
// Invalidate specific cache keys
await redis.del(`product:${productId}`);
await redis.del(`product:${productId}:details`);
// Also invalidate list caches for this category
if (change.updateDescription?.updatedFields?.category) {
const oldCategory = change.updateDescription.updatedFields.category;
await redis.del(`products:category:${oldCategory}`);
}
console.log(`Cache invalidated for product ${productId}`);
}
}
async function watchWithRetry(collection, pipeline, handler) {
let resumeToken = await loadResumeToken();
let retryDelay = 1000;
while (true) {
try {
const options = { fullDocument: 'updateLookup' };
if (resumeToken) options.resumeAfter = resumeToken;
const stream = db.collection(collection).watch(pipeline, options);
for await (const change of stream) {
await handler(change);
resumeToken = change._id;
await saveResumeToken(resumeToken);
retryDelay = 1000; // reset on success
}
} catch (err) {
if (err.code === 136 || err.message.includes('invalidate')) {
console.error('Stream invalidated — collection dropped. Stopping.');
break; // non-resumable
}
console.error(`Stream error, retrying in ${retryDelay}ms:`, err.message);
await sleep(retryDelay);
retryDelay = Math.min(retryDelay * 2, 30000); // max 30s backoff
}
}
}
// BAD — polling approach (wastes resources, adds latency)
setInterval(async () => {
const newOrders = await db.collection('orders').find({
status: 'pending',
createdAt: { $gt: lastCheckTime }
}).toArray();
if (newOrders.length) processOrders(newOrders);
lastCheckTime = new Date();
}, 5000); // 5 second delay, 17,280 queries/day even if no changes!
// GOOD — Change Stream (push-based, immediate, efficient)
const stream = db.collection('orders').watch([
{ $match: { operationType: 'insert', 'fullDocument.status': 'pending' } }
]);
for await (const change of stream) {
await processOrder(change.fullDocument); // immediate, zero wasted queries
}
// Comprehensive audit trail
const changeStream = db.collection('financialRecords').watch([], {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'required' // MongoDB 6.0+, requires preAndPostImages enabled
});
for await (const change of changeStream) {
await db.collection('audit_log').insertOne({
timestamp: change.clusterTime,
collection: change.ns.coll,
operationType: change.operationType,
documentId: change.documentKey._id,
before: change.fullDocumentBeforeChange || null,
after: change.fullDocument || null,
changedFields: change.updateDescription?.updatedFields || null,
// Capture user context if passed via session variables
modifiedBy: change.fullDocument?.lastModifiedBy || 'system'
});
}
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client({ node: 'http://localhost:9200' });
const changeStream = db.collection('products').watch([], {
fullDocument: 'updateLookup'
});
for await (const change of changeStream) {
const { operationType, documentKey, fullDocument } = change;
if (operationType === 'insert' || operationType === 'update' || operationType === 'replace') {
// Sync to Elasticsearch
await esClient.index({
index: 'products',
id: documentKey._id.toString(),
document: {
name: fullDocument.name,
price: fullDocument.price,
category: fullDocument.category,
updatedAt: new Date()
}
});
} else if (operationType === 'delete') {
await esClient.delete({ index: 'products', id: documentKey._id.toString() });
}
}
// Check oplog configuration
use local
db.oplog.rs.stats()
// Check oplog size and retention window
// Requirements check
db.adminCommand({ isMaster: 1 }).setName // must not be null (must be RS)
db.serverStatus().storageEngine.name // must be "wiredTiger"
// Unsupported pipeline stages (will throw error):
db.collection('data').watch([
{ $lookup: { ... } } // ERROR: not allowed in change stream pipeline
]);
// Performance: limit open streams
// BAD — opening stream per user request
app.get('/watch/:id', (req, res) => {
const stream = db.collection('orders').watch(); // creates millions of cursors!
});
// GOOD — single shared stream, broadcast via WebSocket rooms
db.hello().operationTime or from any server response. This is different from resumeAfter (which uses a stored token from a previous stream) — startAtOperationTime uses an absolute time. It's commonly used when deploying a new consumer service that needs to catch up from a known point without a stored token.
const { Timestamp } = require('mongodb');
// Get cluster time before starting an operation
const startTime = db.hello().operationTime; // current cluster time
// Perform some operations that we want to capture
await db.collection('orders').insertMany(bulkOrders);
// Open change stream from BEFORE those operations
// (will capture them and all future changes)
const changeStream = db.collection('orders').watch([], {
startAtOperationTime: startTime
});
// Replay from 1 hour ago (within oplog window)
const oneHourAgo = Timestamp.fromNumber(
Math.floor((Date.now() - 3600000) / 1000)
);
const historicalStream = db.collection('orders').watch([], {
startAtOperationTime: oneHourAgo
});