首页/知识库/AI Agent 基础设施:可观测性、数据管理与运行时治理

AI Agent 基础设施:可观测性、数据管理与运行时治理

✍️ AI Master📅 创建 2026-04-30📖 28 min 阅读
💡

文章摘要

AI Agent 的能力上限不由模型决定,而由底层基础设施决定。本文系统梳理 Agent 基础设施的完整技术栈:从可观测性管线到数据管理层,从内存架构到状态治理,从运行时安全到生产部署模式,帮助你构建可靠、可控、可审计的 Agent 系统。

1为什么 Agent 基础设施比模型更重要

2025 年图灵奖得主在公开演讲中提出了一个振聋发聩的观点:"AI Agent 的能力上限最终都是数据库问题。" 这句话揭示了一个被严重低估的事实——决定 Agent 在生产环境中表现的不是 LLM 的参数量,而是底层基础设施的完备程度。

Agent 基础设施指的是支撑 AI Agent 运行时所需的全部技术组件,包括可观测性系统、数据管理层、内存架构、状态管理器、安全网关和部署编排层。这些组件共同决定了 Agent 能否在生产环境中可靠运行、持续学习和安全交互。

Agent 与纯 LLM 的本质区别

纯 LLM 应用是一个无状态的函数调用:输入 prompt,输出文本。而 AI Agent是一个有状态的系统,它需要感知环境、维护记忆、执行工具调用、跟踪任务状态、处理异常情况。这些能力无法仅靠模型本身实现,必须依赖专门的基础设施层。

关键差异对比

  • LLM 应用:无状态、单次调用、无需持久化、无需运行时监控
  • AI Agent:有状态、多轮交互、需要持久化记忆、需要实时监控和治理

Agent 基础设施的六大支柱

Agent 基础设施由六大核心组件构成,每个组件解决不同的运行时问题:

  • 可观测性层:回答"Agent 在做什么?为什么这么做?"
  • 数据管理层:管理 Agent 访问的外部数据源和内部存储
  • 内存架构:短期记忆、长期记忆、工作记忆的结构化组织
  • 状态管理器:跟踪 Agent 的任务状态、工具状态和会话状态
  • 安全与治理层:权限控制、行为审计、输出验证
  • 部署与编排层:多 Agent 协作、负载均衡、故障恢复

核心观点:如果你看不到 Agent 在做什么,你就无法治理它。可观测性是 Agent 基础设施的第一要务,没有可观测性,其他所有组件都形同虚设。

设计 Agent 系统的第一步不是选择模型,而是设计可观测性方案。在写第一个 Agent 之前,先决定:你要记录什么?如何追踪一个 Agent 决策的完整链路?出问题后如何复盘?

不要假设 LLM 的「聪明」能弥补基础设施的「简陋」。在生产环境中,一个缺乏状态管理的 Agent 会在网络抖动时丢失全部上下文,一个没有审计日志的 Agent 出问题后无法追溯根因。基础设施的短板决定了 Agent 能力的上限。

2Agent 可观测性:从 Tracing 到行为审计

可观测性(Observability) 是 Agent 基础设施中最关键也最容易被忽视的组件。传统的应用监控(日志、指标、追踪)在 Agent 场景下远远不够,因为 Agent 的行为是非确定性的——同一个输入,不同时间可能产生完全不同的执行路径。

Agent 可观测性的三层架构

Agent 可观测性需要从三个层面构建完整的可见性管线:

第一层:执行追踪(Execution Tracing)。这是最基础的层面,记录 Agent 每次工具调用的输入输出、LLM 调用的 prompt 和 response、以及状态转换的时间戳。通过 OpenTelemetry 标准的 Trace 和 Span,可以将一次 Agent 运行的完整链路可视化。

第二层:决策解释(Decision Explanation)。这层解决的核心问题是:Agent 为什么做出这个决定? 这需要记录 Agent 的推理过程(如果是 ReAct 模式,就是 Thought-Action-Observation 的完整序列)、上下文快照(Agent 在做决策时看到了什么信息)、以及备选方案(Agent 考虑了哪些其他选项但被否决了)。

第三层:行为审计(Behavior Auditing)。这是最高层级的可观测性,关注的是 Agent 行为是否符合预期策略。例如:Agent 是否访问了未授权的数据源?是否执行了超出权限的操作?输出内容是否包含敏感信息?行为审计需要策略引擎来实时评估 Agent 的行为合规性。

Agent 可观测性的核心指标

与传统的 APM 指标不同,Agent 可观测性需要关注一组全新的指标:

  • 工具调用成功率:Agent 调用外部工具的成功率,低于 95% 说明工具集成有问题
  • 上下文命中率:Agent 从记忆中检索到的信息中有多少是相关且正确的
  • 决策一致性:相同或相似输入下,Agent 的决策是否保持稳定
  • 异常恢复率:Agent 遇到错误后自动恢复并继续执行任务的比例
  • 延迟分布:不仅要看平均延迟,更要看 P99 延迟,因为 Agent 的多步推理可能产生极端的长尾延迟

实现 Agent 可观测性的技术栈

主流的可观测性方案包括 LangSmith(LangChain 生态)、Phoenix(Arize AI)、Langfuse(开源方案)以及基于 OpenTelemetry 的自建方案。选择方案时需要考虑三个关键因素:是否支持 Agent 特定的追踪语义、是否提供决策可视化工具、是否支持行为审计的实时告警。

python
from opentelemetry import trace, context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import time
import json

# 初始化 Agent 专用的 Tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("agent-runtime")
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))

