MongoDB

Replication & Sharding

15 Questions

A replica set is a group of MongoDB instances (typically 3 or more) that maintain the same data through automatic replication. One member is the primary and accepts all writes; the others are secondaries that replicate from the primary's oplog (operations log). If the primary fails, the remaining members hold an election to choose a new primary within seconds, providing automatic failover. The minimum recommended replica set size is 3 nodes to ensure a majority can always elect a primary. Replica sets are the foundation of MongoDB's high availability architecture.
// Initialize a replica set in the shell
rs.initiate({
  _id: "myReplicaSet",
  members: [
    { _id: 0, host: "mongo1:27017" },
    { _id: 1, host: "mongo2:27017" },
    { _id: 2, host: "mongo3:27017" }
  ]
})

// Check replica set status
rs.status()

// Connection string with replica set
mongodb://mongo1:27017,mongo2:27017,mongo3:27017/mydb?replicaSet=myReplicaSet

Why it matters: Replica sets are the foundation of MongoDB's high availability and a core concept tested in every interview focused on production systems and reliability.

Real applications: All production MongoDB deployments use replica sets to eliminate single points of failure; when the primary crashes, a new primary is elected within seconds, ensuring continuous availability.

Common mistakes: Using fewer than 3 members (preventing majority elections), treating arbiters as a substitute for data nodes, or misconfiguring replica set names in connection strings.

The oplog (operations log) is a special capped collection (local.oplog.rs) on the primary that records every write operation as an idempotent statement. Secondaries continuously poll the primary's oplog and replay operations in order. Because the oplog uses idempotent operations (e.g., $set instead of $inc), replaying an operation multiple times yields the same result. Oplog size is critical — if a secondary falls too far behind (e.g., during network partition), it can no longer find its last seen position in the oplog and must perform an initial sync (full data copy). Monitor rs.printReplicationInfo() to check oplog capacity.
// Check oplog status
rs.printReplicationInfo()
// Output: configured oplog size: 990MB
//         log length start to end: 86400 secs (24hrs)
//         oplog first event time: ...

// Check secondary replication lag
rs.printSecondaryReplicationInfo()

// View raw oplog entries
use local
db.oplog.rs.find().sort({ $natural: -1 }).limit(5)

Why it matters: Understanding oplog mechanics is essential for comprehending how replication actually works and recognizing when to investigate replication lag issues.

Real applications: When a secondary falls too far behind and runs out of oplog history, it must perform an expensive full data resync; monitoring oplog capacity prevents this automatic syncing from triggering unexpectedly.

Common mistakes: Not understanding idempotent operations (use $set, not $inc for replication), ignoring oplog size during planning, or assuming small oplog sizes are acceptable on high-write systems.

An arbiter is a lightweight replica set member that participates in elections but holds no data. It provides an odd number of voting members to prevent election splits without the cost of a full data-bearing node. Use an arbiter when you have only 2 data nodes and need a tiebreaker for elections. However, MongoDB recommends against arbiters in new deployments — adding a third data node is preferred because it also serves reads and provides an additional data backup. Arbiters cannot become primary, cannot serve reads, and do not provide data redundancy.
// Add an arbiter to existing replica set
rs.addArb("mongo-arbiter:27017")

// Arbiter config (priority and votes set to 0 for data members optionally)
rs.reconfig({
  _id: "myRS",
  members: [
    { _id: 0, host: "mongo1:27017" },
    { _id: 1, host: "mongo2:27017" },
    { _id: 2, host: "mongo-arbiter:27017", arbiterOnly: true }
  ]
})

// Check member roles
rs.status().members.map(m => ({ host: m.name, state: m.stateStr }))

Why it matters: Arbiters are a special replica set member type; understanding their role and limitations shows grasp of election mechanics and HA architecture.

Real applications: Legacy two-node setups sometimes use arbiters for cost reasons, though MongoDB now recommends always adding a third full data node for better HA and read redundancy.

Common mistakes: Using an arbiter when a third data node is feasible, configuring arbiters to hold data or accept writes, or not realizing arbiters can't provide data backup or serve reads.

