Chapter 6: Design an Ad Click Event Aggregation System
volume2 ad-tech click-aggregation streaming batch
Status: π© Interview ready
Difficulty: Hard
Time to complete: 50 min read + practice
Overview
An ad click aggregation system counts and analyzes ad click events in real-time and batch mode, powering billing (cost per click), analytics, and fraud detection for ad networks.
Real-world examples: Google Ads, Facebook Ads Manager, Amazon Advertising
Why this matters:
- Core data engineering question (appears at Meta, Google, Amazon, Stripe)
- Teaches Lambda architecture, stream processing, and approximate algorithms
- Tests trade-offs between exactness, latency, and fault tolerance
Problem Statement
Design an ad click aggregation system that:
- Counts ad clicks in near-real-time for billing and reporting
- Answers: βHow many clicks did ad X get in the last M minutes?β
- Returns top 100 most-clicked ads in last 1 minute
- Supports filtering by region, IP, user agent
- Corrects historical data when upstream data is delayed or replayed
Step 1: Requirements & Scope (5 min)
Functional Requirements
Clarifying questions:
- Click volume? β 1 billion ad clicks/day total; peak 50,000 clicks/second
- Query latency? β Near real-time (few minutes acceptable)
- Query types? β Click count for ad X in last M minutes; Top-100 ads in last 1 minute
- Filtering? β By region, IP address, user agent
- Accuracy? β Billing requires high accuracy; analytics can be approximate
- Historical corrections? β Yes β raw event data can be replayed to correct errors
- Retention? β Raw clicks: 7 days; Aggregated data: 90 days
Scope:
- Ingest click events and aggregate them over time windows
- Answer count queries and top-K queries
- Support filtering on dimensions
- Reconcile streaming results with batch results
Non-Functional Requirements
- High availability: Billing system must not lose click data
- Exactly-once semantics: Each click counted exactly once for billing
- Fault tolerance: System must recover from failures without data loss
- Scalability: Handle 50K clicks/sec at peak; scale horizontally
- Low latency: Aggregated results available within ~1 minute
- Data correctness: Batch reprocessing can correct streaming errors
Scale Estimates
Click volume: 1 billion clicks/day
Peak rate: 50,000 clicks/second
Click event size: ~50 bytes
{ad_id, click_timestamp, user_id, ip, country, user_agent}
Raw event storage: 50 bytes Γ 1B/day = 50 GB/day = 350 GB/week (7-day retention)
Aggregated data: Much smaller β 1 number per ad per minute
~1M ads Γ 1440 min/day Γ 8 bytes = ~11 GB/day
90-day aggregated storage: ~1 TB
Step 2: High-Level Design (10 min)
Lambda Architecture Overview
The core pattern for this system is Lambda Architecture, which runs two parallel pipelines:
Click Events
β
ββββββββββββββββ΄ββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββββββ
β Speed Layer β β Batch Layer β
β (Streaming) β β (Spark batch job) β
β Flink/Spark β β Runs every few hours β
β Streaming β β on raw click logs β
ββββββββββ¬ββββββββββ βββββββββββββ¬βββββββββββ
β β
β Fast, approximate β Slow, accurate
β (few minutes latency) β (hours latency)
βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββββββ
β Speed View β β Batch View β
β (recent results) β β (historical results) β
ββββββββββ¬ββββββββββ βββββββββββββ¬βββββββββββ
β β
ββββββββββββββββ¬βββββββββββββββββ
βΌ
ββββββββββββββββββββ
β Serving Layer β
β Merges views, β
β answers queries β
ββββββββββββββββββββ
Simplified Architecture
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Ad Click Aggregation β
β β
β Click Events Ingestion Stream Processing β
β ββββββββββββ βββββββββββ βββββββββββββββββββββββ β
β βAd Server ββββββββΆβ Kafka βββββββΆβ Flink / Spark β β
β βMobile Appβ β(buffer) β β Streaming β β
β βWeb Browserβ βββββββββββ β (window agg) β β
β ββββββββββββ β ββββββββββββ¬βββββββββββ β
β β β β
β β raw logs βΌ β
β βΌ ββββββββββββββββββββββββ β
β βββββββββββββββ β Aggregated Results β β
β β Raw Click β β DB (Cassandra/ β β
β β Log Store β β InfluxDB) β β
β β (S3/HDFS) β ββββββββββββ¬ββββββββββββ β
β ββββββββ¬βββββββ β β
β β βΌ β
β β batch ββββββββββββββββββββββββ β
β βββββββββββΆβ Batch Reprocessing β β
β β (Spark batch job) β β
β β corrects streaming β β
β ββββββββββββββββββββββββ β
β β
β Query API Visualization β
β ββββββββββββ ββββββββββββββββββββββββ β
β βREST API ββββββββΆβ Ads Analytics β β
β β(count, β β Dashboard β β
β β top-K) β ββββββββββββββββββββββββ β
β ββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Model
Raw click event:
{
"ad_id": "ad_12345",
"click_timestamp": 1712995200,
"user_id": "usr_78901",
"ip": "192.168.1.100",
"country": "US",
"user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X)"
}Aggregated result schema (stored in Cassandra/InfluxDB):
CREATE TABLE ad_click_aggregates (
ad_id TEXT,
window_start TIMESTAMP, -- start of 1-minute window
click_count BIGINT,
filter_region TEXT, -- optional filter dimension
PRIMARY KEY ((ad_id, filter_region), window_start)
) WITH CLUSTERING ORDER BY (window_start DESC);Query API Design
GET /v1/ads/{ad_id}/aggregated_count
?from=1712908800
&to=1712995200
&filter_region=US
&filter_user_agent=mobile
Response:
{
"ad_id": "ad_12345",
"from": 1712908800,
"to": 1712995200,
"click_count": 147823,
"filter": {"region": "US", "user_agent": "mobile"}
}
GET /v1/ads/top_k
?window_minutes=1
&k=100
Response:
{
"window_start": 1712995200,
"window_end": 1712995260,
"top_ads": [
{"ad_id": "ad_99999", "click_count": 45231},
{"ad_id": "ad_88888", "click_count": 41099},
...
]
}
Step 3: Deep Dive (25 min)
Streaming Aggregation: Tumbling vs Sliding Windows
Tumbling window (non-overlapping, fixed duration):
Time: |--- 1 min ---|--- 1 min ---|--- 1 min ---|
Window: [W1 ][W2 ][W3 ]
Each click belongs to exactly one window.
Simple to implement. Low memory.
Use case: "Clicks per ad per minute" for billing.
Sliding window (overlapping, slides by step):
Time: 0s 30s 60s 90s 120s
Window 1: |ββββββ 60s ββββββ|
Window 2: |ββββββ 60s ββββββ|
Window 3: |ββββββ 60s ββββββ|
Each click appears in multiple windows.
Expensive memory (store all events in current window).
Use case: "Top-K ads in last 1 minute" (continuously updated).
Flink window aggregation code sketch:
DataStream<ClickEvent> clickStream = env
.addSource(new FlinkKafkaConsumer<>("clicks", schema, props));
// Tumbling 1-minute window per ad_id
clickStream
.keyBy(click -> click.getAdId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new ClickCountAggregator())
.addSink(new CassandraSink<>());
// Top-K ads in last 1 minute (sliding window)
clickStream
.keyBy(click -> click.getAdId())
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
.aggregate(new ClickCountAggregator())
.keyBy(agg -> agg.getWindowStart()) // group all ads in same window
.process(new TopKProcessFunction(100)) // emit top 100 per window
.addSink(new RedisSink<>());Event Time vs Processing Time
Critical distinction:
Event time: When the click actually happened (in the user's browser/phone)
Processing time: When Kafka received the event (could be minutes later due to network)
Example problem:
User clicks ad at 12:00:00 on mobile with bad network
Event arrives in Kafka at 12:03:47 (3m 47s late!)
If we use processing time: counted in 12:03 window (wrong!)
If we use event time: counted in 12:00 window (correct for billing!)
Watermarks: Flinkβs mechanism to handle late-arriving events
Watermark definition: "I believe no events with timestamp < T will arrive anymore"
Watermark strategy:
watermark = max_event_time_seen - allowed_lateness
Example with 2-minute allowed lateness:
Max event time seen: 12:05:00
Watermark: 12:03:00
At watermark 12:03:00, Flink closes and emits results for all windows ending β€ 12:03:00
Events arriving after window is closed β counted as "late data"
Option 1: Drop late events
Option 2: Update window result and re-emit (side output)
Option 3: Accumulate in next window (approximate)
Top-K Ads Problem
Naive approach: Min-Heap
For each 1-minute window:
Maintain a min-heap of size K across all ads
For each ad's click count, push to heap if > min
Problem at scale:
1M ads Γ 8 bytes per count = 8 MB per node
Fine for single node, but aggregation across 100 Flink nodes is expensive:
β Need to merge 100 heaps of size 1M
β Full shuffle of all ad counts across network
Scalable approach: Two-level aggregation + Count-Min Sketch
Level 1 β Local aggregation (per Flink node):
Each node counts clicks for the ads it sees (subset of all ads)
Maintain local top-K heap (size K per node)
Emit local top-K every 10 seconds
Level 2 β Global merge (single aggregator node):
Receive local top-K lists from all nodes
Merge and re-rank β emit global top-K
Why this works:
Global top-K must be in at least one node's local top-K
(An ad can only be globally #1 if it was locally top-K on some node)
Reduces data shuffled from O(all ads) to O(K Γ num_nodes)
Count-Min Sketch for approximate counts:
Use case: We have 1M ads; tracking exact count for every ad is memory-intensive.
For top-K we need a fast approximate count structure.
Count-Min Sketch structure:
d hash functions Γ w counters per row (d rows total)
βββββββββββββββββββββββββββββββββββββββββββ
β Row 0 (hash_0): [12, 7, 0, 45, 3, ...] β
β Row 1 (hash_1): [5, 31, 0, 12, 8, ...] β
β Row 2 (hash_2): [9, 0, 44, 7, 21, ...] β
βββββββββββββββββββββββββββββββββββββββββββ
Update: for click on ad_id X:
Row 0: counters[hash_0(X) % w]++
Row 1: counters[hash_1(X) % w]++
Row 2: counters[hash_2(X) % w]++
Query: estimated count for ad_id X:
return min(row_0[hash_0(X) % w],
row_1[hash_1(X) % w],
row_2[hash_2(X) % w])
Properties:
- Always over-estimates (never under-estimates) β due to hash collisions
- Error bound: count β€ true_count + Ξ΅ Γ total_inserts with prob 1 - Ξ΄
- Fixed memory: d Γ w integers, regardless of number of distinct ads
- Typical config: d=5, w=2000 β 40 KB for up to 1M ads at 1% error
Trade-off: approximate for analytics; use exact count from DB for billing
Exactly-Once Delivery with Kafka
The problem:
Consumer crashes after processing 100 clicks but before committing offset
β Consumer restarts, reads same 100 clicks again
β 100 clicks counted twice in TSDB (duplicate billing!)
Solution: Kafka transactions + idempotent consumer
Flink's exactly-once mode (two-phase commit):
Phase 1 β Pre-commit:
Flink processes batch of events
Writes results to TSDB sink (in pre-committed transaction)
Commits Kafka offset checkpoint
Phase 2 β Commit:
If checkpoint succeeds β commit TSDB transaction
If failure β Flink restores from last checkpoint, replays Kafka from saved offset
TSDB sink must support transactions (Cassandra lightweight transactions,
or use idempotency key: (ad_id, window_start) as upsert key)
Idempotency approach (simpler):
Each aggregated write uses UPSERT with (ad_id, window_start) as key
If event is replayed, UPSERT overwrites with same value β no duplicate
Requires: processing is deterministic (same input β same output)
Kafka producer idempotency:
# Enable idempotent producer (deduplicates retried messages)
props.put("enable.idempotence", "true")
props.put("acks", "all")
props.put("retries", Integer.MAX_VALUE)
props.put("max.in.flight.requests.per.connection", "5")
Lambda Architecture: Data Reconciliation
Why batch reprocessing is needed:
Streaming has two failure modes:
1. Out-of-order events: Click at 12:00 arrives at 12:05.
Streaming counted it in 12:00 window (if watermark allows) or dropped it.
2. Processing bugs: Bug in Flink job produced wrong counts for 3 hours.
Batch layer fixes both:
Every 3-6 hours, Spark reads all raw clicks from S3 (immutable log)
Recomputes exact aggregates from scratch (no watermark approximations)
Writes "official" corrected counts to Batch View table
Serving layer: If batch result available, serve batch. Else serve streaming.
Reconciliation flow:
Streaming result: ad_12345 had 14,823 clicks in 12:00-13:00 window
Batch result: ad_12345 had 14,910 clicks in 12:00-13:00 window (87 late clicks)
Serving layer logic:
batch_result = query batch_view(ad_id=12345, window=12:00-13:00)
if batch_result is not None:
return batch_result # Use accurate batch
else:
return stream_result # Fall back to streaming (recent data)
Fault Tolerance: Checkpointing and Replay
Flink checkpointing:
Flink takes a consistent snapshot of all operator state every 1 minute:
βββ Kafka consumer offsets (which messages were processed)
βββ Window state (click counts accumulated in current window)
βββ Output sink state (pre-committed transactions)
Checkpoint stored in distributed storage (S3 / HDFS)
Failure recovery:
Flink restores from last checkpoint (say, t-1 minute)
Kafka replays messages from checkpoint offset
Reprocesses last 1 minute of clicks
Exactly-once: idempotent UPSERT to TSDB prevents double counting
Trade-off: checkpoint frequency vs recovery time
Every 1 min checkpoint β at most 1 min of replay on failure (fast recovery)
Every 10 min checkpoint β at most 10 min of replay (more re-work)
Kafka retention for replay:
Kafka retention: 7 days (matches raw click log retention)
If Flink is down for 2 hours:
Kafka has buffered all 2 hours of clicks
When Flink restarts, replays from last committed offset
No data lost as long as failure < 7 days
This is why Kafka is the source of truth for raw events, not the TSDB
Database Choice for Aggregated Results
Why Cassandra?
| Requirement | Cassandra | MySQL |
|---|---|---|
| Write throughput | Millions/sec | ~10K/sec |
| Time-range queries | Yes (clustering key on timestamp) | Yes (slower) |
| Horizontal scale | Built-in (ring topology) | Hard (sharding) |
| Availability | Always writeable (AP) | Needs failover |
| Aggregations | No (done in app layer) | Yes (SQL) |
| Use case fit | Excellent for pre-aggregated writes | Overkill |
Cassandra schema:
-- Click count per ad per 1-minute window
CREATE TABLE ad_click_1min (
ad_id TEXT,
window_start BIGINT, -- Unix timestamp of window start
region TEXT,
click_count COUNTER, -- Cassandra COUNTER type (atomic increment)
PRIMARY KEY ((ad_id, region), window_start)
) WITH CLUSTERING ORDER BY (window_start DESC)
AND default_time_to_live = 7776000; -- 90 days TTLDesign Summary
Full Lambda Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Ad Click Aggregation β Lambda Architecture β
β β
β Click Sources β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β βAd Server β βMobile Appβ βWeb Browserβ β
β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β
β β β β β
β ββββββββββββββββ΄βββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββ β
β β Kafka Cluster βββββββββββββββββββββββββββββββββ
β β "ad-clicks" β ββ
β β retention: 7d β ββ
β ββββββββββ¬βββββββββ ββ
β β ββ
β βββββββββββββββββΌββββββββββββββββββββββββ ββ
β βΌ βΌ βΌ ββ
β βββββββββββββββ ββββββββββββββββ ββββββββββββββββββ ββ
β βRaw click logβ β Flink Stream β β Spark Batch β ββ
β βwriter (S3/ β β Processing β β Job (every 3h) β ββ
β β HDFS) β β (speed layer)β β(batch layer) β ββ
β βββββββββββββββ ββββββββ¬ββββββββ βββββββββ¬βββββββββ ββ
β β β ββ
β ββββββββββΌβββββββ ββββββββββΌβββββββββ ββ
β β Speed View β β Batch View β ββ
β β Cassandra β β Cassandra β ββ
β β (recent, fast)β β (accurate, β ββ
β ββββββββββ¬βββββββ β corrected) β ββ
β β ββββββββββ¬ββββββββββ ββ
β β β ββ
β βββββββββββββ¬ββββββββββββ ββ
β βΌ ββ
β βββββββββββββββββββ ββ
β β Serving Layer βββββββββββββββββββββ
β β (merge views, β β
β β REST API) β β
β βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Key Decisions Summary
| Decision | Choice | Reasoning |
|---|---|---|
| Architecture | Lambda (speed + batch layers) | Near-real-time results + accurate corrections |
| Stream processor | Flink | Native event-time windows, exactly-once, checkpointing |
| Message broker | Kafka (7-day retention) | Durable buffer, replay for batch and recovery |
| Top-K algorithm | Count-Min Sketch + 2-level aggregation | Fixed memory, sub-linear time, approximate is OK |
| Exactly-once | Kafka transactions + idempotent UPSERT | Prevent double-counting in billing |
| Database | Cassandra | High write throughput, time-range queries, built-in TTL |
| Reconciliation | Batch overwrites streaming for historical windows | Batch is source of truth for billing |
Interview Questions & Answers
Q: What is Lambda architecture and when would you use it for this problem?
A: Lambda architecture runs two parallel data pipelines: a speed layer (streaming, low latency, approximate) and a batch layer (batch processing, high latency, accurate). The serving layer merges both views. Use it when you need both near-real-time results (to show advertisers current click counts) and accurate historical results (for billing correction). The batch layer corrects late-arriving events and bugs in the streaming layer.
Q: How does Count-Min Sketch work and what are its limitations?
A: Count-Min Sketch maintains a 2D array of counters (d rows Γ w columns). On update for item X: increment one counter in each row at position hash_i(X) % w. On query: return the minimum counter across all rows. The min is taken because hash collisions only inflate counts (never deflate). Limitation: always over-estimates β due to collisions, a rare ad might appear popular. Error is bounded: estimated_count β€ true_count + Ξ΅ Γ total_events with high probability. Not suitable for billing β use for approximate analytics (top-K ranking).
Q: What is the difference between event time and processing time, and why does it matter?
A: Event time is when the click physically occurred (e.g., 12:00:00 on the userβs phone). Processing time is when the system ingested it from Kafka (may be minutes later due to network delay). For billing, you must use event time: a click that happened at 12:00 should be billed in the 12:00 window regardless of when it arrived. Watermarks tell Flink when itβs safe to close a window: βI have seen all events up to time T - latency_tolerance, so close the 12:00 window now.β
Q: How do you achieve exactly-once semantics in a Kafka β Flink β Cassandra pipeline?
A: Three-part answer: (1) Flink checkpoint β Flink snapshots Kafka offsets + window state every minute to durable storage. On crash, restores from checkpoint and replays from Kafka. (2) Kafka transactions β ensure atomic writes of checkpoints and offset commits. (3) Idempotent sink β Cassandra writes use UPSERT with (ad_id, window_start) as primary key. If the same batch is replayed after a failure, the UPSERT overwrites with identical data β no duplicate counts.
Q: How does the batch layer correct errors from the streaming layer?
A: The batch layer reads the immutable raw click log from S3/HDFS every 3-6 hours and recomputes exact aggregates using Spark. Late-arriving events that the streaming layer missed (because they arrived after the watermark closed the window) are included in the batch results. The serving layer checks the batch view first; if a batch result exists for a requested time range, it serves the batch result (more accurate) rather than the streaming result. Batch results βoverwriteβ streaming results for past windows.
Key Takeaways
- Lambda architecture is the right pattern for systems that need both low-latency results and high accuracy β the speed layer serves recent queries, the batch layer corrects historical data.
- Event time vs processing time is critical for billing accuracy β always use event time with watermarks to handle late-arriving events.
- Count-Min Sketch solves the top-K problem in fixed memory regardless of the number of distinct ads, at the cost of slight over-estimation.
- Exactly-once delivery requires three layers: Kafka offset checkpointing + Flink 2PC + idempotent UPSERT keys in the sink DB.
- Kafkaβs 7-day retention serves as the replay buffer for both batch reprocessing and failure recovery β it is the single source of truth for raw events.
- Two-level aggregation for top-K (local top-K per node β global merge) avoids a full shuffle of all ad counts across the network.
- Cassandra is the right choice for storing pre-aggregated results β its counter column type, built-in TTL, and horizontal scale match the write-heavy, time-range query workload.
Related Resources
- distributed-system-components - Kafka, Flink, Cassandra, Spark
- key-patterns - Stream processing patterns, top-K, aggregation
- ch05-metrics-monitoring - Related: Kafka buffer, TSDB, streaming ingestion
- ch01-scale-from-zero-to-millions - Scaling stateful stream processors
Last Updated: 2026-04-13
Status: Interview ready β Hard question, common at Meta/Google/Amazon data infra rounds