class AgentTracer:
    """Agent 执行追踪器:记录完整的 Thought-Action-Observation 链路"""
    
    def __init__(self, agent_id: str, session_id: str):
        self.agent_id = agent_id
        self.session_id = session_id
        self.step_count = 0
    
    def trace_tool_call(self, tool_name: str, tool_input: dict, tool_output: dict):
        """追踪一次工具调用"""
        self.step_count += 1
        with tracer.start_as_current_span(
            f"agent.{tool_name}",
            attributes={
                "agent.id": self.agent_id,
                "agent.session_id": self.session_id,
                "agent.step": self.step_count,
                "agent.tool.name": tool_name,
                "agent.tool.input_size": len(json.dumps(tool_input)),
                "agent.tool.output_size": len(json.dumps(tool_output)),
                "agent.type": "tool_call",
            }
        ) as span:
            # 记录决策上下文
            span.set_attribute("agent.thought", "检索相关文档后决定调用搜索工具")
            span.set_attribute("agent.alternatives", 
                             json.dumps(["直接回答", "调用知识库", "请求用户澄清"]))
            return tool_output
    
    def trace_llm_call(self, prompt: str, response: str, model: str, 
                       latency_ms: float, tokens: dict):
        """追踪一次 LLM 调用"""
        with tracer.start_as_current_span(
            f"agent.llm.{model}",
            attributes={
                "agent.id": self.agent_id,
                "agent.llm.model": model,
                "agent.llm.latency_ms": latency_ms,
                "agent.llm.prompt_tokens": tokens.get("prompt", 0),
                "agent.llm.completion_tokens": tokens.get("completion", 0),
                "agent.llm.total_cost": tokens.get("cost", 0),
                "agent.type": "llm_call",
            }
        ) as span:
            return response

# 使用示例
tr = AgentTracer(agent_id="research-agent-01", session_id="session-abc123")
tool_output = tr.trace_tool_call(
    tool_name="web_search",
    tool_input={"query": "2026 AI Agent 基础设施趋势"},
    tool_output={"results": 15, "top_sources": ["arxiv", "github", "blog"]}
)
tr.trace_llm_call(
    prompt="根据搜索结果分析...",
    response="基础设施的核心趋势是...",
    model="gpt-4o",
    latency_ms=1230,
    tokens={"prompt": 4500, "completion": 800, "cost": 0.012}
)
可观测性层级核心问题记录内容存储开销实时性要求

执行追踪 (Tracing)

Agent 执行了什么?

工具调用、LLM 调用、状态转换

中 (每次调用 ~1KB)

高 (实时)

决策解释 (Explanation)

Agent 为什么这么做?

推理链、上下文快照、备选方案

高 (每步 ~5KB)

中 (近实时)

行为审计 (Auditing)

Agent 行为是否合规?

策略检查结果、违规记录、告警

低 (仅异常时记录)

高 (实时告警)

在 Agent 可观测性中,最重要的一条规则是:记录决策上下文(Agent 看到了什么信息才做出的决定),而不是只记录最终输出。这是调试 Agent 行为异常的起点。

不要在 Trace 中记录敏感的 LLM prompt 和 response 原文。Agent 的 prompt 可能包含用户隐私数据、API 密钥或业务机密。应记录 prompt 的 hash、token 数和结构信息,而非原文。

3Agent 数据管理层:Vector DB、缓存与外部数据源

数据管理是 Agent 基础设施中最直接影响 Agent 能力上限的组件。图灵奖得主说"Agent 最后全是数据库问题",核心指向的就是这一层——Agent 能访问的数据质量、数据新鲜度和数据检索效率,直接决定了 Agent 回答问题的准确性和时效性。

Agent 数据管理的三层架构

Agent 需要访问的数据可以分为三个层次,每个层次有不同的访问模式和性能要求:

热数据层(Hot Layer):Agent 当前任务直接相关的数据,需要在毫秒级被检索到。这包括当前会话的上下文、活跃的工具响应缓存、以及最近访问的文档片段。热数据层通常存储在内存缓存(如 Redis)中,采用 LRU 淘汰策略。

温数据层(Warm Layer):Agent 可能用到但不一定立即需要的数据。这包括知识库检索结果、工具的定义和 schema、Agent 的长期记忆索引。温数据层通常存储在向量数据库(如 Milvus、Qdrant、Pinecone)中,通过语义相似度检索。

冷数据层(Cold Layer):Agent 偶尔需要的历史数据。这包括历史会话记录、完整的行为审计日志、长期归档的知识库。冷数据层通常存储在对象存储(如 S3)中,通过批处理查询访问。

Vector DB 在 Agent 中的核心角色

向量数据库是 Agent 数据管理层的核心组件,它解决了传统数据库无法解决的语义检索问题。当 Agent 需要回答"与这个问题最相关的文档是什么"时,关键词匹配是不够的——需要基于语义相似度找到相关内容。

Vector DB 在 Agent 中的三个关键应用场景:

  • RAG 检索:从大规模文档库中检索与用户查询最相关的段落,作为 LLM 的上下文输入
  • 记忆检索:从 Agent 的长期记忆中检索与当前对话语义相关的历史片段
  • 工具匹配:当 Agent 需要选择使用哪个工具时,通过语义匹配找到最合适的工具

数据一致性挑战

Agent 数据管理面临的一个独特挑战是数据一致性问题:当 Agent 在多步推理过程中,底层数据源(如文档库、数据库)可能已经更新。如果 Agent 基于旧数据做出决策,可能导致错误的结果。

解决数据一致性问题的三种策略:

  • TTL 过期策略:为缓存的数据设置生存时间,过期后强制重新检索
  • 版本追踪:为每次检索结果记录数据版本号,决策时验证版本是否过期
  • 主动失效:当底层数据源更新时,主动使相关缓存失效,触发 Agent 重新检索
python
import redis
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition
import hashlib
import json
import time
from typing import Optional

