Chapter 7 Flashcards — Sharding

flashcards ddia-2e chapter7 sharding


Foundations

What is sharding and why does the 2nd edition use “sharding” instead of “partitioning”?
?

  • Sharding: Splitting a large dataset across multiple nodes so each node holds a subset of the data
  • Why rename: The 2nd edition adopts industry terminology — Elasticsearch calls them shards, MongoDB calls them shards, Redis Cluster calls them slots backed by shards. “Partitioning” was academic; “sharding” is what practitioners say
  • Why shard: Dataset too large for one node; read/write throughput too high for one node; latency requires geographic distribution; regulatory data residency requirements
  • Key orthogonality: Sharding and replication are independent — each shard is typically replicated to multiple nodes for availability

What is a hot spot in a sharded database and what are its two root causes?
?

  • Hot spot: One shard receives disproportionately high read or write load compared to others
  • Cause 1 — Sequential keys: Timestamp keys → all new writes go to “today’s” shard; auto-increment IDs → always hit the highest shard
    • Fix: Prefix timestamp with entity ID (sensor_id, user_id) to spread across shards
  • Cause 2 — Access frequency skew: Celebrity/viral content — a few keys are read by millions simultaneously; hash sharding doesn’t help because it’s the access frequency, not key distribution, that’s skewed
    • Fix: Caching layer (Redis/CDN) + optional random suffix for write spreading
  • Key insight: Hash sharding solves distribution imbalance but not access-frequency imbalance

Sharding Strategies

What are the two main sharding strategies and their fundamental trade-off?
?
Key-range sharding:

  • Assign continuous key ranges to shards (like encyclopedia volumes A-D, E-H, etc.)
  • Keys stored sorted within shards → range queries scan one shard
  • ✅ Range scans efficient; ✅ Co-locate related keys (same user’s events)
  • ❌ Sequential key hot spots (timestamps → all writes to latest shard)
  • Used by: HBase, Bigtable, CockroachDB, Spanner

Hash sharding:

  • Hash the key; hash value determines shard
  • Even distribution regardless of key patterns
  • ✅ Uniform distribution; ✅ No sequential hot spots
  • ❌ Range queries require scatter/gather across all shards
  • Used by: Cassandra, DynamoDB, MongoDB (hash), Redis Cluster

Fundamental trade-off: Range query efficiency vs uniform distribution — you cannot have both simultaneously without compound keys

What is consistent hashing and how does it minimize rebalancing overhead?
?

  • Concept: Map the hash space to a ring (0 to 2^64); place nodes at random positions on the ring
  • Key routing: A key hashes to a point on the ring → goes to the first node clockwise from that point
  • Adding a node:
    • New node placed at a random ring position
    • Only keys between the new node and its predecessor on the ring need to move
    • All other keys remain unchanged
    • Moves only ~1/N of the total data (N = new node count)
  • Removing a node: Only that node’s keys move to its successor
  • Virtual nodes (vnodes): Each physical node owns multiple ring positions → smoother distribution and finer-grained rebalancing
  • Used by: Amazon Dynamo, Cassandra (vnodes), Ketama/libketama

How does a compound shard key (Cassandra-style) combine hash and range sharding?
?

  • Compound key: (partition_key, clustering_key)
  • Partition key: Hashed to determine which shard; guarantees uniform distribution
    • Cannot do range queries across different partition keys
  • Clustering key: Sorted within the shard; enables range queries within one shard
  • Example: PRIMARY KEY (user_id, created_at)
    • All events for user_id=alice land on one shard (hashed)
    • Within that shard, events sorted by created_at
    • Query: WHERE user_id='alice' AND created_at BETWEEN t1 AND t2 → single shard, efficient range scan
    • Cannot query all users’ events in a time range across shards (that requires scatter/gather)
  • Limitation: Range queries only work within one partition key’s shard; cross-partition range = scatter

Multitenancy Sharding

What are the three tenancy isolation models for SaaS platforms and their trade-offs?
?
Row-level isolation:

  • All tenants share tables; tenant_id column filters rows
  • Shard key includes tenant_id to co-locate tenant data
  • ✅ Lowest overhead; ✅ Simple schema management
  • ❌ Noisy neighbor: one heavy tenant degrades others on same shard
  • ❌ Bugs can expose cross-tenant data
  • Used by: high-volume shared SaaS platforms

Schema-level isolation:

  • Each tenant gets a dedicated schema in the shared database
  • ✅ Moderate isolation; ✅ Per-tenant backup/restore possible
  • ❌ Schema count limits (PostgreSQL: hundreds practical, not thousands)
  • ❌ Cross-tenant aggregation harder
  • Used by: Shopify, mid-tier SaaS

