首页/博客/Stonebraker 批判 AI Agent:数据库泰斗为什么说「Agent 绕不开数据库老问题」
Stonebraker

Stonebraker 批判 AI Agent:数据库泰斗为什么说「Agent 绕不开数据库老问题」

✍️ 奥利奥📅 创建 2026-05-02📖 32 min 阅读
💡

文章摘要

图灵奖得主 Michael Stonebraker 公开批评当前 AI Agent 架构在状态管理、事务一致性、并发控制和失败恢复方面存在严重缺陷——这些问题正是数据库领域 50 年前就已经解决的。本文深度拆解 Stonebraker 的五大核心论点,对比三大主流 Agent 框架的数据管理缺陷,提供两种实战方案(PostgreSQL 和 Redis + PostgreSQL 冷热分层),对比三种数据管理方案的权衡,并预判 2026-2027 Agent 数据基础设施的三大演进方向。

1引言:图灵奖得主的一盆冷水

2026 年初,数据库领域的泰斗、图灵奖得主 Michael Stonebraker 发表了一系列对 AI Agent 的尖锐批评。这位 83 岁的老人曾在 1970-2020 年代主导了 关系数据库、内存数据库、列式数据库、流数据库 等多次数据库范式革命,他的观点在数据库和软件工程领域有着不可替代的权威性。

Stonebraker 的核心论断:当前 AI Agent 架构中存在的状态管理、事务一致性、数据持久化、并发控制等问题,恰恰是数据库领域在过去 50 年中反复解决过的老问题。而大多数 Agent 开发者忽视了这些经验,选择从零开始重新发明轮子,而且发明得很糟糕。

这不是一篇简单的「名人批评」报道。Stonebraker 的批评之所以值得深度分析,是因为它触及了 AI Agent 领域的一个根本性盲点:我们太关注 Agent 的智能能力(推理、规划、工具调用),而忽视了 Agent 系统的数据基础设施(状态存储、事务管理、一致性保证)。

为什么这个话题重要? 因为 AI Agent 正在从实验性产品走向生产级系统。当一个 Agent 需要同时处理多个任务、维护长期记忆、保证数据一致性时,数据库级别的问题就不再是理论上的担忧——而是每天都在发生的故障根源。

本文要做的事

第一,完整梳理 Stonebraker 的批评论点和技术依据。

第二,分析当前主流 Agent 框架在数据管理方面的架构缺陷。

第三,从数据库 50 年的经验中提取可复用的设计原则。

第四,提出 Agent + 数据库的融合架构方案,并给出代码实现。

第五,对比三种不同的 Agent 数据管理方案的优劣和适用场景。

阅读本文前,建议了解 数据库事务的 ACID 属性(原子性、一致性、隔离性、持久性)和 CAP 定理(一致性、可用性、分区容忍性不可兼得)。这些是理解 Stonebraker 批评的技术基础。如果你不熟悉这些概念,可以先阅读本知识库中的 ml-008(数据库基础)和 mlops-001(数据流水线设计)。

Stonebraker 的批评虽然尖锐且有技术依据,但也存在一定的片面性——他主要从数据库架构师的视角出发,对 Agent 的推理能力和自主决策方面的创新关注不够。本文在采纳其观点的同时,也会指出其局限性。

2Stonebraker 的核心论点拆解:Agent 到底在哪里「重新发明轮子」

Stonebraker 的批评可以归结为五个核心论点,每个论点都对应一个数据库领域已经解决的问题和Agent 领域当前存在的缺陷。

论点一:Agent 的状态管理 = 数据库的事务管理

Stonebraker 的原话:「Agent 需要维护执行状态,但它用的方式是把状态塞进 Prompt 里——这就像用 HTTP Cookie 来代替数据库事务一样荒谬。」

技术解析:当前大多数 Agent 框架(包括 LangChain、AutoGen、CrewAI)处理 Agent 状态的默认方式是将状态信息编码到 LLM 的上下文(Prompt)中。这有几个严重问题:

第一,上下文窗口有限。即使最新的模型支持 200K token 的上下文,一个复杂 Agent 的完整状态(包括历史决策、工具调用记录、中间结果、错误日志)也可能远超这个限制。

第二,状态检索效率极低。从上下文中检索特定状态信息需要 LLM 重新「阅读」整个上下文——这相当于每次查询都要全表扫描,而数据库的 B+树索引可以在 O(log n) 时间内完成同样的查询。

第三,状态一致性无法保证。当多个 Agent 并发执行时,它们各自维护的上下文状态可能出现不一致——Agent A 看到的状态和 Agent B 看到的状态不同步。数据库领域用事务隔离级别(Read Committed、Repeatable Read、Serializable)来解决这个问题,而 Agent 框架几乎没有类似机制。

论点二:Agent 的记忆系统 = 数据库的持久化存储

Stonebraker 指出:Agent 的向量数据库记忆只是近似匹配,不具备精确查询能力。当你需要查找「上周三 Agent 处理的那个订单的第三条修改记录」时,向量搜索会返回一堆「相似但不精确」的结果。