class AgentDataManager:
    """Agent 数据管理器:三层数据架构(热/温/冷)"""
    
    def __init__(self, redis_url: str, qdrant_url: str, embedding_model):
        # 热数据层:Redis 缓存
        self.cache = redis.from_url(redis_url, decode_responses=True)
        # 温数据层:Qdrant 向量数据库
        self.vector_db = QdrantClient(url=qdrant_url)
        # 嵌入模型
        self.embedding_model = embedding_model
        self.default_ttl = 3600  # 缓存默认 TTL: 1 小时
    
    def store_context(self, session_id: str, context: dict) -> str:
        """存储会话上下文到热数据层"""
        key = f"agent:context:{session_id}"
        version = int(time.time())
        payload = {
            "data": context,
            "version": version,
            "timestamp": time.time(),
        }
        self.cache.setex(key, self.default_ttl, json.dumps(payload))
        return f"v{version}"
    
    def retrieve_context(self, session_id: str) -> Optional[dict]:
        """从热数据层检索会话上下文"""
        key = f"agent:context:{session_id}"
        raw = self.cache.get(key)
        if raw is None:
            return None
        payload = json.loads(raw)
        # 检查 TTL 是否接近过期
        age = time.time() - payload["timestamp"]
        if age > self.default_ttl * 0.8:
            print(f"Warning: context for {session_id} is {age:.0f}s old, "
                  f"consider refreshing")
        return payload["data"]
    
    def search_knowledge(self, query: str, collection: str = "knowledge",
                        top_k: int = 5, min_score: float = 0.7) -> list:
        """从向量数据库检索相关知识"""
        # 生成查询向量
        query_vector = self.embedding_model.encode(query).tolist()
        # 语义搜索
        results = self.vector_db.search(
            collection_name=collection,
            query_vector=query_vector,
            limit=top_k,
        )
        # 过滤低分结果
        relevant = [
            {
                "content": hit.payload.get("content", ""),
                "source": hit.payload.get("source", ""),
                "score": hit.score,
                "version": hit.payload.get("version", "unknown"),
            }
            for hit in results if hit.score >= min_score
        ]
        return relevant
    
    def cache_tool_response(self, tool_name: str, input_hash: str,
                           response: dict, ttl: int = 1800):
        """缓存工具响应(避免重复调用相同工具)"""
        key = f"agent:tool_cache:{tool_name}:{input_hash}"
        self.cache.setex(key, ttl, json.dumps({
            "response": response,
            "cached_at": time.time(),
        }))
    
    def get_cached_tool_response(self, tool_name: str, 
                                 input_hash: str) -> Optional[dict]:
        """获取缓存的工具响应"""
        key = f"agent:tool_cache:{tool_name}:{input_hash}"
        raw = self.cache.get(key)
        return json.loads(raw) if raw else None

# 使用示例
data_mgr = AgentDataManager(
    redis_url="redis://localhost:6379",
    qdrant_url="http://localhost:6334",
    embedding_model=None  # 实际使用 sentence-transformers
)

# 缓存会话上下文
version = data_mgr.store_context("session-001", {
    "user_goal": "分析 2026 年 AI Agent 基础设施趋势",
    "constraints": ["只用中文回复", "包含代码示例"],
})

# 检索知识库
results = data_mgr.search_knowledge(
    query="Agent 可观测性最佳实践",
    top_k=3,
    min_score=0.75,
)
print(f"找到 {len(results)} 条相关知识")
数据层存储引擎访问延迟数据量级更新频率典型用途

热数据 (Hot)

Redis / Memcached

< 5ms

MB 级

秒级

会话上下文、工具缓存

温数据 (Warm)

Qdrant / Milvus / Pinecone

50-200ms

GB 级

分钟级

知识库、记忆索引

冷数据 (Cold)

S3 / PostgreSQL

100-5000ms

TB 级

小时级

审计日志、归档

为 Agent 的向量检索设置 min_score 阈值(建议 0.7+)。不要盲目取 top_k 个结果——如果相似度低于阈值,返回「无相关知识」比返回「不相关知识」更好,因为不相关知识会误导 LLM 产生幻觉。

不要在向量数据库中存储未经清洗的原始数据。Agent 检索到的内容会直接作为 LLM 的上下文,如果向量库中包含敏感信息、过期数据或错误内容,Agent 会直接传播这些错误。建立数据入库前的清洗和验证流程。

4Agent 内存架构:短期记忆、长期记忆与工作记忆

Agent 内存架构是 Agent 基础设施中最接近人类认知模型的组件。与 LLM 的固定上下文窗口不同,Agent 需要动态管理不同时间尺度的记忆,这需要一个精心设计的内存架构。

三层记忆模型

Agent 内存架构借鉴了人类认知心理学中的三层记忆模型:

短期记忆(Short-Term Memory):对应 LLM 的上下文窗口,存储当前对话的最近消息和最近的操作结果。短期记忆的特点是容量有限(受模型上下文窗口大小限制)、访问速度最快(直接在 prompt 中)、内容会随对话推进被截断。管理短期记忆的核心挑战是上下文窗口有限时,如何选择保留哪些信息、丢弃哪些信息。

工作记忆(Working Memory):这是 Agent 特有的记忆层,存储当前任务的中间状态——比如"正在执行第 3 步,前两步的结果是 X 和 Y"。工作记忆不在 LLM 的上下文窗口中,而是由 Agent 框架显式管理。工作记忆的大小不受上下文窗口限制,可以随时扩展。关键设计决策是:工作记忆的结构化程度——是用自由文本还是结构化数据(如 JSON schema)来存储。

长期记忆(Long-Term Memory):存储 Agent 跨会话持久化的知识,包括用户偏好、历史经验、习得的技能和知识库索引。长期记忆通过向量数据库实现语义检索,Agent 在需要时按需检索相关记忆注入到当前上下文中。长期记忆管理的核心挑战是记忆冲突——当新旧记忆矛盾时,如何决定保留哪个版本。

记忆管理的关键设计决策

设计 Agent 内存架构时,需要做出以下关键决策:

  • 上下文窗口分配:多少比例用于系统指令、多少用于对话历史、多少用于检索增强的上下文?
  • 记忆压缩策略:当短期记忆接近容量上限时,使用摘要压缩(将多轮对话压缩为要点)还是选择性保留(只保留与当前任务相关的片段)?
  • 记忆检索触发:是每次 LLM 调用前都检索长期记忆,还是按需检索(Agent 主动发出检索请求时)?
  • 记忆生命周期:记忆有过期机制吗?多久不使用就自动归档到冷存储?

记忆冲突与记忆衰减

长期记忆系统面临两个独特的问题:

记忆冲突:Agent 在不同时间学到了相互矛盾的信息。例如,用户之前说"我喜欢 Python",后来又说"我现在主要用 Rust"。解决记忆冲突的策略包括时间戳优先级(新记忆覆盖旧记忆)、置信度评分(高置信度记忆优先)和来源追溯(记录记忆的来源以判断可信度)。

记忆衰减:随着时间推移,记忆的相关性会自然下降。实现记忆衰减的两种常见方法:在向量检索时将时间衰减因子加入相似度计算(如 score × e^(-λ × age)),或在存储时为记忆设置过期时间,到期后自动归档。