Database-level isolation:

  • Each tenant gets a dedicated database (or cluster)
  • ✅ Full isolation; ✅ Compliance-friendly (GDPR deletion = drop database)
  • ❌ High operational overhead; ❌ Hard to aggregate cross-tenant analytics
  • Used by: Enterprise SaaS with large, regulated customers

Hybrid: Small tenants share row-level shards; large/enterprise tenants get dedicated shards

What is the “noisy neighbor” problem in multitenant sharding and how is it mitigated?
?

  • Problem: One tenant with heavy workload consumes resources on a shared shard, degrading performance for all co-located tenants
  • Why it happens: Row-level or schema-level isolation shares the same underlying shard; a bursty tenant monopolizes CPU/disk I/O
  • Mitigation strategies:
    1. Rate limiting per tenant at the application layer — cap tenant QPS before hitting DB
    2. Tenant-aware load routing — detect hot tenants, route to less-loaded nodes dynamically
    3. Tier-based isolation — top N tenants (by usage) get dedicated shards; rest share pool
    4. Auto-scaling — serverless databases (DynamoDB, Aurora Serverless) absorb burst per tenant
    5. Credit/quota system — burst credits allow temporary spikes without sustained impact on neighbors
  • Key insight: The tenant isolation model (row vs schema vs DB) directly determines how much blast radius a noisy tenant can create

Secondary Indexes

What is a local (document-partitioned) secondary index and what are its trade-offs?
?

  • How it works: Each shard maintains its own secondary index for its local data only
  • Write: Updates only the local shard’s index → single-shard write, fast
  • Read: Must query ALL shards and merge results (scatter/gather)
    • Tail latency = slowest responding shard (P99 degradation amplifies with shard count)
  • Consistency: Immediately consistent with the data on that shard
  • Concrete example:
    • Shard 0: has cars [red_01, blue_02]; local index: {red: [red_01], blue: [blue_02]}
    • Shard 1: has cars [red_07, blue_08]; local index: {red: [red_07], blue: [blue_08]}
    • Query “color=red”: scatter to Shard 0 AND Shard 1, gather [red_01, red_07]
  • Used by: MongoDB, Elasticsearch (search = scatter to all shards), Cassandra (ALLOW FILTERING), VoltDB

What is a global (term-partitioned) secondary index and what are its trade-offs?
?

  • How it works: One global index, itself sharded by the index term (not by primary key)
    • Term “red” → all pointers to red items across all data shards stored in one index shard
  • Write: Must write to BOTH the data shard (primary key) AND the index shard for the indexed term → cross-shard write; index updates often async
  • Read: Query ONE index shard → get pointers → fetch data (fast secondary lookup)
  • Consistency: Often eventually consistent — async index updates mean index may be slightly stale
  • Concrete example:
    • Index shard for “red”: {red → [(shard0, car_01), (shard1, car_07), (shard2, car_16)]}
    • Query “color=red”: hit one index shard → get 3 pointers → fetch those 3 records
  • Used by: DynamoDB Global Secondary Indexes (async, eventually consistent), Riak Search

What is scatter/gather and why does it amplify tail latency?
?

  • Scatter: Send the query to ALL shards simultaneously (fan-out)
  • Gather: Wait for ALL shards to respond, then merge results
  • Tail latency amplification: Total query time = max(all shard response times)
    • If each shard has P99 = 50ms, with 50 shards, the chance of at least one shard hitting P99 is near 100%
    • In practice, a 50-shard scatter/gather query has P99 much higher than a single-shard query
  • Why it happens: Must wait for the slowest shard (stragglers), cannot return partial results for most queries
  • Failure sensitivity: A single slow or unavailable shard blocks the entire query
  • Mitigation:
    • Use global secondary indexes to avoid scatter/gather
    • Speculative execution (send to all, return when k-of-N respond, cancel rest)
    • Limit scatter/gather queries to background jobs, not interactive paths

Rebalancing

What are the three rebalancing strategies and when would you choose each?
?
1. Fixed shard count (Elasticsearch, Riak, Couchbase):

  • Create many more shards than nodes at creation (e.g., 1000 for 10 nodes)
  • Add node: move whole shards (no splitting); remove node: redistribute its shards
  • ✅ Simple logic; ✅ No shard-splitting complexity
  • ❌ Shard count fixed at creation; too few = can’t scale; too many = per-shard overhead
  • Choose when: Shard count is predictable at creation time; cluster size changes are infrequent

