Chapter 4: Design a Distributed Message Queue
volume2 message-queue kafka distributed-systems streaming
Status: π© Interview ready
Difficulty: Very Hard
Time to complete: 60 min read + practice
Overview
A distributed message queue decouples producers from consumers, enabling asynchronous processing, buffering, and replay of events at massive scale. Systems like Apache Kafka, RabbitMQ, and AWS SQS are foundational infrastructure in modern distributed architectures.
Why this matters:
- High-frequency interview question at Amazon, LinkedIn, Uber, Confluent, Stripe
- Tests knowledge of distributed storage, replication, consistency, and throughput optimization
- Kafka concepts appear in nearly every large-scale system design (news feed, notifications, logging, analytics)
Problem Statement
Design a distributed message queue that:
- Allows producers to publish messages to named topics
- Allows consumers to subscribe and read messages in order
- Persists messages durably (survives crashes, consumer downtime)
- Scales to 1 million messages per second throughput
- Supports both traditional MQ semantics (delete after consume) and streaming semantics (keep and replay)
Step 1: Requirements & Scope (5 min)
Functional Requirements
Clarifying questions:
- Traditional queue (point-to-point) or pub/sub? β Pub/sub (Kafka model)
- Message ordering? β Yes, within a partition (not global across topic)
- Message persistence? β Yes, configurable retention (default 7 days, like Kafka)
- Replay? β Yes, consumers can re-read from any offset
- Multiple consumers? β Yes, consumer groups for parallel processing
- Message size? β Up to 1 MB per message (Kafka default)
- Delivery semantics? β At-least-once by default (discuss exactly-once as advanced topic)
Scope:
- Producers publish to topics
- Topics divided into partitions for parallelism
- Consumers in consumer groups read from partitions
- Messages retained for configurable duration
- At-least-once delivery with offset tracking
Non-Functional Requirements
- Throughput: 1 million messages/sec (write) + comparable read throughput
- Durability: No data loss even if a broker crashes (replication)
- Scalability: Add brokers to scale horizontally
- Low latency: End-to-end < 10 ms (producer to consumer)
- Fault tolerance: System continues with broker failures
- Ordering: Guaranteed within partition
- Message size: Up to 1 MB
Scale Estimates
Target throughput: 1M messages/sec
Average message size: 1 KB
Write bandwidth: 1 MB/sec Γ 1000 = 1 GB/sec (write)
With replication 3x: 3 GB/sec disk writes
Daily storage: 1 GB/sec Γ 86,400 sec = 86 TB/day
With 7-day retention: 86 TB Γ 7 β 600 TB storage cluster
I/O consideration:
Sequential disk write speed: 500 MB/s (HDD) / 3000 MB/s (SSD)
Random disk write speed: ~1 MB/s (HDD β 500x slower!)
β Sequential I/O is critical for throughput
Step 2: High-Level Design (10 min)
Core Concepts
Topic: Named channel (e.g., "orders", "user-events")
Partition: Ordered, immutable log within a topic
Broker: Server that stores and serves partitions
Producer: Writes messages to topics
Consumer: Reads messages from topics
Consumer Group: Set of consumers sharing work across partitions
Offset: Position of a message within a partition (monotonically increasing integer)
Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Cluster β
β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
Producer βββΌβββΊβ Broker 1 β β Broker 2 β β Broker 3 β β
β β β β β β β β
β β P0 (L) β β P1 (L) β β P2 (L) β β
β β P1 (F) β β P2 (F) β β P0 (F) β β
β β P2 (F) β β P0 (F) β β P1 (F) β β
β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β
β β β β β
ββββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββ
β β β
ββββββΌβββββ ββββββΌβββββ ββββββΌβββββ
βConsumer β βConsumer β βConsumer β
β A β β B β β C β
β(reads P0)β β(reads P1)β β(reads P2)β
βββββββββββ βββββββββββ ββββββββββββ
βββββββββββ Consumer Group 1 ββββββββββ
L = Leader partition, F = Follower (replica)
Topics and Partitions
Topic: "user-events"
βββ Partition 0: [msg0] [msg1] [msg2] [msg3] ...
βββ Partition 1: [msg0] [msg1] [msg2] ...
βββ Partition 2: [msg0] [msg1] ...
Each partition:
- Ordered, immutable, append-only log
- Messages never deleted (until retention expires)
- Each message has an offset (0, 1, 2, ...)
- Stored on one broker (with replicas on others)
Consumer Group Model
Consumer Group "payment-service":
Consumer 1 β reads Partition 0
Consumer 2 β reads Partition 1
Consumer 3 β reads Partition 2
Consumer Group "analytics-service":
Consumer 1 β reads ALL partitions (different group, independent offset)
Key rules:
- Each partition assigned to exactly ONE consumer per group
- Multiple groups can read same partition independently
- If consumers > partitions: some consumers are idle
- If partitions > consumers: some consumers read multiple partitions
API Design:
// Producer API
produce(topic="orders", key="user_123", value="{...}", partition=HASH)
// Consumer API
subscribe(topics=["orders", "payments"])
poll(timeout_ms=1000) β List<Message>
commit(offsets) // Mark messages as processed
// Admin API
create_topic(name, num_partitions, replication_factor)
delete_topic(name)
list_topics()
Step 3: Deep Dive (30 min)
3.1 Message Storage β Write-Ahead Log
Core insight: Sequential disk I/O is dramatically faster than random I/O
Random disk I/O: ~1 MB/s (HDD β seek time dominates)
Sequential disk I/O: ~500 MB/s (HDD) / 3,000 MB/s (SSD)
β 500x faster!
Kafka writes are always sequential (append-only log)
This is why Kafka can sustain 1 GB/s+ on commodity hardware
Commit log structure:
Partition 0 on Broker 1:
/data/kafka/user-events-0/
βββ 00000000000000000000.log (segment 1: offsets 0β999)
βββ 00000000000000000000.index (offset β byte position index)
βββ 00000000000000001000.log (segment 2: offsets 1000β1999)
βββ 00000000000000001000.index
βββ 00000000000000002000.log (active segment: appended to)
Segment file: fixed size (default 1 GB) or fixed time (default 1 week)
When segment full β close it, open new active segment
Old segments deleted when retention expires
Message format on disk:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Offset (8 bytes) β Size (4 bytes) β
β CRC32 (4 bytes) β Magic (1 byte β version) β
β Attributes (1 byte)β Timestamp (8 bytes) β
β Key length (4 bytes)β Key (variable) β
β Value length (4 bytes)β Value (variable) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
CRC32: Data integrity check (detect corruption)
Key: Used for partition routing (Hash(key) % num_partitions)
Value: Actual payload (bytes, no schema enforcement at broker level)
Log index for fast seek:
.index file maps: offset β byte_position_in_.log_file
Binary search on index: O(log N) to find any offset
Then sequential read from that byte position
Example: Find offset 1500 in segment 1000β1999:
1. Binary search .index for offset 1500
2. Get byte position 45678
3. Seek .log file to byte 45678
4. Read sequentially from there
3.2 Partition Strategy
How producers decide which partition:
def select_partition(topic, key, value, num_partitions):
if key is not None:
# Hash-based: same key always goes to same partition
# Ensures ordering for all messages with same key
return hash(key) % num_partitions
else:
# Round-robin: distribute evenly for load balancing
return round_robin_counter % num_partitionsWhen to use key-based partitioning:
Use case: All events for user_id="123" must be ordered
key = "user_123"
β All messages with this key go to same partition
β Consumer reads them in order (guaranteed within partition)
Use case: Order events for order_id="456"
key = "order_456"
β All state changes for this order stay in sequence
Partition count considerations:
More partitions = more parallelism = higher throughput
But:
- More file handles open on brokers
- Longer leader election time during failover
- More Zookeeper/KRaft metadata
Rule of thumb:
Target throughput / throughput per partition
Example: 1 GB/s target, 100 MB/s per partition β 10 partitions minimum
Add buffer: 20β30 partitions for a busy topic
3.3 Consumer Group and Rebalancing
Consumer offset tracking:
Consumer group "payment-service" reads Partition 0:
Partition 0 messages: [A][B][C][D][E][F][G]
Offsets: 0 1 2 3 4 5 6
Consumer reads A, B, C β commits offset 3 (next to read)
Offset stored in: __consumer_offsets internal topic (Kafka)
If consumer crashes and restarts:
Reads committed offset 3 β starts from D (at-least-once)
Nothing lost! But D might be re-processed if crash was mid-processing
Rebalancing β when consumers join or leave:
Before rebalance (3 consumers, 3 partitions):
Consumer 1 β P0
Consumer 2 β P1
Consumer 3 β P2
Consumer 3 crashes:
Group coordinator detects via heartbeat timeout
Triggers rebalance:
Consumer 1 β P0, P2 (takes over P2)
Consumer 2 β P1
Consumer 4 joins:
Rebalance again:
Consumer 1 β P0
Consumer 2 β P1
Consumer 4 β P2
(Consumer 1 releases P2 to Consumer 4)
Rebalancing protocols:
| Protocol | Description | Pros | Cons |
|---|---|---|---|
| Eager (stop-the-world) | All consumers stop, revoke all, reassign | Simple | All consumers pause during rebalance |
| Cooperative (incremental) | Only move partitions that need to change | Low disruption | More complex coordination |
Modern Kafka (2.4+): Uses cooperative rebalancing by default.
3.4 Replication for Durability
Leader-Follower replication per partition:
Topic: "orders", Partition 0, Replication factor: 3
Broker 1 (Leader P0) βββ Producer writes here
β
β Replicate
ββββββββββββββββββββΊ Broker 2 (Follower P0 replica)
β
ββββββββββββββββββββΊ Broker 3 (Follower P0 replica)
Consumer reads from Leader (or follower in newer Kafka versions)
ISR β In-Sync Replicas:
ISR = set of replicas that are caught up with the leader
(within replica.lag.time.max.ms, default 10 sec)
Example:
ISR = {Broker1(leader), Broker2, Broker3} (all caught up)
Broker 3 falls behind (network issue):
ISR = {Broker1(leader), Broker2} (Broker3 removed from ISR)
Broker 3 catches up:
ISR = {Broker1(leader), Broker2, Broker3} (re-added to ISR)
Ack modes β producer durability guarantee:
| Acks | Behavior | Durability | Throughput | Use Case |
|---|---|---|---|---|
acks=0 | No acknowledgment | None (fire and forget) | Highest | Metrics, logs (loss OK) |
acks=1 | Leader writes to disk | Low (leader crash = loss) | High | Default, moderate durability |
acks=all | All ISR replicas acknowledge | Highest (survives leader crash) | Lower | Financial transactions, critical data |
# Producer configuration examples:
# Fire and forget (metrics):
producer.send(topic, value, acks=0)
# Default (most use cases):
producer.send(topic, value, acks=1)
# Strongest durability (payments):
producer.send(topic, value, acks='all', min_insync_replicas=2)
# min.insync.replicas=2: Need at least 2 replicas to ack
# Even if 1 broker down, message is safeLeader election on broker failure:
Broker 1 (leader for P0) crashes:
1. ZooKeeper/KRaft detects broker failure (session timeout)
2. Controller selects new leader from ISR
(prefer most up-to-date replica)
3. Brokers in ISR update their metadata
4. Producers/consumers redirect to new leader
Typical failover time: ~5β30 seconds
3.5 Push vs Pull Consumption
Kafka uses PULL (consumer polls broker):
Push model: Pull model (Kafka):
Broker pushes to consumer Consumer polls broker
β β
βΌ βΌ
Broker must track consumer state Consumer tracks own offset
Broker overwhelms slow consumer Consumer pulls at own pace
Hard to handle varied speeds Varied speeds handled naturally
Lower latency Slightly higher latency
Can batch large reads
Why Pull is better for varied consumer speeds:
Consumer A: Fast analytics (processes 10K msg/sec)
Consumer B: Slow payment processor (processes 100 msg/sec)
Push: Broker must throttle/buffer per consumer β complex
Pull: Each consumer calls poll() at own rate β simple
Kafka pull with long polling:
consumer.poll(timeout_ms=500) // Wait up to 500ms for messages
β No busy-wait; blocks until messages arrive or timeout
3.6 Zero-Copy Optimization
Without zero-copy (traditional approach):
Read file from disk:
Disk β Kernel buffer (1st copy)
Kernel buffer β Application memory (2nd copy)
Application memory β Socket buffer (3rd copy)
Socket buffer β NIC (4th copy)
Total: 4 copies, 2 context switches, high CPU usage
With zero-copy (Kafkaβs approach):
Kafka uses sendfile() syscall (Linux):
Disk β Kernel buffer (1st copy)
Kernel buffer β NIC directly (2nd copy β via DMA)
Total: 2 copies, 0 context switches, minimal CPU usage
Result: 2β4x better throughput for the same hardware
Kafka code (simplified):
FileChannel.transferTo(position, count, socketChannel)
// This triggers sendfile() syscall under the hood
// OS handles the copy entirely in kernel space
When zero-copy applies: Reading unmodified stored messages to consumers. Does not apply when messages need transformation (encryption, compression at broker level).
3.7 Batch Processing for Throughput
Producers batch messages:
# Producer config for batching:
producer_config = {
"batch.size": 16384, # Batch up to 16 KB before sending
"linger.ms": 5, # Wait up to 5ms to fill batch
"compression.type": "lz4" # Compress batch (2β4x size reduction)
}
# Effect:
# Without batching: 1000 messages = 1000 network round-trips
# With batching: 1000 messages = ~10 network round-trips
# Plus compression: reduces bandwidth by 50β75%Consumers batch reads:
while True:
records = consumer.poll(max_records=500, timeout_ms=100)
# Reads up to 500 messages in one poll
# Process batch together (bulk DB insert, etc.)
process_batch(records)
consumer.commit() # Commit entire batch's offsets at once3.8 Delivery Semantics
At-most-once (producer acks=0, auto-commit before processing):
Messages may be lost (no retry) but never duplicated
Use case: Sensor data, metrics where occasional loss is OK
At-least-once (default Kafka):
Producer: retries on failure β possible duplicate sends
Consumer: commit offset AFTER processing β if crash mid-processing,
message re-read and re-processed on restart
Use case: Most systems (idempotent consumers handle duplicates)
Exactly-once semantics (EOS) (Kafka 0.11+):
Producer side (Idempotent Producer):
Each message has sequence number
Broker deduplicates retried messages
PID (Producer ID) + sequence β dedup key
Consumer side (Transactional):
Read β process β write output atomically
Either all committed or none
Config:
producer.enable.idempotence = true
producer.transactional.id = "unique-id"
producer.beginTransaction()
producer.send(output_topic, result)
consumer.sendOffsetsToTransaction(offsets, group_id)
producer.commitTransaction() // Atomic: output + offset committed together
Trade-offs:
At-most-once: Highest throughput, data loss possible
At-least-once: High throughput, duplicates possible (handle in consumer)
Exactly-once: Lowest throughput (~20% overhead), no loss or duplication
3.9 ZooKeeper vs KRaft Metadata Management
Legacy: ZooKeeper (before Kafka 3.x):
ZooKeeper stores:
- Broker list and health
- Topic/partition metadata
- ISR lists
- Consumer group offsets (early Kafka)
- Controller election
Problems:
- Separate cluster to operate
- ZooKeeper becomes bottleneck at scale (10K+ partitions)
- Metadata propagation lag
- Harder to operate (two systems: Kafka + ZooKeeper)
Modern: KRaft (Kafka Raft Metadata β Kafka 2.8+, default in 3.x):
KRaft stores metadata internally using Raft consensus:
- Metadata stored in internal topic: __cluster_metadata
- Controller nodes form a Raft quorum (3 or 5 nodes)
- All brokers are followers of the controller quorum
- No external dependency
Benefits:
- Simpler operations (one system)
- Faster metadata propagation (sub-millisecond vs seconds)
- Scales to millions of partitions
- Faster startup and failover
Architecture comparison:
ZooKeeper mode: KRaft mode:
[ZooKeeper Ensemble] [Kafka Controller Quorum]
[ZK1] [ZK2] [ZK3] [Ctrl1] [Ctrl2] [Ctrl3]
β (metadata) β (metadata via Raft)
[Kafka Brokers] [Kafka Brokers]
[B1] [B2] [B3] ... [B1] [B2] [B3] ...
3.10 Message Deduplication
Problem: Producer retries can cause duplicate messages
Producer sends message M:
β Broker receives M, writes to log, sends ACK
β ACK lost in network!
β Producer retries, sends M again
β Broker receives duplicate M
β Consumer sees M twice!
Solution: Idempotent Producer:
Each producer assigned:
PID (Producer ID): unique ID assigned by broker
Sequence Number: monotonically increasing per partition
Message envelope: (PID, Partition, SequenceNumber) β unique key
Broker maintains last seen sequence per (PID, Partition)
If incoming sequence == last_seen + 1: accept
If incoming sequence <= last_seen: reject as duplicate (idempotent)
If incoming sequence > last_seen + 1: out-of-order error
Design Summary
Final Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Producers β
β [Service A] [Service B] [Service C] [Service D] β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β Produce (batch + compress)
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Cluster β
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Broker 1 β β Broker 2 β β Broker 3 β β
β β β β β β β β
β β Topic-A-P0(L)β β Topic-A-P1(L)β β Topic-A-P2(L)β β
β β Topic-A-P1(F)β β Topic-A-P2(F)β β Topic-A-P0(F)β β
β β β β β β β β
β β Segment logs β β Segment logs β β Segment logs β β
β β .log + .indexβ β .log + .indexβ β .log + .indexβ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β KRaft Controller Quorum (3 nodes) β β
β β Leader election, partition metadata, ISR tracking β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β Poll (pull)
βββββββββββββββββββΌββββββββββββββββββ
βΌ βΌ βΌ
[Consumer A] [Consumer B] [Consumer C]
(reads P0) (reads P1) (reads P2)
βββββββββββββββββββββββββββββββββββββββββββ
Consumer Group "service-X"
Key Decisions Summary
| Decision | Choice | Reasoning |
|---|---|---|
| Storage | Sequential disk (WAL) | 500x faster than random I/O |
| Replication | Leader-follower + ISR | Durability without full sync overhead |
| Default delivery | At-least-once | Balance of durability and throughput |
| Ack default | acks=1 | Good durability, high throughput |
| Critical data | acks=all, min.insync.replicas=2 | No data loss |
| Consumption model | Pull | Consumer controls rate, handles varied speeds |
| Throughput optimization | Batching + compression + zero-copy | Multiplicative gains |
| Metadata | KRaft (Raft consensus) | No external dependency, scales to millions of partitions |
| Ordering | Per-partition only | Global ordering would require 1 partition = no parallelism |
| Exactly-once | Idempotent producer + transactions | When required; ~20% throughput cost |
Interview Questions & Answers
Q: How does Kafka guarantee message ordering?
A: Kafka guarantees ordering only within a partition. All messages with the same key are routed to the same partition via Hash(key) % num_partitions, so you get ordering per key. Global ordering across a topic would require a single partition, eliminating all parallelism β a bad trade-off. The correct answer: model your data so that related messages share a key, and ordering within that key is guaranteed.
Q: What is the trade-off between acks=1 and acks=all?
A: acks=1 (leader only): Producer gets ACK after leader writes to disk. If leader crashes before replication, that message is lost (follower elected as new leader wonβt have it). acks=all: Producer waits for all ISR replicas to acknowledge. Even if leader crashes immediately after ACK, the message exists on all in-sync replicas. Cost: higher latency (wait for slowest ISR) and lower throughput. Use acks=all for financial data; acks=1 for most use cases.
Q: How does a consumer group achieve parallel processing?
A: Each partition is assigned to exactly one consumer within the group. With N partitions and N consumers, each consumer independently reads one partition β N-way parallelism. The group coordinator (a broker) tracks offsets per partition per consumer group in the internal __consumer_offsets topic. Adding consumers up to the partition count increases parallelism; beyond that, extra consumers are idle. To add more parallelism, increase partition count (can only increase, not decrease).
Q: Why is sequential disk I/O so important for Kafkaβs performance?
A: Random disk I/O on HDD requires a physical seek (~10ms per seek), limiting throughput to ~100 seeks/sec = ~1 MB/s. Sequential I/O reads/writes contiguous sectors, achieving 500 MB/s on HDD and 3,000+ MB/s on SSD. Kafkaβs append-only log ensures all writes are sequential. Combined with OS page cache (memory-mapped files), Kafka can often serve reads entirely from RAM (warm cache) without disk access at all. This architecture enables 1 GB/s+ throughput on commodity hardware.
Q: What is the ISR and why does it matter?
A: ISR (In-Sync Replicas) is the set of replicas currently caught up with the leader (within replica.lag.time.max.ms). With acks=all, the producer only gets an ACK when all ISR replicas have written the message. A replica falling behind is removed from ISR β it canβt slow down producers. New leader is always chosen from ISR, guaranteeing no committed message is lost during failover. The tension: small ISR = faster writes but less durability; large ISR = slower writes but higher durability.
Q: What is the difference between at-least-once and exactly-once delivery?
A: At-least-once: Producer retries on failure β duplicates possible; consumer must be idempotent. Simple and high performance. Exactly-once: Idempotent producer (PID + sequence number for dedup at broker) + transactional API (atomically commit output message + input offset in one transaction). Guarantees no loss and no duplication. Cost: ~20% throughput reduction, more complex code. Use exactly-once for financial transactions; at-least-once with idempotent consumers for most other cases.
Key Takeaways
- Topics β Partitions β Brokers: Partition is the unit of parallelism and ordering; partition count determines max consumer parallelism
- Sequential disk I/O is the cornerstone of Kafkaβs throughput β 500x faster than random I/O; append-only WAL design exploits this
- ISR (In-Sync Replicas) ensures durability without full sync overhead β new leader always elected from ISR
- Ack modes are a durability/throughput dial:
acks=0(fire-and-forget) βacks=1(leader) βacks=all(all ISR) - Pull-based consumption lets consumers read at their own pace β no risk of broker overwhelming a slow consumer
- Zero-copy via sendfile() reduces CPU overhead for consumer reads by 2β4x β bytes flow directly from page cache to NIC
- Consumer group rebalancing enables elastic scaling β add consumers up to partition count for more parallelism
- KRaft replaces ZooKeeper β simpler operations, faster metadata propagation, scales to millions of partitions
Related Resources
- distributed-system-components - Kafka, event streaming patterns
- key-patterns - Pub/sub, event sourcing, CQRS
- ch03-google-maps - Kafka used in traffic data pipeline
- ch04-rate-limiter - Redis atomic operations (contrast with WAL approach)
Practice this design! Key areas to master:
- Draw topics β partitions β broker β consumer group architecture
- Walk through what happens when a broker crashes (ISR, leader election)
- Explain why sequential I/O matters and how Kafka exploits it
- Compare acks=0, acks=1, acks=all with concrete failure scenarios
- Explain exactly-once semantics and when to use it
Last Updated: 2026-04-13
Status: Very Hard β master the replication and delivery semantics in depth