💡

文章摘要

2026 年 4 月,Simon Willison 推荐了 honker — 一个将 PostgreSQL NOTIFY/LISTEN 语义引入 SQLite 的 Rust 扩展。honker 提供了 Kafka 风格的持久化数据流和事务型外盒(Transactionally Staged Job Drains)模式的 SQLite 实现,支持 Python/Rust/Go 等多种语言绑定。本文从架构原理、API 设计、事务一致性保证、性能基准到 AI Agent 消息队列实战,全面解析 honker 如何用单文件数据库实现可靠的事件驱动架构。

一、honker 是什么:为什么 SQLite 需要 NOTIFY/LISTEN 语义

honker 是一个 Rust 编写的 SQLite 扩展,由 russellromney 开发。它的核心目标是将 PostgreSQL 的 NOTIFY/LISTEN 通知机制和事务型外盒模式引入 SQLite 生态

背景:SQLite 的异步通信空白

SQLite 作为世界上最流行的嵌入式数据库,长期以来缺少跨进程异步通信能力:

  • PostgreSQL 有 NOTIFY/LISTEN,允许一个连接发送通知,其他连接接收
  • Redis 有 PUB/SUB 和 Streams
  • Kafka/RabbitMQ 提供了完整的消息队列基础设施
  • SQLite 只有 WAL(Write-Ahead Log),但没有原生的消息通知机制

这意味着基于 SQLite 的应用如果需要事件驱动架构(如 AI Agent 任务队列、实时数据同步),只能依赖轮询——效率低、延迟高、扩展性差。

honker 填补了这个空白。它提供两种核心原语:

原语 类比 用途
Queue Redis BLPOP / RabbitMQ Queue 工作队列、任务分发、消费者组
Stream Kafka Stream / Redis Streams 事件溯源、数据变更捕获、实时推送

honker 的关键设计决策

  1. 基于 WAL 的轮询机制:worker 每 1ms 轮询 .db-wal 文件的 stat 变化,实现接近实时的通知
  2. 事务型外盒模式:确保消息仅在事务成功提交后才进入队列,避免「数据写入成功但消息丢失」的不一致状态
  3. 单文件架构:无需额外服务进程,与 SQLite 应用共享同一数据库文件
  4. 多语言绑定:官方提供 Python 绑定,社区可扩展 Go/Node.js/Rust

为什么叫 honker? 名字来自「honk」—— 大鹅的叫声,暗示这是一个「通知/提醒」工具。

图表加载中…

二、honker 核心 API 设计:Queue 和 Stream 的用法差异

honker 提供了两套 API:Queue(队列)和 Stream(流)。它们的设计哲学和适用场景完全不同

图表加载中…
python
import honker

# 打开(或创建)数据库
db = honker.open("app.db")

# 创建名为 "emails" 的队列
emails = db.queue("emails")

# === 生产者:入队任务 ===
emails.enqueue({
    "to": "alice@example.com",
    "subject": "Welcome!",
    "body": "Hello Alice"
})
emails.enqueue({
    "to": "bob@example.com",
    "subject": "Order Confirmed",
    "body": "Your order #42 is confirmed"
})

# === 消费者:消费任务 ===
async for job in emails.claim("worker-1"):
    print(f"Processing job: {job.payload}")
    # 处理邮件发送...
    job.ack()  # 确认完成
    # 如果 worker 崩溃未 ack,任务会重新入队
python
import honker

db = honker.open("app.db")

# 创建持久化事件流
stream = db.stream("user-events")

# === 发布事件(在事务中)===
with db.transaction() as tx:
    tx.execute(
        "UPDATE users SET name=? WHERE id=?",
        ["New Name", 42]
    )
    # 只有事务提交成功,事件才会发布
    stream.publish(
        {"user_id": 42, "change": "name", "new_value": "New Name"},
        tx=tx
    )

# === 消费者:订阅事件流 ===
async for event in stream.subscribe(consumer="dashboard"):
    # 实时推送到浏览器
    await push_to_browser(event)
    # 事件持久化,重启后不会丢失

三、事务型外盒模式(Transactionally Staged Job Drains)深度解析

honker 实现了 Transactionally Staged Job Drains 模式(源自 Brandur Leach 的经典文章)。这是分布式系统中确保数据写入与消息发送一致性的关键模式

问题:为什么需要事务型外盒?

考虑一个典型的场景:用户注册时,你需要:

  1. 将用户信息写入数据库
  2. 发送欢迎邮件

不使用事务型外盒的问题:

code
# 反模式:两步操作可能不一致
db.execute("INSERT INTO users ...")  # ✅ 写入成功
email_service.send("Welcome!")        # ❌ 网络超时,发送失败
# 结果:用户注册了但没收到邮件 → 用户体验差

