Module 06: Multi-Agent Systems — Exercises
These exercises progress from implementing core patterns to designing full production systems. Complete them in order. Each builds intuition that the next one requires.
Exercise 1: Implement DAG Task Execution
Objective: Build a DAG-based task executor that runs independent tasks in parallel and sequential tasks in dependency order.
Background: The orchestrator_subagent.py example runs subagents sequentially for clarity. In a real system, you want to detect which tasks are independent and run them in parallel, while correctly serializing tasks that have dependencies.
Tasks:
-
Define a
TaskNodedataclass:@dataclass class TaskNode: id: str instruction: str depends_on: list[str] # task IDs this task depends on result: Optional[str] = None status: str = "pending" # pending | running | done | failed -
Implement
topological_generations(nodes: list[TaskNode]) -> list[list[TaskNode]]:- Returns a list of “generations”, where each generation is a list of tasks that can run in parallel
- All tasks in generation N depend only on tasks in generations 0..N-1
- Example: [FetchA, FetchB, FetchC] is generation 0; [SumA, SumB, SumC] is generation 1; [Synthesize] is generation 2
-
Implement
execute_dag(nodes: list[TaskNode], client) -> dict[str, TaskNode]:- For each generation, run all tasks in parallel using
asyncio.gather() - After each generation completes, inject results into the dependent tasks’ context
- If a task fails, mark all downstream tasks as “skipped”
- Return a dict of task_id → TaskNode with final statuses
- For each generation, run all tasks in parallel using
-
Test with a 6-node research DAG:
fetch_paper_1 ──► summarize_1 ──┐ fetch_paper_2 ──► summarize_2 ──┤──► synthesize fetch_paper_3 ──► summarize_3 ──┘Verify that all fetch tasks run in parallel, all summarize tasks run in parallel (after fetches), and synthesize runs last.
Stretch goal: Add a retry mechanism at the DAG executor level. When a task fails, retry it up to 2 times before marking it as failed and propagating the failure downstream.
Exercise 2: Typed Handoff Protocol with Pydantic Validation
Objective: Enforce strict inter-agent communication contracts using Pydantic. Learn to catch handoff failures at the boundary.
Background: In the examples, subagent results are returned as unvalidated dicts. In production, you want schema validation at every agent boundary. A subagent that returns malformed output should be caught immediately, not cause a cryptic failure 3 steps later.
Tasks:
-
Define Pydantic models for the research pipeline handoff:
class ResearchSubtaskResult(BaseModel): findings: str supporting_details: list[str] confidence: Literal["high", "medium", "low"] sources_referenced: int = 0 class OrchestrationResult(BaseModel): task_id: str agent_id: str status: Literal["success", "partial", "failed"] data: Optional[ResearchSubtaskResult] = None errors: list[str] = [] schema_version: str = "1.0" -
Modify
run_subagent()to:- Validate the LLM’s JSON output against
ResearchSubtaskResult - On
ValidationError: return anOrchestrationResultwithstatus="failed"and the validation errors inerrors - On success: return
OrchestrationResultwithstatus="success"and the validated data
- Validate the LLM’s JSON output against
-
Write a test that:
- Creates a mock LLM response with a missing required field
- Verifies your validator catches it and returns a failed result
- Creates a valid mock response and verifies it passes through correctly
-
Extend the schema with a
schema_versioncheck: if the version field is missing or"2.0"(future schema), log a warning but attempt to process it anyway (backwards-compatible degradation).
Stretch goal: Implement a schema migration: define ResearchSubtaskResultV2 (adds a methodology field), and write a migrate_v1_to_v2(v1: ResearchSubtaskResult) -> ResearchSubtaskResultV2 function that fills in sensible defaults for the new field.
Exercise 3: Circuit Breaker for Agent Reliability
Objective: Implement a circuit breaker that stops routing to a failing agent, protecting the overall pipeline from a consistently failing subagent.
Background: Without a circuit breaker, a subagent that fails every call will cause the orchestrator to waste time and tokens on retries. A circuit breaker detects the failure pattern and short-circuits subsequent calls.
Tasks:
-
Implement the
CircuitBreakerclass from the README with all three states (CLOSED, OPEN, HALF-OPEN). -
Add the following interface:
class CircuitBreaker: def call_allowed(self) -> bool: ... def on_success(self) -> None: ... def on_failure(self) -> None: ... def status_report(self) -> dict: ... # return state, failure_count, last_failure_time -
Write a
CircuitBreakerRegistrythat maintains oneCircuitBreakerper agent ID:class CircuitBreakerRegistry: def get(self, agent_id: str) -> CircuitBreaker: ... def report(self) -> dict[str, dict]: ... # show status of all breakers -
Integrate the registry into a modified
run_subagent()function:- Before executing: check
registry.get(agent_id).call_allowed() - If not allowed: return a
SubagentResultwithstatus="failed"and error “Circuit breaker OPEN” - On success: call
registry.get(agent_id).on_success() - On failure: call
registry.get(agent_id).on_failure()
- Before executing: check
-
Write a test: simulate 6 consecutive failures from
agent_2, then verify the circuit breaker is OPEN, then simulate the timeout elapsing, verify it moves to HALF-OPEN, simulate one successful call, verify it closes.
Stretch goal: Add metrics logging to the circuit breaker: track total calls, total failures, total opens, and average time in OPEN state. Export as a dictionary for a monitoring system.
Exercise 4: Build a Fan-Out Aggregator with Disagreement Detection
Objective: Implement a fan-out pattern where multiple agents answer the same question, and a meta-agent detects when agents disagree.
Background: For high-stakes questions, you may want multiple independent agents to answer and then check if their answers are consistent. If agents disagree significantly, flag for human review rather than silently picking one.
Tasks:
-
Implement
run_opinion_panel(question: str, n_agents: int = 3) -> list[AgentOutput]:- Fan out the same question to N agents in parallel
- Each agent is given slightly different framing to encourage independent reasoning:
- Agent 1: “Answer from first principles”
- Agent 2: “Consider the most common counterarguments first, then conclude”
- Agent 3: “Focus on empirical evidence and concrete examples”
-
Implement
detect_disagreement(outputs: list[AgentOutput], client) -> dict:- Ask an LLM to compare the N outputs and identify:
- Points of consensus (all agents agree)
- Points of divergence (agents differ)
- Overall verdict: “consistent”, “minor_differences”, “significant_disagreement”
- Return a structured dict with these fields
- Ask an LLM to compare the N outputs and identify:
-
Implement the full pipeline:
question → [Agent 1 || Agent 2 || Agent 3] → disagreement detector → if consistent: synthesize and return if significant: flag for human review, return with warning -
Test with:
- A question with a clear factual answer (should be consistent)
- A genuinely controversial question (should show divergence)
Stretch goal: Instead of a single disagreement detector, add a “debate round”: after initial disagreement is detected, share each agent’s answer with the other agents and ask them to either defend or revise. Run 2 rounds of debate, then synthesize the final view.
Exercise 5: Interview Simulation — Weekly Engineering Report System
Objective: Design and prototype a multi-agent system that automates a weekly engineering report, pulling data from GitHub, Jira, and Slack.
Prompt:
Your engineering team spends 2 hours every Friday afternoon manually compiling a weekly engineering report that covers: PRs merged, bugs closed, incidents resolved, team blockers, and a narrative summary. You’re asked to automate this with a multi-agent system.
Data sources:
- GitHub: PRs merged, commits, code review stats (API available)
- Jira: tickets closed, bugs resolved, sprint velocity (API available)
- Slack: incidents channel messages, engineering-blockers channel (API available)
Output: A structured markdown report sent to engineering-updates Slack channel every Friday at 5pm.
Part A: System Design (35 minutes)
Write a design document covering:
-
Agent decomposition:
- List each agent in your system
- For each agent, specify: role, inputs, outputs, tools used
- Draw the DAG (text or ASCII diagram)
-
Orchestration strategy:
- How does the orchestrator decompose the task?
- Which agents can run in parallel? Which must be sequential?
- How does the orchestrator assemble the final report?
-
Handoff protocol:
- Define the Pydantic model for the result each data-fetching agent returns
- Define the Pydantic model for the synthesizer’s input and output
-
Failure handling:
- What happens if the GitHub API is down? (one data source missing)
- What happens if the Jira agent times out?
- How do you handle partial reports vs no report?
- What’s your retry strategy?
-
Production concerns:
- How do you schedule the weekly run? (cron, event trigger, etc.)
- How do you avoid re-processing the same data if the job runs twice?
- How do you monitor that the job completed successfully?
- How do you handle the report getting stale if it runs at 5pm but the data APIs return data from 4pm?
Part B: Prototype (45 minutes)
Implement a minimal version using only the Anthropic SDK. Since you don’t have real GitHub/Jira/Slack APIs, simulate them with stub functions that return plausible data.
Required components:
fetch_github_data()→ returns stubbed PR/commit datafetch_jira_data()→ returns stubbed ticket datafetch_slack_data()→ returns stubbed message data- Three parallel data agents that call the stubs and format the raw data
- One orchestrator that runs the three agents in parallel, then calls a report-writing agent with all three outputs
- Output: a formatted markdown engineering report
Your prototype should be runnable with python weekly_report.py.
Evaluation rubric:
| Dimension | Strong Answer | Weak Answer |
|---|---|---|
| Agent decomposition | Clear roles, typed interfaces, DAG diagram | Vague descriptions, monolithic agent |
| Parallelism | All three data sources fetched in parallel | Sequential fetch — the main bottleneck |
| Failure handling | Partial report strategy, per-source retry, alerting | ”if it fails, retry everything” |
| Handoff protocol | Pydantic models with version field | Freeform strings between agents |
| Production concerns | Idempotency, monitoring, scheduling | No discussion of production issues |
| Prototype quality | Runs cleanly, handles errors, structured output | Throws exceptions, hardcoded paths |
Deliverable:
weekly_report_design.md— your design documentweekly_report.py— your prototype