Chapter 7 Flashcards — Sharding
flashcards ddia-2e chapter7 sharding
Foundations
What is sharding and why does the 2nd edition use “sharding” instead of “partitioning”?
?
- Sharding: Splitting a large dataset across multiple nodes so each node holds a subset of the data
- Why rename: The 2nd edition adopts industry terminology — Elasticsearch calls them shards, MongoDB calls them shards, Redis Cluster calls them slots backed by shards. “Partitioning” was academic; “sharding” is what practitioners say
- Why shard: Dataset too large for one node; read/write throughput too high for one node; latency requires geographic distribution; regulatory data residency requirements
- Key orthogonality: Sharding and replication are independent — each shard is typically replicated to multiple nodes for availability
What is a hot spot in a sharded database and what are its two root causes?
?
- Hot spot: One shard receives disproportionately high read or write load compared to others
- Cause 1 — Sequential keys: Timestamp keys → all new writes go to “today’s” shard; auto-increment IDs → always hit the highest shard
- Fix: Prefix timestamp with entity ID (sensor_id, user_id) to spread across shards
- Cause 2 — Access frequency skew: Celebrity/viral content — a few keys are read by millions simultaneously; hash sharding doesn’t help because it’s the access frequency, not key distribution, that’s skewed
- Fix: Caching layer (Redis/CDN) + optional random suffix for write spreading
- Key insight: Hash sharding solves distribution imbalance but not access-frequency imbalance
Sharding Strategies
What are the two main sharding strategies and their fundamental trade-off?
?
Key-range sharding:
- Assign continuous key ranges to shards (like encyclopedia volumes A-D, E-H, etc.)
- Keys stored sorted within shards → range queries scan one shard
- ✅ Range scans efficient; ✅ Co-locate related keys (same user’s events)
- ❌ Sequential key hot spots (timestamps → all writes to latest shard)
- Used by: HBase, Bigtable, CockroachDB, Spanner
Hash sharding:
- Hash the key; hash value determines shard
- Even distribution regardless of key patterns
- ✅ Uniform distribution; ✅ No sequential hot spots
- ❌ Range queries require scatter/gather across all shards
- Used by: Cassandra, DynamoDB, MongoDB (hash), Redis Cluster
Fundamental trade-off: Range query efficiency vs uniform distribution — you cannot have both simultaneously without compound keys
What is consistent hashing and how does it minimize rebalancing overhead?
?
- Concept: Map the hash space to a ring (0 to 2^64); place nodes at random positions on the ring
- Key routing: A key hashes to a point on the ring → goes to the first node clockwise from that point
- Adding a node:
- New node placed at a random ring position
- Only keys between the new node and its predecessor on the ring need to move
- All other keys remain unchanged
- Moves only ~1/N of the total data (N = new node count)
- Removing a node: Only that node’s keys move to its successor
- Virtual nodes (vnodes): Each physical node owns multiple ring positions → smoother distribution and finer-grained rebalancing
- Used by: Amazon Dynamo, Cassandra (vnodes), Ketama/libketama
How does a compound shard key (Cassandra-style) combine hash and range sharding?
?
- Compound key:
(partition_key, clustering_key) - Partition key: Hashed to determine which shard; guarantees uniform distribution
- Cannot do range queries across different partition keys
- Clustering key: Sorted within the shard; enables range queries within one shard
- Example:
PRIMARY KEY (user_id, created_at)- All events for
user_id=aliceland on one shard (hashed) - Within that shard, events sorted by
created_at - Query:
WHERE user_id='alice' AND created_at BETWEEN t1 AND t2→ single shard, efficient range scan - Cannot query all users’ events in a time range across shards (that requires scatter/gather)
- All events for
- Limitation: Range queries only work within one partition key’s shard; cross-partition range = scatter
Multitenancy Sharding
What are the three tenancy isolation models for SaaS platforms and their trade-offs?
?
Row-level isolation:
- All tenants share tables;
tenant_idcolumn filters rows - Shard key includes
tenant_idto co-locate tenant data - ✅ Lowest overhead; ✅ Simple schema management
- ❌ Noisy neighbor: one heavy tenant degrades others on same shard
- ❌ Bugs can expose cross-tenant data
- Used by: high-volume shared SaaS platforms
Schema-level isolation:
- Each tenant gets a dedicated schema in the shared database
- ✅ Moderate isolation; ✅ Per-tenant backup/restore possible
- ❌ Schema count limits (PostgreSQL: hundreds practical, not thousands)
- ❌ Cross-tenant aggregation harder
- Used by: Shopify, mid-tier SaaS
Database-level isolation:
- Each tenant gets a dedicated database (or cluster)
- ✅ Full isolation; ✅ Compliance-friendly (GDPR deletion = drop database)
- ❌ High operational overhead; ❌ Hard to aggregate cross-tenant analytics
- Used by: Enterprise SaaS with large, regulated customers
Hybrid: Small tenants share row-level shards; large/enterprise tenants get dedicated shards
What is the “noisy neighbor” problem in multitenant sharding and how is it mitigated?
?
- Problem: One tenant with heavy workload consumes resources on a shared shard, degrading performance for all co-located tenants
- Why it happens: Row-level or schema-level isolation shares the same underlying shard; a bursty tenant monopolizes CPU/disk I/O
- Mitigation strategies:
- Rate limiting per tenant at the application layer — cap tenant QPS before hitting DB
- Tenant-aware load routing — detect hot tenants, route to less-loaded nodes dynamically
- Tier-based isolation — top N tenants (by usage) get dedicated shards; rest share pool
- Auto-scaling — serverless databases (DynamoDB, Aurora Serverless) absorb burst per tenant
- Credit/quota system — burst credits allow temporary spikes without sustained impact on neighbors
- Key insight: The tenant isolation model (row vs schema vs DB) directly determines how much blast radius a noisy tenant can create
Secondary Indexes
What is a local (document-partitioned) secondary index and what are its trade-offs?
?
- How it works: Each shard maintains its own secondary index for its local data only
- Write: Updates only the local shard’s index → single-shard write, fast
- Read: Must query ALL shards and merge results (scatter/gather)
- Tail latency = slowest responding shard (P99 degradation amplifies with shard count)
- Consistency: Immediately consistent with the data on that shard
- Concrete example:
- Shard 0: has cars [red_01, blue_02]; local index: {red: [red_01], blue: [blue_02]}
- Shard 1: has cars [red_07, blue_08]; local index: {red: [red_07], blue: [blue_08]}
- Query “color=red”: scatter to Shard 0 AND Shard 1, gather [red_01, red_07]
- Used by: MongoDB, Elasticsearch (search = scatter to all shards), Cassandra (ALLOW FILTERING), VoltDB
What is a global (term-partitioned) secondary index and what are its trade-offs?
?
- How it works: One global index, itself sharded by the index term (not by primary key)
- Term “red” → all pointers to red items across all data shards stored in one index shard
- Write: Must write to BOTH the data shard (primary key) AND the index shard for the indexed term → cross-shard write; index updates often async
- Read: Query ONE index shard → get pointers → fetch data (fast secondary lookup)
- Consistency: Often eventually consistent — async index updates mean index may be slightly stale
- Concrete example:
- Index shard for “red”: {red → [(shard0, car_01), (shard1, car_07), (shard2, car_16)]}
- Query “color=red”: hit one index shard → get 3 pointers → fetch those 3 records
- Used by: DynamoDB Global Secondary Indexes (async, eventually consistent), Riak Search
What is scatter/gather and why does it amplify tail latency?
?
- Scatter: Send the query to ALL shards simultaneously (fan-out)
- Gather: Wait for ALL shards to respond, then merge results
- Tail latency amplification: Total query time = max(all shard response times)
- If each shard has P99 = 50ms, with 50 shards, the chance of at least one shard hitting P99 is near 100%
- In practice, a 50-shard scatter/gather query has P99 much higher than a single-shard query
- Why it happens: Must wait for the slowest shard (stragglers), cannot return partial results for most queries
- Failure sensitivity: A single slow or unavailable shard blocks the entire query
- Mitigation:
- Use global secondary indexes to avoid scatter/gather
- Speculative execution (send to all, return when k-of-N respond, cancel rest)
- Limit scatter/gather queries to background jobs, not interactive paths
Rebalancing
What are the three rebalancing strategies and when would you choose each?
?
1. Fixed shard count (Elasticsearch, Riak, Couchbase):
- Create many more shards than nodes at creation (e.g., 1000 for 10 nodes)
- Add node: move whole shards (no splitting); remove node: redistribute its shards
- ✅ Simple logic; ✅ No shard-splitting complexity
- ❌ Shard count fixed at creation; too few = can’t scale; too many = per-shard overhead
- Choose when: Shard count is predictable at creation time; cluster size changes are infrequent
2. Dynamic splitting (HBase, MongoDB, RethinkDB):
- Split when shard > threshold; merge when shard < threshold
- ✅ Auto-adapts to data volume; no pre-planning needed
- ❌ Cold-start hotspot (single shard until first split); split storms possible
- Choose when: Data growth is unpredictable; initial data volume is unknown
3. Proportional to nodes / vnodes (Cassandra):
- Each node owns N virtual nodes; add node → steal random vnodes from existing nodes
- ✅ Scales naturally; finer granularity with more nodes
- ❌ Gossip overhead grows; random stealing can cause temporarily uneven distribution
- Choose when: Cluster nodes are added/removed frequently; need gradual rebalancing
Why is automatic rebalancing dangerous during a failure event?
?
- Scenario:
- Node A slows down (due to GC pause, network issue, or disk problem)
- Auto-rebalancer interprets slow response as node failure
- Rebalancer triggers: moves A’s shards to nodes B, C, D
- Moving shards = heavy disk reads on A + heavy network + disk writes on B, C, D
- B, C, D are now handling both normal traffic AND rebalancing I/O → they slow down
- Rebalancer now detects B, C as potentially down → triggers more moves → cascading failure
- Real risk: Rebalancing a single node’s data can generate network traffic equivalent to that node’s entire data size — multiplied across multiple recovery replications
- Best practices:
- Add delay (e.g., wait 30 minutes) before triggering rebalancing on a suspected failure
- Require quorum agreement that a node is truly down
- Rate-limit rebalancing bandwidth
- Require human operator approval for large rebalancing operations in production
Request Routing
What are the three request routing approaches and what coordination service do they use?
?
1. Any-node forwarding (gossip):
- Client contacts any node; that node forwards to the correct shard owner if needed (1 extra hop in data path)
- Nodes learn shard assignments via gossip protocol
- Used by: Cassandra
2. Routing tier:
- Clients contact a dedicated routing service (proxy) that knows current shard→node mapping
- Router forwards to correct node (1 extra hop before data path)
- Routing tier subscribes to ZooKeeper/etcd for assignment updates
- Used by: Kafka (ZooKeeper/KRaft), some MongoDB deployments
3. Partition-aware clients:
- Client library maintains current shard map (subscribes to ZooKeeper/etcd)
- Client contacts correct node directly (no extra hops)
- Most efficient; requires client-side complexity
- Used by: Kafka producer/consumer API, Cassandra smart drivers
Coordination service: ZooKeeper, etcd, or Kafka KRaft stores authoritative shard→node mapping. All routing approaches depend on a consistent, fault-tolerant metadata store. Kafka 3.x+ replaces ZooKeeper with KRaft (Raft-based metadata stored in Kafka itself).
Comparison and Trade-offs
Compare key-range sharding vs hash sharding on 5 dimensions.
?
| Dimension | Key-Range | Hash |
|---|---|---|
| Range queries | Efficient (single shard) | Inefficient (scatter/gather) |
| Distribution | Uneven (some ranges have more data) | Uniform (hash evenly distributes) |
| Sequential hot spots | Yes (e.g., timestamp → today’s shard) | No (hash breaks sequential patterns) |
| Access-frequency hot spots | Yes (popular key range) | Yes (popular key, regardless of hash) |
| Rebalancing | Range splitting (HBase, Spanner style) | Consistent hashing ring |
| Best for | Time-series, co-located range access | Random access, high-write uniformity |
Compare local vs global secondary indexes on 5 dimensions.
?
| Dimension | Local (Document-Partitioned) | Global (Term-Partitioned) |
|---|---|---|
| Write cost | Single shard | Cross-shard (2+ shards) |
| Write latency | Low | Higher |
| Read cost | Scatter/gather (all shards) | Single index shard |
| Read latency | High (tail latency problem) | Low |
| Consistency | Immediate (local shard) | Eventually consistent (async) |
| Best for | Write-heavy, infrequent secondary reads | Read-heavy secondary access |
Modern Context
How do NewSQL databases (CockroachDB, Spanner) handle sharding differently from traditional NoSQL?
?
Traditional NoSQL (Cassandra, DynamoDB): Application must explicitly choose shard key; partitioning is the application’s responsibility; cross-shard transactions require application-level coordination
NewSQL approach:
- CockroachDB: Data auto-split into ranges (~64 MB); ranges auto-rebalanced via Raft; load-based splitting (hot ranges split by QPS, not just size); application sees logical table
- Google Spanner: Auto range splits + TrueTime API + Paxos groups per range; globally consistent; 2PC across shard groups for cross-shard transactions
- YugabyteDB: PostgreSQL-compatible; Raft-based sharding; automatic rebalancing
Key differences:
- No shard key design decisions (ranges split automatically)
- Cross-shard ACID transactions are first-class (not bolted on)
- Rebalancing is continuous and automatic (with safety mechanisms)
- Higher latency for cross-shard ops vs single-shard NoSQL
Trade-off: Convenience and correctness at the cost of higher baseline latency; suitable for latency-tolerant, correctness-critical workloads
What sharding strategies does Kafka use for topics, and why does the partition key matter?
?
- Kafka model: A topic is divided into partitions (shards); each partition is an ordered, append-only log
- Assignment: Message key hashed → determines partition; no key = round-robin assignment
- Why partition key matters:
- Ordering: Messages with the same key always go to the same partition → ordering guaranteed per key; use
user_idas key to order all user events - Co-location: All events for same entity in one partition → simpler stateful stream processing
- Hot spots: Popular key (celebrity user_id) → hot partition → same problem as DB hot spots
- Ordering: Messages with the same key always go to the same partition → ordering guaranteed per key; use
- Consumer parallelism: Each partition consumed by at most one consumer in a group; max parallelism = partition count
- Routing metadata: Kafka 3.x+ uses KRaft (internal Raft log) instead of ZooKeeper to store partition→broker assignments
- Rebalancing: Adding partitions = simple (just add); reducing = hard (requires data migration); common pattern is to over-provision partitions at creation
Application Scenarios
A time-series IoT database stores readings keyed by (device_id, timestamp). The team wants to use hash sharding on device_id. What problem might arise and how would you fix it?
?
Problem: If a small number of devices generate the majority of traffic (a common IoT pattern — some devices report every second, others rarely), hash sharding on device_id creates access-frequency hot spots even though key distribution is even.
Additionally, range queries like “all readings for device X between t1 and t2” require scatter/gather across all shards because timestamp is not the shard key.
Fixes:
- Compound shard key:
(device_id)as partition key (hashed),timestampas clustering key (sorted within shard) — enables efficient time-range queries per device - Hot device detection: Identify top-N high-frequency devices; use random suffix (
device_X_00throughdevice_X_9) to split their writes across 10 shards; aggregate on read - Time-bucketed sharding: Shard key =
(device_id, time_bucket)where time_bucket groups readings into e.g., hourly buckets — distributes writes but requires knowing the bucket to query - Tiered storage: Recent data in hot shards; older data archived to cold shards or object storage (common in InfluxDB, TimescaleDB)
A secondary index on model_type in a product catalog contains only 5 distinct values (basic, standard, premium, enterprise, legacy). Is this a good global secondary index candidate? Why or why not?
?
Problem: A global secondary index with only 5 distinct values (low cardinality) creates unbalanced index shards — the “standard” bucket might contain 80% of products while “legacy” has 1%.
- The “standard” index shard becomes a hot spot: most queries and writes touch it
- Hash-partitioning the index by term doesn’t help — there are only 5 terms
Better approaches:
- Local secondary index for low-cardinality attributes: scatter/gather across N data shards is likely cheaper than a hot global index shard
- Filter at query layer: If the catalog is small enough, fetch all products from relevant shards and filter in application memory
- Materialized views: Pre-compute separate tables/collections per
model_type— denormalization, but eliminates the index problem entirely - Composite index key for better distribution:
(model_type, product_id_hash_bucket)— combine low-cardinality field with a high-cardinality bucket to spread the index
Key insight: Global secondary indexes work best for high-cardinality indexed attributes where each index entry points to a small number of rows
Quick Facts
What is the default number of shards in Elasticsearch vs virtual nodes in Cassandra?
?
Elasticsearch:
- Default: 1 primary shard per index (changed from 5 in ES 7.x+)
- Configurable at index creation; cannot change shard count after creation without reindexing
- Rule of thumb: 10–50 GB per shard is optimal
- Too many shards: each shard has JVM + Lucene overhead; memory pressure
Cassandra vnodes:
- Default: 256 virtual nodes (vnodes) per physical node
- 10-node cluster = 10 × 256 = 2,560 ring positions
- More vnodes = smoother rebalancing when adding/removing nodes
- More vnodes = more gossip overhead; typical trade-off is well-balanced at 256
DynamoDB:
- Partitions (shards) are fully managed; application cannot see or control shard count
- Automatic split/merge based on size (~10 GB) and throughput (~3,000 WCU / 10,000 RCU per partition)
- On-demand capacity: shards scale transparently with traffic
Total Cards: 20
Review Time: ~25 minutes
Priority: HIGH
Last Updated: 2026-05-29