Sharding is MongoDB's horizontal scaling strategy that distributes data across multiple servers called shards. Each shard is itself a replica set. When a single server can no longer handle the data volume or write throughput, sharding partitions the collection across shards using a shard key. A sharded cluster consists of: shards (data), config servers (metadata, 3-node replica set), and mongos routers (query routing). Sharding allows virtually unlimited horizontal scaling for both storage and throughput, which is why companies like Instagram and Forbes use MongoDB sharding for their massive datasets.
// Enable sharding on a database
sh.enableSharding("myDatabase")

// Shard a collection on a key
sh.shardCollection("myDatabase.orders", { customerId: 1 })

// Check sharding status
sh.status()

// View chunk distribution
db.getSiblingDB("config").chunks.find(
  { ns: "myDatabase.orders" }
).count()

Why it matters: Sharding enables unlimited horizontal scaling and is essential knowledge for engineers working with massive datasets or very high write throughput systems.

Real applications: Companies like Instagram, Uber, and Airbnb shard across thousands of MongoDB nodes to handle petabytes of data and millions of operations per second.

Common mistakes: Sharding prematurely before vertical scaling limits are reached, choosing a poor shard key, or underestimating operational complexity that sharding introduces.

A good shard key must have high cardinality (many distinct values), even write distribution (avoid hotspots), and should match common query patterns for targeted queries. Monotonically increasing keys like timestamps or auto-increment IDs make terrible shard keys — all writes go to one shard (hotspot). Hashed shard keys distribute data randomly but scatter range queries across all shards. A compound shard key like { userId: 1, createdAt: 1 } balances distribution with query targeting. Consider using a zone sharding strategy for geographic or tenant-based partitioning.
// BAD — monotonically increasing (all writes to last shard)
sh.shardCollection("db.events", { createdAt: 1 })

// BAD — low cardinality (too few values = too few chunks)
sh.shardCollection("db.orders", { status: 1 })

// GOOD — hashed (even distribution, scattered ranges)
sh.shardCollection("db.users", { _id: "hashed" })

// GOOD — compound (balances distribution + targeted queries)
sh.shardCollection("db.orders", { customerId: 1, createdAt: 1 })

// Check chunk balance
sh.status()  // shows chunk counts per shard

Why it matters: Shard key selection is THE most critical decision for sharded clusters; a bad shard key creates permanent performance problems that are extremely difficult to fix later.

Real applications: A system that accidentally uses timestamps as a shard key will have all writes concentrated on the latest timestamp chunk, creating a massive performance bottleneck and preventing horizontal scaling.

Common mistakes: Using monotonically increasing keys (timestamps, sequences) causing writes to one shard, choosing low-cardinality keys reducing parallelization, or not considering query patterns in shard key design.

Targeted queries include the shard key, allowing the mongos router to direct the query to the specific shard(s) that hold the data. Scatter-gather queries lack the shard key, so mongos broadcasts to all shards and merges results — these are expensive and should be avoided in hot paths. Always include the shard key in your most frequent queries. If not possible, consider whether your shard key choice is wrong for your access pattern. Use explain() to check if a query is a SHARD_MERGE (scatter-gather) or SINGLE_SHARD (targeted).
// TARGETED — includes shard key (customerId)
db.orders.find({ customerId: "cust123", status: "pending" })
// mongos sends to ONLY the shard holding customerId: "cust123"

// SCATTER-GATHER — no shard key
db.orders.find({ status: "pending" })
// mongos broadcasts to ALL shards, merges results

// Check query type with explain
db.orders.find({ customerId: "cust123" })
  .explain("executionStats")
// Look for: winningPlan.stage: "SINGLE_SHARD" vs "SHARD_MERGE"

Why it matters: Query targeting efficiency directly determines sharding success; scatter-gather queries can degrade performance by orders of magnitude compared to targeted queries.

Real applications: A targeted query on a 100-shard cluster hits 1 shard in milliseconds; the same query without a shard key broadcasts to all 100 shards, combines results from each, and takes seconds.

Common mistakes: Not including the shard key in queries, not using explain() to verify query targeting, or failing to redesign queries to include the shard key when possible.

