文章摘要
ACP(Agent Communication Protocol)由 IBM Research 于 2025 年 3 月发起,2025 年 8 月正式合并入 Google A2A 协议(Linux Foundation 管理)。ACP 为多 Agent 系统提供了轻量级、HTTP 原生、SDK 可选的通信标准,其 REST 设计、异步优先架构和多模态消息支持使其成为企业级 Agent 互操作的关键基础设施。本文从协议原理、四层架构、与 MCP/A2A 的关系、到 Python/TypeScript 实战代码,完整覆盖 ACP 的技术全景。
1为什么需要 ACP:Agent 通信的最后一块拼图
2025 年到 2026 年,AI Agent 协议生态经历了从碎片化到统一的快速演进。 在这个进程中,三个协议构成了核心支柱:Anthropic 的 MCP 解决了 Agent 与工具的连接问题,Google 的 A2A 解决了 Agent 与 Agent 的任务委托问题,而 IBM 的 ACP 则填补了一个关键空白——轻量级、框架无关的 Agent 间消息传递标准。
ACP 的独特定位在于它的设计哲学:极简主义。与 A2A 的完整企业级方案不同,ACP 追求的是「用 curl 就能测试 Agent 通信」的简洁性。这意味着:
- 无需 SDK 即可使用:任何能发 HTTP 请求的语言都能接入 ACP
- 异步优先:天然支持长时间运行的 Agent 任务,通过 Server-Sent Events (SSE) 实现流式响应
- 离线发现:Agent 不需要在线就能被其他 Agent 发现其能力
- 多模态消息:消息体可以包含文本、图片、嵌入向量、结构化数据
ACP 的诞生时间线:
2025 年 3 月,IBM Research 发布 BeeAI 开源平台,ACP 作为其内部通信协议首次亮相。2025 年 5 月,ACP 独立开源,提供独立的协议规范和参考实现。2025 年 8 月 29 日,IBM 宣布将 ACP 合并入 Linux Foundation 旗下的 A2A 项目——这不是竞争失败,而是协议融合。ACP 的轻量级消息传递层被保留为 A2A 的可选子协议,专门服务于需要极简通信的场景。
ACP 解决的核心痛点是什么?
在 ACP 出现之前,如果你有两个用不同框架构建的 Agent(比如一个 LangGraph Agent 和一个 CrewAI Agent),要让它们通信,你需要:
- 为每个 Agent 编写 HTTP 包装器
- 定义消息格式(JSON?Protobuf?自定义?)
- 处理异步回调(任务完成了怎么通知?)
- 管理 Agent 生命周期(启动、停止、健康检查)
ACP 把这些全部标准化了。
💡 一句话理解
ACP 已合并入 A2A(Linux Foundation),但其协议规范和参考实现仍独立维护。在需要轻量级 Agent 通信的场景中,直接使用 ACP 比使用完整 A2A 更合适。
⚠️ 常见踩坑
ACP 的「轻量级」不意味着「功能弱」——它的异步消息传递和离线发现能力是 A2A 原生不具备的。选择协议时应根据场景需求决定,而非简单认为 A2A 替代了 ACP。
2ACP 四层架构:从传输到语义的完整设计
ACP 的架构设计遵循经典的分层模型,将通信关注点解耦为四个独立的层次。这种设计确保了传输层的升级(比如从 HTTP/1.1 升级到 HTTP/3)不会影响上层的语义理解,反之亦然。
第一层:传输层(Transport Layer)
传输层定义了消息如何在 Agent 之间物理传递。ACP 选择了最务实的方案——HTTP/REST。这意味着:
- 每个 Agent 暴露标准的 HTTP 端点
- 消息格式为 JSON(人类可读,调试友好)
- 支持标准 HTTP 方法:POST 发送消息、GET 查询状态、DELETE 终止任务
- 认证使用标准 OAuth 2.0 或 API Key
第二层:消息层(Message Layer)
消息层定义了消息的结构。ACP 的消息模型比 A2A 更简单,但足够强大:
- Message:包含 sender(发送者 ID)、content(消息内容)、metadata(元数据)
- Content:支持多种类型——text/plain、image/png、application/json、embedding/vector
- Thread:消息按线程组织,支持多轮对话
第三层:发现层(Discovery Layer)
这是 ACP 最有特色的设计——离线发现。Agent 不需要在线就能注册自己的能力:
- 每个 Agent 发布一个 Agent Card(JSON 格式),描述自己的能力、支持的消息类型、认证要求
- Agent Card 存储在注册中心(可以是本地文件或分布式注册表)
- 其他 Agent 通过查询注册中心发现可用 Agent
第四层:治理层(Governance Layer)
治理层处理安全和合规问题:
- 可验证凭证(Verifiable Credentials):Agent 身份通过加密凭证验证
- 权限策略:定义 Agent 能访问哪些资源、执行哪些操作
- 审计日志:所有消息交换都有审计追踪
💡 一句话理解
ACP 的发现层设计参考了 W3C 的 WebFinger 协议——Agent Card 的格式与 ActivityPub 的 Actor 对象高度相似。如果你熟悉 ActivityPub(Mastodon 的协议),会发现 ACP 的设计非常直觉。
⚠️ 常见踩坑
ACP 的治理层在 2026 年仍处于草案阶段。生产环境中建议先用 OAuth 2.0 + 自定义权限策略替代完整的 Verifiable Credentials 方案,等规范稳定后再迁移。
3ACP 核心 API:5 个端点覆盖所有场景
ACP 的 API 设计遵循 REST 最佳实践,整个协议只有 5 个核心端点。这种极简设计是 ACP 最大的竞争力——一个下午就能实现完整的 ACP 兼容 Agent。
端点一览:
| 端点 | 方法 | 功能 | 响应 |
|---|---|---|---|
/agents |
GET | 列出本地 Agent | Agent 列表 |
/agents/:id/card |
GET | 获取 Agent Card | Agent 能力描述 |
/messages |
POST | 发送消息 | 消息 ID + 状态 |
/messages/:id |
GET | 查询消息状态 | 消息内容 + 状态 |
/messages/:id/stream |
GET | SSE 流式接收 | 实时事件流 |
消息格式示例:
一个典型的 ACP 消息请求包含以下结构:发送者 ID、接收者 ID、消息内容(支持多模态)、元数据(超时、优先级等)。
Agent Card 格式示例:
Agent Card 是 ACP 发现机制的核心,它描述了 Agent 的能力、支持的消息类型、认证要求。一个典型的 Agent Card 包含:Agent ID、名称、描述、能力标签、支持的内容类型、认证方式。
异步消息的生命周期:
ACP 中每条消息都有明确的状态机:pending → processing → completed / failed。Agent 在处理消息时,可以通过 SSE 推送中间状态更新,让调用方实时了解进度。
"""
ACP Server 完整实现
依赖: pip install fastapi uvicorn sse-starlette
"""
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime
import asyncio
import uuid
import json
app = FastAPI(title="ACP Server", version="1.0.0")
# ─── 数据模型 ───
class MessageContent(BaseModel):
type: Literal["text", "image", "json", "embedding"]
data: str # 文本内容 / Base64 图片 / JSON 字符串 / 向量 JSON
metadata: Optional[dict] = None
class MessageRequest(BaseModel):
sender_id: str
receiver_id: str
content: MessageContent
thread_id: Optional[str] = None
priority: Literal["low", "normal", "high"] = "normal"
timeout_seconds: int = 300
class AgentCard(BaseModel):
agent_id: str
name: str
description: str
capabilities: list[str]
supported_content_types: list[str]
auth_required: bool = False
max_concurrent_messages: int = 10
# ─── 内存存储(生产环境替换为 Redis/DB) ───
messages_db: dict[str, dict] = {}
agents_db: dict[str, AgentCard] = {}
message_queues: dict[str, asyncio.Queue] = {}
# ─── 注册示例 Agent ───
@app.on_event("startup")
async def register_default_agents():
agents_db["analyst-001"] = AgentCard(
agent_id="analyst-001",
name="数据分析 Agent",
description="擅长 SQL 查询和数据可视化",
capabilities=["sql_query", "data_visualization", "statistical_analysis"],
supported_content_types=["text", "json"],
auth_required=True,
)
agents_db["writer-001"] = AgentCard(
agent_id="writer-001",
name="写作 Agent",
description="擅长技术文档和报告撰写",
capabilities=["technical_writing", "report_generation", "summarization"],
supported_content_types=["text"],
auth_required=False,
)
# ─── ACP 核心端点 ───
@app.get("/agents")
async def list_agents():
"""列出所有已注册的 Agent"""
return {"agents": list(agents_db.values())}
@app.get("/agents/{agent_id}/card")
async def get_agent_card(agent_id: str):
"""获取指定 Agent 的能力描述"""
if agent_id not in agents_db:
raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found")
return agents_db[agent_id]
@app.post("/messages")
async def send_message(req: MessageRequest):
"""发送消息给指定 Agent(异步)"""
if req.receiver_id not in agents_db:
raise HTTPException(status_code=404, detail=f"Receiver {req.receiver_id} not found")
msg_id = str(uuid.uuid4())
thread_id = req.thread_id or str(uuid.uuid4())
message = {
"message_id": msg_id,
"thread_id": thread_id,
"sender_id": req.sender_id,
"receiver_id": req.receiver_id,
"content": req.content.model_dump(),
"status": "pending",
"priority": req.priority,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
}
messages_db[msg_id] = message
# 创建消息队列用于 SSE 推送
message_queues[msg_id] = asyncio.Queue()
# 异步处理消息(模拟 Agent 处理)
asyncio.create_task(process_message(msg_id, message))
return {
"message_id": msg_id,
"thread_id": thread_id,
"status": "pending",
"stream_url": f"/messages/{msg_id}/stream",
}
@app.get("/messages/{message_id}")
async def get_message_status(message_id: str):
"""查询消息状态"""
if message_id not in messages_db:
raise HTTPException(status_code=404, detail="Message not found")
return messages_db[message_id]
@app.get("/messages/{message_id}/stream")
async def stream_message(message_id: str):
"""SSE 流式接收消息处理进度"""
if message_id not in message_queues:
raise HTTPException(status_code=404, detail="No stream for this message")
async def event_generator():
queue = message_queues[message_id]
while True:
event = await queue.get()
yield {"event": event["type"], "data": json.dumps(event["data"])}
if event["type"] in ("completed", "failed"):
break
return EventSourceResponse(event_generator())
# ─── 消息处理模拟 ───
async def process_message(msg_id: str, message: dict):
"""模拟 Agent 处理消息的异步流程"""
queue = message_queues[msg_id]
# 状态: processing
messages_db[msg_id]["status"] = "processing"
await queue.put({"type": "status", "data": {"status": "processing", "progress": 0}})
# 模拟处理步骤
for step in range(1, 4):
await asyncio.sleep(1)
progress = step * 33
await queue.put({
"type": "progress",
"data": {"status": "processing", "progress": progress, "step": step}
})
# 状态: completed
result_content = {
"type": "text",
"data": f"任务完成。处理了来自 {message['sender_id']} 的请求。",
}
messages_db[msg_id]["status"] = "completed"
messages_db[msg_id]["result"] = result_content
messages_db[msg_id]["updated_at"] = datetime.utcnow().isoformat()
await queue.put({
"type": "completed",
"data": {
"status": "completed",
"result": result_content,
"completed_at": datetime.utcnow().isoformat(),
}
})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)# 1. 启动 ACP Server
python acp_server.py &
# 2. 列出所有 Agent
curl -s http://localhost:8100/agents | python -m json.tool
# 3. 获取指定 Agent 的能力
curl -s http://localhost:8100/agents/analyst-001/card | python -m json.tool
# 4. 发送消息(异步)
curl -s -X POST http://localhost:8100/messages \
-H "Content-Type: application/json" \
-d '{
"sender_id": "coordinator-001",
"receiver_id": "analyst-001",
"content": {
"type": "text",
"data": "分析上季度销售数据,找出表现最差的产品线"
},
"priority": "high",
"timeout_seconds": 120
}' | python -m json.tool
# 5. 查询消息状态
curl -s http://localhost:8100/messages/{message_id} | python -m json.tool
# 6. SSE 流式接收处理进度
curl -N http://localhost:8100/messages/{message_id}/stream💡 一句话理解
ACP 的 API 设计哲学是「5 分钟上手」——如果你熟悉 REST API,阅读上面的端点列表后就能立即开始集成。这也是 IBM 在设计时刻意追求的目标。
⚠️ 常见踩坑
生产环境中不要使用内存存储。示例中的 messages_db 和 agents_db 应替换为 Redis(消息队列)+ PostgreSQL(Agent 注册表)的组合。
4ACP vs A2A vs MCP:三大协议的精确边界
到 2026 年 6 月,MCP、A2A、ACP 三大协议已经形成了清晰的分工。 它们不是竞争关系,而是互补的分层基础设施。理解每个协议的精确边界,是做出正确架构决策的前提。
MCP(Model Context Protocol)——Agent 与工具的连接层
MCP 解决的是「Agent 如何使用外部工具」的问题。它定义了 Agent(客户端)如何发现、调用、管理外部工具(服务端)。类比:MCP 是 Agent 的「USB 接口」——让 Agent 能插入各种工具。
A2A(Agent-to-Agent Protocol)——Agent 间的任务委托层
A2A 解决的是「Agent 如何把复杂任务委托给其他 Agent」的问题。它定义了 Agent 如何发现彼此的能力、协商任务参数、交换中间结果。类比:A2A 是 Agent 的「项目管理协议」——让 Agent 能分工协作。
ACP(Agent Communication Protocol)——Agent 间的消息传递层
ACP 解决的是「Agent 之间如何简单高效地交换消息」的问题。它比 A2A 更轻量,不处理复杂的任务委托,只关注消息的发送、接收、状态追踪。类比:ACP 是 Agent 的「即时通讯协议」——让 Agent 能快速交换信息。
选型决策矩阵:
| 场景 | 推荐协议 | 原因 |
|---|---|---|
| Agent 需要查询数据库 | MCP | 工具连接是 MCP 的核心 |
| Agent 需要委托分析任务给另一个 Agent | A2A | 任务委托是 A2A 的核心 |
| 两个 Agent 需要快速交换状态信息 | ACP | 轻量级消息传递是 ACP 的核心 |
| 构建多 Agent 编排系统 | A2A + MCP | A2A 管编排,MCP 管工具 |
| 构建 Agent 消息队列 | ACP | 异步消息传递是 ACP 的强项 |
| 完整企业 Agent 平台 | MCP + A2A + ACP | 三层协议各司其职 |
ACP 合并入 A2A 后的变化:
2025 年 8 月 ACP 合并入 A2A(Linux Foundation)后,ACP 作为 A2A 的「轻量级消息子协议」继续存在。具体来说:
- A2A 的完整任务委托流程(Task 对象、Artifact 交换)保持不变
- ACP 的简单消息传递(Message 对象、SSE 流式响应)作为 A2A 的可选模块
- 开发者可以选择只使用 A2A 的任务委托功能,或同时启用 ACP 的消息传递功能
- ACP 的 Agent Card 格式被 A2A 采纳为标准的能力描述格式
💡 一句话理解
在实际项目中,最常见的组合是 MCP + A2A(覆盖 90% 的场景)。只有当你需要 Agent 间的轻量级消息队列或 SSE 流式推送时,才需要引入 ACP。
⚠️ 常见踩坑
不要为了「协议完整性」而同时使用三个协议。过度设计的协议栈会增加系统复杂度和维护成本。从 MCP + A2A 开始,按需引入 ACP。
5实战:构建 ACP + MCP + A2A 三协议融合系统
本节将构建一个完整的三协议融合系统,展示 MCP、A2A、ACP 如何在实际项目中协同工作。
场景描述:
一个企业智能助手系统包含三个 Agent:
- 协调 Agent:接收用户请求,分解任务,委托给专业 Agent
- 数据分析 Agent:通过 MCP 连接数据库,执行 SQL 查询
- 报告生成 Agent:接收分析结果,生成 Markdown 报告
协议分工:
- MCP:数据分析 Agent 通过 MCP 连接 PostgreSQL 数据库
- A2A:协调 Agent 通过 A2A 将分析任务委托给数据分析 Agent
- ACP:数据分析 Agent 通过 ACP 将中间结果流式推送给报告生成 Agent
为什么需要三种协议?
- 只用 A2A:可以完成任务委托,但无法连接数据库(缺少 MCP)
- 只用 MCP + A2A:可以委托任务并连接数据库,但中间结果只能通过任务完成后的 Artifact 获取,无法实时推送(缺少 ACP 的 SSE 流式能力)
- 三者结合:任务委托 + 工具连接 + 实时消息推送 = 完整的企业级方案
系统架构图:
用户 → 协调 Agent → (A2A) → 数据分析 Agent → (MCP) → PostgreSQL
↓ (ACP SSE)
报告生成 Agent → 最终报告
"""
三协议融合协调 Agent
- A2A: 委托任务给专业 Agent
- MCP: 不直接使用(由专业 Agent 使用)
- ACP: 接收中间结果的 SSE 流
"""
import httpx
import asyncio
import json
from typing import Optional
class CoordinatorAgent:
"""协调 Agent:接收用户请求,分解并委托给专业 Agent"""
def __init__(self):
self.a2a_base = "http://localhost:8200" # A2A 协调端点
self.acp_base = "http://localhost:8100" # ACP 消息端点
async def handle_user_request(self, user_query: str) -> dict:
"""处理用户请求的完整流程"""
# Step 1: 通过 A2A 委托数据分析任务
analysis_task = await self._delegate_via_a2a(
agent_id="analyst-001",
task={
"type": "data_analysis",
"query": user_query,
"output_format": "json",
}
)
print(f"[A2A] 数据分析任务已委托: {analysis_task['task_id']}")
# Step 2: 通过 ACP 发送消息通知报告 Agent 准备接收数据
acp_msg = await self._notify_via_acp(
receiver_id="writer-001",
content=f"即将接收数据分析结果,请准备生成报告。用户查询: {user_query}"
)
print(f"[ACP] 报告 Agent 已通知: {acp_msg['message_id']}")
# Step 3: 通过 ACP SSE 监听分析 Agent 的中间结果
results = await self._stream_results_via_acp(
message_id=analysis_task.get("acp_message_id", acp_msg["message_id"])
)
# Step 4: 将最终结果通过 A2A 委托报告 Agent
report_task = await self._delegate_via_a2a(
agent_id="writer-001",
task={
"type": "report_generation",
"analysis_data": results,
"format": "markdown",
}
)
return {
"analysis_task_id": analysis_task["task_id"],
"report_task_id": report_task["task_id"],
"status": "delegated",
}
async def _delegate_via_a2a(self, agent_id: str, task: dict) -> dict:
"""通过 A2A 协议委托任务"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.a2a_base}/tasks",
json={
"assignee": agent_id,
"task": task,
"priority": "high",
}
)
return resp.json()
async def _notify_via_acp(self, receiver_id: str, content: str) -> dict:
"""通过 ACP 协议发送通知消息"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.acp_base}/messages",
json={
"sender_id": "coordinator-001",
"receiver_id": receiver_id,
"content": {"type": "text", "data": content},
"priority": "normal",
}
)
return resp.json()
async def _stream_results_via_acp(self, message_id: str) -> list:
"""通过 ACP SSE 流式接收中间结果"""
results = []
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"GET",
f"{self.acp_base}/messages/{message_id}/stream"
) as response:
async for line in response.aiter_lines():
if line.startswith("data:"):
event = json.loads(line[5:].strip())
if event.get("type") == "progress":
print(f" [ACP] 进度: {event['data']['progress']}%")
elif event.get("type") == "completed":
results.append(event["data"].get("result"))
break
return results
# ─── 运行示例 ───
async def main():
coordinator = CoordinatorAgent()
result = await coordinator.handle_user_request(
"分析上季度华东区销售数据,找出下滑最明显的产品线"
)
print(f"\n最终结果: {json.dumps(result, indent=2)}")
if __name__ == "__main__":
asyncio.run(main())/**
* ACP TypeScript 客户端
* 依赖: npm install eventsource-parser
*/
import EventSource from 'eventsource';
interface ACPMessage {
message_id: string;
thread_id: string;
sender_id: string;
receiver_id: string;
content: {
type: 'text' | 'image' | 'json' | 'embedding';
data: string;
metadata?: Record<string, unknown>;
};
status: 'pending' | 'processing' | 'completed' | 'failed';
created_at: string;
}
interface AgentCard {
agent_id: string;
name: string;
description: string;
capabilities: string[];
supported_content_types: string[];
auth_required: boolean;
}
export class ACPClient {
private baseUrl: string;
constructor(baseUrl: string = 'http://localhost:8100') {
this.baseUrl = baseUrl;
}
/** 列出所有可用 Agent */
async listAgents(): Promise<AgentCard[]> {
const res = await fetch(`${this.baseUrl}/agents`);
const data = await res.json();
return data.agents;
}
/** 获取指定 Agent 的能力描述 */
async getAgentCard(agentId: string): Promise<AgentCard> {
const res = await fetch(`${this.baseUrl}/agents/${agentId}/card`);
if (!res.ok) throw new Error(`Agent ${agentId} not found`);
return res.json();
}
/** 发送消息 */
async sendMessage(params: {
senderId: string;
receiverId: string;
content: { type: string; data: string };
priority?: 'low' | 'normal' | 'high';
}): Promise<{ message_id: string; thread_id: string; status: string }> {
const res = await fetch(`${this.baseUrl}/messages`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
sender_id: params.senderId,
receiver_id: params.receiverId,
content: params.content,
priority: params.priority ?? 'normal',
}),
});
return res.json();
}
/** SSE 流式接收消息进度 */
streamProgress(
messageId: string,
onEvent: (type: string, data: unknown) => void
): EventSource {
const es = new EventSource(
`${this.baseUrl}/messages/${messageId}/stream`
);
es.addEventListener('status', (e) => {
onEvent('status', JSON.parse(e.data));
});
es.addEventListener('progress', (e) => {
onEvent('progress', JSON.parse(e.data));
});
es.addEventListener('completed', (e) => {
onEvent('completed', JSON.parse(e.data));
es.close();
});
es.addEventListener('failed', (e) => {
onEvent('failed', JSON.parse(e.data));
es.close();
});
return es;
}
/** 完整流程:发送消息并等待结果 */
async sendAndWait(params: {
senderId: string;
receiverId: string;
content: { type: string; data: string };
}): Promise<unknown> {
// 发送消息
const { message_id } = await this.sendMessage(params);
console.log(`[ACP] 消息已发送: ${message_id}`);
// 等待结果
return new Promise((resolve, reject) => {
this.streamProgress(message_id, (type, data) => {
console.log(`[ACP] 事件: ${type}`, data);
if (type === 'completed') resolve(data);
if (type === 'failed') reject(new Error('Message processing failed'));
});
});
}
}
// ─── 使用示例 ───
async function main() {
const acp = new ACPClient('http://localhost:8100');
// 列出 Agent
const agents = await acp.listAgents();
console.log('可用 Agent:', agents.map(a => a.name));
// 发送消息并等待结果
const result = await acp.sendAndWait({
senderId: 'coordinator-001',
receiverId: 'analyst-001',
content: {
type: 'text',
data: '分析上季度华东区销售数据',
},
});
console.log('分析结果:', result);
}
main().catch(console.error);💡 一句话理解
三协议融合的关键是「各司其职」:MCP 管工具、A2A 管任务、ACP 管消息。不要在 ACP 里实现任务委托逻辑,也不要在 A2A 里实现 SSE 流式推送。
⚠️ 常见踩坑
示例中的 httpx 和 fetch 调用没有包含认证逻辑。生产环境中,A2A 和 ACP 端点都需要 OAuth 2.0 或 API Key 认证。
6ACP 的离线发现机制:Agent Card 深度解析
ACP 的离线发现机制是其最具创新性的设计之一。 传统的 Agent 发现方案(如 A2A 的 Agent Card via HTTP)要求 Agent 在线才能被发现——你必须知道 Agent 的 URL 才能获取其能力描述。ACP 的离线发现打破了这个限制。
离线发现的工作原理:
Agent 注册:Agent 在启动时将自己的 Agent Card 发布到注册中心。注册中心可以是本地文件系统、Redis、PostgreSQL,或者分布式的 DHT 网络。
能力索引:注册中心为每个 Agent Card 建立索引,支持按能力标签(capabilities)搜索。
离线查询:其他 Agent 可以查询注册中心,发现当前不在线的 Agent 的能力。当目标 Agent 上线后,消息会自动路由。
Agent Card 的完整结构:
Agent Card 遵循 JSON-LD 格式,包含以下核心字段:
agent_id:全局唯一标识符(推荐 UUID)name:人类可读的名称description:能力描述(用于语义搜索)capabilities:能力标签数组(如 ["sql_query", "visualization"])supported_content_types:支持的消息内容类型auth_required:是否需要认证max_concurrent_messages:最大并发消息数availability:可用性状态(online/offline/scheduled)endpoints:服务端点列表
离线发现 vs 在线发现的权衡:
| 维度 | 离线发现(ACP) | 在线发现(A2A) |
|---|---|---|
| 发现时机 | Agent 不需要在线 | Agent 必须在线 |
| 信息新鲜度 | 可能过期 | 实时准确 |
| 实现复杂度 | 需要注册中心 | 只需 HTTP |
| 适用场景 | 大规模 Agent 注册表 | 小规模已知集群 |
| 容错性 | 高(注册中心可缓存) | 低(依赖 Agent 在线) |
"""
ACP Agent 注册中心
支持离线发现、能力搜索、状态追踪
"""
from datetime import datetime
from typing import Optional
import json
import os
class ACPRegistry:
"""ACP Agent 注册中心(文件系统实现)"""
def __init__(self, registry_dir: str = "./acp_registry"):
self.registry_dir = registry_dir
os.makedirs(registry_dir, exist_ok=True)
def register(self, agent_card: dict) -> str:
"""注册 Agent Card 到注册中心"""
agent_id = agent_card["agent_id"]
card_path = os.path.join(self.registry_dir, f"{agent_id}.json")
# 添加注册时间戳
agent_card["registered_at"] = datetime.utcnow().isoformat()
agent_card["last_seen"] = datetime.utcnow().isoformat()
agent_card["availability"] = "online"
with open(card_path, "w") as f:
json.dump(agent_card, f, indent=2, ensure_ascii=False)
return agent_id
def discover(self, capabilities: list[str]) -> list[dict]:
"""按能力标签发现 Agent(支持离线 Agent)"""
results = []
for filename in os.listdir(self.registry_dir):
if not filename.endswith(".json"):
continue
card_path = os.path.join(self.registry_dir, filename)
with open(card_path) as f:
card = json.load(f)
# 检查能力匹配
agent_caps = set(card.get("capabilities", []))
required_caps = set(capabilities)
if required_caps.issubset(agent_caps):
results.append(card)
return sorted(results, key=lambda c: c.get("registered_at", ""))
def heartbeat(self, agent_id: str) -> bool:
"""更新 Agent 心跳"""
card_path = os.path.join(self.registry_dir, f"{agent_id}.json")
if not os.path.exists(card_path):
return False
with open(card_path) as f:
card = json.load(f)
card["last_seen"] = datetime.utcnow().isoformat()
card["availability"] = "online"
with open(card_path, "w") as f:
json.dump(card, f, indent=2, ensure_ascii=False)
return True
def mark_offline(self, agent_id: str):
"""标记 Agent 为离线(Card 保留用于离线发现)"""
card_path = os.path.join(self.registry_dir, f"{agent_id}.json")
if not os.path.exists(card_path):
return
with open(card_path) as f:
card = json.load(f)
card["availability"] = "offline"
with open(card_path, "w") as f:
json.dump(card, f, indent=2, ensure_ascii=False)
# ─── 使用示例 ───
registry = ACPRegistry()
# 注册 Agent
registry.register({
"agent_id": "analyst-001",
"name": "数据分析 Agent",
"description": "擅长 SQL 查询和数据可视化",
"capabilities": ["sql_query", "data_visualization", "statistical_analysis"],
"supported_content_types": ["text", "json"],
"auth_required": True,
"max_concurrent_messages": 10,
})
# 离线发现:查找具有 sql_query 能力的 Agent
results = registry.discover(["sql_query"])
print(f"找到 {len(results)} 个匹配的 Agent")
for agent in results:
print(f" - {agent['name']} (状态: {agent['availability']})")💡 一句话理解
ACP 的离线发现能力在大规模 Agent 系统中非常有价值——你可以预先注册所有 Agent 的能力,即使某些 Agent 暂时离线,系统也能规划任务路径,等 Agent 上线后自动执行。
⚠️ 常见踩坑
离线发现的注册中心需要定期清理过期的 Agent Card。建议设置 TTL(如 7 天),超过 TTL 未更新的 Card 自动标记为 stale。
7ACP 安全模型:从 OAuth 2.0 到可验证凭证
ACP 的安全模型分为两个阶段:当前生产可用的 OAuth 2.0 方案,和未来规划中的可验证凭证(Verifiable Credentials)方案。
当前方案:OAuth 2.0 + API Key
在 2026 年的生产环境中,大多数 ACP 部署使用标准的 OAuth 2.0 认证:
- Agent 身份认证:每个 Agent 在注册中心注册时获得一个 client_id 和 client_secret
- 消息签名:每条消息包含一个 HMAC 签名,接收方可以验证消息来源
- 传输加密:所有 ACP 通信必须使用 HTTPS(TLS 1.3)
- 速率限制:每个 Agent 有独立的速率限制,防止消息风暴
未来方案:可验证凭证(VC)
ACP 规范中定义了一个更强大的安全模型,基于 W3C 的可验证凭证标准:
- 去中心化身份(DID):每个 Agent 有一个 DID,不依赖中心化注册中心
- 能力凭证(VC):Agent 的能力由可信机构签发凭证证明
- 选择性披露:Agent 可以只披露完成任务所需的最小能力
- 零知识证明:某些场景下,Agent 可以证明「我有某能力」而不暴露具体细节
安全最佳实践:
- 最小权限原则:Agent 只获得完成任务所需的最小权限
- 消息审计:所有 Agent 间通信都记录审计日志
- 超时控制:每条消息设置合理的超时时间
- 输入验证:对接收的消息内容进行严格验证
- 沙箱执行:Agent 在隔离环境中执行任务
💡 一句话理解
如果你的 Agent 系统只在企业内部运行,OAuth 2.0 + API Key 已经足够安全。可验证凭证方案更适合跨组织、跨信任域的 Agent 协作场景。
⚠️ 常见踩坑
永远不要在消息内容中传递敏感信息(密码、Token、个人数据)。ACP 消息在传输过程中虽然加密,但在 Agent Card 注册中心和审计日志中可能以明文存储。
8ACP 生产部署指南与 2026 生态现状
ACP 在 2026 年 6 月的生态现状:
自 2025 年 8 月合并入 A2A(Linux Foundation)以来,ACP 的发展轨迹可以总结为:
- 规范状态:ACP 1.0 规范已稳定,作为 A2A 规范的子模块维护
- 参考实现:Python(FastAPI)和 TypeScript(Express)两套参考实现
- SDK 支持:BeeAI SDK(Python)内置 ACP 客户端和服务端
- 企业采用:IBM 内部 Agent 平台、部分金融机构的 Agent 互操作层
- 社区规模:GitHub 约 3,000+ stars(ACP 独立仓库)
生产部署清单:
| 组件 | 推荐方案 | 替代方案 |
|---|---|---|
| HTTP 框架 | FastAPI (Python) / Express (Node.js) | Flask / Koa |
| 消息存储 | Redis Streams | PostgreSQL / Kafka |
| Agent 注册 | etcd | Consul / Redis |
| 认证 | OAuth 2.0 (Keycloak) | JWT + API Key |
| 监控 | Prometheus + Grafana | Datadog |
| 日志 | ELK Stack | Loki |
| 容器编排 | Kubernetes | Docker Compose (小规模) |
性能基准(2026 年 6 月测试):
在 4 核 16GB 的云服务器上,使用 Redis 作为消息存储:
ACP 的局限性与替代方案:
- 不适合大规模任务编排:如果你需要复杂的任务图、条件分支、并行执行,应该使用 A2A 的完整任务模型
- 没有内置的消息持久化:ACP 规范不要求消息持久化,如果需要保证消息不丢失,需要自行实现
- 缺少消息路由规则:ACP 的消息路由是点对点的,不支持基于内容的路由(如果需要,可以结合 Kafka)
ACP 的路线图(2026 H2):
- ACP 1.1:增加消息优先级队列和死信队列支持
- ACP 1.2:标准化 Agent Card 的语义描述(引入 Schema.org 词汇)
- ACP 2.0:完整的可验证凭证集成,支持去中心化身份
💡 一句话理解
对于大多数企业项目,建议直接使用 A2A 的完整方案(内含 ACP 子模块),而不是独立部署 ACP。这样可以获得更好的生态支持和工具链集成。
⚠️ 常见踩坑
ACP 的性能基准是在理想条件下测试的。生产环境中,消息序列化/反序列化、网络延迟、认证开销等因素会导致实际性能降低 30-50%。建议在自己的环境中做基准测试。