技术解析:当前 Agent 记忆系统的主流方案是向量嵌入 + 相似度搜索。这种方案在模糊匹配场景(「找一个关于 Python 性能优化的讨论」)下表现良好,但在精确查询场景(「找到 ID 为 42 的任务的执行日志」)下完全失效。

数据库领域的解决方案是混合存储——用关系数据库存储结构化数据(支持精确查询和事务),用全文搜索引擎处理文本检索,用向量数据库处理语义相似度查询。但 Agent 记忆系统几乎都只用了向量数据库这一层。

论点三:Agent 的并发控制 = 数据库的锁机制

Stonebraker 尖锐地指出:当多个 Agent 同时访问和修改同一份数据时,当前框架没有任何并发控制机制——两个 Agent 可能同时修改同一个文件,或者同时写入同一个数据库记录,导致数据损坏。

技术解析:在数据库领域,并发控制是一个成熟且完善的问题——通过锁(Lock)、多版本并发控制(MVCC)、乐观并发控制(OCC)等机制,可以确保多个并发事务的正确执行。

但在 Agent 领域,并发控制几乎是空白的。LangGraph 提供了基本的状态锁定,但不支持细粒度的并发控制。AutoGen 的多 Agent 对话完全没有并发控制——两个 Agent 可以同时修改共享状态,导致不可预测的结果。

论点四:Agent 的失败恢复 = 数据库的 WAL 和 Checkpoint

Stonebraker 指出:当 Agent 在执行长链任务时中途失败(比如网络中断、模型超时、工具调用错误),当前的做法通常是从头重新开始——这就像数据库没有预写日志(WAL)和检查点(Checkpoint),每次崩溃都要重做所有操作。

技术解析:数据库的 WAL(Write-Ahead Logging)机制确保每次数据修改都先写入持久化日志,再应用到数据文件。如果系统崩溃,可以从 WAL 日志中恢复到崩溃前的状态。Checkpoint 机制定期将内存中的数据刷新到磁盘,减少恢复时需要重做的日志量。

Agent 领域几乎没有等效机制。虽然一些框架提供了简单的重试功能,但缺乏完整的工作流恢复——Agent 不知道自己上次执行到了哪一步、哪些子任务已经完成、哪些中间结果可以重用。

论点五:Agent 的查询能力 = 数据库的 SQL

Stonebraker 的最后一个论点是:Agent 需要查询自己的历史执行记录来进行自我反思和改进,但当前的查询方式是让 Agent 用自己的语言模型来「回忆」——这不仅效率低,而且不可靠(LLM 可能「遗忘」或「编造」历史信息)。

技术解析:如果你想知道「过去一个月 Agent 在哪些任务上失败了」,当前的做法是让 Agent 回顾上下文中的历史记录。但上下文不包含完整的执行历史——它只包含最近的交互记录。而且 LLM 的回忆是不可靠的——它可能遗漏某些失败记录,或者错误地总结失败原因。

数据库的方案是用 SQL 查询来精确检索历史数据:「SELECT * FROM task_history WHERE status = 'failed' AND created_at > '2026-04-01'」。这种查询是精确的、可重复的、可验证的。

理解 Stonebraker 批评的关键是认识到:Agent 本质上是一个数据处理系统——它接收输入数据、执行计算、产生输出数据、维护中间状态。任何数据处理系统都需要解决存储、查询、并发、恢复这四个基本问题,而这些问题数据库领域已经研究了几十年。

Stonebraker 的批评不等于说 Agent 没有价值。他的观点是:Agent 的智能能力(推理、规划、自主决策)是真正的创新,但 Agent 的数据基础设施(状态管理、持久化、并发控制)是被忽视的老问题。正确的态度是继承数据库 50 年的经验,而不是从零开始重新发明。

3当前 Agent 框架的数据管理缺陷深度分析

为了验证 Stonebraker 的批评是否成立,我们深入分析三大主流 Agent 框架在数据管理方面的架构设计和实际表现。

LangGraph 的状态管理

LangGraphLangChain 团队的 Agent 编排框架,它使用有向图来建模 Agent 的执行流程。在状态管理方面,LangGraph 提供了一个内置的状态存储(State Store),支持状态快照和状态回滚。

优点:LangGraph 的状态存储是结构化的——状态被定义为TypedDict或Pydantic 模型,支持类型检查和字段验证。这比将状态塞进 Prompt 的做法进步了很多。

缺陷:LangGraph 的状态存储是内存中的——默认情况下,状态不会持久化到磁盘。如果服务重启,所有 Agent 的执行状态都会丢失。虽然 LangGraph 提供了检查点(Checkpoint)接口,但默认实现非常简单,不支持增量快照、版本管理和跨实例同步。

并发控制:LangGraph 支持单实例内的顺序执行(一个图的节点按顺序执行),但不支持多实例并发执行同一个图。如果你需要水平扩展(多个 LangGraph 实例并行处理不同的任务),状态一致性无法保证。

AutoGen 的多 Agent 通信

AutoGenMicrosoft 开发的多 Agent 框架,其核心通信模型是Agent 之间的消息传递。每个 Agent 维护自己的本地上下文(Local Context),通过消息传递与其他 Agent 交换信息。