python
import json
import time
import math
from dataclasses import dataclass, field
from typing import Optional
from collections import deque

@dataclass
class MemoryItem:
    """单条记忆的标准化结构"""
    content: str
    source: str           # "conversation" | "tool_result" | "user_preference"
    timestamp: float
    importance: float     # 0.0 ~ 1.0,重要性评分
    access_count: int = 0
    last_accessed: float = 0.0
    tags: list = field(default_factory=list)
    
    def relevance_score(self, current_time: float, decay_rate: float = 0.0001) -> float:
        """计算当前时间点的相关性分数(含时间衰减)"""
        age = current_time - self.timestamp
        # 时间衰减 + 使用频率加成
        time_decay = math.exp(-decay_rate * age)
        usage_bonus = min(self.access_count * 0.05, 0.3)
        return self.importance * time_decay + usage_bonus


class AgentMemoryManager:
    """Agent 三层记忆管理器"""
    
    def __init__(self, context_window_limit: int = 128000,
                 short_term_tokens: int = 8000,
                 working_memory_limit: int = 50):
        self.context_limit = context_window_limit
        self.st_limit = short_term_tokens
        # 短期记忆:双端队列,FIFO 淘汰
        self.short_term = deque(maxlen=20)
        # 工作记忆:结构化存储
        self.working_memory: dict[str, MemoryItem] = {}
        self.wm_limit = working_memory_limit
        # 长期记忆索引(简化版,实际对接向量数据库)
        self.long_term_index: list[MemoryItem] = []
    
    def add_to_short_term(self, message: str, token_count: int):
        """添加消息到短期记忆(对话历史)"""
        self.short_term.append({
            "content": message,
            "tokens": token_count,
        })
    
    def compress_short_term(self) -> str:
        """压缩短期记忆:将最近对话摘要化"""
        if len(self.short_term) <= 3:
            # 对话不多时保留原文
            return "
".join(m["content"] for m in self.short_term)
        
        # 超过 3 条时:保留最后 2 条原文,中间部分摘要
        recent = list(self.short_term)[-2:]
        middle = list(self.short_term)[1:-2]
        summary = f"[{len(middle)} 轮对话摘要:讨论{middle[0]['content'][:50]}...]"
        return summary + "
" + "
".join(m["content"] for m in recent)
    
    def set_working_memory(self, key: str, content: str, importance: float = 0.8):
        """设置工作记忆条目"""
        if len(self.working_memory) >= self.wm_limit:
            # 淘汰最久未访问的条目
            oldest = min(self.working_memory.items(), 
                        key=lambda x: x[1].last_accessed)
            del self.working_memory[oldest[0]]
        
        self.working_memory[key] = MemoryItem(
            content=content,
            source="agent_state",
            timestamp=time.time(),
            importance=importance,
        )
    
    def retrieve_long_term(self, query: str, top_k: int = 3) -> list[str]:
        """从长期记忆检索(简化版——实际应使用向量数据库)"""
        current_time = time.time()
        # 按相关性排序
        scored = [
            (item, item.relevance_score(current_time))
            for item in self.long_term_index
            if query.lower() in item.content.lower() or 
               any(query.lower() in tag for tag in item.tags)
        ]
        scored.sort(key=lambda x: x[1], reverse=True)
        
        # 更新访问计数
        for item, _ in scored[:top_k]:
            item.access_count += 1
            item.last_accessed = current_time
        
        return [item.content for item, _ in scored[:top_k]]
    
    def get_context_prompt(self) -> str:
        """构建 LLM 的完整上下文 prompt"""
        parts = []
        
        # 工作记忆(结构化状态)
        if self.working_memory:
            wm_parts = []
            for key, item in self.working_memory.items():
                wm_parts.append(f"【{key}】{item.content}")
            parts.append("## 当前任务状态\n" + "\n".join(wm_parts))
        
        # 短期记忆(压缩后的对话历史)
        st = self.compress_short_term()
        if st:
            parts.append("## 对话历史\n" + st)
        
        return "\n\n".join(parts)

# 使用示例
mem = AgentMemoryManager()
mem.add_to_short_term("用户:帮我查一下 2026 年 AI Agent 的趋势", 20)
mem.add_to_short_term("Agent:好的,我来搜索相关信息", 15)
mem.add_to_short_term("Agent:根据搜索结果,主要趋势有...", 150)

mem.set_working_memory("current_task", "研究 2026 AI Agent 基础设施", 0.9)
mem.set_working_memory("step", "已完成信息检索,正在整理结果", 0.7)

print(mem.get_context_prompt())
记忆类型存储位置容量限制访问方式生命周期管理策略

短期记忆

LLM 上下文窗口

受模型限制

直接作为 prompt

对话级别

压缩 / 截断

工作记忆

Agent 运行时内存

可配置

Agent 显式读写

任务级别

LRU 淘汰

长期记忆

向量数据库

近无限

语义检索

跨会话

衰减 + 冲突解决

工作记忆应该用结构化数据(JSON / 键值对)存储,而不是自由文本。结构化工作记忆可以被 Agent 的程序逻辑直接读取和修改,不需要经过 LLM 解析,这大幅降低了出错概率。

不要把所有历史对话都塞进长期记忆。未过滤的对话记录包含大量噪声(寒暄、纠错、试错过程),直接存入向量库会污染检索结果。应该在存入长期记忆前进行摘要化和结构化处理。

5Agent 状态管理:任务状态、工具状态与异常恢复

状态管理是 Agent 基础设施中确保可靠性的核心组件。没有良好的状态管理,Agent 在遇到网络错误、工具超时或模型超时等异常情况时,会丢失全部进度,需要用户从头开始——这在生产环境中是不可接受的。

Agent 状态的三个维度

Agent 运行时需要维护三个维度的状态:

任务状态(Task State):描述 Agent 当前正在做什么、已完成什么、还需要做什么。任务状态通常用有限状态机(FSM) 或有向无环图(DAG) 来建模。一个典型的 Agent 任务状态包括:等待输入 → 规划中 → 执行中(含子步骤) → 验证中 → 完成 / 失败。