使用 honker 的解决方案:

模式工作流程

步骤 操作 事务状态 消费者可见性
1 开始事务 进行中 不可见
2 写入业务数据 进行中 不可见
3 将消息「暂存」到外盒表 进行中 暂存中(不对外暴露)
4 提交事务 已提交 ✅ 立即可见
5 Worker 轮询 WAL 发现变化 已完成 消费消息
6 Worker ACK 已完成 消息已处理

与普通消息队列的对比

特性 普通 MQ(RabbitMQ/Kafka) honker(事务型外盒)
数据一致性 ❌ 两步操作,可能不一致 ✅ 单事务保证原子性
运维复杂度 🔴 需要独立服务 🟢 零额外服务
部署成本 🔴 需要 MQ 服务器 🟢 随 SQLite 一起部署
消息持久化 ✅ 支持 ✅ 支持
消息顺序保证 ✅ 支持 ✅ 支持
跨进程通信 ✅ 支持 ✅ 支持(通过 WAL 轮询)
横向扩展 ✅ 优秀 🟡 适合单机/小规模
适用场景 大型分布式系统 单机应用、边缘部署、AI Agent
python
# ✅ 原子操作:数据写入和消息入队在同一个事务中
with db.transaction() as tx:
    db.execute("INSERT INTO users ...", tx=tx)
    db.queue("emails").enqueue({"to": user.email}, tx=tx)
# 事务要么全部成功,要么全部回滚
场景推荐方案原因

AI Agent 任务队列

honker Queue

任务需要可靠执行,Worker 崩溃可重试

实时事件推送(Dashboard)

honker Stream

需要持久化事件历史

邮件/通知发送

honker Queue + ACK

确保每条消息都被处理

数据变更捕获(CDC)

honker Stream

记录所有数据变更

微服务间通信

Kafka/RabbitMQ

跨机器、高吞吐场景

高可用要求

RabbitMQ + 集群

honker 不适合多节点部署

四、honker 的 20+ 自定义 SQL 函数详解

honker 扩展提供了 20+ 自定义 SQL 函数,让你可以直接在 SQL 中操作队列和流。这是 honker 的一大亮点——你可以在存储过程、触发器中使用这些函数

核心函数速查表

函数 说明 返回值
notify(channel, payload) 发送通知(类似 PostgreSQL) 受影响的行数
honker_stream_read_since(stream, offset, limit) 从偏移量读取流消息 JSON 数组
honker_stream_publish(stream, payload) 发布事件到流 消息 ID
honker_stream_count(stream) 流中消息总数 INTEGER
honker_queue_enqueue(queue, payload) 入队 消息 ID
honker_queue_claim(queue, worker, timeout) 消费并锁定消息 JSON 或 NULL
honker_queue_ack(queue, message_id) 确认消息完成 受影响的行数
honker_queue_nack(queue, message_id) 拒绝消息(重新入队) 受影响的行数
honker_queue_depth(queue) 队列深度(未消费消息数) INTEGER

实战示例:用 SQL 直接操作

在触发器中使用 honker

关键优势: 这意味着你的 honker 应用可以与任何其他 SQLite 应用(如 LiteParse、Datasette、甚至 Electron 应用)无缝集成,只需在同一个数据库文件上注册 honker 扩展。

python
import honker
import json

db = honker.open("shop.db")