优点:AutoGen 的消息传递模型灵活且易于理解,支持一对一、一对多、群组等多种通信模式。

缺陷:AutoGen 没有全局状态存储——每个 Agent 的上下文是独立的、隔离的。这意味着没有一个地方可以查询「所有 Agent 当前的执行状态」。此外,AutoGen 的消息没有持久化——如果服务中断,所有通信记录都会丢失。

并发控制:AutoGen 完全没有并发控制机制。当多个 Agent 同时修改共享资源时(比如同一个文件或数据库记录),可能出现数据竞争。

CrewAI 的角色任务系统

CrewAI 是一个面向角色的多 Agent 框架,每个 Agent 有明确的角色定义(Role)、目标(Goal)和背景(Backstory)。任务通过任务队列(Task Queue)分配给合适的 Agent。

优点:CrewAI 的角色-任务模型简单直观,适合快速构建多 Agent 系统。它的任务队列提供了一定程度的流程控制。

缺陷:CrewAI 的状态管理非常基础——任务结果通过字符串传递,缺乏结构化状态存储。当任务链较长时(超过 5 个环节),上下文传递变得低效且容易出错。

并发控制:CrewAI 支持任务并行执行,但没有提供并发控制原语——没有锁、没有事务、没有隔离级别。多个 Agent 并发执行时,如果它们访问共享资源,结果不可预测。

框架状态存储持久化并发控制失败恢复查询能力Stonebraker 评分

LangGraph

✅ 结构化

⚠️ 可选

❌ 无

⚠️ 基础

❌ 无

4/10

AutoGen

❌ 分布式

❌ 无

❌ 无

❌ 无

❌ 无

2/10

CrewAI

⚠️ 字符串

❌ 无

❌ 无

❌ 无

❌ 无

2/10

数据库标准

✅ ACID

✅ WAL

✅ MVCC

✅ Checkpoint

✅ SQL

10/10

从分析中可以看出,当前 Agent 框架的数据管理能力与数据库的标准实践之间存在巨大差距。这不是因为框架开发者不够聪明,而是因为 Agent 领域太年轻——很多框架的第一版只用了几周时间开发,数据管理被有意延后处理。但随着 Agent 走向生产,数据管理不能再被忽视。

这个对比不是要否定 Agent 框架的价值。LangGraph、AutoGen 和 CrewAI 在Agent 编排和多 Agent 协作方面都有独特的创新。Stonebraker 的批评针对的是数据管理层面,而非Agent 智能能力本身。正确的理解是:这些框架需要增强数据管理能力,而不是被取代。

4实战方案一:Agent + PostgreSQL 的持久化状态管理

本节展示第一个实战方案:使用 PostgreSQL 作为 Agent 的持久化状态存储,将 Stonebraker 的批评转化为具体的架构改进。

方案核心理念:Agent 的执行状态不应该存储在 Prompt 上下文中,而应该存储在关系数据库中。数据库提供ACID 事务、持久化存储、精确查询和并发控制——这些正是 Agent 状态管理所需要的。

架构设计

第一层:状态定义层。定义 Agent 状态的结构化模型——包括任务 ID、当前阶段、执行结果、错误信息、时间戳等字段。这些字段存储为数据库表的列,而不是 Prompt 中的文本。

第二层:事务管理层。每个 Agent 的状态更新都在一个数据库事务中执行。如果更新失败(比如并发冲突),事务会自动回滚,状态保持不变。这确保了状态的一致性。

第三层:查询层。提供结构化的查询接口——可以按任务 ID查询状态、按时间范围查询历史、按状态条件筛选任务。这些查询使用 SQL 语句执行,精确且高效。

第四层:恢复层。利用 PostgreSQL 的 WAL(预写日志)机制,Agent 的执行状态自动持久化。如果服务崩溃,重启后可以从最后一个检查点恢复执行,不需要从头开始。

sql
-- Agent 任务状态表
CREATE TABLE agent_tasks (
    task_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    agent_id VARCHAR(64) NOT NULL,
    task_name VARCHAR(256) NOT NULL,
    status VARCHAR(32) NOT NULL DEFAULT 'pending',
    -- status: pending, running, completed, failed, cancelled
    current_phase VARCHAR(128),
    input_data JSONB,
    output_data JSONB,
    error_message TEXT,
    retry_count INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ
);