工具状态(Tool State):记录 Agent 调用的每个工具的当前状态——是否正在执行、是否超时、返回值是什么、是否需要重试。工具状态管理的关键是超时处理和重试策略:当一个工具调用超过预定时间仍未返回时,Agent 应该如何处理?

会话状态(Session State):维护整个用户会话的元数据——会话 ID、开始时间、累计 token 消耗、成本、已执行步骤数。会话状态用于计费、审计和会话恢复。

异常恢复策略

Agent 在生产环境中必然会遇到各种异常,设计优雅的恢复策略比避免异常更重要:

  • 工具调用超时:设置合理的超时阈值(如 30 秒),超时后重试一次,再次失败则记录错误并切换到备选方案
  • LLM 输出不完整:检测输出是否被截断(如 JSON 不完整、代码缺少闭合括号),如果是,发送续传请求让模型补全
  • 上下文窗口溢出:当累积的上下文超过模型限制时,触发记忆压缩流程,保留关键信息,丢弃次要内容
  • 网络中断:将 Agent 的中间状态持久化到数据库,网络恢复后从最近的检查点(checkpoint) 恢复执行

状态持久化设计

状态持久化的核心原则是检查点机制(Checkpointing):在 Agent 执行的每个关键节点保存状态快照,使得 Agent 可以从任意检查点恢复。检查点的粒度需要权衡:太粗(每步检查点)导致存储开销大,太细(仅在任务完成时检查)导致故障恢复时丢失大量进度。

推荐的检查点策略:在每次工具调用前后、每次 LLM 调用前后、以及任务状态转换时保存检查点。检查点应包含:任务状态、工作记忆、工具调用历史、以及当前执行的步骤编号。

python
import json
import time
import enum
from dataclasses import dataclass, asdict
from typing import Optional, Any
import sqlite3

class TaskStatus(enum.Enum):
    WAITING = "waiting"
    PLANNING = "planning"
    EXECUTING = "executing"
    VERIFYING = "verifying"
    COMPLETED = "completed"
    FAILED = "failed"
    RECOVERING = "recovering"

@dataclass
class Checkpoint:
    """Agent 执行检查点"""
    checkpoint_id: str
    timestamp: float
    task_status: str
    step_number: int
    tool_history: list[dict]
    working_memory: dict
    error: Optional[str] = None
    
    def save_to_db(self, db_path: str):
        """持久化检查点到 SQLite"""
        conn = sqlite3.connect(db_path)
        conn.execute("""
            INSERT OR REPLACE INTO checkpoints 
            (checkpoint_id, timestamp, task_status, step_number, 
             tool_history, working_memory, error)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            self.checkpoint_id, self.timestamp, self.task_status,
            self.step_number,
            json.dumps(self.tool_history),
            json.dumps(self.working_memory),
            self.error,
        ))
        conn.commit()
        conn.close()
    
    @classmethod
    def load_latest(cls, db_path: str) -> Optional['Checkpoint']:
        """加载最近的检查点用于恢复"""
        conn = sqlite3.connect(db_path)
        row = conn.execute(
            "SELECT * FROM checkpoints ORDER BY timestamp DESC LIMIT 1"
        ).fetchone()
        conn.close()
        if row is None:
            return None
        return cls(
            checkpoint_id=row[0], timestamp=row[1],
            task_status=row[2], step_number=row[3],
            tool_history=json.loads(row[4]),
            working_memory=json.loads(row[5]),
            error=row[6],
        )


class AgentStateManager:
    """Agent 状态管理器:支持检查点和异常恢复"""
    
    def __init__(self, agent_id: str, db_path: str = "agent_state.db"):
        self.agent_id = agent_id
        self.db_path = db_path
        self.current_status = TaskStatus.WAITING
        self.step_number = 0
        self.tool_history: list[dict] = []
        self.working_memory: dict = {}
        self._init_db()
    
    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS checkpoints (
                checkpoint_id TEXT PRIMARY KEY,
                timestamp REAL,
                task_status TEXT,
                step_number INTEGER,
                tool_history TEXT,
                working_memory TEXT,
                error TEXT
            )
        """)
        conn.commit()
        conn.close()
    
    def create_checkpoint(self, error: Optional[str] = None) -> Checkpoint:
        """创建检查点"""
        cp = Checkpoint(
            checkpoint_id=f"{self.agent_id}-{self.step_number}-{int(time.time())}",
            timestamp=time.time(),
            task_status=self.current_status.value,
            step_number=self.step_number,
            tool_history=self.tool_history[-20:],  # 保留最近 20 条
            working_memory=dict(self.working_memory),
            error=error,
        )
        cp.save_to_db(self.db_path)
        return cp
    
    def recover(self) -> bool:
        """从最近检查点恢复"""
        latest = Checkpoint.load_latest(self.db_path)
        if latest is None:
            return False
        
        self.current_status = TaskStatus.RECOVERING
        self.step_number = latest.step_number
        self.tool_history = latest.tool_history
        self.working_memory = latest.working_memory
        
        print(f"Recovered from checkpoint {latest.checkpoint_id} "
              f"(step {latest.step_number}, status: {latest.task_status})")
        if latest.error:
            print(f"Last error: {latest.error}")
        
        self.current_status = TaskStatus.EXECUTING
        return True

# 使用示例
sm = AgentStateManager("agent-041-demo")
sm.current_status = TaskStatus.EXECUTING
sm.step_number = 3
sm.tool_history.append({
    "tool": "web_search",
    "input": "AI Agent 基础设施 2026",
    "output": "...",
    "status": "success",
})
sm.create_checkpoint()
print("Checkpoint created")
异常类型检测方式恢复策略重试次数超时阈值

工具调用超时

等待时间 > 阈值

重试 → 备选方案

最多 2 次

30s

LLM 输出截断

JSON / 代码不完整

发送续传请求

最多 1 次

60s

上下文溢出

token 计数 > 限制

记忆压缩

N/A

N/A

网络中断

连接异常

从检查点恢复

最多 3 次

10s

工具返回错误

HTTP 状态码 ≥ 400

记录错误 → 降级

最多 1 次

N/A

检查点的存储不要用单点数据库。Agent 的状态是实时变化的,如果检查点数据库宕机,Agent 就失去了恢复能力。推荐使用 Redis + SQLite 的双写策略:热检查点存 Redis,定期刷盘到 SQLite。

