Single-prompt AI interactions have a ceiling. Ask a model to summarize a document, straightforward. Ask it to research a competitor, extract key data, compare it against your product specs, draft a report, and email it to your sales team. That chain of steps requires something fundamentally different from a single LLM call.
AI agents bridge that gap. An agent is an LLM that can decide which actions to take, execute them using tools, observe the results, and decide what to do next, looping until it completes a goal or determines it needs human input. The jump from single-prompt to agentic systems is substantial in capability and equally substantial in complexity. Systems that work in demos break in production when tool calls fail, loops run indefinitely, or the model loses track of what it has already done.
I’ve built agent systems for research automation, customer onboarding pipelines, data extraction workflows, and competitive intelligence. The architectural decisions you make early (how to structure memory, how to handle failures, when to split work across multiple agents) determine whether these systems perform reliably or erratically. This guide covers those decisions in detail.
Single-Agent vs. Multi-Agent Systems
The first architectural decision is whether to use one agent or many.
A single agent receives a task, has access to a set of tools, and works toward completion by calling tools iteratively. Simple, observable, and sufficient for many use cases. A customer support agent that can query a knowledge base, check order status, and draft responses is a single agent. No coordination overhead, no handoff logic, easier to debug.
A multi-agent system decomposes work across specialized agents coordinated by an orchestrator. The orchestrator receives a high-level goal, breaks it into subtasks, delegates each subtask to a specialized agent, collects results, and synthesizes a final output. A research pipeline that uses one agent to search the web, another to read and extract key points from each page, and a third to write a synthesis is a multi-agent system.
Use a single agent when:
- The task is conceptually sequential and fits naturally in one context window
- Tool use is limited to 3-6 well-defined operations
- Latency matters and coordination overhead is expensive
- You want maximum observability and minimal failure surface
Use multiple agents when:
- Subtasks are genuinely independent and can run in parallel
- Different subtasks require different tool access or prompting strategies
- Single-agent context windows fill up with intermediate state
- Specialized agents produce meaningfully better output than a generalist
The temptation to decompose everything into multiple specialized agents is strong but often counterproductive. Each handoff between agents is a potential failure point. I default to single-agent and only add agents when the single-agent approach demonstrably fails.
Tool Design
Tools are functions an agent can call. The quality of your tool design determines how well the agent can complete tasks. Poorly designed tools are the most common reason agent systems fail.
Tool anatomy
Every tool needs three things the model can reason about:
- A precise name:
search_webis better thansearch;get_customer_order_historyis better thanget_data - A clear description: Describe what the tool does, what parameters it expects, and what it returns. The model uses this description to decide when to call the tool.
- Typed, validated inputs: Use Pydantic models to enforce input schemas. Catch malformed calls before they hit external systems.
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from typing import Optional
import httpx
class WebSearchInput(BaseModel):
query: str = Field(description="The search query. Be specific and use relevant keywords.")
max_results: int = Field(default=5, ge=1, le=20, description="Number of results to return (1-20)")
@tool("search_web", args_schema=WebSearchInput)
def search_web(query: str, max_results: int = 5) -> str:
"""
Search the web for current information. Use this when you need facts,
recent news, or data that may not be in your training data.
Returns a list of search results with titles, URLs, and snippets.
"""
# Implementation using a search API (Brave, Serper, Tavily, etc.)
response = httpx.get(
"https://api.search-provider.com/search",
params={"q": query, "count": max_results},
headers={"Authorization": f"Bearer {SEARCH_API_KEY}"}
)
results = response.json()["results"]
formatted = []
for r in results:
formatted.append(f"Title: {r['title']}\nURL: {r['url']}\nSnippet: {r['snippet']}")
return "\n\n".join(formatted) if formatted else "No results found."
Tool design principles
Return human-readable strings. Agents process tool output as text in their context window. JSON blobs work but add tokens and make the model work harder to extract meaning. Format results as readable summaries.
Return error messages, not exceptions. When a tool fails, catch the exception and return a descriptive string like "Error: Could not retrieve data. The requested resource returned 404." The agent can then decide whether to retry, try a different approach, or ask for help. Unhandled exceptions crash the agent loop entirely.
Make tools idempotent where possible. Agents retry tools when they’re unsure the call succeeded. Idempotent tools (where calling twice produces the same result as calling once) are safe to retry. Non-idempotent tools (sending emails, creating database records, charging payments) need deduplication logic.
Keep tools atomic. A tool that does one thing is easier to reason about than one that does several things conditionally. If an agent needs to read a file and then process its contents, give it two tools rather than one combined tool with branching logic.
Memory Management
Memory determines what information an agent has access to across steps. Without deliberate memory design, agents either repeat work they’ve already done, lose track of earlier results, or fill their context window with irrelevant history.
There are three memory types:
Short-term memory (in-context)
Everything in the active context window is short-term memory. The model can directly reference anything that appears in the conversation history. For simple, focused tasks, this is the only memory you need.
The practical limit is context length. For gpt-4o with a 128K token window, you can fit a substantial amount of agent state in context. For longer-running workflows, you need to manage what stays in context and what gets summarized or stored externally.
I use a rolling window with summarization for long-running agents:
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-4o-mini"),
max_token_limit=4000, # Keep recent turns verbatim
return_messages=True
)
When the conversation history exceeds max_token_limit, older turns get summarized into a compressed representation that preserves key facts without every individual message.
Long-term memory (external store)
For agents that need to remember information across separate sessions (user preferences, facts learned in previous runs, knowledge accumulated over time), that information must be persisted to an external store and retrieved selectively.
import json
from datetime import datetime
class AgentMemoryStore:
"""Simple persistent memory using a JSON file or database."""
def __init__(self, agent_id: str, storage_backend):
self.agent_id = agent_id
self.db = storage_backend
def remember(self, key: str, value: str, category: str = "general"):
"""Store a fact for future retrieval."""
self.db.upsert({
"agent_id": self.agent_id,
"key": key,
"value": value,
"category": category,
"stored_at": datetime.utcnow().isoformat()
})
def recall(self, query: str, category: Optional[str] = None, limit: int = 5) -> list:
"""Retrieve relevant memories using semantic search."""
filters = {"agent_id": self.agent_id}
if category:
filters["category"] = category
return self.db.semantic_search(query, filters=filters, limit=limit)
def forget(self, key: str):
"""Remove a stored memory."""
self.db.delete({"agent_id": self.agent_id, "key": key})
For production systems, I store agent memory in a vector database (for semantic retrieval) or a key-value store (for exact lookups), depending on whether the agent needs to find “memories related to this topic” or “the specific value stored under this key.”
Episodic memory (task state)
For multi-step tasks, you need to track what has been attempted, what succeeded, and what failed within the current run. I maintain explicit task state rather than relying on the model to reconstruct it from conversation history:
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class StepStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class TaskStep:
name: str
description: str
status: StepStatus = StepStatus.PENDING
result: Any = None
error: Optional[str] = None
attempts: int = 0
@dataclass
class TaskState:
goal: str
steps: list[TaskStep] = field(default_factory=list)
context: dict = field(default_factory=dict)
def pending_steps(self) -> list[TaskStep]:
return [s for s in self.steps if s.status == StepStatus.PENDING]
def completed_steps(self) -> list[TaskStep]:
return [s for s in self.steps if s.status == StepStatus.COMPLETED]
def summary(self) -> str:
lines = [f"Goal: {self.goal}", "Steps:"]
for step in self.steps:
lines.append(f" [{step.status.value}] {step.name}")
if step.result:
lines.append(f" Result: {str(step.result)[:200]}")
return "\n".join(lines)
Injecting a concise task state summary into the agent’s context at each step prevents the model from losing track of where it is in a complex workflow.
Building Agents with LangGraph
LangGraph is the framework I reach for when building production agent systems. It models agent logic as a directed graph where nodes are processing steps and edges define control flow. This graph structure makes it possible to implement loops, conditional branching, parallel execution, and recovery flows that are difficult to express in linear chain architectures.
Basic ReAct agent
The ReAct (Reasoning + Acting) pattern is the foundation of most practical agents: reason about what to do, take an action, observe the result, reason about the next step.
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# Define tools
tools = [search_web, read_url, extract_data, write_file]
# Create agent
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_react_agent(llm, tools)
# Run agent
result = agent.invoke({
"messages": [HumanMessage(content="Research the top 5 cloud storage providers and create a comparison table of their pricing")]
})
print(result["messages"][-1].content)
For more control over the execution graph:
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing import TypedDict, Annotated
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
task_state: TaskState
iteration_count: int
def should_continue(state: AgentState) -> str:
"""Determine whether to continue the agent loop or finish."""
messages = state["messages"]
last_message = messages[-1]
# If the last AI message has no tool calls, we're done
if not last_message.tool_calls:
return "end"
# Prevent infinite loops
if state["iteration_count"] >= 15:
return "end"
return "continue"
def call_model(state: AgentState) -> AgentState:
"""Call the LLM with current state."""
response = llm_with_tools.invoke(state["messages"])
return {
"messages": [response],
"iteration_count": state["iteration_count"] + 1
}
def call_tools(state: AgentState) -> AgentState:
"""Execute tool calls from the last model response."""
tool_executor = ToolExecutor(tools)
last_message = state["messages"][-1]
tool_results = tool_executor.batch(last_message.tool_calls)
return {"messages": tool_results}
# Build graph
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", call_tools)
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue, {"continue": "tools", "end": END})
graph.add_edge("tools", "agent")
runnable = graph.compile()
The should_continue function is where you implement loop termination logic. Hard iteration limits are essential; without them, a confused agent can loop indefinitely on a failing tool call.
Parallel tool execution
When an agent needs to call multiple independent tools, executing them in parallel rather than sequentially reduces latency significantly:
import asyncio
from langchain_core.tools import BaseTool
async def execute_tools_parallel(tool_calls: list[dict], tools: dict[str, BaseTool]) -> list:
"""Execute multiple tool calls concurrently."""
tasks = []
for call in tool_calls:
tool = tools[call["name"]]
tasks.append(tool.ainvoke(call["args"]))
results = await asyncio.gather(*tasks, return_exceptions=True)
# Convert exceptions to error strings so the agent can recover
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed.append(f"Error in {tool_calls[i]['name']}: {str(result)}")
else:
processed.append(result)
return processed
A research agent that needs to fetch five URLs runs them all concurrently rather than one after another, reducing a 15-second sequential operation to 3-4 seconds.
Multi-Agent Patterns with CrewAI
CrewAI provides a higher-level abstraction for multi-agent systems organized around roles and crews. Each agent has a defined role, goal, and backstory that shapes its behavior, and a crew coordinates their collaborative work.
Crew structure for a competitive intelligence pipeline
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, WebsiteSearchTool
search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()
# Define specialized agents
researcher = Agent(
role="Market Research Specialist",
goal="Find comprehensive and accurate information about competitors",
backstory="""You are an expert market researcher with years of experience
analyzing SaaS companies. You know how to find pricing pages, feature lists,
customer reviews, and recent news efficiently.""",
tools=[search_tool, web_tool],
verbose=True,
max_iter=5
)
analyst = Agent(
role="Competitive Intelligence Analyst",
goal="Extract structured data and identify strategic insights from research",
backstory="""You specialize in distilling raw research into actionable competitive
intelligence. You identify differentiation opportunities and market gaps.""",
tools=[], # Pure reasoning, no external tools needed
verbose=True
)
writer = Agent(
role="Technical Writer",
goal="Produce clear, well-structured reports from analysis",
backstory="""You write executive-level reports that communicate complex
competitive landscapes clearly and concisely.""",
tools=[],
verbose=True
)
# Define tasks with explicit dependencies
research_task = Task(
description="""Research {competitor_name}. Find:
1. Current pricing (all tiers)
2. Key features and capabilities
3. Target customer segments
4. Recent product announcements (last 6 months)
5. Customer reviews highlighting strengths and complaints""",
expected_output="Comprehensive research notes with sources cited",
agent=researcher
)
analysis_task = Task(
description="""Using the research on {competitor_name}, produce:
1. Feature comparison table vs our product
2. Pricing position analysis
3. Three key differentiation opportunities for us
4. Three areas where they outperform us""",
expected_output="Structured competitive analysis with actionable insights",
agent=analyst,
context=[research_task] # Depends on research completion
)
report_task = Task(
description="Write a one-page executive summary of the competitive analysis",
expected_output="Professional report in markdown format, under 600 words",
agent=writer,
context=[research_task, analysis_task]
)
# Assemble crew
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(inputs={"competitor_name": "Competitor Corp"})
The context parameter on each task passes completed task output to subsequent tasks, creating explicit data flow dependencies.
Handoff Protocols
When work moves between agents, the quality of the handoff determines whether context is preserved or lost.
Structured handoff format
Never pass raw LLM text between agents. Structure handoff data:
from pydantic import BaseModel
class ResearchHandoff(BaseModel):
"""Structured output from research agent to analysis agent."""
subject: str
sources_consulted: list[str]
key_findings: list[str]
raw_data: dict
confidence_level: str # "high", "medium", "low"
gaps_identified: list[str] # What could not be found
class AnalysisHandoff(BaseModel):
"""Structured output from analysis agent to writing agent."""
subject: str
key_insights: list[str]
data_tables: list[dict]
recommended_actions: list[str]
supporting_evidence: dict[str, str]
Structured handoffs prevent the receiving agent from misinterpreting unstructured text, allow the orchestrator to validate completeness before proceeding, and create a clear audit trail.
Orchestrator handoff logic
def orchestrate_research_pipeline(subject: str) -> str:
"""Orchestrate a multi-agent research and analysis pipeline."""
# Step 1: Research
research_result = researcher_agent.invoke({
"task": f"Research {subject}",
"output_format": ResearchHandoff.schema()
})
# Validate handoff data
try:
research_data = ResearchHandoff.model_validate_json(research_result)
except Exception as e:
return f"Research agent produced invalid output: {e}"
if research_data.confidence_level == "low":
# Request human review before proceeding
return request_human_review(research_data, reason="Low confidence research")
# Step 2: Analysis
analysis_result = analyst_agent.invoke({
"research": research_data.model_dump(),
"task": "Analyze the research and identify key insights"
})
analysis_data = AnalysisHandoff.model_validate_json(analysis_result)
# Step 3: Write report
report = writer_agent.invoke({
"analysis": analysis_data.model_dump(),
"task": "Write an executive summary report"
})
return report
Failure Recovery
Production agent systems fail. Tools time out, APIs return unexpected responses, models produce malformed output. Failure recovery design determines whether those failures cascade into complete system failure or get handled gracefully.
Retry with backoff
import time
from functools import wraps
def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):
"""Decorator for automatic retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def call_external_api(url: str) -> dict:
response = httpx.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
Fallback tools
For critical capabilities, define fallback tools the agent can use when the primary tool fails:
TOOL_FALLBACKS = {
"search_web_primary": "search_web_fallback",
"read_url_primary": "extract_url_text_cached",
}
def get_tool_with_fallback(tool_name: str, tools: dict) -> BaseTool:
if tool_name in tools:
return tools[tool_name]
fallback_name = TOOL_FALLBACKS.get(tool_name)
if fallback_name and fallback_name in tools:
print(f"Primary tool '{tool_name}' unavailable, using fallback '{fallback_name}'")
return tools[fallback_name]
raise ValueError(f"No tool available for: {tool_name}")
Human-in-the-loop checkpoints
Not every decision should be automated. High-stakes actions (sending emails, making purchases, deleting data) require explicit human approval before execution:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt
def execute_high_stakes_action(state: AgentState) -> AgentState:
"""Node that requires human approval before proceeding."""
proposed_action = state["proposed_action"]
# Interrupt the graph and wait for human input
human_response = interrupt({
"question": f"Agent wants to: {proposed_action['description']}\n\nApprove? (yes/no)",
"proposed_action": proposed_action
})
if human_response.lower() != "yes":
return {
"messages": [ToolMessage(
content="Action cancelled by human reviewer.",
tool_call_id="human_review"
)],
"action_approved": False
}
# Proceed with action
result = execute_action(proposed_action)
return {"action_approved": True, "action_result": result}
# Enable checkpointing for interrupt support
memory = MemorySaver()
graph = graph.compile(checkpointer=memory, interrupt_before=["execute_high_stakes_action"])
LangGraph’s interrupt mechanism pauses graph execution, persists state, and resumes from the same point after human input, no lost context.
Production Deployment Considerations
Observability
Agent systems are notoriously difficult to debug without comprehensive tracing. Every tool call, model invocation, and decision point should be logged with enough context to reconstruct what happened:
from langsmith import Client
# LangSmith traces all LangChain/LangGraph calls automatically when configured
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-research-agent"
LangSmith is the standard tracing tool for LangChain/LangGraph systems. It captures full execution traces including model inputs/outputs, tool calls, latency per step, and token usage.
Cost and latency budgets
Set explicit budgets per agent run:
class AgentBudget:
def __init__(self, max_tokens: int = 100_000, max_time_seconds: int = 120):
self.max_tokens = max_tokens
self.max_time_seconds = max_time_seconds
self.tokens_used = 0
self.start_time = time.time()
def check(self) -> bool:
"""Returns True if budget allows continued execution."""
elapsed = time.time() - self.start_time
if elapsed > self.max_time_seconds:
raise TimeoutError(f"Agent exceeded {self.max_time_seconds}s time budget")
if self.tokens_used > self.max_tokens:
raise BudgetExceededError(f"Agent exceeded {self.max_tokens} token budget")
return True
An agent that loops on a failing tool will burn tokens indefinitely without hard limits.
Idempotent runs
Design agent pipelines so that re-running a partially completed job produces the same final result as a successful first run. Cache tool results keyed by input, skip completed steps, and use transaction IDs to prevent duplicate side effects.
Agent orchestration is the infrastructure layer that makes AI capabilities compound. A well-designed orchestration layer turns individual model calls into reliable business processes. The key is treating agent systems with the same engineering discipline you’d apply to any distributed system: explicit state management, graceful failure handling, comprehensive observability, and hard limits on autonomous execution.