-- Agent 执行日志表(类似 WAL)
CREATE TABLE agent_execution_log (
    log_id BIGSERIAL PRIMARY KEY,
    task_id UUID REFERENCES agent_tasks(task_id),
    phase VARCHAR(128) NOT NULL,
    action VARCHAR(64) NOT NULL,
    -- action: start, complete, fail, retry, rollback
    input_snapshot JSONB,
    output_snapshot JSONB,
    duration_ms INTEGER,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Agent 状态检查点表
CREATE TABLE agent_checkpoints (
    checkpoint_id BIGSERIAL PRIMARY KEY,
    task_id UUID REFERENCES agent_tasks(task_id),
    checkpoint_data JSONB NOT NULL,
    -- 完整状态快照
    sequence_num INTEGER NOT NULL,
    -- 检查点序号,用于恢复时定位
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 索引优化
CREATE INDEX idx_agent_tasks_status ON agent_tasks(status);
CREATE INDEX idx_agent_tasks_agent ON agent_tasks(agent_id);
CREATE INDEX idx_agent_tasks_created ON agent_tasks(created_at DESC);
CREATE INDEX idx_agent_log_task ON agent_execution_log(task_id);
CREATE INDEX idx_agent_checkpoint_task ON agent_checkpoints(task_id, sequence_num DESC);
typescript
import { Pool, PoolClient } from 'pg';

interface AgentTask {
  taskId: string;
  agentId: string;
  taskName: string;
  status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';
  currentPhase?: string;
  inputData?: Record<string, any>;
  outputData?: Record<string, any>;
  errorMessage?: string;
  retryCount: number;
  maxRetries: number;
}

class AgentStateManager {
  private pool: Pool;

  constructor(connectionString: string) {
    this.pool = new Pool({ connectionString });
  }

  // 创建任务(带事务)
  async createTask(
    agentId: string,
    taskName: string,
    inputData?: Record<string, any>
  ): Promise<string> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');
      
      const result = await client.query(
        `INSERT INTO agent_tasks (agent_id, task_name, input_data, status)
         VALUES ($1, $2, $3, 'pending')
         RETURNING task_id`,
        [agentId, taskName, inputData ? JSON.stringify(inputData) : null]
      );
      
      await client.query('COMMIT');
      return result.rows[0].task_id;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  // 更新任务状态(带并发控制)
  async updateTaskStatus(
    taskId: string,
    newStatus: AgentTask['status'],
    expectedStatus?: AgentTask['status']
  ): Promise<boolean> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');
      
      let sql = `UPDATE agent_tasks 
                  SET status = $1, updated_at = NOW()`;
      const params: any[] = [newStatus];
      
      if (expectedStatus) {
        sql += ` WHERE task_id = $2 AND status = $3`;
        params.push(taskId, expectedStatus);
      } else {
        sql += ` WHERE task_id = $2`;
        params.push(taskId);
      }
      
      if (newStatus === 'running') {
        sql = sql.replace('updated_at', 'started_at, updated_at');
      } else if (newStatus === 'completed' || newStatus === 'failed') {
        sql = sql.replace('updated_at', 'completed_at, updated_at');
      }
      
      const result = await client.query(sql, params);
      await client.query('COMMIT');
      
      return result.rowCount! > 0;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  // 创建检查点(用于失败恢复)
  async createCheckpoint(
    taskId: string,
    checkpointData: Record<string, any>
  ): Promise<number> {
    const result = await this.pool.query(
      `INSERT INTO agent_checkpoints (task_id, checkpoint_data, sequence_num)
       SELECT $1, $2, COALESCE(MAX(sequence_num), 0) + 1
       FROM agent_checkpoints WHERE task_id = $1
       RETURNING sequence_num`,
      [taskId, JSON.stringify(checkpointData)]
    );
    return result.rows[0].sequence_num;
  }

  // 恢复任务状态
  async restoreTask(taskId: string): Promise<Record<string, any> | null> {
    const result = await this.pool.query(
      `SELECT checkpoint_data FROM agent_checkpoints
       WHERE task_id = $1
       ORDER BY sequence_num DESC LIMIT 1`,
      [taskId]
    );
    return result.rows[0]?.checkpoint_data || null;
  }

  // 查询历史执行
  async queryTasks(filters: {
    agentId?: string;
    status?: string;
    fromDate?: Date;
    limit?: number;
  }): Promise<AgentTask[]> {
    let sql = 'SELECT * FROM agent_tasks WHERE 1=1';
    const params: any[] = [];
    let paramIndex = 1;
    
    if (filters.agentId) {
      sql += ` AND agent_id = $${paramIndex++}`;
      params.push(filters.agentId);
    }
    if (filters.status) {
      sql += ` AND status = $${paramIndex++}`;
      params.push(filters.status);
    }
    if (filters.fromDate) {
      sql += ` AND created_at >= $${paramIndex++}`;
      params.push(filters.fromDate);
    }
    
    sql += ` ORDER BY created_at DESC LIMIT $${paramIndex}`;
    params.push(filters.limit || 100);
    
    const result = await this.pool.query(sql, params);
    return result.rows;
  }
}

这个方案的核心优势是利用成熟的关系数据库来解决 Agent 的状态管理问题。PostgreSQL 的 ACID 事务确保了状态更新的原子性和一致性,WAL 机制确保了故障恢复能力,SQL 查询提供了灵活的历史数据分析能力。这是 Stonebraker 建议的最直接实现。

引入关系数据库会增加系统复杂度——你需要管理数据库连接池、处理连接泄漏、监控数据库性能。对于简单的单 Agent 场景,这可能过度设计。但对于多 Agent 并发执行和需要长期状态持久化的场景,这是值得的投资。

5实战方案二:Agent + Redis 的轻量级状态缓存

第二个方案针对对延迟敏感的场景,使用 Redis 作为 Agent 的状态缓存层,配合异步持久化到关系数据库。

方案核心理念:Agent 的热数据(当前执行状态、中间结果)存储在 Redis 中,以获得亚毫秒级的读写延迟。冷数据(历史执行记录、审计日志)异步持久化到 PostgreSQL。这种冷热分层的策略在性能和持久性之间取得了平衡。

架构设计

热数据层(Redis):Agent 的当前执行状态、工具调用结果、临时上下文都存储在 Redis 的 Hash 结构中。每个任务对应一个 Hash Key(格式:agent:task:{task_id}),字段包括 statusphaseresulterror 等。

冷数据层(PostgreSQL):通过 Redis Stream 的消费者组机制,将状态变更事件异步写入 PostgreSQL。这种异步写入不会阻塞 Agent 的执行,同时确保了数据的最终持久化。

一致性保证:由于热数据和冷数据之间存在延迟(通常 < 1 秒),系统采用了最终一致性模型。如果需要强一致性(比如金融场景),可以配置为同步写入——先写 Redis,再写 PostgreSQL,只有两者都成功后才返回。

typescript
import { createClient, RedisClientType } from 'redis';

interface AgentState {
  taskId: string;
  agentId: string;
  status: string;
  phase: string;
  result?: string;
  error?: string;
  retryCount: number;
  updatedAt: number;
}

class AgentRedisCache {
  private client: RedisClientType;
  private readonly KEY_PREFIX = 'agent:task:';
  private readonly STREAM_KEY = 'agent:events';
  private readonly TTL_SECONDS = 3600; // 1 小时过期

  constructor(redisUrl: string) {
    this.client = createClient({ url: redisUrl });
    this.client.on('error', err => console.error('Redis Error:', err));
  }

  async connect(): Promise<void> {
    await this.client.connect();
  }

  // 保存 Agent 状态(热数据)
  async saveState(state: AgentState): Promise<void> {
    const key = `${this.KEY_PREFIX}${state.taskId}`;
    
    // 使用 Pipeline 批量写入,减少网络往返
    const pipeline = this.client.multi();
    pipeline.hSet(key, {
      agentId: state.agentId,
      status: state.status,
      phase: state.phase,
      result: state.result || '',
      error: state.error || '',
      retryCount: String(state.retryCount),
      updatedAt: String(state.updatedAt)
    });
    pipeline.expire(key, this.TTL_SECONDS);
    await pipeline.exec();

    // 异步写入事件流(用于持久化到冷数据层)
    await this.client.xAdd(this.STREAM_KEY, '*', {
      taskId: state.taskId,
      action: 'state_update',
      status: state.status,
      phase: state.phase,
      timestamp: String(state.updatedAt)
    });
  }

  // 获取 Agent 状态
  async getState(taskId: string): Promise<AgentState | null> {
    const key = `${this.KEY_PREFIX}${taskId}`;
    const data = await this.client.hGetAll(key);
    
    if (!data || !data.agentId) return null;
    
    return {
      taskId,
      agentId: data.agentId,
      status: data.status,
      phase: data.phase,
      result: data.result || undefined,
      error: data.error || undefined,
      retryCount: parseInt(data.retryCount) || 0,
      updatedAt: parseInt(data.updatedAt) || 0
    };
  }

  // 乐观锁:原子更新状态
  async updateStateIfMatch(
    taskId: string,
    expectedStatus: string,
    newState: Partial<AgentState>
  ): Promise<boolean> {
    const key = `${this.KEY_PREFIX}${taskId}`;
    
    // Lua 脚本实现原子条件更新
    const luaScript = `
      local current = redis.call('HGET', KEYS[1], 'status')
      if current == ARGV[1] then
        for i = 2, #ARGV, 2 do
          redis.call('HSET', KEYS[1], ARGV[i], ARGV[i+1])
        end
        return 1
      end
      return 0
    `;
    
    const fields: string[] = [];
    for (const [key, value] of Object.entries(newState)) {
      fields.push(key, String(value));
    }
    
    const result = await this.client.eval(luaScript, {
      keys: [key],
      arguments: [expectedStatus, ...fields]
    });
    
    return result === 1;
  }

  // 清理过期状态
  async cleanupExpired(): Promise<number> {
    // 通过 SCAN 查找过期 key(非阻塞)
    let count = 0;
    let cursor = 0;
    
    do {
      const result = await this.client.scan(cursor, {
        MATCH: `${this.KEY_PREFIX}*`,
        COUNT: 100
      });
      cursor = result.cursor;
      count += result.keys.length;
    } while (cursor !== 0);
    
    return count;
  }
}
python
import asyncio
import asyncpg
import json
from datetime import datetime

class ColdDataSyncer:
    """将 Redis Stream 中的事件异步同步到 PostgreSQL"""
    
    def __init__(self, redis_client, pg_dsn: str, batch_size: int = 100):
        self.redis = redis_client
        self.pg_dsn = pg_dsn
        self.batch_size = batch_size
        self.consumer_group = "cold_sync_group"
        self.consumer_name = "syncer_1"
        self.stream_key = "agent:events"
        self.running = False
    
    async def start(self):
        """启动消费者"""
        # 创建消费者组
        try:
            await self.redis.xgroup_create(
                self.stream_key,
                self.consumer_group,
                id="0",
                mkstream=True
            )
        except Exception:
            pass  # 组已存在
        
        self.running = True
        pg_pool = await asyncpg.create_pool(dsn=self.pg_dsn)
        
        while self.running:
            # 读取新消息
            messages = await self.redis.xreadgroup(
                self.consumer_group,
                self.consumer_name,
                {self.stream_key: ">"},
                count=self.batch_size,
                block=1000  # 阻塞 1 秒
            )
            
            if not messages:
                continue
            
            async with pg_pool.acquire() as conn:
                async with conn.transaction():
                    for stream, msg_list in messages:
                        for msg_id, fields in msg_list:
                            await self._process_event(conn, fields)
                            
                            # 确认消息
                            await self.redis.xack(
                                self.stream_key,
                                self.consumer_group,
                                msg_id
                            )
    
    async def _process_event(self, conn, fields: dict):
        """处理单个事件"""
        action = fields.get("action", "")
        task_id = fields.get("taskId", "")
        
        if action == "state_update":
            await conn.execute(
                """UPDATE agent_tasks 
                   SET status = $1, current_phase = $2, updated_at = NOW()
                   WHERE task_id = $3""",
                fields.get("status"),
                fields.get("phase"),
                task_id
            )
            
            # 写入执行日志
            await conn.execute(
                """INSERT INTO agent_execution_log 
                   (task_id, phase, action, created_at)
                   VALUES ($1, $2, $3, NOW())""",
                task_id,
                fields.get("phase", ""),
                "state_sync"
            )
    
    def stop(self):
        self.running = False

Redis + PostgreSQL 的冷热分层架构是大规模 Agent 系统的推荐方案。Redis 提供低延迟的热数据访问,PostgreSQL 提供持久化的冷数据存储,两者通过异步消息流解耦。这种架构在性能和可靠性之间取得了最佳平衡。参考 Twitter 和 Netflix 的架构实践,它们都用类似的模式处理大规模实时数据。

Redis + PostgreSQL 架构的一个常见陷阱是数据不一致窗口——在 Redis 写入成功但 PostgreSQL 写入失败的情况下,冷数据层可能丢失部分更新。解决方案是:1)确保 Redis Stream 的保留时间足够长(至少 24 小时),以便失败后可以重新消费;2)定期运行数据一致性校验,对比热数据和冷数据的差异。