2. Dynamic splitting (HBase, MongoDB, RethinkDB):

  • Split when shard > threshold; merge when shard < threshold
  • ✅ Auto-adapts to data volume; no pre-planning needed
  • ❌ Cold-start hotspot (single shard until first split); split storms possible
  • Choose when: Data growth is unpredictable; initial data volume is unknown

3. Proportional to nodes / vnodes (Cassandra):

  • Each node owns N virtual nodes; add node → steal random vnodes from existing nodes
  • ✅ Scales naturally; finer granularity with more nodes
  • ❌ Gossip overhead grows; random stealing can cause temporarily uneven distribution
  • Choose when: Cluster nodes are added/removed frequently; need gradual rebalancing

Why is automatic rebalancing dangerous during a failure event?
?

  • Scenario:
    1. Node A slows down (due to GC pause, network issue, or disk problem)
    2. Auto-rebalancer interprets slow response as node failure
    3. Rebalancer triggers: moves A’s shards to nodes B, C, D
    4. Moving shards = heavy disk reads on A + heavy network + disk writes on B, C, D
    5. B, C, D are now handling both normal traffic AND rebalancing I/O → they slow down
    6. Rebalancer now detects B, C as potentially down → triggers more moves → cascading failure
  • Real risk: Rebalancing a single node’s data can generate network traffic equivalent to that node’s entire data size — multiplied across multiple recovery replications
  • Best practices:
    • Add delay (e.g., wait 30 minutes) before triggering rebalancing on a suspected failure
    • Require quorum agreement that a node is truly down
    • Rate-limit rebalancing bandwidth
    • Require human operator approval for large rebalancing operations in production

Request Routing

What are the three request routing approaches and what coordination service do they use?
?
1. Any-node forwarding (gossip):

  • Client contacts any node; that node forwards to the correct shard owner if needed (1 extra hop in data path)
  • Nodes learn shard assignments via gossip protocol
  • Used by: Cassandra

2. Routing tier:

  • Clients contact a dedicated routing service (proxy) that knows current shard→node mapping
  • Router forwards to correct node (1 extra hop before data path)
  • Routing tier subscribes to ZooKeeper/etcd for assignment updates
  • Used by: Kafka (ZooKeeper/KRaft), some MongoDB deployments

3. Partition-aware clients:

  • Client library maintains current shard map (subscribes to ZooKeeper/etcd)
  • Client contacts correct node directly (no extra hops)
  • Most efficient; requires client-side complexity
  • Used by: Kafka producer/consumer API, Cassandra smart drivers

Coordination service: ZooKeeper, etcd, or Kafka KRaft stores authoritative shard→node mapping. All routing approaches depend on a consistent, fault-tolerant metadata store. Kafka 3.x+ replaces ZooKeeper with KRaft (Raft-based metadata stored in Kafka itself).


Comparison and Trade-offs

Compare key-range sharding vs hash sharding on 5 dimensions.
?

DimensionKey-RangeHash
Range queriesEfficient (single shard)Inefficient (scatter/gather)
DistributionUneven (some ranges have more data)Uniform (hash evenly distributes)
Sequential hot spotsYes (e.g., timestamp → today’s shard)No (hash breaks sequential patterns)
Access-frequency hot spotsYes (popular key range)Yes (popular key, regardless of hash)
RebalancingRange splitting (HBase, Spanner style)Consistent hashing ring
Best forTime-series, co-located range accessRandom access, high-write uniformity

Compare local vs global secondary indexes on 5 dimensions.
?

DimensionLocal (Document-Partitioned)Global (Term-Partitioned)
Write costSingle shardCross-shard (2+ shards)
Write latencyLowHigher
Read costScatter/gather (all shards)Single index shard
Read latencyHigh (tail latency problem)Low
ConsistencyImmediate (local shard)Eventually consistent (async)
Best forWrite-heavy, infrequent secondary readsRead-heavy secondary access

Modern Context

How do NewSQL databases (CockroachDB, Spanner) handle sharding differently from traditional NoSQL?
?
Traditional NoSQL (Cassandra, DynamoDB): Application must explicitly choose shard key; partitioning is the application’s responsibility; cross-shard transactions require application-level coordination

NewSQL approach:

  • CockroachDB: Data auto-split into ranges (~64 MB); ranges auto-rebalanced via Raft; load-based splitting (hot ranges split by QPS, not just size); application sees logical table
  • Google Spanner: Auto range splits + TrueTime API + Paxos groups per range; globally consistent; 2PC across shard groups for cross-shard transactions
  • YugabyteDB: PostgreSQL-compatible; Raft-based sharding; automatic rebalancing

