Orchestration Strategies
How do you coordinate multiple agents? The orchestration strategy determines who decides what happens next and how work flows through the system.
Three Main Approaches
1. Hierarchical (Manager-Workers)
A central manager agent assigns tasks and coordinates workers.
# Hierarchical orchestration with CrewAI
from crewai import Crew, Process
crew = Crew(
agents=[manager, researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.hierarchical, # Manager controls flow
manager_agent=manager
)
# The manager decides:
# - Which agent handles each task
# - When to move to next task
# - How to handle failures
Pros: Clear accountability, predictable flow, easy to debug Cons: Manager bottleneck, single point of failure
2. Peer-to-Peer (Collaborative)
Agents communicate directly without a central coordinator.
# Peer-to-peer with direct handoffs
class PeerAgent:
def __init__(self, name: str, peers: dict):
self.name = name
self.peers = peers # name -> agent instance
async def process(self, task: dict) -> dict:
result = await self.do_work(task)
# Decide next peer based on result
if result["needs_review"]:
return await self.peers["reviewer"].process(result)
elif result["needs_more_data"]:
return await self.peers["researcher"].process(result)
else:
return result
Pros: No bottleneck, resilient, flexible Cons: Complex coordination, harder to trace, potential loops
3. Swarm (Emergent Behavior)
Agents act independently following simple rules; complex behavior emerges.
# Swarm-like orchestration
class SwarmAgent:
def __init__(self, role: str, shared_board: TaskBoard):
self.role = role
self.board = shared_board
async def run(self):
while True:
# Each agent picks tasks matching their role
task = await self.board.claim_task(
filter=lambda t: t.type == self.role
)
if not task:
await asyncio.sleep(1)
continue
result = await self.process(task)
# Post results or new tasks to board
if result.creates_new_tasks:
for new_task in result.new_tasks:
await self.board.post_task(new_task)
await self.board.complete_task(task.id, result)
Pros: Highly scalable, self-organizing, fault tolerant Cons: Unpredictable, hard to reason about, debugging nightmare
Choosing Your Strategy
| Scenario | Best Strategy |
|---|---|
| Well-defined workflow | Hierarchical |
| Dynamic, changing tasks | Peer-to-Peer |
| Massive parallelism | Swarm |
| Need audit trail | Hierarchical |
| Real-time adaptation | Peer-to-Peer |
| Research/exploration | Swarm |
Hybrid Example: Supervised Swarm
# Supervisor + swarm workers
class SupervisedSwarm:
def __init__(self, supervisor: Agent, workers: list[SwarmAgent]):
self.supervisor = supervisor
self.workers = workers
self.board = TaskBoard()
async def execute(self, goal: str):
# Supervisor breaks down goal into tasks
tasks = await self.supervisor.decompose(goal)
for task in tasks:
await self.board.post_task(task)
# Workers swarm on tasks
worker_tasks = [w.run() for w in self.workers]
# Supervisor monitors and intervenes if needed
while not self.board.all_complete():
status = self.board.get_status()
if status.stuck_tasks:
await self.supervisor.unstick(status.stuck_tasks)
await asyncio.sleep(5)
Nerd Note: Start hierarchical, evolve to peer-to-peer as you understand your domain. Swarm only for truly parallelizable problems.
Next: Microsoft AutoGen's unique approach to multi-agent conversations. :::