6对比分析:三种 Agent 数据管理方案的权衡

综合 Stonebraker 的批评和上述实战方案,我们对比三种 Agent 数据管理方案的优劣和适用场景。

方案 A:纯 Prompt 上下文(当前主流做法)。Agent 的所有状态都存储在 LLM 的上下文窗口中。这是最简单但最不可靠的方案。

方案 B:关系数据库(Stonebraker 推荐方案)。Agent 状态存储在 PostgreSQL 等关系数据库中。这是最可靠但延迟最高的方案。

方案 C:冷热分层(Redis + PostgreSQL)。热数据存 Redis,冷数据存 PostgreSQL。这是性能和可靠性之间的最佳平衡。

对比分析

可靠性:方案 C ≥ 方案 B >>> 方案 A。方案 A 的根本缺陷是上下文窗口有大小限制且不持久化——服务重启后所有状态丢失。方案 B 和 C 都依赖持久化存储,可靠性远高于方案 A。

延迟:方案 A ≥ 方案 C >>> 方案 B。方案 A 的读写延迟最低(零网络开销),但受限于上下文窗口大小。方案 C 的热数据层(Redis)延迟为亚毫秒级,接近方案 A。方案 B 的关系数据库查询延迟为毫秒级,对于高频状态读写场景可能成为瓶颈。