不要在检查点中存储完整的 LLM response 原文。检查点的目的是恢复执行状态,不是存档对话内容。存储 prompt/response 的 hash 和 token 数即可,完整内容应通过可观测性层单独记录。

6Agent 安全与治理:权限控制、行为审计与输出验证

安全与治理层是 Agent 基础设施中保护系统免受 Agent 自身行为伤害的组件。Agent 拥有执行工具的能力——可以查询数据库、调用 API、读写文件——这些能力如果不受控制,可能造成严重的安全事故。

Agent 安全的三层防线

Agent 安全需要建立三层防线,层层把关:

第一层:权限控制(Authorization)。在 Agent 执行操作之前,验证 Agent 是否有权限执行该操作。这包括工具级别的权限(Agent 能否调用某个工具)、数据级别的权限(Agent 能否访问某条数据)、以及操作级别的权限(Agent 能否执行删除、修改等破坏性操作)。权限控制应该采用最小权限原则——Agent 只拥有完成任务所需的最少权限。

第二层:行为审计(Behavior Auditing)。记录 Agent 的所有操作,包括成功的和失败的。行为审计的核心价值在于事后追溯——当 Agent 做出了错误决策或执行了危险操作时,审计日志可以提供完整的证据链。审计日志应该包含:操作时间、操作类型、输入参数、输出结果、以及决策上下文。

第三层:输出验证(Output Validation)。在 Agent 的最终输出返回给用户之前,进行内容验证。这包括敏感信息过滤(确保不泄露密码、密钥、PII 数据)、格式验证(确保输出符合预期的 schema)、以及安全扫描(确保输出中不包含恶意代码或注入攻击载荷)。

Agent 治理的关键策略

在生产环境中运行 Agent,需要制定以下治理策略:

  • 成本上限:为每个 Agent 会话设置 token 预算上限,防止 Agent 进入无限循环导致费用爆炸
  • 速率限制:限制 Agent 调用外部工具的频率,防止 Agent 的行为触发外部服务的限流或封禁
  • 人工审批:对于高风险操作(如删除数据、修改配置、发送外部消息),要求人工审批后才能执行
  • 行为基线:建立 Agent 行为的正常基线,当 Agent 的行为显著偏离基线时(如工具调用频率突然升高 10 倍),触发告警并暂停 Agent
python
import time
import re
import enum
from dataclasses import dataclass
from typing import Optional

class RiskLevel(enum.Enum):
    LOW = "low"           # 只读操作
    MEDIUM = "medium"     # 写入但可恢复
    HIGH = "high"         # 破坏性操作
    CRITICAL = "critical" # 不可逆操作

@dataclass
class ToolPermission:
    """工具权限定义"""
    tool_name: str
    allowed: bool
    risk_level: RiskLevel
    requires_approval: bool
    rate_limit_per_min: int = 60
    data_scope: str = "all"  # "all" | "own" | "none"

@dataclass
class AuditLog:
    """行为审计日志条目"""
    timestamp: float
    agent_id: str
    session_id: str
    action: str
    tool_name: str
    input_params: dict
    output: str
    risk_level: str
    approved_by: Optional[str] = None
    blocked: bool = False
    block_reason: str = ""


class AgentGovernanceEngine:
    """Agent 治理引擎:权限控制 + 审计 + 输出验证"""
    
    def __init__(self, agent_id: str, session_id: str):
        self.agent_id = agent_id
        self.session_id = session_id
        self.permissions: dict[str, ToolPermission] = {}
        self.audit_log: list[AuditLog] = []
        self.token_budget = 100000
        self.tokens_used = 0
        self.cost_budget = 10.0  # 美元
        self.cost_used = 0.0
    
    def set_permission(self, perm: ToolPermission):
        """设置工具权限"""
        self.permissions[perm.tool_name] = perm
    
    def check_tool_permission(self, tool_name: str) -> tuple[bool, str]:
        """检查 Agent 是否有权限调用某个工具"""
        if tool_name not in self.permissions:
            return False, f"工具 {tool_name} 未授权"
        
        perm = self.permissions[tool_name]
        if not perm.allowed:
            return False, f"工具 {tool_name} 被禁用"
        
        if perm.requires_approval:
            return False, f"工具 {tool_name} 需要人工审批"
        
        return True, "已授权"
    
    def audit_tool_call(self, tool_name: str, input_params: dict, 
                       output: str, risk: RiskLevel, 
                       blocked: bool = False, block_reason: str = ""):
        """记录工具调用的审计日志"""
        log = AuditLog(
            timestamp=time.time(),
            agent_id=self.agent_id,
            session_id=self.session_id,
            action="tool_call",
            tool_name=tool_name,
            input_params=input_params,
            output=output[:500],  # 截断避免日志过大
            risk_level=risk.value,
            blocked=blocked,
            block_reason=block_reason,
        )
        self.audit_log.append(log)
    
    def check_budget(self, tokens: int, cost: float) -> tuple[bool, str]:
        """检查 token 和成本预算"""
        if self.tokens_used + tokens > self.token_budget:
            return False, f"Token 预算不足: 已用 {self.tokens_used}/{self.token_budget}"
        if self.cost_used + cost > self.cost_budget:
            return False, f"成本预算不足: 已用 ${self.cost_used:.2f}/${self.cost_budget:.2f}"
        return True, "预算充足"
    
    def validate_output(self, output: str) -> tuple[bool, list[str]]:
        """验证输出内容的合规性"""
        issues = []
        # 敏感信息扫描
        patterns = {
            "api_key": r"sk-[a-zA-Z0-9]{20,}",
            "password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+['\"]",
            "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
        }
        for name, pattern in patterns.items():
            matches = re.findall(pattern, output)
            if matches:
                issues.append(f"检测到疑似{name}泄露: {len(matches)} 处")
        # 长度检查
        if len(output) > 50000:
            issues.append(f"输出过长: {len(output)} 字符,建议截断")
        return len(issues) == 0, issues