A replica set election is triggered when the primary becomes unavailable (network failure, crash, or maintenance). Eligible members initiate an election by sending replSetRequestVotes to other members. A member wins by receiving votes from a majority of all voting members (not just available ones). MongoDB uses the Raft consensus algorithm for elections with priority and oplog recency as tiebreakers. The election process typically completes in under 12 seconds (elections were faster in MongoDB 3.2+ with electionTimeoutMillis = 10s). During election, the cluster is in a read-only state — writes are temporarily unavailable.
// Manually trigger an election (step down current primary)
rs.stepDown(60) // primary steps down for 60 seconds

// Check who is primary
rs.isMaster() // or rs.hello() in newer versions
// { ismaster: true, primary: "mongo1:27017", ... }

// Set member priority (higher = preferred primary)
cfg = rs.conf()
cfg.members[0].priority = 2  // preferred primary
cfg.members[1].priority = 1
cfg.members[2].priority = 0  // never becomes primary
rs.reconfig(cfg)

Why it matters: Elections are the core of replica set failover; understanding election mechanics and timing is crucial for designing resilient systems.

Real applications: In production, you design failover expectations knowing elections take 10-30 seconds; applications must handle temporary write unavailability gracefully.

Common mistakes: Not configuring member priorities, assuming elections are instant, using arbiters as tiebreakers when they cause confusion, or failing to account for election time in SLA calculations.

MongoDB divides sharded collection data into chunks (default 128MB). The balancer runs on the config servers and automatically migrates chunks between shards to maintain even distribution. When a shard has too many chunks compared to others (threshold: 8 chunks by default), the balancer moves chunks to underloaded shards. Chunk migrations are transparent to applications but consume I/O and network resources. You can disable the balancer during maintenance windows or scheduled jobs. Moving a chunk involves copying all documents in that chunk's key range to the destination shard, then updating the config server metadata.
// Check balancer status
sh.isBalancerRunning()
sh.getBalancerState()

// Disable balancer (e.g., during backup)
sh.stopBalancer()
sh.startBalancer()

// Manual chunk split and move
sh.splitAt("mydb.orders", { customerId: "cust500" })
sh.moveChunk("mydb.orders", { customerId: "cust500" }, "shard2")

// View current chunk distribution
db.getSiblingDB("config").chunks.aggregate([
  { $group: { _id: "$shard", count: { $sum: 1 } } }
])

Why it matters: Chunk migration is the background process that enables sharding's load balancing; understanding its mechanics helps diagnose performance issues during data rebalancing.

Real applications: When adding a new shard to a 10-shard cluster, the balancer automatically migrates billions of chunks to distribute data evenly, consuming I/O resources for days or weeks.

Common mistakes: Permanently disabling the balancer without remembering to enable it, not considering chunk migration I/O when scheduling backups, or adding too many shards at once causing migration storms.

Zone sharding (previously tag-aware sharding) assigns specific shard key ranges to designated shards, enabling geographic data locality or tenant isolation. This is used for data residency compliance (GDPR, data sovereignty), multi-tenant SaaS applications, and tiered storage (hot/warm/cold data on different hardware). Zones are defined on shards using sh.addShardTag() (or sh.addShardToZone() in newer versions) and key ranges are mapped to zones with sh.addTagRange(). All documents with shard keys in the defined range are routed exclusively to shards in that zone.
// Assign zones to shards
sh.addShardToZone("shard-us-east", "US")
sh.addShardToZone("shard-eu-west", "EU")

// Map key ranges to zones (countryCode as shard key)
sh.updateZoneKeyRange(
  "mydb.users",
  { countryCode: "US-MIN" }, // min key
  { countryCode: "US-MAX" }, // max key
  "US"
)
sh.updateZoneKeyRange(
  "mydb.users",
  { countryCode: "DE" },
  { countryCode: "DZ" },
  "EU"
)

Replication lag is the delay between when the primary writes an operation to the oplog and when a secondary applies it. High lag means secondaries are behind the primary, and read operations on secondaries may return stale data. Common causes: high write volume, under-resourced secondary hardware, network congestion, or long-running operations blocking secondary replication. Monitor lag with rs.printSecondaryReplicationInfo(). Reduce it by upgrading secondary hardware, increasing oplog size, enabling secondary flow control (MongoDB 4.2+), or adjusting write concern to allow the secondary to acknowledge receipt before fully applying operations.
// Monitor replication lag
rs.printSecondaryReplicationInfo()
// Output:
// source: mongo2:27017
// syncedTo: Thu Jan 01 2026 12:00:00 GMT+0000
// 5 secs (0.08 hrs) behind the primary