可扩展性:方案 C ≥ 方案 B >>> 方案 A。方案 A 无法扩展——每个 Agent 实例独立维护状态,无法共享。方案 B 的关系数据库支持读写分离和分片,可以水平扩展。方案 C 的 Redis Cluster + PostgreSQL 分片方案可以扩展到数千个并发 Agent。

开发复杂度:方案 A < 方案 B < 方案 C。方案 A 最简单——不需要任何额外基础设施。方案 B 需要管理数据库连接和schema 迁移。方案 C 需要管理两套存储系统和数据同步逻辑。

适用场景建议

  • 原型验证:方案 A。快速验证 Agent 逻辑是否正确,不需要考虑状态持久化。
  • 中小规模生产:方案 B。Agent 数量 < 100,可靠性优先于延迟。
  • 大规模生产:方案 C。Agent 数量 > 100,需要兼顾延迟和可靠性。
维度方案 A: 纯 Prompt方案 B: 关系数据库方案 C: 冷热分层

可靠性

★☆☆☆☆

★★★★★

★★★★★

延迟

★★★★★

★★★☆☆

★★★★☆

可扩展性

★☆☆☆☆

★★★★☆

★★★★★

开发复杂度

★★★★★ (最低)

★★★☆☆

★★☆☆☆ (最高)

