Parallelization and orchestrator-workers workflows with Pydantic AI

til
llm
pydantic-ai
workflows
Author
Affiliation
Published

July 9, 2025

Modified

July 11, 2025

I’ve been re-implementing typical patterns for building agentic systems with Pydantic AI. In this post, I’ll explore how to build a parallelization and orchestrator-worker workflow.

In previous TILs, I’ve explored:

You can download this notebook here.

What is parallelization?

This workflow is designed for tasks that can be easily divided into independent subtasks. The key trade-off is managing complexity and coordination overhead in exchange for significant speed improvements or diverse perspectives.

It looks like this:

flowchart LR
    In([In]) --> LLM1["LLM Call 1"]
    In --> LLM2["LLM Call 2"]
    In --> LLM3["LLM Call 3"]
    LLM1 --> Aggregator["Aggregator"] 
    LLM2 --> Aggregator["Aggregator"] 
    LLM3 --> Aggregator["Aggregator"] 
    Aggregator --> Out([Out])

Examples:

  • Evaluate multiple independent aspects of a text (safety, quality, relevance)
  • Process user query and apply guardrails in parallel
  • Generate multiple response candidates given a query for comparison

What is orchestrator-worker?

This workflow works well for tasks where you don’t know the required subtasks beforehand. The subtasks are determined by the orchestrator.

Here’s a diagram:

flowchart LR
    In([In]) --> Orch[Orchestrator]

    Orch -.-> LLM1["LLM Call 1"]
    Orch -.-> LLM2["LLM Call 2"]
    Orch -.-> LLM3["LLM Call 3"]

    LLM1 -.-> Synth[Synthesizer]
    LLM2 -.-> Synth
    LLM3 -.-> Synth

    Synth --> Out([Out])

Examples:

  • Coding tools making changes to multiple files at once
  • Searching multiple sources and synthesize the results

The difference between parallelization and orchestrator-worker is that in parallelization, the subtasks are known beforehand, while in orchestrator-worker, the subtasks are determined by the orchestrator.

Setup

Pydantic AI uses asyncio under the hood, so you’ll need to enable nest_asyncio to run this notebook:

import nest_asyncio

nest_asyncio.apply()

Then, you need to import the required libraries. I’m using Logfire to monitor the workflow.

import asyncio
import os
from pprint import pprint
from typing import Literal, Optional

import logfire
import requests
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext

load_dotenv()
True

PydanticAI is compatible with OpenTelemetry (OTel). So it’s pretty easy to use it with Logfire or with any other OTel-compatible observability tool (e.g., Langfuse).

To enable tracking, create a project in Logfire, generate a Write token and add it to the .env file. Then, you just need to run:

logfire.configure(
    token=os.getenv("LOGFIRE_TOKEN"),
)
logfire.instrument_pydantic_ai()

The first time you run this, it will ask you to create a project in Logfire. From it, it will generate a logfire_credentials.json file in your working directory. In following runs, it will automatically use the credentials from the file.

Parallelization example

In this example, I’ll show you how to build a workflow that runs the same evaluator in parallel and then aggregates the results.

class Evaluation(BaseModel):
    explanation: str
    is_appropiate: bool


class AggregatedResults(BaseModel):
    summary: str
    is_appropiate: bool

Then you can create the agents and encapsulate the logic in a function:

evaluator = Agent(
    "openai:gpt-4.1-mini",
    output_type=Evaluation,
    system_prompt=(
        "You are an expert evaluator. Provided with a text, you will evaluate if it's appropriate for a general audience."
    ),
)

aggregator = Agent(
    "openai:gpt-4.1-mini",
    output_type=AggregatedResults,
    system_prompt=(
        "You are an expert evaluator. Provided with a list of evaluations, you will summarize them and provide a final evaluation."
    ),
)

@logfire.instrument("Run workflow")
async def run_workflow(topic: str) -> str:
    tasks = [evaluator.run(f"Evaluate the following text: {topic}") for _ in range(3)]
    evaluations = await asyncio.gather(*tasks)
    aggregated_results = await aggregator.run(f"Summarize the following evaluations:\n\n{[(eval.output.explanation, eval.output.is_appropiate) for eval in evaluations]}")
    return aggregated_results.output