// Programmatic lag check
rs.status().members.forEach(m => {
  if (m.state === 2) { // SECONDARY
    const lagSecs = (new Date() - m.optimeDate) / 1000;
    print(`${m.name} lag: ${lagSecs}s`);
  }
});

// Increase oplog size (avoid secondary needing full re-sync)
// In mongod.conf:
// replication:
//   oplogSizeMB: 10240

Why it matters: Replication lag directly impacts data consistency guarantees and reliability; monitoring lag is mandatory in production for detecting problems early.

Real applications: If secondaries lag 1 hour behind the primary and the primary fails, you lose 1 hour of data not yet replicated to the secondary that gets promoted to primary.

Common mistakes: Not monitoring lag until users complain about stale reads, ignoring flow control features that prevent excessive lag, or upgrading hardware without verifying lag improves.

During a failover, write operations fail with NotMasterError or NotWritablePrimary until a new primary is elected (typically 10–30 seconds). Applications using the MongoDB driver with retryable writes (enabled by default since MongoDB 4.0) automatically retry eligible write operations once after a failover, providing seamless handling for transient errors. Operations that ran on the old primary but weren't yet replicated may be rolled back to a rollback directory. Design applications to expect and handle write errors during elections. Use a connection string with all replica set members so the driver discovers the new primary automatically.
// Retryable writes enabled by default in modern drivers
const client = new MongoClient(
  "mongodb://mongo1,mongo2,mongo3/?replicaSet=rs0",
  { retryWrites: true } // default true in recent drivers
);

// Driver automatically retries this on failover:
await db.collection('orders').insertOne(order);

// For operations that CANNOT be retried (multi-document without session):
try {
  await db.collection('logs').insertMany(batch);
} catch (err) {
  if (err.code === 91 || err.message.includes('NotWritablePrimary')) {
    // Wait and retry after new primary election
    await sleep(5000);
    await db.collection('logs').insertMany(batch);
  }
}

Why it matters: Failover handling is crucial for building resilient applications; understanding write error scenarios helps design robust error handling logic.

Real applications: During a failover, applications must gracefully handle write errors and retry; modern MongoDB drivers support retryable writes that make this transparent to most applications.

Common mistakes: Not enabling retryable writes, assuming writes never fail during normal operation, or not having retry logic in critical write paths.

A hidden member replicates data like a normal secondary but is invisible to client applications — it never receives read requests via read preferences and is never elected primary (priority must be 0). Hidden members are ideal for dedicated backups, running intensive analytics, reporting, or ETL jobs that would otherwise impact the primary or application-facing secondaries. They can participate in elections as voters or be configured as non-voting. To take a point-in-time backup, you can lock a hidden member with db.fsyncLock() without impacting application availability.
// Configure a hidden member
cfg = rs.conf()
cfg.members[3] = {
  _id: 3,
  host: "mongo-hidden:27017",
  priority: 0,   // cannot become primary
  hidden: true,  // invisible to client read preferences
  votes: 1       // still participates in elections
}
rs.reconfig(cfg)

// Verify (hidden members won't appear in isMaster().hosts)
rs.conf().members.filter(m => m.hidden)

// Use hidden member for backup
// On hidden member:
db.fsyncLock()
// ... take file system snapshot ...
db.fsyncUnlock()

Why it matters: Hidden members enable dedicated workload separation; understanding specialized member types shows advanced MongoDB architecture knowledge.

Real applications: Large data warehouses use hidden members to run nightly ETL jobs and analytics without impacting user-facing read and write performance.

Common mistakes: Configuring hidden members to accept user read traffic (defeating their purpose), setting wrong priority or votes, or not securing hidden members as they still replicate all data.

The mongos router acts as a query coordinator, holding a local cache of the cluster's chunk map from the config servers. When a query arrives, mongos inspects the shard key in the query predicate. For targeted queries (shard key present), it routes to the specific shard(s) owning that key range. For scatter-gather queries, it broadcasts to all shards and merges, sorts, and deduplicates results in memory. Aggregation pipelines with $sort and $group may require a merge pipeline on mongos. Always deploy multiple mongos instances behind a load balancer for high availability and distribute routing load.
// mongos connection (same driver API as standalone)
// Application connects to mongos, not shards directly
const client = new MongoClient("mongodb://mongos1:27017,mongos2:27017/mydb");