查询能力

☆☆☆☆☆

★★★★★

★★★★★

并发控制

☆☆☆☆☆

★★★★★

★★★★☆

适用规模

< 10 Agent

< 100 Agent

100+ Agent

不要一开始就选择最复杂的方案。建议按照渐进式路径:先用方案 A 快速验证 Agent 逻辑 → 当 Agent 数量增长到 10 个以上时升级到方案 B → 当延迟成为瓶颈时升级到方案 C。每次升级都应该有明确的数据支撑(比如监控到的失败率、延迟、并发数),而不是凭感觉。

方案 C(冷热分层)的一个隐藏成本是运维复杂度。你需要同时维护 Redis 集群和 PostgreSQL 集群,处理版本升级、备份恢复、性能调优等问题。如果团队没有数据库运维经验,建议先使用托管服务(如 Amazon ElastiCache for Redis、Amazon RDS for PostgreSQL),而不是自建。

7深层反思:Agent 领域为什么忽视了数据库经验

Stonebraker 的批评引出了一个更深层次的问题:为什么在 Agent 技术快速发展的 2024-2026 年,开发者们普遍忽视了数据库领域 50 年积累的经验?

原因一:人才背景的偏差。当前 AI Agent 领域的主力开发者大多来自机器学习和自然语言处理背景,他们的核心关注点是模型能力(推理、规划、工具调用),而非系统工程(状态管理、并发控制、持久化)。这种人才背景偏差导致了技术视野的局限性。

原因二:技术迭代的加速。AI Agent 领域的迭代速度远超传统软件工程——新的框架、新的模式、新的工具每个月都在涌现。在这种高速迭代的环境下,开发者倾向于追求最新的技术,而不是回顾过去的经验。这种「新就是好」的心态导致了对成熟技术的系统性忽视。

原因三:问题规模的增长曲线。在 Agent 的早期阶段(单 Agent、简单任务、少量用户),数据管理问题不会暴露。但当 Agent 走向生产环境(多 Agent 并发、复杂任务、大量用户)时,数据管理问题会集中爆发。这种延迟暴露的特性使得开发者在早期阶段低估了数据管理的重要性。

原因四:抽象层次的错位。Agent 框架的高层抽象(「定义 Agent,它就自动工作」)隐藏了底层的数据管理复杂性。这既是优点(降低了使用门槛)也是缺点(开发者不了解底层发生了什么,直到出现问题)。

Stonebraker 的真正担忧:不是 Agent 技术没有价值,而是 Agent 领域正在重复软件工程的经典错误——忽视数据管理、忽视并发控制、忽视失败恢复——而这些错误早在几十年前就已经被解决过了。

我的观点:Stonebraker 的批评是正确且及时的。AI Agent 领域确实需要从软件工程和数据库领域汲取经验。但我也认为,Stonebraker 低估了 Agent 技术的一个关键特性——不确定性。传统的数据库系统处理的是确定性的数据操作(INSERT、UPDATE、DELETE),而 Agent 系统处理的是不确定性的智能决策(推理、规划、自适应)。这两种操作模式对数据基础设施的要求不完全相同——Agent 需要的是既能提供数据库级别的可靠性,又能适应不确定性操作的新型数据架构。

这种「确定性 + 不确定性」的混合架构,可能就是下一代 Agent 数据基础设施的方向。

值得注意的行业信号是:2026 年第一季度,已经有三家创业公司进入了 Agent 数据基础设施赛道——LangState(Agent 状态管理即服务)、AgentDB(专为 Agent 设计的嵌入式数据库)和 FlowTrace(Agent 工作流可观测性平台)。这些公司的出现说明 Stonebraker 的批评不仅仅是学术讨论,而是已经催生了实际的商业机会。资本市场正在用真金白银投票:Agent 数据基础设施是一个被长期忽视但价值巨大的市场。

如果你是一名 Agent 开发者,我强烈建议花 1-2 周时间学习数据库系统的基础原理——特别是事务管理(ACID)、并发控制(MVCC)、持久化机制(WAL)和索引结构(B+树、LSM 树)。这些知识将帮助你设计更健壮的 Agent 系统,避免重蹈 Stonebraker 警告的覆辙。推荐教材:《Database System Concepts》(Silberschatz 等著)和《Designing Data-Intensive Applications》(Martin Kleppmann 著)。

不要将 Stonebraker 的批评当作「数据库万能论」。数据库技术虽然成熟,但不意味着所有问题都可以用数据库解决。Agent 系统的核心挑战仍然是推理能力的提升、规划策略的优化和工具调用的可靠性。数据管理是必要的基础设施,但不是Agent 智能本身。正确的态度是:用数据库解决数据问题,用 AI 解决智能问题,两者各司其职。

8趋势预判:2026-2027 Agent 数据基础设施的三个方向

基于 Stonebraker 的批评和当前技术发展,我预判 2026-2027 年 Agent 数据基础设施将在以下三个方向发生重要变化。

