Chapter 11: Batch Processing
ddia-2e batch-processing spark etl ml-pipelines dataframes
Status: Notes complete
Overview
Batch processing operates on bounded (finite) datasets: a job starts, processes all the input data, produces output, and terminates. The 2nd edition significantly restructures the 1st edition’s Chapter 10, moving away from Hadoop MapReduce as the primary lens and centering instead on modern frameworks: Apache Spark, Apache Flink (batch mode), dbt, and Apache Beam. The chapter covers the mechanics of distributed batch execution—shuffling data, joining datasets, expressing transformations via DataFrame APIs or SQL—and then examines four dominant use cases: ETL, analytics, machine learning, and serving derived data.
Core principle: Input data is immutable; batch jobs produce derived outputs that can be regenerated by re-running the job. This immutability enables fault tolerance (re-run the job), historical reprocessing (apply new logic to old data), and debugging (inspect intermediate results at any step).
Why this chapter matters: Almost every production data system depends on batch jobs somewhere in its pipeline—loading a data warehouse, training an ML model, building a search index, or generating recommendation scores. Understanding the mechanics of batch execution determines how fast, fault-tolerant, and cost-effective those jobs are.
Key Concepts
What is Batch Processing?
Batch vs. request/response vs. stream:
| Paradigm | Input | Latency | Triggered by |
|---|---|---|---|
| Request/response (OLTP) | One request (small) | Milliseconds | User action |
| Batch processing | Bounded dataset (large) | Minutes to hours | Scheduler / trigger |
| Stream processing | Unbounded events | Milliseconds to seconds | Events as they arrive |
When batch is the right choice:
- Processing yesterday’s data (daily refresh)
- Training ML models on historical data
- Loading data warehouse from operational databases
- Generating derived views (search indexes, recommendation scores)
- Reprocessing historical data with new business logic
The philosophy remains constant across MapReduce, Spark, Flink, and dbt: immutable inputs → deterministic transformations → derived outputs. The frameworks differ in efficiency and API ergonomics, not in this fundamental design.
Shuffling Data
The shuffle is the most important—and most expensive—operation in distributed batch processing. Understanding it is essential for writing efficient jobs.
What the shuffle does:
- Each worker holds a portion of the data (its partitions)
- A grouping/join operation requires all records with the same key to be co-located on the same worker
- The shuffle redistributes records across the network so that records with the same key end up on the same worker
- After shuffling, each worker can process its key group independently
Why shuffle is expensive:
- Network I/O: All data may cross the network (N workers × M workers connections)
- Disk spill: Intermediate data buffered to disk when memory exhausted
- Sort: Data typically sorted by key as part of the shuffle
- Blocking: The next stage cannot begin until all upstream stages complete (global barrier)
- Skew: If one key is much larger than others, one worker gets overwhelmed (“data skew”)
Minimizing shuffle cost:
| Strategy | Mechanism | When to apply |
|---|---|---|
| Broadcast join | Replicate small table to every worker | One side < few GB |
| Co-partitioning | Pre-partition both inputs by the same key | Persistent datasets joined repeatedly |
| Partial aggregation (combiner) | Aggregate locally before shuffle | Commutative/associative ops (sum, count) |
| Partition pruning | Only shuffle relevant partitions | Filtered joins |
| Salting | Add random prefix to skewed keys; aggregate in two phases | Highly skewed keys |
Data skew handling (salting example):
# Problem: user_id=12345 has 1 billion records; all go to one reducer
# Solution: Add random salt prefix to spread load
skewed_key = f"{random_int(0,10)}_{user_id}" # 10x parallelism
# Then aggregate results across salt values in second pass
Shuffle in different frameworks:
- MapReduce: Shuffle always happens; output written to disk first
- Spark: Shuffle triggered by wide transformations (groupBy, join, repartition); spills to disk only when needed
- Flink (batch): Pipelined shuffle; can stream data between operators without blocking
- dbt: No shuffle (runs inside a SQL database that handles its own execution)
Joins and Grouping
Three fundamental join strategies in batch processing:
1. Reduce-Side Join (Sort-Merge Join)
Both datasets shuffled by the join key → records with same key arrive at the same reducer → merge.
Dataset A: (user_id=1, name="Alice")
Dataset B: (user_id=1, purchase_id=99, amount=50)
Mapper A: emit (user_id=1, {source: A, name: "Alice"})
Mapper B: emit (user_id=1, {source: B, purchase_id: 99, amount: 50})
Shuffle → all user_id=1 records → same reducer
Reducer: merge A record with all B records for user_id=1
Characteristics:
- Works for any size join (large × large)
- Both inputs shuffled → expensive
- Natural secondary sort: source tag in key ensures A records arrive before B records
2. Map-Side Join (Broadcast Hash Join)
One dataset is small enough to fit in memory. Load it as a hash table into every worker; the large dataset is processed locally without any shuffle.
Small table (10MB): hash_table = {user_id → {country, plan, signup_date}}
Large table (1TB):
for each record in large_table:
user_info = hash_table.get(record.user_id)
emit(merged record)
No network transfer for large table!
Characteristics:
- No shuffle for the large side
- Small side broadcast = tiny network cost
- Requires small side to fit in RAM per worker (typically < 1–10 GB)
- Most common optimization in practice (dimension table joins)
Spark broadcast join:
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), on="user_id")3. Partitioned Hash Join (Bucketed Join)
Both datasets pre-partitioned (bucketed) by the join key. Each worker joins only its local partition with the corresponding partition of the other dataset. No full shuffle needed.
Dataset A: already bucketed by user_id into 100 buckets
Dataset B: already bucketed by user_id into 100 buckets
Worker 1: joins A_bucket_1 with B_bucket_1 (local, no shuffle)
Worker 2: joins A_bucket_2 with B_bucket_2 (local, no shuffle)
...
Worker 100: joins A_bucket_100 with B_bucket_100
Characteristics:
- Both datasets must be bucketed the same way (same key, same number of buckets)
- Setup cost: one-time bucketing; then all subsequent joins are shuffle-free
- Used in Hive bucketed joins, Delta Lake Z-ordering, Spark AQE dynamic partition pruning
Grouping follows the same mechanics as reduce-side join: the shuffle redistributes records by group key, then each worker aggregates its own key group.
Query Languages
Three ways to express batch transformations:
1. Low-level API (MapReduce / RDD API)
# Spark RDD API — explicit, verbose
rdd = sc.textFile("s3://bucket/events/*.parquet")
counts = (rdd
.map(lambda line: (line.split(',')[2], 1)) # extract event_type
.reduceByKey(lambda a, b: a + b) # count per type
.sortBy(lambda x: -x[1]) # sort descending
)When to use: Rarely. Only when the exact execution plan must be controlled or when applying custom operators not available in higher-level APIs.
Drawbacks: Verbose; no query optimizer; hard to read; no schema enforcement.
2. DataFrame / Dataset API
# Spark DataFrame API — typed, optimized
df = spark.read.parquet("s3://bucket/events/")
result = (df
.groupBy("event_type")
.agg(count("*").alias("n"))
.orderBy(col("n").desc())
)DataFrames provide a schema-aware, optimizable abstraction. The Catalyst query optimizer rewrites the logical plan for efficiency (predicate pushdown, column pruning, join reordering).
Benefits over RDD API:
- Catalyst optimizer applies 50+ rules to improve performance
- Automatic code generation (Tungsten, Whole-Stage Code Generation)
- Type safety (Dataset API in Scala/Java)
- Interoperable with SQL
3. SQL (Hive SQL / Spark SQL / Flink SQL / dbt)
-- dbt model: user_event_counts.sql
SELECT
event_type,
COUNT(*) AS n,
DATE_TRUNC('day', event_time) AS event_date
FROM {{ ref('events') }}
WHERE event_date >= '2026-01-01'
GROUP BY event_type, event_date
ORDER BY n DESCSQL is the highest-level abstraction and produces the most optimizer-friendly representation. Modern SQL optimizers (CBO — Cost-Based Optimizer) use table statistics to choose join order, join strategy, and partition pruning.
Comparison:
| API Level | Optimizer Access | Flexibility | Verbosity | Who uses it |
|---|---|---|---|---|
| MapReduce/RDD | None | Maximum | Very high | Legacy; advanced custom ops |
| DataFrame API | Full (Catalyst) | High | Medium | Data engineers, ML engineers |
| SQL | Full (CBO + Catalyst) | Medium | Low | Analysts, dbt users, BI |
DataFrames
Why DataFrames became the standard for batch processing:
- Columnar execution: Process one column at a time → CPU cache efficiency → 10-100x faster than row-by-row
- Vectorized operations: Batch operations on arrays using SIMD CPU instructions
- Lazy evaluation: Build logical plan → optimize → execute only on action
- Interop: Same DataFrame API works for batch (Spark, Flink) and interactive (Pandas, DuckDB, Polars)
Key DataFrame operations:
| Category | Operations | Shuffle? |
|---|---|---|
| Narrow (partition-local) | select, filter, withColumn, drop | No |
| Wide (require shuffle) | groupBy, join, distinct, repartition, sort | Yes |
| Action (trigger execution) | count, show, write, collect, toPandas | Triggers execution |
DataFrame libraries comparison (2026):
| Library | Scale | Where runs | Lazy? | Best for |
|---|---|---|---|---|
| Pandas | Single machine | Python process | No (eager) | <10GB, data exploration |
| Polars | Single machine | Rust-backed Python | Yes | 10–100GB, fast local processing |
| DuckDB | Single machine | In-process SQL engine | Via SQL | 1MB–500GB, analytical SQL |
| Spark DataFrame | Cluster | JVM + Python | Yes | 100GB–PB, distributed batch |
| Flink Table API | Cluster | JVM | Yes | Streaming + batch unified |
| dbt | Inside DB/warehouse | SQL on warehouse | Via SQL | ELT transformations in warehouse |
Pandas vs Spark anti-pattern: Using .toPandas() on a large Spark DataFrame brings all data to the driver — memory crash. Only call .toPandas() on small aggregated results.
Batch Use Cases
Extract–Transform–Load (ETL)
Traditional ETL (extract from source → transform in pipeline → load to warehouse):
Source DBs ETL Tool Data Warehouse
(PostgreSQL, ──► Informatica ──► Snowflake / BigQuery
MySQL, / Spark (star schema)
APIs) / Airflow
Modern ELT (extract and load first → transform inside the warehouse):
Source DBs Fivetran/Airbyte Data Warehouse dbt
(PostgreSQL, ──► (connectors) ──► Snowflake ──► (SQL transforms
MySQL, BigQuery in-warehouse)
APIs) Databricks
ETL vs ELT:
| Aspect | ETL | ELT |
|---|---|---|
| Transform location | External compute (Spark, Informatica) | Inside the warehouse (SQL) |
| When to transform | Before loading | After loading |
| Schema requirement | Schema-on-write (must know schema upfront) | Schema-on-read (load raw, transform later) |
| Compute | Separate cluster | Warehouse compute |
| Modern tooling | Apache Spark, Airflow | Fivetran/Airbyte + dbt |
| Best for | Complex transformations needing custom code | SQL-expressible transformations at warehouse scale |
dbt as the ELT transformation layer:
# dbt model: orders_enriched.sql
# Each .sql file = one table/view in the warehouse
# dbt manages the DAG (dependency order)
{{ config(materialized='incremental', unique_key='order_id') }}
SELECT
o.order_id,
o.created_at,
c.customer_name,
c.tier,
p.product_name,
o.amount
FROM {{ ref('raw_orders') }} o
JOIN {{ ref('raw_customers') }} c ON o.customer_id = c.customer_id
JOIN {{ ref('raw_products') }} p ON o.product_id = p.product_id
{% if is_incremental() %}
WHERE o.created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}Key dbt concepts:
- Models: SQL SELECT statements, materialized as tables/views
- Refs:
{{ ref('model_name') }}creates a dependency → dbt builds the DAG automatically - Incremental models: Only process new data (append or upsert) rather than full recompute
- Tests: Built-in assertions (
not_null,unique,accepted_values,relationships)
Analytics
Batch analytics pipeline:
Raw events Transformation Aggregates BI Layer
(Parquet on ──► (Spark / dbt) ──► (Delta/Iceberg ──► Tableau
S3, date- tables) Looker
partitioned) Redash
Key analytics patterns:
- Fact tables (events, transactions) + dimension tables (users, products) → star schema
- Partition pruning: Events partitioned by date → query only the needed date range
- Columnar formats: Parquet/ORC → only read columns referenced in query (column pruning)
- Materialized aggregates: Pre-compute common aggregations (daily/weekly active users) as batch jobs
Aggregation granularity trade-off:
- Raw events: maximum flexibility, maximum storage cost
- Pre-aggregated (hourly/daily): fast queries, less flexible (can’t drill into individual events)
- Lakehouse approach: Store both raw (for reprocessing) and aggregated (for fast queries)
Machine Learning
Batch ML pipeline stages:
Feature Engineering → Model Training → Model Evaluation → Model Serving
Stage 1: Feature Engineering (Spark / dbt)
Transform raw data into numerical features suitable for ML models:
# Spark feature pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
# Encode categorical features
indexer = StringIndexer(inputCol="country", outputCol="country_idx")
# Assemble feature vector
assembler = VectorAssembler(
inputCols=["age", "tenure_days", "country_idx", "spend_30d"],
outputCol="features"
)
# Normalize
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")Temporal train/test split — critical for preventing data leakage:
Timeline: ──────────────────────────────────────────────►
│
[──────── TRAIN ────────────│──── TEST ────]
Events before cutoff │ Events after cutoff
cutoff
date
Never use random split — it allows future information to leak into training (e.g., a feature derived from events that “happen after” the prediction target).
Stage 2: Model Training
| Scale | Framework | Notes |
|---|---|---|
| Single machine | scikit-learn, XGBoost, LightGBM | <10GB feature matrix |
| Distributed tree | Spark MLlib (GBT), XGBoost-on-Spark | 10GB–1TB |
| Deep learning | PyTorch DDP, PyTorch Lightning | GPU cluster |
| Large language model fine-tuning | FSDP, DeepSpeed | Multi-GPU, 100GB+ |
Stage 3: Model Evaluation
- Offline metrics: AUC, precision/recall, RMSE
- Sliced evaluation: measure performance by user segment, geography, date range
- Temporal holdout: evaluate on most recent data (closest to production distribution)
Stage 4: Model Serving — Batch Scoring vs Online Inference
| Batch Scoring | Online Inference | |
|---|---|---|
| When | Nightly batch job | At request time (real-time) |
| Latency | Minutes–hours | <100ms |
| Staleness | Hours (stale until next batch) | Fresh (latest model + features) |
| Scale | Pre-compute all user/item pairs | Only compute for requesting user |
| Infrastructure | Spark + Redis/DynamoDB cache | Model server (TorchServe, BentoML) |
| Best for | Recommendations, content ranking | Fraud detection, real-time personalization |
Batch scoring pipeline:
# Score all users overnight; write to Redis for low-latency serving
model = load_model("s3://models/recommender/v42/")
users = spark.read.parquet("s3://features/users/2026-05-29/")
items = spark.read.parquet("s3://features/items/latest/")
# Generate user×item score matrix
scores = model.transform(users.crossJoin(items))
top_n = scores.groupBy("user_id").agg(
collect_list(struct("item_id", "score")).alias("recommendations")
)
# Write to Redis for online serving
top_n.write.format("redis").option("host", "redis-cluster").save()Serving Derived Data
Batch processing produces derived data — outputs that can be regenerated from immutable sources. Key architectural patterns for serving this derived data:
Lambda Architecture:
Raw data ──┬──► Batch layer (Spark, daily jobs) ─┬──► Serving layer
│ (accurate, │ (merged view:
│ complete, │ queries hit
└──► Speed layer (Flink, real-time) ──┘ both layers)
(low-latency,
approximate,
recent only)
Lambda pros and cons:
- Pro: Batch layer provides accurate historical results; speed layer provides low-latency recent results
- Con: Two codebases to maintain (batch logic + stream logic); results may diverge; complex merge at query time
Kappa Architecture:
Raw events ──► Single stream processor ──► Derived views
(Flink, Kafka Streams) (Kafka topics, DB tables)
(handles both historical
reprocessing and real-time)
Reprocessing = restart stream job from beginning of Kafka log with new logic.
Kappa pros and cons:
- Pro: Single codebase; stream processing handles both historical and real-time
- Con: Reprocessing may be slow; stream processors less efficient than batch for historical data
- Con: Requires log retention long enough for full reprocessing (expensive storage)
Lambda vs Kappa decision:
| Factor | Lambda | Kappa |
|---|---|---|
| Complexity | Higher (two systems) | Lower (one system) |
| Reprocessing cost | Low (efficient batch) | Can be high (stream replay) |
| Query latency | Low (batch + stream) | Depends on stream latency |
| Operational burden | Two pipelines to maintain | One pipeline |
| 2026 trend | Being replaced | Growing adoption |
Modern trend (2026): Streaming Lakehouse (Apache Iceberg + Flink) blurs the line — stream processing writes to Iceberg tables with ACID guarantees, which BI tools query directly. Lambda architecture is increasingly replaced by streaming lakehouse for new systems.
Comparison Tables
Batch Processing Frameworks (2026)
| Framework | Paradigm | API | Scale | State | Best for |
|---|---|---|---|---|---|
| Hadoop MapReduce | Batch only | Java API | Petabyte | Disk | Legacy; not recommended for new projects |
| Apache Spark | Batch + micro-batch streaming | Python, Scala, SQL | 100GB–PB | Memory + disk | General batch, ML pipelines, ETL |
| Apache Flink (batch) | Batch + native streaming | Java, Python, SQL | 100GB–PB | Memory + disk | Unified batch+stream, low-latency |
| dbt | SQL transforms | SQL | Warehouse-scale | N/A | Analytics ELT inside data warehouse |
| Apache Beam | Portable pipeline | Python, Java | Cloud-scale | Managed | Portability across runners |
| DuckDB | Analytical SQL | SQL | Single machine | In-process | <500GB; local/notebook analytics |
| Polars | DataFrame | Python | Single machine | In-process | <100GB; fast local processing |
ETL vs ELT Detailed Comparison
| Aspect | Traditional ETL | Modern ELT |
|---|---|---|
| Transform timing | Before loading | After loading |
| Transform location | External compute | Inside warehouse |
| Schema approach | Schema-on-write | Schema-on-read |
| Raw data retention | Discarded after transform | Retained in raw layer |
| Tooling | Informatica, SSIS, Talend, Spark | Fivetran/Airbyte + dbt |
| Debugging | Harder (transform happened outside) | Easier (SQL in warehouse, queryable) |
| Flexibility | Less flexible (fixed schema) | More flexible (re-transform from raw) |
| Cost model | Compute cluster cost | Warehouse compute cost |
Join Strategy Selection
| Join type | Input A size | Input B size | Strategy | Why |
|---|---|---|---|---|
| Large × Small | >10GB | <1GB | Broadcast hash join | No shuffle for large side |
| Large × Large (sorted) | Any | Same key sort | Sort-merge join | Merge in one pass |
| Large × Large (bucketed) | Pre-bucketed | Same buckets | Partitioned hash join | No cross-network shuffle |
| Large × Large (raw) | >10GB | >10GB | Sort-merge with shuffle | No alternative |
Important Points Summary
- Shuffle is the bottleneck: Everything in distributed batch is designed to minimize shuffle. The broadcast join is the single most important optimization for common large-small joins.
- DataFrames replaced raw MapReduce: The DataFrame/SQL API gives the Catalyst optimizer visibility into the full computation, enabling automatic broadcast join detection, predicate pushdown, and column pruning.
- ETL is dying; ELT is the standard: Modern data stacks load raw data first (Fivetran/Airbyte), then transform with dbt inside a cloud warehouse. No separate Spark ETL cluster needed for SQL-expressible transforms.
- Temporal splits are mandatory for ML: Random splits leak future information; always split on time for time-series data.
- Batch scoring vs online inference: Batch scoring (pre-compute overnight, serve from cache) is appropriate when predictions are needed for all users; online inference is needed when predictions require request-time context.
- Lambda architecture is complexity debt: Maintaining parallel batch and stream pipelines adds operational burden. Kappa or streaming lakehouse is the 2026 preference.
- dbt models are versioned, tested batch jobs: SQL SELECT statements as first-class artifacts with lineage, tests, and documentation. This is the analytics engineering standard.
- Immutability enables reprocessing: The most powerful property of batch: change the business logic, re-run the job from the immutable source, produce new derived outputs. No data migration needed.
- Flink batch mode matches Spark performance: Flink 1.18+ batch mode with pipelined shuffle provides performance comparable to Spark, with the advantage of a unified streaming+batch runtime.
- Skew is the silent killer: A join on a highly skewed key (e.g., a celebrity user with millions of followers) causes one worker to process 99% of the data. Salting and two-phase aggregation are the solutions.
Modern Context (2026)
The Lakehouse has replaced the data warehouse + data lake duality:
- Delta Lake (Databricks), Apache Iceberg (Netflix, Apple), Apache Hudi (Uber)
- Parquet + transaction log on object storage (S3, GCS, Azure Blob)
- ACID writes, time travel, schema evolution — formerly warehouse-only features, now on cheap object storage
- Batch jobs write to Lakehouse tables; BI tools (Trino, DuckDB, Tableau) query directly
dbt is the analytics engineering standard:
- 50,000+ organizations using dbt in 2026
dbt Core(open source) +dbt Cloud(managed) +dbt Semantic Layer- Works with Snowflake, BigQuery, Databricks, DuckDB, Redshift, and 20+ adapters
- ELT pattern: Fivetran/Airbyte for ingestion → dbt for transformation → BI for presentation
Spark 4.x improvements:
- Adaptive Query Execution (AQE): dynamically changes join strategy based on runtime statistics
- Photon engine (Databricks): C++-based vectorized execution, 2–12x faster than JVM Spark
- Spark Connect: decoupled client/server architecture (run Spark from notebooks without cluster driver)
Apache Polars is disrupting Pandas:
- Rust-based DataFrame library with lazy evaluation and query optimizer
- 5–20x faster than Pandas for most workloads
- Integrates with Arrow and Parquet for zero-copy data sharing with DuckDB and Spark
ML pipelines in 2026:
- Feature stores (Feast, Tecton, Hopsworks) separate feature engineering from model training
- Batch feature pipelines write features to feature store; models read from feature store
- MLflow / Weights & Biases for experiment tracking and model registry
- Batch scoring at PB scale: Spark + Ray (distributed Python) replacing custom MapReduce scoring
Apache Beam and cloud portability:
- Apache Beam provides a portable pipeline API that runs on Flink, Spark, Google Dataflow
- Enables write-once, run-anywhere batch (and stream) pipelines
- Useful when multi-cloud or vendor neutrality is required
Questions for Reflection
- A batch job joining a 2TB events table with a 500MB user profile table is running slowly. Walk through the optimization steps you would take, starting from the most impactful.
- Your company currently uses Lambda architecture (Spark batch + Flink streaming). What are the arguments for migrating to Kappa architecture, and what risks must you manage during the migration?
- You need to train a customer churn prediction model on 18 months of subscription events. How do you structure the feature engineering pipeline to prevent data leakage, and how do you set up train/test split?
- When would you choose dbt over Spark for a data transformation task? When would you choose Spark over dbt?
- Explain why the shuffle phase is a global barrier. What happens if one mapper is 10x slower than all others (“straggler”)? How do Spark and MapReduce handle stragglers differently?
- A data science team wants daily batch-scored recommendations for 50 million users, with each user getting top-20 items from a catalog of 2 million items. Describe the full batch scoring pipeline architecture.
Related Resources
- ch12-stream-processing — Stream processing: the unbounded counterpart to batch
- ch10-consistency-and-consensus — Fault tolerance foundations: how distributed systems agree on state
- ch10-batch-processing — 1st edition Ch10 (MapReduce-focused; good historical context)
- ch04-storage-and-retrieval — Columnar storage (Parquet/ORC) fundamentals
- ch06-replication — Replication feeds CDC pipelines that batch jobs consume
Last Updated: 2026-05-29