// mongos performs merge sort for sorted queries across shards
db.orders.find({ status: "shipped" }).sort({ createdAt: -1 }).limit(10)
// mongos: sends sort query to all shards, gets top 10 from each,
// then merges all results, re-sorts, returns final top 10

// Aggregation on sharded cluster: $sort, $group happen on mongos
db.orders.aggregate([
  { $match: { status: "shipped" } },
  { $group: { _id: "$customerId", total: { $sum: "$amount" } } }
])

Why it matters: Understanding mongos routing is essential for diagnosing sharded cluster performance issues and designing efficient queries.

Real applications: Developers must learn that aggregation pipelines execute stages on shards first (filtered data as early as possible), then the merge pipeline runs on mongos for final operations.

Common mistakes: Running $sort or $group early in pipelines without upstream $match (forcing all shards to sort/group first), not realizing mongos must hold merge results in memory.

Vertical scaling (scaling up) means adding more CPU, RAM, or disk to the existing server. It's limited by hardware maximums and becomes prohibitively expensive. Horizontal scaling (scaling out) distributes load across multiple commodity servers using sharding. MongoDB is designed for horizontal scaling — adding shards linearly increases both storage capacity and write throughput. The rule of thumb: start with vertical scaling (simpler, no sharding complexity), add replica sets for HA, and introduce sharding only when a single replica set can no longer handle your workload. Sharding introduces significant operational complexity and should not be used prematurely.
// Cluster architecture decision:
// Single node → replica set → sharded cluster

// Phase 1: Single mongod (development/small apps)
// mongodb://localhost:27017

// Phase 2: Replica set (HA, read scaling)
// mongodb://m1:27017,m2:27017,m3:27017/?replicaSet=rs0

// Phase 3: Sharded cluster (write scaling beyond single RS)
// Connect to mongos:
// mongodb://mongos1:27017,mongos2:27017/mydb

// Monitor when to shard: watch for
// - Single shard CPU consistently > 80%
// - Disk I/O saturation
// - Write throughput hitting hardware limits

Why it matters: Knowing when to scale and how (vertical vs. horizontal) determines whether you build a sustainable or problematic system; premature sharding adds massive operational complexity.

Real applications: Start with a single replica set; only introduce sharding when a single shard replica set consistently hits resource limits even after optimization.

Common mistakes: Sharding from day one (premature optimization), not maximizing vertical scaling first, or underestimating operational overhead of sharded clusters.

A delayed member replicates data with a configured time lag (e.g., 1 hour behind the primary). This provides a rolling backup window — if data is accidentally deleted or corrupted on the primary, the delayed member still has the pre-corruption state up to the delay duration. Delayed members must have priority 0, can be hidden, and are ideal for protection against human errors (accidental db.dropCollection(), wrong bulk updates). The delay is set via the secondaryDelaySecs (previously slaveDelay) field. Note: the delayed member's data is always behind, so it should never serve application reads.
// Configure a delayed member (1 hour delay)
cfg = rs.conf()
cfg.members[2] = {
  _id: 2,
  host: "mongo-delayed:27017",
  priority: 0,              // cannot become primary
  hidden: true,             // invisible to drivers
  secondaryDelaySecs: 3600  // 1 hour delay
}
rs.reconfig(cfg)

// Verify
rs.conf().members.find(m => m.secondaryDelaySecs > 0)
// { host: 'mongo-delayed:27017', secondaryDelaySecs: 3600 }

// Use case: accidental drop recovery
// Primary: db.users.drop()  <- disaster!
// Delayed member still has users data for next 60 minutes

Why it matters: Delayed members provide a safety net against human errors and accidental data destruction; understanding this protection mechanism is valuable for building resilient systems.

Real applications: If an automated job or developer query accidentally drops a collection or deletes millions of records, the delayed member preserves the data in a recoverable state.

Common mistakes: Using delayed members as read replicas (they have stale data), setting delays too short to be useful, or not documenting the point-in-time recovery procedure.