# 使用示例
governance = AgentGovernanceEngine("agent-041", "session-xyz")
governance.set_permission(ToolPermission(
    tool_name="web_search", allowed=True,
    risk_level=RiskLevel.LOW, requires_approval=False,
    rate_limit_per_min=30,
))
governance.set_permission(ToolPermission(
    tool_name="delete_record", allowed=True,
    risk_level=RiskLevel.CRITICAL, requires_approval=True,
    rate_limit_per_min=5,
))

allowed, reason = governance.check_tool_permission("web_search")
print(f"web_search: {allowed} - {reason}")

allowed, reason = governance.check_tool_permission("delete_record")
print(f"delete_record: {allowed} - {reason}")

valid, issues = governance.validate_output(
    "这是正常输出内容,没有任何敏感信息。"
)
print(f"输出合规: {valid}")
if issues:
    for issue in issues:
        print(f"  ⚠ {issue}")
治理策略防护目标实施方式触发条件

权限控制

防止未授权操作

工具级 ACL + 最小权限

每次工具调用前

成本上限

防止费用爆炸

Token 预算 + 成本预算

每次 LLM 调用前

速率限制

防止触发外部限流

令牌桶算法

每次工具调用前

人工审批

防止高危误操作

审批工作流集成

CRITICAL 级别操作

输出验证

防止信息泄露

正则扫描 + 规则引擎

每次输出返回前

行为基线

检测异常行为

统计建模 + 阈值告警

行为显著偏离基线时

为 Agent 设置成本预算时,建议按「每会话」而非「每天」设置。一个失控的 Agent 在几分钟内就可能消耗掉全天的预算。按会话设置可以更精细地控制每个任务的成本上限。

权限控制不能完全依赖代码层面的检查。Agent 可能通过 prompt injection 绕过代码检查,直接指示 LLM「假设你有所有权限」。必须在系统层面(如操作系统权限、网络策略)设置硬性边界,代码层面的权限检查只是第一道防线。

7Agent 部署与编排:多 Agent 协作、负载均衡与故障恢复

部署与编排层是 Agent 基础设施中将单个 Agent 扩展为生产级系统的组件。单个 Agent 在开发环境中运行良好,不代表它在生产环境的高并发、高可用要求下也能正常工作。部署与编排层解决的核心问题是:如何让 Agent 系统在真实负载下稳定运行。

单 Agent 部署模式

最简单的 Agent 部署是单实例模式:一个 Agent 进程处理所有请求。这种模式适用于低流量场景,但存在明显的单点故障风险。改进的单 Agent 部署包括:

  • 进程守护:使用 systemd 或 supervisor 管理 Agent 进程,崩溃后自动重启
  • 健康检查:定期发送探测请求,确认 Agent 是否正常响应
  • 优雅关闭:收到关闭信号后,完成当前任务再退出,而不是强制终止

多 Agent 编排模式

当流量增长或任务复杂度增加时,需要多 Agent 协作:

水平扩展模式:多个相同的 Agent 实例并行处理独立的请求。这种模式适合问答型 Agent——每个用户请求是独立的,不需要 Agent 之间协调。负载均衡器(如 Nginx 或 Envoy)将请求分发到空闲的 Agent 实例。

垂直分工模式:多个不同角色的 Agent协作完成一个复杂任务。例如:一个 Researcher Agent 负责信息收集,一个 Writer Agent 负责撰写报告,一个 Reviewer Agent 负责质量审核。这种模式需要任务编排引擎(如 LangGraph 或 Temporal)来协调各 Agent 的执行顺序和数据流。

混合模式:结合水平扩展和垂直分工——每种角色的 Agent 都有多个实例,编排引擎负责任务路由和结果聚合。

多 Agent 协作的挑战

多 Agent 系统引入了新的复杂性:

  • 状态一致性:多个 Agent 操作同一份数据时,如何保证状态一致性?需要使用分布式锁或乐观并发控制。
  • 通信开销:Agent 之间的消息传递增加了延迟和失败概率。需要设计可靠的通信协议(如消息队列 + 重试机制)。
  • 调试困难:多 Agent 系统的错误链路很长,一个 Agent 的错误可能源于另一个 Agent 的错误输入。需要分布式追踪来定位根因。
python
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
from enum import Enum

class AgentRole(Enum):
    RESEARCHER = "researcher"
    WRITER = "writer"
    REVIEWER = "reviewer"

@dataclass
class TaskMessage:
    """Agent 间的任务消息"""
    task_id: str
    from_role: AgentRole
    to_role: AgentRole
    payload: dict
    timestamp: float = 0.0
    retry_count: int = 0
    max_retries: int = 3
    
    def __post_init__(self):
        if self.timestamp == 0:
            self.timestamp = time.time()


class MessageQueue:
    """简单的 Agent 间消息队列(生产环境应使用 RabbitMQ / Kafka)"""
    
    def __init__(self):
        self.queues: dict[AgentRole, list[TaskMessage]] = {
            role: [] for role in AgentRole
        }
        self.processed: list[TaskMessage] = []
        self.dead_letter: list[TaskMessage] = []
    
    async def publish(self, message: TaskMessage):
        """发布消息到目标 Agent 的队列"""
        self.queues[message.to_role].append(message)
    
    async def consume(self, role: AgentRole, 
                     timeout: float = 5.0) -> Optional[TaskMessage]:
        """从队列消费消息"""
        start = time.time()
        while time.time() - start < timeout:
            if self.queues[role]:
                return self.queues[role].pop(0)
            await asyncio.sleep(0.1)
        return None
    
    async def nack(self, message: TaskMessage):
        """消息处理失败,重新入队或进入死信队列"""
        message.retry_count += 1
        if message.retry_count >= message.max_retries:
            self.dead_letter.append(message)
            print(f"Message {message.task_id} moved to dead letter queue "
                  f"after {message.retry_count} retries")
        else:
            self.queues[message.to_role].insert(0, message)
            print(f"Message {message.task_id} requeued "
                  f"(retry {message.retry_count}/{message.max_retries})")
    
    async def ack(self, message: TaskMessage):
        """消息处理成功,标记为已处理"""
        self.processed.append(message)


