Chapter 9: Design S3-like Object Storage
volume2 s3 object-storage distributed-storage blob
Status: π© Interview ready
Difficulty: Very Hard
Time to complete: 60 min read + practice
Overview
Object storage systems like AWS S3, Azure Blob Storage, and Google Cloud Storage (GCS) store arbitrary files (called objects) by a key inside named containers called buckets, at massive scale β from gigabytes to exabytes.
Why this matters:
- Deep distributed systems design question (Google, Amazon, Meta, Netflix)
- Teaches erasure coding, consistent hashing, metadata vs data separation, multipart upload
- Foundational to understanding storage infrastructure powering the entire cloud
Problem Statement
Design an object storage system that:
- Stores and retrieves arbitrary binary objects (files) by key
- Scales to 100 PB of total data
- Provides 99.9999% (six 9s) durability
- Supports objects up to 5 TB in size
- Supports multipart upload for large files
- Supports object versioning
Step 1: Requirements & Scope (5 min)
Functional Requirements
Clarifying questions:
- What operations are needed? β PUT, GET, DELETE object; CREATE, DELETE bucket; LIST objects in bucket
- What is the max object size? β 5 TB
- Do we need multipart upload? β Yes, for large files (> 100 MB)
- Do we need versioning? β Yes
- Read/write ratio? β Approximately 1:1 (S3 is read/write heavy)
- Do we need access control? β Basic IAM (identity-based) is assumed
Scale:
- Total data: 100 PB
- Throughput target (S3 reference): 3,500 PUT/sec per bucket, 5,500 GET/sec per bucket
- Objects up to 5 TB; typical objects are KB to GB range
Scope:
- Object CRUD (Create, Read, Delete)
- Bucket management
- Multipart upload
- Object versioning
- Data durability (six 9s)
Non-Functional Requirements
- Durability: 99.9999% (six 9s) β lose at most 1 object per million per year
- Availability: 99.99% (four 9s) read, 99.99% write
- Scalability: Horizontal scale from TB to PB to EB
- Low latency: < 200ms for small object reads
- Data integrity: Checksums on every object
- Consistency: Strong consistency for metadata; eventual is acceptable for replication lag
- Storage efficiency: Minimize storage overhead (erasure coding over full replication)
Step 2: High-Level Design (10 min)
Core Concepts
Object storage fundamentals:
- Bucket: Named container (like a directory, but flat namespace)
- Object: Binary data (any file) with a unique key inside a bucket
- Key: String identifier for an object (e.g.,
photos/2024/cat.jpg) - Value: The raw binary content of the object
- Metadata: Attributes of the object (size, content-type, checksum, creation date)
Comparison with other storage types:
| Feature | Block Storage | File Storage (NFS) | Object Storage |
|---|---|---|---|
| Access | Block device | File path | HTTP API (key) |
| Hierarchy | No | Yes (tree) | Flat (bucket/key) |
| Mutability | Fully mutable | Fully mutable | Immutable (PUT/DELETE) |
| Scale | TB | TB | Exabytes |
| Use case | Databases, OS | File shares | Backups, media, logs |
| Examples | EBS, SAN | EFS, NFS | S3, GCS, Azure Blob |
Two Storage Planes
Object storage has a fundamental separation:
ββββββββββββββββββββββββββββββββββββββββ
β Object Storage System β
β β
ββββββββββββ β βββββββββββββββββββ β
β Client βββββββΆβ β API Gateway β β
ββββββββββββ β ββββββββββ¬βββββββββ β
β β β
β βββββββββΌβββββββββ β
β βIdentity Serviceβ β
β β (AuthN/AuthZ) β β
β βββββββββ¬βββββββββ β
β β β
β ββββββββΌββββββββ β
β β Metadata βββββ bucket/key β
β β Service β lookup β
β ββββββββ¬ββββββββ β
β β node location β
β ββββββββΌββββββββ β
β β Data Store βββββ actual bytes β
β β Service β β
β ββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββ
Plane 1 β Metadata Store:
- Stores: bucket name, object key, object size, content-type, checksum, storage node locations, version IDs
- Strong consistency required
- Reads on every GET, writes on every PUT
Plane 2 β Data Store:
- Stores: raw binary content of each object
- Distributed across many storage nodes
- Replicated or erasure-coded for durability
API Design
Bucket operations:
PUT /buckets/{bucket-name} Create bucket
DELETE /buckets/{bucket-name} Delete bucket
GET /buckets/{bucket-name}?prefix=&marker= List objects
Object operations:
PUT /buckets/{bucket-name}/{object-key} Upload object
GET /buckets/{bucket-name}/{object-key} Download object
DELETE /buckets/{bucket-name}/{object-key} Delete object
HEAD /buckets/{bucket-name}/{object-key} Get metadata only
Multipart upload:
POST /buckets/{bucket}/{key}?uploads Initiate β returns upload_id
PUT /buckets/{bucket}/{key}?partNumber=N&uploadId=X Upload part N β returns ETag
POST /buckets/{bucket}/{key}?uploadId=X Complete β combines all parts
DELETE /buckets/{bucket}/{key}?uploadId=X Abort multipart upload
Request Flow: PUT Object
Client
β
β PUT /buckets/mybucket/photo.jpg
βΌ
API Gateway
β 1. Authenticate request (check API key / IAM)
β 2. Route to correct bucket shard
βΌ
Metadata Service
β 3. Validate object key, check bucket exists
β 4. Reserve object ID + version ID
βΌ
Data Routing Service
β 5. Determine which storage nodes to use (consistent hashing)
β 6. Write object bytes to N storage nodes
βΌ
Metadata Service
β 7. Record object metadata + node locations (commit)
βΌ
Client β 200 OK (ETag = checksum of object)
Request Flow: GET Object
Client
β
β GET /buckets/mybucket/photo.jpg
βΌ
API Gateway β Metadata Service
β 1. Lookup bucket + key β get node locations + version ID
βΌ
Data Store (nearest healthy node)
β 2. Read object bytes
β 3. Verify checksum
βΌ
Client β 200 OK + object bytes + metadata headers
Step 3: Deep Dive (20 min)
Data Store Design
The data store has three internal components:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Store β
β β
β βββββββββββββββββββββββ ββββββββββββββββββββββββββββ β
β β Data Routing β β Placement Service β β
β β Service βββββΆβ (Which nodes are β β
β β (Where to store?) β β available? Where is β β
β ββββββββββββ¬βββββββββββ β each object stored?) β β
β β ββββββββββββββββββββββββββββ β
β β write/read β
β ββββββββββΌβββββββββββββββββββββββββββββββββββββββ β
β β Storage Node Cluster β β
β β [Node 1] [Node 2] [Node 3] [Node 4] ... β β
β β β₯beat β₯beat β₯beat β₯beat β β
β βββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Routing Service:
- Stateless service that directs data to storage nodes
- Uses consistent hashing to pick nodes for each object
- Talks to Placement Service to know which nodes are healthy
Placement Service:
- Tracks the health and capacity of every storage node
- Maintains cluster topology (data center, rack, disk info)
- Receives heartbeats from storage nodes every few seconds
- Provides node list to Data Routing Service
Storage Node:
- Physical or virtual machine with one or more disks
- Stores raw object data (content-addressable by checksum)
- Sends heartbeat to Placement Service
- Exposes internal HTTP API for read/write operations
Heartbeat mechanism:
Storage Node ββheartbeat every 5sβββΆ Placement Service
βββ node status (OK/DEGRADED/FULL) ββββββ
If no heartbeat for 30s β mark node as DEAD
Data Routing Service polls Placement Service β skip dead nodes
Consistent Hashing for Data Distribution
Distribute objects across storage nodes without knowing total node count in advance:
Consistent Hash Ring
(0 to 2^32)
Node A (0Β°)
/
Node D ββββΌβββ Node B (120Β°)
\
Node C (240Β°)
Object key "photo.jpg" hashes to 150Β° β stored on Node B
(walk clockwise to next node)
Add Node E at 180Β°:
- Only objects between 150Β°β180Β° migrate to Node E
- All other objects stay put
Advantages:
- Adding/removing nodes affects only adjacent objects (~1/N objects)
- Virtual nodes (vnodes) for even distribution
- Replication: store copies on next N nodes clockwise
Replication vs Erasure Coding
Two strategies to achieve durability. This is a critical tradeoff:
Strategy 1: Replication (3-way)
Object (1 GB)
β
ββββΆ Node 1: full copy (1 GB)
ββββΆ Node 2: full copy (1 GB)
ββββΆ Node 3: full copy (1 GB)
Total storage: 3 GB for 1 GB object
Storage overhead: 200%
Fault tolerance: can lose 2 of 3 nodes
Strategy 2: Erasure Coding (8+4)
Object (1 GB)
β
Split into 8 data chunks (128 MB each)
Compute 4 parity chunks (128 MB each)
β
ββββΆ Node 1: data chunk 1 (128 MB)
ββββΆ Node 2: data chunk 2 (128 MB)
ββββΆ ...
ββββΆ Node 8: data chunk 8 (128 MB)
ββββΆ Node 9: parity chunk 1 (128 MB)
ββββΆ Node 10: parity chunk 2 (128 MB)
ββββΆ Node 11: parity chunk 3 (128 MB)
ββββΆ Node 12: parity chunk 4 (128 MB)
Total storage: 12 Γ 128 MB = 1.5 GB for 1 GB object
Storage overhead: 50%
Fault tolerance: can lose any 4 of 12 nodes
Erasure Coding explained:
- k: number of data chunks (e.g., k=8)
- m: number of parity chunks (e.g., m=4)
- Written as (k, m) or k+m system
- Can reconstruct original from any k chunks out of (k+m)
- Based on Reed-Solomon or similar codes (similar to RAID-6)
- Can tolerate loss of any m nodes
Comparison:
| Aspect | 3-Way Replication | Erasure Coding (8+4) |
|---|---|---|
| Storage overhead | 200% | 50% |
| Fault tolerance | 2 node failures | 4 node failures |
| Read latency | Very low (read from any copy) | Higher (reconstruct if node down) |
| Write latency | Low (parallel 3 writes) | Higher (compute parity, 12 writes) |
| Complexity | Simple | Complex (parity computation) |
| Recovery cost | Low (copy from replica) | High (read 8 chunks to rebuild) |
| Best for | Hot data, small objects | Cold data, large objects |
S3 uses erasure coding for the primary storage tier (standard class).
Replication is used for cross-region redundancy (CRR).
Object Metadata Storage
What is stored:
Metadata record per object:
bucket_id VARCHAR - which bucket
object_key VARCHAR - key within bucket
object_id BIGINT - internal unique ID
version_id VARCHAR - versioning ID (UUID)
size_bytes BIGINT - object size
content_type VARCHAR - MIME type
checksum VARCHAR - SHA256 of object bytes
storage_nodes JSON - list of nodes and chunk positions
created_at TIMESTAMP
deleted_at TIMESTAMP - soft delete (NULL if not deleted)
Storage choice:
- RocksDB / LevelDB: Embedded LSM-tree key-value store on each node
- Fast writes (LSM-tree, sequential I/O)
- Range scans for LIST operations
- Distributed metadata cluster: Multiple nodes with leader election
- Leader handles all writes (strong consistency)
- Followers replicate and serve reads
Sharding metadata:
Shard key: hash(bucket_id)
Shard 1: bucket_ids 0β1000
Shard 2: bucket_ids 1001β2000
...
Shard N: bucket_ids ...
Within shard: sorted by object_key for range scans (LIST)
Why shard by bucket_id?
- LIST operation scans objects within a single bucket β all in one shard
- Hot bucket problem: separate partition for extremely large buckets
Multipart Upload
For objects > 100 MB (required for objects > 5 GB):
Step 1: Initiate
Client β POST /buckets/b/key?uploads
Server β { "upload_id": "abc123" }
Step 2: Upload Parts (in parallel!)
Client β PUT /buckets/b/key?partNumber=1&uploadId=abc123 [bytes 0β99MB]
Client β PUT /buckets/b/key?partNumber=2&uploadId=abc123 [bytes 100β199MB]
Client β PUT /buckets/b/key?partNumber=3&uploadId=abc123 [bytes 200β299MB]
Server β ETag for each part (checksum of that part)
Step 3: Complete (server reassembles)
Client β POST /buckets/b/key?uploadId=abc123
Body: { parts: [ {partNumber:1, ETag:"e1"}, ... ] }
Server β 200 OK + final ETag
Step 4 (optional): Abort
Client β DELETE /buckets/b/key?uploadId=abc123
Internal design of multipart upload:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Multipart Upload State β
β β
β upload_id: abc123 β
β bucket: mybucket, key: bigfile.zip β
β status: IN_PROGRESS β
β β
β parts: β
β part 1: checksum=e1, size=99MB, nodes=[3,7] β
β part 2: checksum=e2, size=100MB, nodes=[4,8] β
β part 3: checksum=e3, size=100MB, nodes=[5,9] β
β β
β On Complete: concatenate parts β single object β
β On Abort: garbage collect part data β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Why multipart upload?:
- Resume failed uploads (retry only failed part)
- Parallel uploads (much faster for large files)
- Required for objects > 5 GB in S3 (stream limit)
Object Versioning
Each PUT creates a new immutable version; existing version is not overwritten:
bucket: photos, key: cat.jpg
Version history:
version_id: v3 (latest) β object_id: 999 (100 KB, uploaded 2024-03)
version_id: v2 β object_id: 888 (95 KB, uploaded 2024-02)
version_id: v1 β object_id: 777 (90 KB, uploaded 2024-01)
GET /buckets/photos/cat.jpg β returns v3 (latest)
GET /buckets/photos/cat.jpg?versionId=v1 β returns v1
DELETE /buckets/photos/cat.jpg β inserts DELETE marker (v4)
- does not remove data
- GET now returns 404 (sees DELETE marker)
- GET ?versionId=v3 still works
DELETE /buckets/photos/cat.jpg?versionId=v1 β permanently deletes v1
Metadata schema for versioning:
(bucket_id, object_key, version_id) β metadata + object_id
version_id is a UUID or timestamp-based ID
Latest version pointer: (bucket_id, object_key) β latest version_id
Data Integrity via Checksums
Every object has a checksum computed on upload:
Client β Upload photo.jpg
Client computes: SHA256(photo.jpg) = "abc123..."
Sends checksum in header: Content-MD5: abc123...
OR: x-amz-checksum-sha256: abc123...
Server receives bytes:
1. Write to storage nodes
2. Compute SHA256 of received bytes
3. Compare with client-provided checksum
4. If mismatch β reject upload (data corrupted in transit)
5. If match β store checksum in metadata
Server serves object:
1. Read bytes from storage node
2. Compute SHA256
3. Compare with stored checksum in metadata
4. If mismatch β data corrupted on disk! Try another replica or erasure decode
5. Return checksum in ETag header so client can verify
Where checksums protect:
| Where | Threat | Protection |
|---|---|---|
| Client β Server | Network corruption | Client-provided checksum verified on receive |
| Server storage | Bit rot on disk | Periodic scrubbing compares stored bytes to checksum |
| Server β Client | Network corruption | ETag header; client can verify |
| Erasure coding | Chunk corruption | Per-chunk checksum before reconstruction |
Periodic scrubbing: Background job reads every object, verifies checksum. If corrupted, repairs from other replicas/chunks. Runs ~weekly.
Garbage Collection
Deleted and orphaned objects are not immediately removed:
DELETE /buckets/b/photo.jpg
β Metadata marks object as deleted (soft delete)
β Actual bytes on storage nodes NOT yet removed
Garbage Collector (background job):
1. Scan metadata for objects with deleted_at < (now - 1 day)
2. Also scan for orphaned chunks: data on storage nodes
with no corresponding metadata entry
3. Remove actual bytes from storage nodes
4. Remove metadata record
5. Release disk space
Why soft delete?
- Versioning: DELETE creates a marker, previous versions still accessible
- Multipart abort: parts may be on different nodes, need coordinated cleanup
- Crash safety: GC is async, prevents partial deletes leaving inconsistency
Mark-and-sweep GC process:
Phase 1 (Mark):
- Enumerate all live object_ids from metadata
- Build set: { live_object_ids }
Phase 2 (Sweep):
- Scan storage nodes for all stored chunk IDs
- Any chunk_id not in live set β schedule for deletion
Phase 3 (Delete):
- Delete orphaned chunks from storage nodes
- Free disk space
Full System Architecture
ββββββββββββββββββββββββββββ
β API Gateway β
β (Load Balancing, β
β TLS Termination, β
β Request Routing) β
ββββββββββββββ¬ββββββββββββββ
β
ββββββββββββββββββββΌββββββββββββββββββββ
β β β
βββββββββββΌβββββββ ββββββββββΌββββββββ βββββββββΌβββββββββ
β Identity/Auth β β Metadata β β Data Routing β
β Service β β Service β β Service β
ββββββββββββββββββ β (RocksDB β β (Stateless) β
β cluster, β βββββββββ¬βββββββββ
β sharded by β β
β bucket_id) β β
ββββββββββββββββββ β
β
ββββββββββββββββββ β
β Placement ββββββββββββ
β Service β consistent hashing
β (Node health, β node selection
β capacity) β
ββββββββββ¬ββββββββ
β heartbeat
ββββββββββββββββββββΌβββββββββββββββββββββββ
β β β
βββββββββββΌβββββββ ββββββββββΌβββββββ βββββββββββββΌβββββ
β Storage Node β β Storage Node β β Storage Node β
β 1 β β 2 β β 3 ... β
β [data chunks] β β [data chunks] β β [data chunks] β
ββββββββββββββββββ βββββββββββββββββ ββββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Background Services β
β Garbage Collector | Scrubber | Replication Monitor β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Design Summary
| Concern | Choice | Reasoning |
|---|---|---|
| Data distribution | Consistent hashing | Minimal reshuffling when nodes added/removed |
| Durability | Erasure coding (8+4) | 50% overhead vs 200% for replication; tolerate 4 node failures |
| Hot data | 3-way replication | Low read latency; faster recovery |
| Metadata storage | RocksDB cluster sharded by bucket_id | LSM-tree fast writes; range scan for LIST |
| Multipart upload | Part-by-part with upload_id | Resume, parallel, required for >5 GB objects |
| Versioning | Immutable PUT + DELETE markers | No data loss, full history accessible |
| Data integrity | SHA256 checksum on write + periodic scrub | Detect corruption at every boundary |
| Garbage collection | Mark-and-sweep with soft delete | Safe async cleanup, no partial deletes |
| Node health | Heartbeat to Placement Service | Fast failure detection (30s timeout) |
| Consistency | Strong for metadata, eventual for replication | Correct reads; acceptable lag for durability |
Interview Questions & Answers
Q: Why use erasure coding instead of replication for durability?
A: Erasure coding (8+4) provides the same or better fault tolerance (tolerate 4 node failures vs 2 for 3-way replication) at dramatically lower storage cost (50% overhead vs 200%). The tradeoff is higher read/write CPU cost and reconstruction latency when nodes are down. For cold/warm data at exabyte scale, the storage savings far outweigh the complexity. Hot data (frequently accessed) may still use replication for lower read latency.
Q: How does multipart upload enable resumable uploads?
A: Each part has its own ETag (checksum). If a part upload fails, the client retries only that part using the same upload_id and part number. The server tracks which parts have been successfully received. On completion, the client lists all ETags; the server validates them and assembles the final object. This means a 10 GB upload that fails at 9 GB only needs to retry the last ~100 MB part.
Q: How do you handle consistency between metadata and data stores?
A: Write to data nodes first, then commit metadata. If metadata commit fails after data is written, the data chunks become orphaned β handled by the garbage collector during the sweep phase. If data write fails, no metadata is written, so no orphan. This βwrite data first, commit metadata secondβ pattern ensures we never have a metadata pointer to missing data.
Q: How would you scale the metadata service to handle billions of objects?
A: Shard metadata by bucket_id so each shard handles a subset of buckets. Each shard is a replicated RocksDB cluster with a leader for writes and followers for reads. For extremely large buckets (billions of objects in one bucket), further sub-shard by a hash of the object key prefix. LIST operations use range scans within a shard (or merge results from sub-shards for large buckets).
Q: How does consistent hashing help when adding storage nodes?
A: In a consistent hash ring, each node owns a range of the ring. When a new node is added, only objects whose key hashes fall between the new node and its predecessor need to be migrated. This is approximately 1/N of total objects. Without consistent hashing (simple modulo), adding one node would require re-distributing nearly all objects.
Key Takeaways
- Separate metadata from data: Metadata needs strong consistency and fast lookup; data needs throughput and durability β they have different requirements.
- Erasure coding beats replication at scale: 50% overhead vs 200% for 3-way replication; critical at PB/EB scale.
- Consistent hashing minimizes data movement: Adding/removing nodes reshuffles only ~1/N of objects.
- Multipart upload is essential for large objects: Enables parallel uploads, resumability, and bypasses stream size limits.
- Checksums must be verified at every boundary: Transit corruption, bit rot on disk, and erasure chunk corruption are all real failure modes.
- Soft delete + garbage collection decouples deletion from cleanup: Ensures crash safety, versioning correctness, and coordinated multipart abort.
- Heartbeat-based failure detection + Placement Service: Enables the data routing layer to route around failed nodes within ~30 seconds.
Related Resources
- ch10-gaming-leaderboard - Redis sorted set for leaderboard (contrasting storage design)
- ch05-consistent-hashing - Consistent hashing deep dive
- distributed-system-components - Storage node architecture patterns
- key-patterns - Data sharding and replication patterns
Practice this design! Hard interview question. Be ready to:
- Explain erasure coding (k+m system, storage overhead) vs replication
- Draw the metadata vs data store separation
- Walk through multipart upload flow step by step
- Discuss consistent hashing for data placement
- Explain how checksums + GC ensure durability
Last Updated: 2026-04-13
Status: Very common at senior/staff levels β Must know erasure coding!