output = await run_workflow("Athletes should consume enhancing drugs to improve their performance.")
15:28:36.289 Run workflow
15:28:36.290   agent run
15:28:36.290     chat gpt-4.1-mini
15:28:36.291   agent run
15:28:36.291     chat gpt-4.1-mini
15:28:36.292   agent run
15:28:36.292     chat gpt-4.1-mini
15:28:39.380   aggregator run
15:28:39.380     chat gpt-4.1-mini

Finally, you can run the workflow. You should get an output like this:

pprint(output.model_dump())
{'is_appropiate': False,
 'summary': 'All evaluations agree that the text promotes the consumption of '
            'performance-enhancing drugs by athletes, which is a sensitive and '
            'controversial topic. The main concerns highlighted are health '
            'risks, ethical issues, fairness in sports, and legality. The '
            'evaluations consistently indicate that encouraging or normalizing '
            'the use of such drugs is inappropriate for a general audience as '
            'it may promote illegal, harmful, or unsafe behavior. There is '
            'consensus that the subject should be handled with caution.'}

Orchestrator-workers example

In this example, I’ll show you how to build a workflow that given a topic generates a table of contents, then writes each section of the article by making an individual request to an LLM.

First, you must define the structures we’ll use in the outputs of the workflow.

class Section(BaseModel):
    name: str = Field(description="The name of the section")
    description: str = Field(description="The description of the section")


class CompletedSection(BaseModel):
    name: str = Field(description="The name of the section")
    content: str = Field(description="The content of the section")


class Sections(BaseModel):
    sections: list[Section] = Field(description="The sections of the article")

Then, we’ll define the agents we’ll use in the workflow.

orchestrator = Agent(
    "openai:gpt-4.1-mini",
    output_type=Sections,
    system_prompt=(
        "You are an expert writer specialized in SEO. Provided with a topic, you will generate the sections for a short article."
    ),
)

worker = Agent(
    "openai:gpt-4.1-mini",
    output_type=CompletedSection,
    system_prompt=(
        "You are an expert writer specialized in SEO. Provided with a topic and a section, you will generate the content of the section."
    ),
)

def synthesizer(sections: list[CompletedSection]) -> str:
    completed_sections_str = "\n\n".join(
        [section.content for section in sections]
    )
    return completed_sections_str

Then, you can define a function that orchestrates the workflow:

@logfire.instrument("Run workflow")
async def run_workflow(topic: str) -> str:
    orchestrator_output = await orchestrator.run(f"Generate the sections for a short article about {topic}")
    tasks = [worker.run(f"Write the section {section.name} about {topic} with the following description: {section.description}") for section in orchestrator_output.output.sections]
    completed_sections = await asyncio.gather(*tasks)
    full_article = synthesizer([c.output for c in completed_sections])
    return full_article

output = await run_workflow("Artificial Intelligence")
15:32:13.934 Run workflow
15:32:13.936   orchestrator run
15:32:13.937     chat gpt-4.1-mini
15:32:18.567   agent run
15:32:18.568     chat gpt-4.1-mini
15:32:18.569   agent run
15:32:18.569     chat gpt-4.1-mini
15:32:18.570   agent run
15:32:18.571     chat gpt-4.1-mini
15:32:18.572   agent run
15:32:18.572     chat gpt-4.1-mini
15:32:18.573   agent run
15:32:18.573     chat gpt-4.1-mini

That’s all!

If you want to see the full code, you can download the notebook here.

Citation

BibTeX citation:
@online{castillo2025,
  author = {Castillo, Dylan},
  title = {Parallelization and Orchestrator-Workers Workflows with
    {Pydantic} {AI}},
  date = {2025-07-09},
  url = {https://dylancastillo.co/til/parallelization-orchestrator-workers-pydantic-ai.html},
  langid = {en}
}
For attribution, please cite this work as:
Castillo, Dylan. 2025. “Parallelization and Orchestrator-Workers Workflows with Pydantic AI.” July 9, 2025. https://dylancastillo.co/til/parallelization-orchestrator-workers-pydantic-ai.html.