flowchart LR In([In]) --> LLM1["LLM Call 1"] In --> LLM2["LLM Call 2"] In --> LLM3["LLM Call 3"] LLM1 --> Aggregator["Aggregator"] LLM2 --> Aggregator["Aggregator"] LLM3 --> Aggregator["Aggregator"] Aggregator --> Out([Out])
Parallelization and orchestrator-workers workflows with Pydantic AI
I’ve been re-implementing typical patterns for building agentic systems with Pydantic AI. In this post, I’ll explore how to build a parallelization and orchestrator-worker workflow.
In previous TILs, I’ve explored:
You can download this notebook here.
What is parallelization?
This workflow is designed for tasks that can be easily divided into independent subtasks. The key trade-off is managing complexity and coordination overhead in exchange for significant speed improvements or diverse perspectives.
It looks like this:
Examples:
- Evaluate multiple independent aspects of a text (safety, quality, relevance)
- Process user query and apply guardrails in parallel
- Generate multiple response candidates given a query for comparison
What is orchestrator-worker?
This workflow works well for tasks where you don’t know the required subtasks beforehand. The subtasks are determined by the orchestrator.
Here’s a diagram:
flowchart LR In([In]) --> Orch[Orchestrator] Orch -.-> LLM1["LLM Call 1"] Orch -.-> LLM2["LLM Call 2"] Orch -.-> LLM3["LLM Call 3"] LLM1 -.-> Synth[Synthesizer] LLM2 -.-> Synth LLM3 -.-> Synth Synth --> Out([Out])
Examples:
- Coding tools making changes to multiple files at once
- Searching multiple sources and synthesize the results
The difference between parallelization and orchestrator-worker is that in parallelization, the subtasks are known beforehand, while in orchestrator-worker, the subtasks are determined by the orchestrator.
Setup
Pydantic AI uses asyncio
under the hood, so you’ll need to enable nest_asyncio
to run this notebook:
Then, you need to import the required libraries. I’m using Logfire to monitor the workflow.
import asyncio
import os
from pprint import pprint
from typing import Literal, Optional
import logfire
import requests
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
load_dotenv()
True
PydanticAI is compatible with OpenTelemetry (OTel). So it’s pretty easy to use it with Logfire or with any other OTel-compatible observability tool (e.g., Langfuse).
To enable tracking, create a project in Logfire, generate a Write token
and add it to the .env
file. Then, you just need to run:
The first time you run this, it will ask you to create a project in Logfire. From it, it will generate a logfire_credentials.json
file in your working directory. In following runs, it will automatically use the credentials from the file.
Parallelization example
In this example, I’ll show you how to build a workflow that runs the same evaluator in parallel and then aggregates the results.
Then you can create the agents and encapsulate the logic in a function:
evaluator = Agent(
"openai:gpt-4.1-mini",
output_type=Evaluation,
system_prompt=(
"You are an expert evaluator. Provided with a text, you will evaluate if it's appropriate for a general audience."
),
)
aggregator = Agent(
"openai:gpt-4.1-mini",
output_type=AggregatedResults,
system_prompt=(
"You are an expert evaluator. Provided with a list of evaluations, you will summarize them and provide a final evaluation."
),
)
@logfire.instrument("Run workflow")
async def run_workflow(topic: str) -> str:
tasks = [evaluator.run(f"Evaluate the following text: {topic}") for _ in range(3)]
evaluations = await asyncio.gather(*tasks)
aggregated_results = await aggregator.run(f"Summarize the following evaluations:\n\n{[(eval.output.explanation, eval.output.is_appropiate) for eval in evaluations]}")
return aggregated_results.output
output = await run_workflow("Athletes should consume enhancing drugs to improve their performance.")
15:28:36.289 Run workflow
15:28:36.290 agent run
15:28:36.290 chat gpt-4.1-mini
15:28:36.291 agent run
15:28:36.291 chat gpt-4.1-mini
15:28:36.292 agent run
15:28:36.292 chat gpt-4.1-mini
Logfire project URL: https://logfire-us.pydantic.dev/dylanjcastillo/blog
15:28:39.380 aggregator run
15:28:39.380 chat gpt-4.1-mini
Finally, you can run the workflow. You should get an output like this:
{'is_appropiate': False,
'summary': 'All evaluations agree that the text promotes the consumption of '
'performance-enhancing drugs by athletes, which is a sensitive and '
'controversial topic. The main concerns highlighted are health '
'risks, ethical issues, fairness in sports, and legality. The '
'evaluations consistently indicate that encouraging or normalizing '
'the use of such drugs is inappropriate for a general audience as '
'it may promote illegal, harmful, or unsafe behavior. There is '
'consensus that the subject should be handled with caution.'}
Orchestrator-workers example
In this example, I’ll show you how to build a workflow that given a topic generates a table of contents, then writes each section of the article by making an individual request to an LLM.
First, you must define the structures we’ll use in the outputs of the workflow.
class Section(BaseModel):
name: str = Field(description="The name of the section")
description: str = Field(description="The description of the section")
class CompletedSection(BaseModel):
name: str = Field(description="The name of the section")
content: str = Field(description="The content of the section")
class Sections(BaseModel):
sections: list[Section] = Field(description="The sections of the article")
Then, we’ll define the agents we’ll use in the workflow.
orchestrator = Agent(
"openai:gpt-4.1-mini",
output_type=Sections,
system_prompt=(
"You are an expert writer specialized in SEO. Provided with a topic, you will generate the sections for a short article."
),
)
worker = Agent(
"openai:gpt-4.1-mini",
output_type=CompletedSection,
system_prompt=(
"You are an expert writer specialized in SEO. Provided with a topic and a section, you will generate the content of the section."
),
)
def synthesizer(sections: list[CompletedSection]) -> str:
completed_sections_str = "\n\n".join(
[section.content for section in sections]
)
return completed_sections_str
Then, you can define a function that orchestrates the workflow:
@logfire.instrument("Run workflow")
async def run_workflow(topic: str) -> str:
orchestrator_output = await orchestrator.run(f"Generate the sections for a short article about {topic}")
tasks = [worker.run(f"Write the section {section.name} about {topic} with the following description: {section.description}") for section in orchestrator_output.output.sections]
completed_sections = await asyncio.gather(*tasks)
full_article = synthesizer([c.output for c in completed_sections])
return full_article
output = await run_workflow("Artificial Intelligence")
15:32:13.934 Run workflow
15:32:13.936 orchestrator run
15:32:13.937 chat gpt-4.1-mini
15:32:18.567 agent run
15:32:18.568 chat gpt-4.1-mini
15:32:18.569 agent run
15:32:18.569 chat gpt-4.1-mini
15:32:18.570 agent run
15:32:18.571 chat gpt-4.1-mini
15:32:18.572 agent run
15:32:18.572 chat gpt-4.1-mini
15:32:18.573 agent run
15:32:18.573 chat gpt-4.1-mini
That’s all!
If you want to see the full code, you can download the notebook here.
Citation
@online{castillo2025,
author = {Castillo, Dylan},
title = {Parallelization and Orchestrator-Workers Workflows with
{Pydantic} {AI}},
date = {2025-07-09},
url = {https://dylancastillo.co/til/parallelization-orchestrator-workers-pydantic-ai.html},
langid = {en}
}