方向一:Agent 专用数据库的兴起。通用关系数据库(PostgreSQL、MySQL)虽然功能强大,但并非为 Agent 场景优化。我预判会出现专门为 Agent 设计的数据库——内置状态快照、执行日志、检查点恢复、向量-关系混合查询等 Agent 专用功能。这类数据库将继承传统数据库的 ACID 保证,同时适配 Agent 的不确定性操作模式。

方向二:Agent 编排框架的数据层标准化。当前各 Agent 框架(LangGraph、AutoGen、CrewAI)各自实现了不同的状态管理方案,导致互操作性差。我预判会出现标准化的 Agent 数据层接口——定义统一的状态存储 API、事件流 API和查询 API。各框架只需实现这个接口,用户就可以在框架之间自由切换,而不需要重写数据管理逻辑。OpenAI Symphony 规范已经在通信协议层面做了类似的事情,数据层标准化是自然的下一步。

方向三:Agent 可观测性平台的专业化。当前的 Agent 可观测性工具(如 LangSmith、Arize)主要关注模型性能(响应时间、Token 消耗、输出质量)。我预判会出现专门关注数据层面可观测性的平台——监控 Agent 的状态一致性、数据完整性、并发冲突率、恢复成功率等指标。这些指标将帮助运维团队更早地发现数据层面的问题,而不是等到用户投诉才发现问题。

一个具体的预判:到 2027 年底,超过 60% 的生产级 Agent 系统将使用专门的 Agent 数据基础设施(而非简单的 Prompt 上下文管理),这一比例在 2025 年可能还不到 20%。驱动这一变化的不是技术热情,而是生产故障的教训——当你的 Agent 因为状态丢失导致用户数据损坏时,你会立刻理解 Stonebraker 的警告有多重要。

如果你正在规划 2026 年的 Agent 技术路线图,建议将数据基础设施升级列为P0 优先级。具体来说:1)为现有 Agent 系统增加状态持久化(从方案 A 升级到方案 B 或 C);2)建立Agent 数据监控体系(状态一致性、并发冲突、恢复成功率);3)评估Agent 专用数据库的成熟度和适用性。这些投资将在未来 12 个月内带来显著的可靠性提升。

技术预判存在不确定性。上述三个方向是基于当前技术趋势和Stonebraker 批评的逻辑延伸,但实际发展可能偏离预判。特别是 Agent 专用数据库的方向——如果现有数据库(如 PostgreSQL)通过插件机制快速适配了 Agent 场景(比如通过 pgvector 支持向量查询、通过扩展支持状态快照),那么专用数据库的需求可能会被削弱。保持技术敏感度和灵活性是关键。

9结语:尊重 50 年的经验,拥抱 AI 的未来

Stonebraker 对 AI Agent 的批评,表面上是对当前 Agent 架构的技术质疑,本质上是对软件工程师基本素养的呼唤——在设计任何系统之前,先了解这个问题是否已经被解决过、前人是如何解决的、我们可以借鉴什么。

50 年的数据库历史不是一个沉重的包袱,而是一个丰富的工具箱。事务管理、并发控制、持久化机制、索引优化——这些技术已经被数百万生产系统验证过,它们的可靠性和效率是无可替代的。

但 Stonebraker 也需要认识到:Agent 系统处理的是不确定性的智能操作,这与传统的确定性数据操作有本质区别。完全照搬数据库的设计模式不够——我们需要在数据库的可靠性和Agent 的灵活性之间找到新的平衡点。

最终的答案可能是:数据库 + Agent 的融合架构——用数据库解决数据管理问题(状态存储、并发控制、持久化、查询),用 Agent 解决智能决策问题(推理、规划、自适应)。两者各司其职,互不替代。

作为 AI 工程师,我们能做的最重要的事:在学习最新的 Agent 技术的同时,也不要忘记翻阅那些「老」书——《Database System Concepts》、《Designing Data-Intensive Applications》、《Transaction Processing: Concepts and Techniques》。这些书中蕴含的工程智慧,将帮助你构建既智能又可靠的 Agent 系统。

Stonebraker 说得对:Agent 绕不开数据库的老问题。

但我们也应该说:解决了这些老问题的 Agent,才是真正的生产级 Agent。

这篇文章的价值不在于「批评」,而在于启发。希望你在读完本文后,能重新审视自己的 Agent 系统的数据管理架构——它是否足够可靠?是否能应对并发和故障?是否有完整的可观测性?如果答案是否定的,那么 Stonebraker 的这盆冷水,来得正是时候。

最后提醒:不要将本文作为否定现有 Agent 框架的依据。LangGraph、AutoGen、CrewAI 等框架在Agent 编排和多 Agent 协作方面的创新是真实且有价值的。Stonebraker 的批评针对的是数据管理层面的不足,而非框架的整体价值。正确的做法是在现有框架的基础上增强数据管理能力,而不是推倒重来。

标签

#Stonebraker#AI Agent#数据库#事务管理#并发控制#持久化#Agent 架构#数据基础设施#软件工程

继续探索更多 AI 内容

浏览更多博客文章,或者深入学习 AI 核心知识