class AgentOrchestrator:
    """Agent 编排器:协调多 Agent 协作流程"""
    
    def __init__(self):
        self.message_queue = MessageQueue()
        self.task_results: dict[str, dict] = {}
    
    async def run_pipeline(self, user_query: str) -> dict:
        """执行完整的多 Agent 协作流程"""
        task_id = f"task-{int(time.time())}"
        
        # 阶段 1: Researcher 收集信息
        research_msg = TaskMessage(
            task_id=task_id,
            from_role=AgentRole.RESEARCHER,
            to_role=AgentRole.RESEARCHER,
            payload={"query": user_query, "phase": "research"},
        )
        await self.message_queue.publish(research_msg)
        
        # 模拟 Researcher 工作
        research_result = {
            "findings": ["发现 15 篇相关文献", "3 个关键技术趋势"],
            "sources": ["arxiv", "github", "tech blog"],
        }
        self.task_results[task_id] = {"research": research_result}
        
        # 阶段 2: Writer 撰写报告
        writer_msg = TaskMessage(
            task_id=task_id,
            from_role=AgentRole.RESEARCHER,
            to_role=AgentRole.WRITER,
            payload={
                "query": user_query,
                "research_findings": research_result,
                "phase": "writing",
            },
        )
        await self.message_queue.publish(writer_msg)
        
        # 模拟 Writer 工作
        draft = "## 2026 AI Agent 基础设施趋势分析\n\n根据研究..."
        self.task_results[task_id]["draft"] = draft
        
        # 阶段 3: Reviewer 质量审核
        review_msg = TaskMessage(
            task_id=task_id,
            from_role=AgentRole.WRITER,
            to_role=AgentRole.REVIEWER,
            payload={"draft": draft, "phase": "review"},
        )
        await self.message_queue.publish(review_msg)
        
        # 模拟 Reviewer 工作
        review_result = {
            "approved": True,
            "suggestions": ["补充代码示例", "增加对比表格"],
        }
        self.task_results[task_id]["review"] = review_result
        
        return self.task_results[task_id]

# 使用示例
async def main():
    orchestrator = AgentOrchestrator()
    result = await orchestrator.run_pipeline(
        "分析 2026 年 AI Agent 基础设施的技术趋势"
    )
    print(f"Task completed: {list(result.keys())}")

asyncio.run(main())
部署模式适用场景优势劣势典型并发

单实例

低流量、简单任务

架构简单、易于调试

单点故障、无法扩展

< 10 RPM

水平扩展

高流量、独立请求

线性扩展、高可用

不支持跨实例协作

100+ RPM

垂直分工

复杂任务、多步骤

专业化、质量更高

编排复杂、延迟增加

< 50 RPM

混合模式

大规模生产环境

兼顾扩展与协作

运维复杂度高

1000+ RPM

多 Agent 编排不要自己从零造轮子。成熟的编排框架(如 LangGraph、Temporal、Dagster)已经处理了分布式追踪、重试、超时、状态持久化等复杂问题。自己实现这些功能的工作量远超你的预期。

多 Agent 系统中,消息队列是单点故障的最大风险源。如果消息队列宕机,所有 Agent 之间的通信都会中断。生产环境必须使用支持集群的消息队列(如 RabbitMQ 集群、Kafka 集群),而不是简单的内存队列。

8总结:Agent 基础设施的完整知识图谱

Agent 基础设施是一个系统工程问题,不是简单的"接一个 API、写几行代码"就能解决的。本文系统梳理了 Agent 基础设施的完整技术栈,从可观测性到数据管理、从内存架构到状态管理、从安全治理到部署编排。

核心知识点回顾

  • 可观测性是 Agent 基础设施的第一要务——看不到 Agent 在做什么,就无法治理它。执行追踪、决策解释、行为审计三个层级缺一不可。
  • 数据管理层决定了 Agent 的能力上限——三层数据架构(热/温/冷)分别对应毫秒级缓存、语义检索和长期归档。
  • 内存架构是 Agent 的认知基础——短期记忆、工作记忆、长期记忆的协同管理,决定了 Agent 的上下文理解能力。
  • 状态管理确保 Agent 的可靠性——检查点机制和异常恢复策略,让 Agent 在出错后能从最近的稳定状态继续执行。
  • 安全治理保护系统免受 Agent 自身行为伤害——权限控制、成本预算、输出验证三道防线,防止 Agent 造成不可逆的损害。
  • 部署编排将单 Agent 扩展为生产级系统——水平扩展、垂直分工、混合模式,根据不同场景选择合适的部署拓扑。

未来趋势

Agent 基础设施正在经历快速演进,几个值得关注的趋势:

  • Agent 原生可观测性标准:OpenTelemetry 正在制定 Agent 专用的语义约定,标准化 Agent 的追踪和指标采集
  • 边缘 Agent 部署:随着端侧模型(如 Llama 3 8B、Phi-3)性能提升,在设备本地运行 Agent 将成为可能,这要求基础设施支持离线运行和端云协同
  • Agent 运行时安全沙箱:类似 WebAssembly 的沙箱技术将被用于 Agent 执行环境,提供细粒度的资源隔离和权限控制
  • 多 Agent 编排即服务:云厂商将推出托管的 Agent 编排平台,降低多 Agent 系统的部署和运维门槛

学习路线建议

对于想要深入学习 Agent 基础设施的开发者,建议按以下顺序学习:

  1. 先掌握单 Agent 架构的可观测性和状态管理
  2. 再学习数据管理层,特别是向量数据库的集成
  3. 然后深入安全治理,理解权限控制和审计
  4. 最后学习多 Agent 编排,掌握分布式系统的协调模式

最后一句话:AI Agent 的能力上限不由模型决定,而由基础设施决定。把基础设施建好,Agent 才能在生产环境中可靠地运行。

如果你想快速搭建一个可用的 Agent 基础设施,推荐从 LangChain + LangSmith + Qdrant + Redis 开始。这套组合覆盖了可观测性、向量检索、缓存三大核心需求,且有成熟的社区支持。

不要在生产环境中使用「演示级」的 Agent 实现。很多教程和示例代码只展示了 Agent 的「快乐路径」(Happy Path),没有处理超时、错误、重试、状态恢复等生产必需的逻辑。直接将这些代码用于生产,几乎必然会遇到可靠性问题。

继续你的 AI 学习之旅

浏览更多 AI 知识库文章,或者探索 GitHub 上的优质 AI 项目