Key differences:

  • No shard key design decisions (ranges split automatically)
  • Cross-shard ACID transactions are first-class (not bolted on)
  • Rebalancing is continuous and automatic (with safety mechanisms)
  • Higher latency for cross-shard ops vs single-shard NoSQL

Trade-off: Convenience and correctness at the cost of higher baseline latency; suitable for latency-tolerant, correctness-critical workloads

What sharding strategies does Kafka use for topics, and why does the partition key matter?
?

  • Kafka model: A topic is divided into partitions (shards); each partition is an ordered, append-only log
  • Assignment: Message key hashed → determines partition; no key = round-robin assignment
  • Why partition key matters:
    • Ordering: Messages with the same key always go to the same partition → ordering guaranteed per key; use user_id as key to order all user events
    • Co-location: All events for same entity in one partition → simpler stateful stream processing
    • Hot spots: Popular key (celebrity user_id) → hot partition → same problem as DB hot spots
  • Consumer parallelism: Each partition consumed by at most one consumer in a group; max parallelism = partition count
  • Routing metadata: Kafka 3.x+ uses KRaft (internal Raft log) instead of ZooKeeper to store partition→broker assignments
  • Rebalancing: Adding partitions = simple (just add); reducing = hard (requires data migration); common pattern is to over-provision partitions at creation

Application Scenarios

A time-series IoT database stores readings keyed by (device_id, timestamp). The team wants to use hash sharding on device_id. What problem might arise and how would you fix it?
?
Problem: If a small number of devices generate the majority of traffic (a common IoT pattern — some devices report every second, others rarely), hash sharding on device_id creates access-frequency hot spots even though key distribution is even.

Additionally, range queries like “all readings for device X between t1 and t2” require scatter/gather across all shards because timestamp is not the shard key.

Fixes:

  1. Compound shard key: (device_id) as partition key (hashed), timestamp as clustering key (sorted within shard) — enables efficient time-range queries per device
  2. Hot device detection: Identify top-N high-frequency devices; use random suffix (device_X_00 through device_X_9) to split their writes across 10 shards; aggregate on read
  3. Time-bucketed sharding: Shard key = (device_id, time_bucket) where time_bucket groups readings into e.g., hourly buckets — distributes writes but requires knowing the bucket to query
  4. Tiered storage: Recent data in hot shards; older data archived to cold shards or object storage (common in InfluxDB, TimescaleDB)

A secondary index on model_type in a product catalog contains only 5 distinct values (basic, standard, premium, enterprise, legacy). Is this a good global secondary index candidate? Why or why not?
?
Problem: A global secondary index with only 5 distinct values (low cardinality) creates unbalanced index shards — the “standard” bucket might contain 80% of products while “legacy” has 1%.

  • The “standard” index shard becomes a hot spot: most queries and writes touch it
  • Hash-partitioning the index by term doesn’t help — there are only 5 terms

Better approaches:

  1. Local secondary index for low-cardinality attributes: scatter/gather across N data shards is likely cheaper than a hot global index shard
  2. Filter at query layer: If the catalog is small enough, fetch all products from relevant shards and filter in application memory
  3. Materialized views: Pre-compute separate tables/collections per model_type — denormalization, but eliminates the index problem entirely
  4. Composite index key for better distribution: (model_type, product_id_hash_bucket) — combine low-cardinality field with a high-cardinality bucket to spread the index

Key insight: Global secondary indexes work best for high-cardinality indexed attributes where each index entry points to a small number of rows


Quick Facts

What is the default number of shards in Elasticsearch vs virtual nodes in Cassandra?
?
Elasticsearch:

  • Default: 1 primary shard per index (changed from 5 in ES 7.x+)
  • Configurable at index creation; cannot change shard count after creation without reindexing
  • Rule of thumb: 10–50 GB per shard is optimal
  • Too many shards: each shard has JVM + Lucene overhead; memory pressure

Cassandra vnodes:

  • Default: 256 virtual nodes (vnodes) per physical node
  • 10-node cluster = 10 × 256 = 2,560 ring positions
  • More vnodes = smoother rebalancing when adding/removing nodes
  • More vnodes = more gossip overhead; typical trade-off is well-balanced at 256

DynamoDB:

  • Partitions (shards) are fully managed; application cannot see or control shard count
  • Automatic split/merge based on size (~10 GB) and throughput (~3,000 WCU / 10,000 RCU per partition)
  • On-demand capacity: shards scale transparently with traffic

Total Cards: 20
Review Time: ~25 minutes
Priority: HIGH
Last Updated: 2026-05-29