# 1. 创建表
db.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        id INTEGER PRIMARY KEY,
        product TEXT,
        status TEXT DEFAULT 'pending',
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
""")

# 2. 创建触发器(在 Python 中注册 honker 函数)
# 注:实际触发器需在 honker 扩展加载后创建

# 3. 创建订单流
order_stream = db.stream("order-events")

# 4. 创建订单(会自动触发流发布)
with db.transaction() as tx:
    tx.execute(
        "INSERT INTO orders (product, status) VALUES (?, ?)",
        ["MacBook Pro", "pending"]
    )
    # 触发器会自动发布事件到流
    # 如果没有触发器,手动发布:
    # order_stream.publish(
    #     {"action": "order_created", "product": "MacBook Pro"},
    #     tx=tx
    # )

# 5. 消费订单事件
async for event in order_stream.subscribe(consumer="order-processor"):
    data = json.loads(event)
    print(f"处理订单事件: {data}")
    # 可以触发 AI Agent 处理:
    # - 发送确认邮件
    # - 检查库存
    # - 更新推荐模型
sql
-- 发送通知(类似 PostgreSQL 的 NOTIFY)
SELECT notify('orders', '{"id":42}');

-- 从流中读取最近 1000 条消息
SELECT honker_stream_read_since('orders', 0, 1000);

-- 查看队列深度
SELECT honker_queue_depth('email-jobs');
sql
-- 创建触发器:当订单状态变更时,自动发布事件
CREATE TRIGGER order_status_changed
AFTER UPDATE OF status ON orders
WHEN (OLD.status != NEW.status)
BEGIN
    SELECT honker_stream_publish(
        'order-events',
        json_object(
            'order_id', NEW.id,
            'old_status', OLD.status,
            'new_status', NEW.status,
            'timestamp', datetime('now')
        )
    );
END;

五、honker 在 AI Agent 架构中的应用场景

honker 特别适合 AI Agent 系统,原因如下:

场景一:AI Agent 任务队列

AI Agent 通常需要处理大量异步任务:

  • 生成内容后发送通知
  • 分析数据后更新报告
  • 监控指标后触发告警

使用 honker Queue,你可以构建可靠的 Agent 任务管道

场景二:AI Agent 事件溯源

使用 honker Stream,你可以记录 Agent 的所有决策和操作

场景三:多 Agent 协作

honker 的队列和流可以天然支持多 Agent 协作模式

Agent 消费来源 发布目标 协作模式
Research Agent 用户请求队列 研究结果流 接收任务,发布结果
Writing Agent 研究结果流 草稿队列 基于研究写草稿
Review Agent 草稿队列 审核结果流 审核并评分
Publishing Agent 审核结果流 发布事件流 发布最终内容
图表加载中…
python
import honker
import asyncio

db = honker.open("agent.db")
agent_queue = db.queue("agent-tasks")

# Agent 主循环:派发任务
async def dispatch_agent_task(task_type: str, payload: dict):
    agent_queue.enqueue({
        "type": task_type,
        "payload": payload,
        "priority": payload.get("priority", 5),
        "created_at": time.time()
    })

# Worker:处理任务
async def process_agent_tasks():
    async for job in agent_queue.claim("agent-worker-1"):
        task = job.payload
        if task["type"] == "generate_report":
            report = await ai_generate_report(task["payload"])
            # 将结果写回数据库
            job.ack()
python
# 记录 Agent 的每个决策
decision_stream = db.stream("agent-decisions")

with db.transaction() as tx:
    # 保存决策到数据库
    tx.execute(
        "INSERT INTO decisions (agent_id, action, context) VALUES (?, ?, ?)",
        [agent_id, action, json.dumps(context)]
    )
    # 同时发布到事件流(用于实时监控/调试)
    decision_stream.publish({
        "agent_id": agent_id,
        "action": action,
        "timestamp": time.time(),
        "context": context
    }, tx=tx)

六、性能基准与部署指南

性能特征

honker 的性能特点

指标 数值 说明
WAL 轮询间隔 1ms 理论最小延迟
单 DB 并发写入 ~1000 TPS SQLite 限制
队列消费延迟 1-5ms WAL 轮询 + 消费开销
消息持久化 零丢失 WAL 保证
内存占用 < 50MB 纯 SQLite 扩展
磁盘占用 取决于消息量 WAL 模式需要额外空间

部署方式

与竞品对比总结

方案 部署复杂度 数据一致性 消息延迟 适用规模 推荐指数
honker ⭐ 极低(单文件) ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ (1-5ms) 单机/边缘 ⭐⭐⭐⭐⭐
Redis ⭐⭐⭐ 中等 ⭐⭐⭐ ⭐⭐⭐⭐⭐ (<1ms) 中小型 ⭐⭐⭐⭐
RabbitMQ ⭐⭐⭐⭐ 较高 ⭐⭐⭐⭐ ⭐⭐⭐⭐ 中大型 ⭐⭐⭐
Kafka ⭐⭐⭐⭐⭐ 很高 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ 大型 ⭐⭐⭐
SQLite 轮询 ⭐ 极低 ⭐⭐ ⭐⭐ (100ms+) 小型 ⭐⭐

核心结论: honker 不是 Kafka 的替代品——它解决的是不同的问题。如果你需要一个零运维、事务一致、低延迟的消息机制来支撑单机或边缘部署的 AI Agent 系统,honker 是目前 SQLite 生态中最好的选择。

bash
# 1. 安装 honker(Python 绑定)
pip install honker

# 2. 或使用 Rust 原生版
cargo add honker

# 3. 加载扩展到 SQLite
# Python:
import honker
db = honker.open("myapp.db")

# 或使用标准 sqlite3 + 加载扩展
import sqlite3
conn = sqlite3.connect("myapp.db")
conn.load_extension("/path/to/honker.so")

🎯 相关面试题

巩固本篇知识点,备战 AI 岗位面试。