1为什么 Agent 基础设施比模型更重要
2025 年图灵奖得主在公开演讲中提出了一个振聋发聩的观点:"AI Agent 的能力上限最终都是数据库问题。" 这句话揭示了一个被严重低估的事实——决定 Agent 在生产环境中表现的不是 LLM 的参数量,而是底层基础设施的完备程度。
Agent 基础设施指的是支撑 AI Agent 运行时所需的全部技术组件,包括可观测性系统、数据管理层、内存架构、状态管理器、安全网关和部署编排层。这些组件共同决定了 Agent 能否在生产环境中可靠运行、持续学习和安全交互。
Agent 与纯 LLM 的本质区别
纯 LLM 应用是一个无状态的函数调用:输入 prompt,输出文本。而 AI Agent是一个有状态的系统,它需要感知环境、维护记忆、执行工具调用、跟踪任务状态、处理异常情况。这些能力无法仅靠模型本身实现,必须依赖专门的基础设施层。
关键差异对比:
- LLM 应用:无状态、单次调用、无需持久化、无需运行时监控
- AI Agent:有状态、多轮交互、需要持久化记忆、需要实时监控和治理
Agent 基础设施的六大支柱
Agent 基础设施由六大核心组件构成,每个组件解决不同的运行时问题:
- 可观测性层:回答"Agent 在做什么?为什么这么做?"
- 数据管理层:管理 Agent 访问的外部数据源和内部存储
- 内存架构:短期记忆、长期记忆、工作记忆的结构化组织
- 状态管理器:跟踪 Agent 的任务状态、工具状态和会话状态
- 安全与治理层:权限控制、行为审计、输出验证
- 部署与编排层:多 Agent 协作、负载均衡、故障恢复
核心观点:如果你看不到 Agent 在做什么,你就无法治理它。可观测性是 Agent 基础设施的第一要务,没有可观测性,其他所有组件都形同虚设。
设计 Agent 系统的第一步不是选择模型,而是设计可观测性方案。在写第一个 Agent 之前,先决定:你要记录什么?如何追踪一个 Agent 决策的完整链路?出问题后如何复盘?
不要假设 LLM 的「聪明」能弥补基础设施的「简陋」。在生产环境中,一个缺乏状态管理的 Agent 会在网络抖动时丢失全部上下文,一个没有审计日志的 Agent 出问题后无法追溯根因。基础设施的短板决定了 Agent 能力的上限。
2Agent 可观测性:从 Tracing 到行为审计
可观测性(Observability) 是 Agent 基础设施中最关键也最容易被忽视的组件。传统的应用监控(日志、指标、追踪)在 Agent 场景下远远不够,因为 Agent 的行为是非确定性的——同一个输入,不同时间可能产生完全不同的执行路径。
Agent 可观测性的三层架构
Agent 可观测性需要从三个层面构建完整的可见性管线:
第一层:执行追踪(Execution Tracing)。这是最基础的层面,记录 Agent 每次工具调用的输入输出、LLM 调用的 prompt 和 response、以及状态转换的时间戳。通过 OpenTelemetry 标准的 Trace 和 Span,可以将一次 Agent 运行的完整链路可视化。
第二层:决策解释(Decision Explanation)。这层解决的核心问题是:Agent 为什么做出这个决定? 这需要记录 Agent 的推理过程(如果是 ReAct 模式,就是 Thought-Action-Observation 的完整序列)、上下文快照(Agent 在做决策时看到了什么信息)、以及备选方案(Agent 考虑了哪些其他选项但被否决了)。
第三层:行为审计(Behavior Auditing)。这是最高层级的可观测性,关注的是 Agent 行为是否符合预期策略。例如:Agent 是否访问了未授权的数据源?是否执行了超出权限的操作?输出内容是否包含敏感信息?行为审计需要策略引擎来实时评估 Agent 的行为合规性。
Agent 可观测性的核心指标
与传统的 APM 指标不同,Agent 可观测性需要关注一组全新的指标:
- 工具调用成功率:Agent 调用外部工具的成功率,低于 95% 说明工具集成有问题
- 上下文命中率:Agent 从记忆中检索到的信息中有多少是相关且正确的
- 决策一致性:相同或相似输入下,Agent 的决策是否保持稳定
- 异常恢复率:Agent 遇到错误后自动恢复并继续执行任务的比例
- 延迟分布:不仅要看平均延迟,更要看 P99 延迟,因为 Agent 的多步推理可能产生极端的长尾延迟
实现 Agent 可观测性的技术栈
主流的可观测性方案包括 LangSmith(LangChain 生态)、Phoenix(Arize AI)、Langfuse(开源方案)以及基于 OpenTelemetry 的自建方案。选择方案时需要考虑三个关键因素:是否支持 Agent 特定的追踪语义、是否提供决策可视化工具、是否支持行为审计的实时告警。
from opentelemetry import trace, context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import time
import json
# 初始化 Agent 专用的 Tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("agent-runtime")
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
class AgentTracer:
"""Agent 执行追踪器:记录完整的 Thought-Action-Observation 链路"""
def __init__(self, agent_id: str, session_id: str):
self.agent_id = agent_id
self.session_id = session_id
self.step_count = 0
def trace_tool_call(self, tool_name: str, tool_input: dict, tool_output: dict):
"""追踪一次工具调用"""
self.step_count += 1
with tracer.start_as_current_span(
f"agent.{tool_name}",
attributes={
"agent.id": self.agent_id,
"agent.session_id": self.session_id,
"agent.step": self.step_count,
"agent.tool.name": tool_name,
"agent.tool.input_size": len(json.dumps(tool_input)),
"agent.tool.output_size": len(json.dumps(tool_output)),
"agent.type": "tool_call",
}
) as span:
# 记录决策上下文
span.set_attribute("agent.thought", "检索相关文档后决定调用搜索工具")
span.set_attribute("agent.alternatives",
json.dumps(["直接回答", "调用知识库", "请求用户澄清"]))
return tool_output
def trace_llm_call(self, prompt: str, response: str, model: str,
latency_ms: float, tokens: dict):
"""追踪一次 LLM 调用"""
with tracer.start_as_current_span(
f"agent.llm.{model}",
attributes={
"agent.id": self.agent_id,
"agent.llm.model": model,
"agent.llm.latency_ms": latency_ms,
"agent.llm.prompt_tokens": tokens.get("prompt", 0),
"agent.llm.completion_tokens": tokens.get("completion", 0),
"agent.llm.total_cost": tokens.get("cost", 0),
"agent.type": "llm_call",
}
) as span:
return response
# 使用示例
tr = AgentTracer(agent_id="research-agent-01", session_id="session-abc123")
tool_output = tr.trace_tool_call(
tool_name="web_search",
tool_input={"query": "2026 AI Agent 基础设施趋势"},
tool_output={"results": 15, "top_sources": ["arxiv", "github", "blog"]}
)
tr.trace_llm_call(
prompt="根据搜索结果分析...",
response="基础设施的核心趋势是...",
model="gpt-4o",
latency_ms=1230,
tokens={"prompt": 4500, "completion": 800, "cost": 0.012}
)| 可观测性层级 | 核心问题 | 记录内容 | 存储开销 | 实时性要求 |
|---|---|---|---|---|
执行追踪 (Tracing) | Agent 执行了什么? | 工具调用、LLM 调用、状态转换 | 中 (每次调用 ~1KB) | 高 (实时) |
决策解释 (Explanation) | Agent 为什么这么做? | 推理链、上下文快照、备选方案 | 高 (每步 ~5KB) | 中 (近实时) |
行为审计 (Auditing) | Agent 行为是否合规? | 策略检查结果、违规记录、告警 | 低 (仅异常时记录) | 高 (实时告警) |
在 Agent 可观测性中,最重要的一条规则是:记录决策上下文(Agent 看到了什么信息才做出的决定),而不是只记录最终输出。这是调试 Agent 行为异常的起点。
不要在 Trace 中记录敏感的 LLM prompt 和 response 原文。Agent 的 prompt 可能包含用户隐私数据、API 密钥或业务机密。应记录 prompt 的 hash、token 数和结构信息,而非原文。
3Agent 数据管理层:Vector DB、缓存与外部数据源
数据管理是 Agent 基础设施中最直接影响 Agent 能力上限的组件。图灵奖得主说"Agent 最后全是数据库问题",核心指向的就是这一层——Agent 能访问的数据质量、数据新鲜度和数据检索效率,直接决定了 Agent 回答问题的准确性和时效性。
Agent 数据管理的三层架构
Agent 需要访问的数据可以分为三个层次,每个层次有不同的访问模式和性能要求:
热数据层(Hot Layer):Agent 当前任务直接相关的数据,需要在毫秒级被检索到。这包括当前会话的上下文、活跃的工具响应缓存、以及最近访问的文档片段。热数据层通常存储在内存缓存(如 Redis)中,采用 LRU 淘汰策略。
温数据层(Warm Layer):Agent 可能用到但不一定立即需要的数据。这包括知识库检索结果、工具的定义和 schema、Agent 的长期记忆索引。温数据层通常存储在向量数据库(如 Milvus、Qdrant、Pinecone)中,通过语义相似度检索。
冷数据层(Cold Layer):Agent 偶尔需要的历史数据。这包括历史会话记录、完整的行为审计日志、长期归档的知识库。冷数据层通常存储在对象存储(如 S3)中,通过批处理查询访问。
Vector DB 在 Agent 中的核心角色
向量数据库是 Agent 数据管理层的核心组件,它解决了传统数据库无法解决的语义检索问题。当 Agent 需要回答"与这个问题最相关的文档是什么"时,关键词匹配是不够的——需要基于语义相似度找到相关内容。
Vector DB 在 Agent 中的三个关键应用场景:
- RAG 检索:从大规模文档库中检索与用户查询最相关的段落,作为 LLM 的上下文输入
- 记忆检索:从 Agent 的长期记忆中检索与当前对话语义相关的历史片段
- 工具匹配:当 Agent 需要选择使用哪个工具时,通过语义匹配找到最合适的工具
数据一致性挑战
Agent 数据管理面临的一个独特挑战是数据一致性问题:当 Agent 在多步推理过程中,底层数据源(如文档库、数据库)可能已经更新。如果 Agent 基于旧数据做出决策,可能导致错误的结果。
解决数据一致性问题的三种策略:
- TTL 过期策略:为缓存的数据设置生存时间,过期后强制重新检索
- 版本追踪:为每次检索结果记录数据版本号,决策时验证版本是否过期
- 主动失效:当底层数据源更新时,主动使相关缓存失效,触发 Agent 重新检索
import redis
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition
import hashlib
import json
import time
from typing import Optional
class AgentDataManager:
"""Agent 数据管理器:三层数据架构(热/温/冷)"""
def __init__(self, redis_url: str, qdrant_url: str, embedding_model):
# 热数据层:Redis 缓存
self.cache = redis.from_url(redis_url, decode_responses=True)
# 温数据层:Qdrant 向量数据库
self.vector_db = QdrantClient(url=qdrant_url)
# 嵌入模型
self.embedding_model = embedding_model
self.default_ttl = 3600 # 缓存默认 TTL: 1 小时
def store_context(self, session_id: str, context: dict) -> str:
"""存储会话上下文到热数据层"""
key = f"agent:context:{session_id}"
version = int(time.time())
payload = {
"data": context,
"version": version,
"timestamp": time.time(),
}
self.cache.setex(key, self.default_ttl, json.dumps(payload))
return f"v{version}"
def retrieve_context(self, session_id: str) -> Optional[dict]:
"""从热数据层检索会话上下文"""
key = f"agent:context:{session_id}"
raw = self.cache.get(key)
if raw is None:
return None
payload = json.loads(raw)
# 检查 TTL 是否接近过期
age = time.time() - payload["timestamp"]
if age > self.default_ttl * 0.8:
print(f"Warning: context for {session_id} is {age:.0f}s old, "
f"consider refreshing")
return payload["data"]
def search_knowledge(self, query: str, collection: str = "knowledge",
top_k: int = 5, min_score: float = 0.7) -> list:
"""从向量数据库检索相关知识"""
# 生成查询向量
query_vector = self.embedding_model.encode(query).tolist()
# 语义搜索
results = self.vector_db.search(
collection_name=collection,
query_vector=query_vector,
limit=top_k,
)
# 过滤低分结果
relevant = [
{
"content": hit.payload.get("content", ""),
"source": hit.payload.get("source", ""),
"score": hit.score,
"version": hit.payload.get("version", "unknown"),
}
for hit in results if hit.score >= min_score
]
return relevant
def cache_tool_response(self, tool_name: str, input_hash: str,
response: dict, ttl: int = 1800):
"""缓存工具响应(避免重复调用相同工具)"""
key = f"agent:tool_cache:{tool_name}:{input_hash}"
self.cache.setex(key, ttl, json.dumps({
"response": response,
"cached_at": time.time(),
}))
def get_cached_tool_response(self, tool_name: str,
input_hash: str) -> Optional[dict]:
"""获取缓存的工具响应"""
key = f"agent:tool_cache:{tool_name}:{input_hash}"
raw = self.cache.get(key)
return json.loads(raw) if raw else None
# 使用示例
data_mgr = AgentDataManager(
redis_url="redis://localhost:6379",
qdrant_url="http://localhost:6334",
embedding_model=None # 实际使用 sentence-transformers
)
# 缓存会话上下文
version = data_mgr.store_context("session-001", {
"user_goal": "分析 2026 年 AI Agent 基础设施趋势",
"constraints": ["只用中文回复", "包含代码示例"],
})
# 检索知识库
results = data_mgr.search_knowledge(
query="Agent 可观测性最佳实践",
top_k=3,
min_score=0.75,
)
print(f"找到 {len(results)} 条相关知识")| 数据层 | 存储引擎 | 访问延迟 | 数据量级 | 更新频率 | 典型用途 |
|---|---|---|---|---|---|
热数据 (Hot) | Redis / Memcached | < 5ms | MB 级 | 秒级 | 会话上下文、工具缓存 |
温数据 (Warm) | Qdrant / Milvus / Pinecone | 50-200ms | GB 级 | 分钟级 | 知识库、记忆索引 |
冷数据 (Cold) | S3 / PostgreSQL | 100-5000ms | TB 级 | 小时级 | 审计日志、归档 |
为 Agent 的向量检索设置 min_score 阈值(建议 0.7+)。不要盲目取 top_k 个结果——如果相似度低于阈值,返回「无相关知识」比返回「不相关知识」更好,因为不相关知识会误导 LLM 产生幻觉。
不要在向量数据库中存储未经清洗的原始数据。Agent 检索到的内容会直接作为 LLM 的上下文,如果向量库中包含敏感信息、过期数据或错误内容,Agent 会直接传播这些错误。建立数据入库前的清洗和验证流程。
4Agent 内存架构:短期记忆、长期记忆与工作记忆
Agent 内存架构是 Agent 基础设施中最接近人类认知模型的组件。与 LLM 的固定上下文窗口不同,Agent 需要动态管理不同时间尺度的记忆,这需要一个精心设计的内存架构。
三层记忆模型
Agent 内存架构借鉴了人类认知心理学中的三层记忆模型:
短期记忆(Short-Term Memory):对应 LLM 的上下文窗口,存储当前对话的最近消息和最近的操作结果。短期记忆的特点是容量有限(受模型上下文窗口大小限制)、访问速度最快(直接在 prompt 中)、内容会随对话推进被截断。管理短期记忆的核心挑战是上下文窗口有限时,如何选择保留哪些信息、丢弃哪些信息。
工作记忆(Working Memory):这是 Agent 特有的记忆层,存储当前任务的中间状态——比如"正在执行第 3 步,前两步的结果是 X 和 Y"。工作记忆不在 LLM 的上下文窗口中,而是由 Agent 框架显式管理。工作记忆的大小不受上下文窗口限制,可以随时扩展。关键设计决策是:工作记忆的结构化程度——是用自由文本还是结构化数据(如 JSON schema)来存储。
长期记忆(Long-Term Memory):存储 Agent 跨会话持久化的知识,包括用户偏好、历史经验、习得的技能和知识库索引。长期记忆通过向量数据库实现语义检索,Agent 在需要时按需检索相关记忆注入到当前上下文中。长期记忆管理的核心挑战是记忆冲突——当新旧记忆矛盾时,如何决定保留哪个版本。
记忆管理的关键设计决策
设计 Agent 内存架构时,需要做出以下关键决策:
- 上下文窗口分配:多少比例用于系统指令、多少用于对话历史、多少用于检索增强的上下文?
- 记忆压缩策略:当短期记忆接近容量上限时,使用摘要压缩(将多轮对话压缩为要点)还是选择性保留(只保留与当前任务相关的片段)?
- 记忆检索触发:是每次 LLM 调用前都检索长期记忆,还是按需检索(Agent 主动发出检索请求时)?
- 记忆生命周期:记忆有过期机制吗?多久不使用就自动归档到冷存储?
记忆冲突与记忆衰减
长期记忆系统面临两个独特的问题:
记忆冲突:Agent 在不同时间学到了相互矛盾的信息。例如,用户之前说"我喜欢 Python",后来又说"我现在主要用 Rust"。解决记忆冲突的策略包括时间戳优先级(新记忆覆盖旧记忆)、置信度评分(高置信度记忆优先)和来源追溯(记录记忆的来源以判断可信度)。
记忆衰减:随着时间推移,记忆的相关性会自然下降。实现记忆衰减的两种常见方法:在向量检索时将时间衰减因子加入相似度计算(如 score × e^(-λ × age)),或在存储时为记忆设置过期时间,到期后自动归档。
import json
import time
import math
from dataclasses import dataclass, field
from typing import Optional
from collections import deque
@dataclass
class MemoryItem:
"""单条记忆的标准化结构"""
content: str
source: str # "conversation" | "tool_result" | "user_preference"
timestamp: float
importance: float # 0.0 ~ 1.0,重要性评分
access_count: int = 0
last_accessed: float = 0.0
tags: list = field(default_factory=list)
def relevance_score(self, current_time: float, decay_rate: float = 0.0001) -> float:
"""计算当前时间点的相关性分数(含时间衰减)"""
age = current_time - self.timestamp
# 时间衰减 + 使用频率加成
time_decay = math.exp(-decay_rate * age)
usage_bonus = min(self.access_count * 0.05, 0.3)
return self.importance * time_decay + usage_bonus
class AgentMemoryManager:
"""Agent 三层记忆管理器"""
def __init__(self, context_window_limit: int = 128000,
short_term_tokens: int = 8000,
working_memory_limit: int = 50):
self.context_limit = context_window_limit
self.st_limit = short_term_tokens
# 短期记忆:双端队列,FIFO 淘汰
self.short_term = deque(maxlen=20)
# 工作记忆:结构化存储
self.working_memory: dict[str, MemoryItem] = {}
self.wm_limit = working_memory_limit
# 长期记忆索引(简化版,实际对接向量数据库)
self.long_term_index: list[MemoryItem] = []
def add_to_short_term(self, message: str, token_count: int):
"""添加消息到短期记忆(对话历史)"""
self.short_term.append({
"content": message,
"tokens": token_count,
})
def compress_short_term(self) -> str:
"""压缩短期记忆:将最近对话摘要化"""
if len(self.short_term) <= 3:
# 对话不多时保留原文
return "
".join(m["content"] for m in self.short_term)
# 超过 3 条时:保留最后 2 条原文,中间部分摘要
recent = list(self.short_term)[-2:]
middle = list(self.short_term)[1:-2]
summary = f"[{len(middle)} 轮对话摘要:讨论{middle[0]['content'][:50]}...]"
return summary + "
" + "
".join(m["content"] for m in recent)
def set_working_memory(self, key: str, content: str, importance: float = 0.8):
"""设置工作记忆条目"""
if len(self.working_memory) >= self.wm_limit:
# 淘汰最久未访问的条目
oldest = min(self.working_memory.items(),
key=lambda x: x[1].last_accessed)
del self.working_memory[oldest[0]]
self.working_memory[key] = MemoryItem(
content=content,
source="agent_state",
timestamp=time.time(),
importance=importance,
)
def retrieve_long_term(self, query: str, top_k: int = 3) -> list[str]:
"""从长期记忆检索(简化版——实际应使用向量数据库)"""
current_time = time.time()
# 按相关性排序
scored = [
(item, item.relevance_score(current_time))
for item in self.long_term_index
if query.lower() in item.content.lower() or
any(query.lower() in tag for tag in item.tags)
]
scored.sort(key=lambda x: x[1], reverse=True)
# 更新访问计数
for item, _ in scored[:top_k]:
item.access_count += 1
item.last_accessed = current_time
return [item.content for item, _ in scored[:top_k]]
def get_context_prompt(self) -> str:
"""构建 LLM 的完整上下文 prompt"""
parts = []
# 工作记忆(结构化状态)
if self.working_memory:
wm_parts = []
for key, item in self.working_memory.items():
wm_parts.append(f"【{key}】{item.content}")
parts.append("## 当前任务状态\n" + "\n".join(wm_parts))
# 短期记忆(压缩后的对话历史)
st = self.compress_short_term()
if st:
parts.append("## 对话历史\n" + st)
return "\n\n".join(parts)
# 使用示例
mem = AgentMemoryManager()
mem.add_to_short_term("用户:帮我查一下 2026 年 AI Agent 的趋势", 20)
mem.add_to_short_term("Agent:好的,我来搜索相关信息", 15)
mem.add_to_short_term("Agent:根据搜索结果,主要趋势有...", 150)
mem.set_working_memory("current_task", "研究 2026 AI Agent 基础设施", 0.9)
mem.set_working_memory("step", "已完成信息检索,正在整理结果", 0.7)
print(mem.get_context_prompt())| 记忆类型 | 存储位置 | 容量限制 | 访问方式 | 生命周期 | 管理策略 |
|---|---|---|---|---|---|
短期记忆 | LLM 上下文窗口 | 受模型限制 | 直接作为 prompt | 对话级别 | 压缩 / 截断 |
工作记忆 | Agent 运行时内存 | 可配置 | Agent 显式读写 | 任务级别 | LRU 淘汰 |
长期记忆 | 向量数据库 | 近无限 | 语义检索 | 跨会话 | 衰减 + 冲突解决 |
工作记忆应该用结构化数据(JSON / 键值对)存储,而不是自由文本。结构化工作记忆可以被 Agent 的程序逻辑直接读取和修改,不需要经过 LLM 解析,这大幅降低了出错概率。
不要把所有历史对话都塞进长期记忆。未过滤的对话记录包含大量噪声(寒暄、纠错、试错过程),直接存入向量库会污染检索结果。应该在存入长期记忆前进行摘要化和结构化处理。
5Agent 状态管理:任务状态、工具状态与异常恢复
状态管理是 Agent 基础设施中确保可靠性的核心组件。没有良好的状态管理,Agent 在遇到网络错误、工具超时或模型超时等异常情况时,会丢失全部进度,需要用户从头开始——这在生产环境中是不可接受的。
Agent 状态的三个维度
Agent 运行时需要维护三个维度的状态:
任务状态(Task State):描述 Agent 当前正在做什么、已完成什么、还需要做什么。任务状态通常用有限状态机(FSM) 或有向无环图(DAG) 来建模。一个典型的 Agent 任务状态包括:等待输入 → 规划中 → 执行中(含子步骤) → 验证中 → 完成 / 失败。
工具状态(Tool State):记录 Agent 调用的每个工具的当前状态——是否正在执行、是否超时、返回值是什么、是否需要重试。工具状态管理的关键是超时处理和重试策略:当一个工具调用超过预定时间仍未返回时,Agent 应该如何处理?
会话状态(Session State):维护整个用户会话的元数据——会话 ID、开始时间、累计 token 消耗、成本、已执行步骤数。会话状态用于计费、审计和会话恢复。
异常恢复策略
Agent 在生产环境中必然会遇到各种异常,设计优雅的恢复策略比避免异常更重要:
- 工具调用超时:设置合理的超时阈值(如 30 秒),超时后重试一次,再次失败则记录错误并切换到备选方案
- LLM 输出不完整:检测输出是否被截断(如 JSON 不完整、代码缺少闭合括号),如果是,发送续传请求让模型补全
- 上下文窗口溢出:当累积的上下文超过模型限制时,触发记忆压缩流程,保留关键信息,丢弃次要内容
- 网络中断:将 Agent 的中间状态持久化到数据库,网络恢复后从最近的检查点(checkpoint) 恢复执行
状态持久化设计
状态持久化的核心原则是检查点机制(Checkpointing):在 Agent 执行的每个关键节点保存状态快照,使得 Agent 可以从任意检查点恢复。检查点的粒度需要权衡:太粗(每步检查点)导致存储开销大,太细(仅在任务完成时检查)导致故障恢复时丢失大量进度。
推荐的检查点策略:在每次工具调用前后、每次 LLM 调用前后、以及任务状态转换时保存检查点。检查点应包含:任务状态、工作记忆、工具调用历史、以及当前执行的步骤编号。
import json
import time
import enum
from dataclasses import dataclass, asdict
from typing import Optional, Any
import sqlite3
class TaskStatus(enum.Enum):
WAITING = "waiting"
PLANNING = "planning"
EXECUTING = "executing"
VERIFYING = "verifying"
COMPLETED = "completed"
FAILED = "failed"
RECOVERING = "recovering"
@dataclass
class Checkpoint:
"""Agent 执行检查点"""
checkpoint_id: str
timestamp: float
task_status: str
step_number: int
tool_history: list[dict]
working_memory: dict
error: Optional[str] = None
def save_to_db(self, db_path: str):
"""持久化检查点到 SQLite"""
conn = sqlite3.connect(db_path)
conn.execute("""
INSERT OR REPLACE INTO checkpoints
(checkpoint_id, timestamp, task_status, step_number,
tool_history, working_memory, error)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
self.checkpoint_id, self.timestamp, self.task_status,
self.step_number,
json.dumps(self.tool_history),
json.dumps(self.working_memory),
self.error,
))
conn.commit()
conn.close()
@classmethod
def load_latest(cls, db_path: str) -> Optional['Checkpoint']:
"""加载最近的检查点用于恢复"""
conn = sqlite3.connect(db_path)
row = conn.execute(
"SELECT * FROM checkpoints ORDER BY timestamp DESC LIMIT 1"
).fetchone()
conn.close()
if row is None:
return None
return cls(
checkpoint_id=row[0], timestamp=row[1],
task_status=row[2], step_number=row[3],
tool_history=json.loads(row[4]),
working_memory=json.loads(row[5]),
error=row[6],
)
class AgentStateManager:
"""Agent 状态管理器:支持检查点和异常恢复"""
def __init__(self, agent_id: str, db_path: str = "agent_state.db"):
self.agent_id = agent_id
self.db_path = db_path
self.current_status = TaskStatus.WAITING
self.step_number = 0
self.tool_history: list[dict] = []
self.working_memory: dict = {}
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
checkpoint_id TEXT PRIMARY KEY,
timestamp REAL,
task_status TEXT,
step_number INTEGER,
tool_history TEXT,
working_memory TEXT,
error TEXT
)
""")
conn.commit()
conn.close()
def create_checkpoint(self, error: Optional[str] = None) -> Checkpoint:
"""创建检查点"""
cp = Checkpoint(
checkpoint_id=f"{self.agent_id}-{self.step_number}-{int(time.time())}",
timestamp=time.time(),
task_status=self.current_status.value,
step_number=self.step_number,
tool_history=self.tool_history[-20:], # 保留最近 20 条
working_memory=dict(self.working_memory),
error=error,
)
cp.save_to_db(self.db_path)
return cp
def recover(self) -> bool:
"""从最近检查点恢复"""
latest = Checkpoint.load_latest(self.db_path)
if latest is None:
return False
self.current_status = TaskStatus.RECOVERING
self.step_number = latest.step_number
self.tool_history = latest.tool_history
self.working_memory = latest.working_memory
print(f"Recovered from checkpoint {latest.checkpoint_id} "
f"(step {latest.step_number}, status: {latest.task_status})")
if latest.error:
print(f"Last error: {latest.error}")
self.current_status = TaskStatus.EXECUTING
return True
# 使用示例
sm = AgentStateManager("agent-041-demo")
sm.current_status = TaskStatus.EXECUTING
sm.step_number = 3
sm.tool_history.append({
"tool": "web_search",
"input": "AI Agent 基础设施 2026",
"output": "...",
"status": "success",
})
sm.create_checkpoint()
print("Checkpoint created")| 异常类型 | 检测方式 | 恢复策略 | 重试次数 | 超时阈值 |
|---|---|---|---|---|
工具调用超时 | 等待时间 > 阈值 | 重试 → 备选方案 | 最多 2 次 | 30s |
LLM 输出截断 | JSON / 代码不完整 | 发送续传请求 | 最多 1 次 | 60s |
上下文溢出 | token 计数 > 限制 | 记忆压缩 | N/A | N/A |
网络中断 | 连接异常 | 从检查点恢复 | 最多 3 次 | 10s |
工具返回错误 | HTTP 状态码 ≥ 400 | 记录错误 → 降级 | 最多 1 次 | N/A |
检查点的存储不要用单点数据库。Agent 的状态是实时变化的,如果检查点数据库宕机,Agent 就失去了恢复能力。推荐使用 Redis + SQLite 的双写策略:热检查点存 Redis,定期刷盘到 SQLite。
不要在检查点中存储完整的 LLM response 原文。检查点的目的是恢复执行状态,不是存档对话内容。存储 prompt/response 的 hash 和 token 数即可,完整内容应通过可观测性层单独记录。
6Agent 安全与治理:权限控制、行为审计与输出验证
安全与治理层是 Agent 基础设施中保护系统免受 Agent 自身行为伤害的组件。Agent 拥有执行工具的能力——可以查询数据库、调用 API、读写文件——这些能力如果不受控制,可能造成严重的安全事故。
Agent 安全的三层防线
Agent 安全需要建立三层防线,层层把关:
第一层:权限控制(Authorization)。在 Agent 执行操作之前,验证 Agent 是否有权限执行该操作。这包括工具级别的权限(Agent 能否调用某个工具)、数据级别的权限(Agent 能否访问某条数据)、以及操作级别的权限(Agent 能否执行删除、修改等破坏性操作)。权限控制应该采用最小权限原则——Agent 只拥有完成任务所需的最少权限。
第二层:行为审计(Behavior Auditing)。记录 Agent 的所有操作,包括成功的和失败的。行为审计的核心价值在于事后追溯——当 Agent 做出了错误决策或执行了危险操作时,审计日志可以提供完整的证据链。审计日志应该包含:操作时间、操作类型、输入参数、输出结果、以及决策上下文。
第三层:输出验证(Output Validation)。在 Agent 的最终输出返回给用户之前,进行内容验证。这包括敏感信息过滤(确保不泄露密码、密钥、PII 数据)、格式验证(确保输出符合预期的 schema)、以及安全扫描(确保输出中不包含恶意代码或注入攻击载荷)。
Agent 治理的关键策略
在生产环境中运行 Agent,需要制定以下治理策略:
- 成本上限:为每个 Agent 会话设置 token 预算上限,防止 Agent 进入无限循环导致费用爆炸
- 速率限制:限制 Agent 调用外部工具的频率,防止 Agent 的行为触发外部服务的限流或封禁
- 人工审批:对于高风险操作(如删除数据、修改配置、发送外部消息),要求人工审批后才能执行
- 行为基线:建立 Agent 行为的正常基线,当 Agent 的行为显著偏离基线时(如工具调用频率突然升高 10 倍),触发告警并暂停 Agent
import time
import re
import enum
from dataclasses import dataclass
from typing import Optional
class RiskLevel(enum.Enum):
LOW = "low" # 只读操作
MEDIUM = "medium" # 写入但可恢复
HIGH = "high" # 破坏性操作
CRITICAL = "critical" # 不可逆操作
@dataclass
class ToolPermission:
"""工具权限定义"""
tool_name: str
allowed: bool
risk_level: RiskLevel
requires_approval: bool
rate_limit_per_min: int = 60
data_scope: str = "all" # "all" | "own" | "none"
@dataclass
class AuditLog:
"""行为审计日志条目"""
timestamp: float
agent_id: str
session_id: str
action: str
tool_name: str
input_params: dict
output: str
risk_level: str
approved_by: Optional[str] = None
blocked: bool = False
block_reason: str = ""
class AgentGovernanceEngine:
"""Agent 治理引擎:权限控制 + 审计 + 输出验证"""
def __init__(self, agent_id: str, session_id: str):
self.agent_id = agent_id
self.session_id = session_id
self.permissions: dict[str, ToolPermission] = {}
self.audit_log: list[AuditLog] = []
self.token_budget = 100000
self.tokens_used = 0
self.cost_budget = 10.0 # 美元
self.cost_used = 0.0
def set_permission(self, perm: ToolPermission):
"""设置工具权限"""
self.permissions[perm.tool_name] = perm
def check_tool_permission(self, tool_name: str) -> tuple[bool, str]:
"""检查 Agent 是否有权限调用某个工具"""
if tool_name not in self.permissions:
return False, f"工具 {tool_name} 未授权"
perm = self.permissions[tool_name]
if not perm.allowed:
return False, f"工具 {tool_name} 被禁用"
if perm.requires_approval:
return False, f"工具 {tool_name} 需要人工审批"
return True, "已授权"
def audit_tool_call(self, tool_name: str, input_params: dict,
output: str, risk: RiskLevel,
blocked: bool = False, block_reason: str = ""):
"""记录工具调用的审计日志"""
log = AuditLog(
timestamp=time.time(),
agent_id=self.agent_id,
session_id=self.session_id,
action="tool_call",
tool_name=tool_name,
input_params=input_params,
output=output[:500], # 截断避免日志过大
risk_level=risk.value,
blocked=blocked,
block_reason=block_reason,
)
self.audit_log.append(log)
def check_budget(self, tokens: int, cost: float) -> tuple[bool, str]:
"""检查 token 和成本预算"""
if self.tokens_used + tokens > self.token_budget:
return False, f"Token 预算不足: 已用 {self.tokens_used}/{self.token_budget}"
if self.cost_used + cost > self.cost_budget:
return False, f"成本预算不足: 已用 ${self.cost_used:.2f}/${self.cost_budget:.2f}"
return True, "预算充足"
def validate_output(self, output: str) -> tuple[bool, list[str]]:
"""验证输出内容的合规性"""
issues = []
# 敏感信息扫描
patterns = {
"api_key": r"sk-[a-zA-Z0-9]{20,}",
"password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+['\"]",
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
}
for name, pattern in patterns.items():
matches = re.findall(pattern, output)
if matches:
issues.append(f"检测到疑似{name}泄露: {len(matches)} 处")
# 长度检查
if len(output) > 50000:
issues.append(f"输出过长: {len(output)} 字符,建议截断")
return len(issues) == 0, issues
# 使用示例
governance = AgentGovernanceEngine("agent-041", "session-xyz")
governance.set_permission(ToolPermission(
tool_name="web_search", allowed=True,
risk_level=RiskLevel.LOW, requires_approval=False,
rate_limit_per_min=30,
))
governance.set_permission(ToolPermission(
tool_name="delete_record", allowed=True,
risk_level=RiskLevel.CRITICAL, requires_approval=True,
rate_limit_per_min=5,
))
allowed, reason = governance.check_tool_permission("web_search")
print(f"web_search: {allowed} - {reason}")
allowed, reason = governance.check_tool_permission("delete_record")
print(f"delete_record: {allowed} - {reason}")
valid, issues = governance.validate_output(
"这是正常输出内容,没有任何敏感信息。"
)
print(f"输出合规: {valid}")
if issues:
for issue in issues:
print(f" ⚠ {issue}")| 治理策略 | 防护目标 | 实施方式 | 触发条件 |
|---|---|---|---|
权限控制 | 防止未授权操作 | 工具级 ACL + 最小权限 | 每次工具调用前 |
成本上限 | 防止费用爆炸 | Token 预算 + 成本预算 | 每次 LLM 调用前 |
速率限制 | 防止触发外部限流 | 令牌桶算法 | 每次工具调用前 |
人工审批 | 防止高危误操作 | 审批工作流集成 | CRITICAL 级别操作 |
输出验证 | 防止信息泄露 | 正则扫描 + 规则引擎 | 每次输出返回前 |
行为基线 | 检测异常行为 | 统计建模 + 阈值告警 | 行为显著偏离基线时 |
为 Agent 设置成本预算时,建议按「每会话」而非「每天」设置。一个失控的 Agent 在几分钟内就可能消耗掉全天的预算。按会话设置可以更精细地控制每个任务的成本上限。
权限控制不能完全依赖代码层面的检查。Agent 可能通过 prompt injection 绕过代码检查,直接指示 LLM「假设你有所有权限」。必须在系统层面(如操作系统权限、网络策略)设置硬性边界,代码层面的权限检查只是第一道防线。
7Agent 部署与编排:多 Agent 协作、负载均衡与故障恢复
部署与编排层是 Agent 基础设施中将单个 Agent 扩展为生产级系统的组件。单个 Agent 在开发环境中运行良好,不代表它在生产环境的高并发、高可用要求下也能正常工作。部署与编排层解决的核心问题是:如何让 Agent 系统在真实负载下稳定运行。
单 Agent 部署模式
最简单的 Agent 部署是单实例模式:一个 Agent 进程处理所有请求。这种模式适用于低流量场景,但存在明显的单点故障风险。改进的单 Agent 部署包括:
- 进程守护:使用 systemd 或 supervisor 管理 Agent 进程,崩溃后自动重启
- 健康检查:定期发送探测请求,确认 Agent 是否正常响应
- 优雅关闭:收到关闭信号后,完成当前任务再退出,而不是强制终止
多 Agent 编排模式
当流量增长或任务复杂度增加时,需要多 Agent 协作:
水平扩展模式:多个相同的 Agent 实例并行处理独立的请求。这种模式适合问答型 Agent——每个用户请求是独立的,不需要 Agent 之间协调。负载均衡器(如 Nginx 或 Envoy)将请求分发到空闲的 Agent 实例。
垂直分工模式:多个不同角色的 Agent协作完成一个复杂任务。例如:一个 Researcher Agent 负责信息收集,一个 Writer Agent 负责撰写报告,一个 Reviewer Agent 负责质量审核。这种模式需要任务编排引擎(如 LangGraph 或 Temporal)来协调各 Agent 的执行顺序和数据流。
混合模式:结合水平扩展和垂直分工——每种角色的 Agent 都有多个实例,编排引擎负责任务路由和结果聚合。
多 Agent 协作的挑战
多 Agent 系统引入了新的复杂性:
- 状态一致性:多个 Agent 操作同一份数据时,如何保证状态一致性?需要使用分布式锁或乐观并发控制。
- 通信开销:Agent 之间的消息传递增加了延迟和失败概率。需要设计可靠的通信协议(如消息队列 + 重试机制)。
- 调试困难:多 Agent 系统的错误链路很长,一个 Agent 的错误可能源于另一个 Agent 的错误输入。需要分布式追踪来定位根因。
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class AgentRole(Enum):
RESEARCHER = "researcher"
WRITER = "writer"
REVIEWER = "reviewer"
@dataclass
class TaskMessage:
"""Agent 间的任务消息"""
task_id: str
from_role: AgentRole
to_role: AgentRole
payload: dict
timestamp: float = 0.0
retry_count: int = 0
max_retries: int = 3
def __post_init__(self):
if self.timestamp == 0:
self.timestamp = time.time()
class MessageQueue:
"""简单的 Agent 间消息队列(生产环境应使用 RabbitMQ / Kafka)"""
def __init__(self):
self.queues: dict[AgentRole, list[TaskMessage]] = {
role: [] for role in AgentRole
}
self.processed: list[TaskMessage] = []
self.dead_letter: list[TaskMessage] = []
async def publish(self, message: TaskMessage):
"""发布消息到目标 Agent 的队列"""
self.queues[message.to_role].append(message)
async def consume(self, role: AgentRole,
timeout: float = 5.0) -> Optional[TaskMessage]:
"""从队列消费消息"""
start = time.time()
while time.time() - start < timeout:
if self.queues[role]:
return self.queues[role].pop(0)
await asyncio.sleep(0.1)
return None
async def nack(self, message: TaskMessage):
"""消息处理失败,重新入队或进入死信队列"""
message.retry_count += 1
if message.retry_count >= message.max_retries:
self.dead_letter.append(message)
print(f"Message {message.task_id} moved to dead letter queue "
f"after {message.retry_count} retries")
else:
self.queues[message.to_role].insert(0, message)
print(f"Message {message.task_id} requeued "
f"(retry {message.retry_count}/{message.max_retries})")
async def ack(self, message: TaskMessage):
"""消息处理成功,标记为已处理"""
self.processed.append(message)
class AgentOrchestrator:
"""Agent 编排器:协调多 Agent 协作流程"""
def __init__(self):
self.message_queue = MessageQueue()
self.task_results: dict[str, dict] = {}
async def run_pipeline(self, user_query: str) -> dict:
"""执行完整的多 Agent 协作流程"""
task_id = f"task-{int(time.time())}"
# 阶段 1: Researcher 收集信息
research_msg = TaskMessage(
task_id=task_id,
from_role=AgentRole.RESEARCHER,
to_role=AgentRole.RESEARCHER,
payload={"query": user_query, "phase": "research"},
)
await self.message_queue.publish(research_msg)
# 模拟 Researcher 工作
research_result = {
"findings": ["发现 15 篇相关文献", "3 个关键技术趋势"],
"sources": ["arxiv", "github", "tech blog"],
}
self.task_results[task_id] = {"research": research_result}
# 阶段 2: Writer 撰写报告
writer_msg = TaskMessage(
task_id=task_id,
from_role=AgentRole.RESEARCHER,
to_role=AgentRole.WRITER,
payload={
"query": user_query,
"research_findings": research_result,
"phase": "writing",
},
)
await self.message_queue.publish(writer_msg)
# 模拟 Writer 工作
draft = "## 2026 AI Agent 基础设施趋势分析\n\n根据研究..."
self.task_results[task_id]["draft"] = draft
# 阶段 3: Reviewer 质量审核
review_msg = TaskMessage(
task_id=task_id,
from_role=AgentRole.WRITER,
to_role=AgentRole.REVIEWER,
payload={"draft": draft, "phase": "review"},
)
await self.message_queue.publish(review_msg)
# 模拟 Reviewer 工作
review_result = {
"approved": True,
"suggestions": ["补充代码示例", "增加对比表格"],
}
self.task_results[task_id]["review"] = review_result
return self.task_results[task_id]
# 使用示例
async def main():
orchestrator = AgentOrchestrator()
result = await orchestrator.run_pipeline(
"分析 2026 年 AI Agent 基础设施的技术趋势"
)
print(f"Task completed: {list(result.keys())}")
asyncio.run(main())| 部署模式 | 适用场景 | 优势 | 劣势 | 典型并发 |
|---|---|---|---|---|
单实例 | 低流量、简单任务 | 架构简单、易于调试 | 单点故障、无法扩展 | < 10 RPM |
水平扩展 | 高流量、独立请求 | 线性扩展、高可用 | 不支持跨实例协作 | 100+ RPM |
垂直分工 | 复杂任务、多步骤 | 专业化、质量更高 | 编排复杂、延迟增加 | < 50 RPM |
混合模式 | 大规模生产环境 | 兼顾扩展与协作 | 运维复杂度高 | 1000+ RPM |
多 Agent 编排不要自己从零造轮子。成熟的编排框架(如 LangGraph、Temporal、Dagster)已经处理了分布式追踪、重试、超时、状态持久化等复杂问题。自己实现这些功能的工作量远超你的预期。
多 Agent 系统中,消息队列是单点故障的最大风险源。如果消息队列宕机,所有 Agent 之间的通信都会中断。生产环境必须使用支持集群的消息队列(如 RabbitMQ 集群、Kafka 集群),而不是简单的内存队列。
8总结:Agent 基础设施的完整知识图谱
Agent 基础设施是一个系统工程问题,不是简单的"接一个 API、写几行代码"就能解决的。本文系统梳理了 Agent 基础设施的完整技术栈,从可观测性到数据管理、从内存架构到状态管理、从安全治理到部署编排。
核心知识点回顾
- 可观测性是 Agent 基础设施的第一要务——看不到 Agent 在做什么,就无法治理它。执行追踪、决策解释、行为审计三个层级缺一不可。
- 数据管理层决定了 Agent 的能力上限——三层数据架构(热/温/冷)分别对应毫秒级缓存、语义检索和长期归档。
- 内存架构是 Agent 的认知基础——短期记忆、工作记忆、长期记忆的协同管理,决定了 Agent 的上下文理解能力。
- 状态管理确保 Agent 的可靠性——检查点机制和异常恢复策略,让 Agent 在出错后能从最近的稳定状态继续执行。
- 安全治理保护系统免受 Agent 自身行为伤害——权限控制、成本预算、输出验证三道防线,防止 Agent 造成不可逆的损害。
- 部署编排将单 Agent 扩展为生产级系统——水平扩展、垂直分工、混合模式,根据不同场景选择合适的部署拓扑。
未来趋势
Agent 基础设施正在经历快速演进,几个值得关注的趋势:
- Agent 原生可观测性标准:OpenTelemetry 正在制定 Agent 专用的语义约定,标准化 Agent 的追踪和指标采集
- 边缘 Agent 部署:随着端侧模型(如 Llama 3 8B、Phi-3)性能提升,在设备本地运行 Agent 将成为可能,这要求基础设施支持离线运行和端云协同
- Agent 运行时安全沙箱:类似 WebAssembly 的沙箱技术将被用于 Agent 执行环境,提供细粒度的资源隔离和权限控制
- 多 Agent 编排即服务:云厂商将推出托管的 Agent 编排平台,降低多 Agent 系统的部署和运维门槛
学习路线建议
对于想要深入学习 Agent 基础设施的开发者,建议按以下顺序学习:
- 先掌握单 Agent 架构的可观测性和状态管理
- 再学习数据管理层,特别是向量数据库的集成
- 然后深入安全治理,理解权限控制和审计
- 最后学习多 Agent 编排,掌握分布式系统的协调模式
最后一句话:AI Agent 的能力上限不由模型决定,而由基础设施决定。把基础设施建好,Agent 才能在生产环境中可靠地运行。
如果你想快速搭建一个可用的 Agent 基础设施,推荐从 LangChain + LangSmith + Qdrant + Redis 开始。这套组合覆盖了可观测性、向量检索、缓存三大核心需求,且有成熟的社区支持。
不要在生产环境中使用「演示级」的 Agent 实现。很多教程和示例代码只展示了 Agent 的「快乐路径」(Happy Path),没有处理超时、错误、重试、状态恢复等生产必需的逻辑。直接将这些代码用于生产,几乎必然会遇到可靠性问题。