基于MCP构建智能体间通信:完整实现指南

本文详细介绍了如何在MCP协议上构建智能体间通信系统,包括流式传输、可恢复性、持久性和多轮交互等关键技术特性,并提供了完整的Python实现代码和架构设计。

基于MCP构建智能体间通信:完整实现指南

MCP已经显著超越了其最初"为LLM提供上下文"的目标。通过最近的增强功能,包括可恢复流、启发式交互、采样和通知(进度和资源),MCP现在为构建复杂的智能体间通信系统提供了强大的基础。

智能体/工具误解

随着更多开发者探索具有智能体行为的工具(长时间运行、执行过程中可能需要额外输入等),一个常见的误解是MCP不适用,主要是因为其早期工具原语示例侧重于简单的请求-响应模式。

这种观念已经过时。MCP规范在过去几个月中得到了显著增强,这些能力缩小了构建长时间运行智能体行为的差距:

  • 流式传输和部分结果:执行期间的实时进度更新
  • 可恢复性:客户端可以在断开连接后重新连接并继续
  • 持久性:结果在服务器重启后仍然存在
  • 多轮交互:通过启发式交互和采样在执行过程中进行交互式输入

什么使MCP工具具有"智能体"特性?

我们将智能体定义为一个可以在较长时间内自主运行的实体,处理可能需要基于实时反馈进行多次交互或调整的复杂任务。

1. 流式传输和部分结果

智能体需要实时进度更新机制,以保持用户知情(可观察性、调试、信任建立)和流式传输中间结果,使系统能够基于部分输出做出反应并调整执行。

服务器实现(智能体发送进度通知):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 来自server/server.py - 旅行智能体发送进度更新
for i, step in enumerate(steps):
    await ctx.session.send_progress_notification(
        progress_token=ctx.request_id,
        progress=i * 25,
        total=100,
        message=step, 
        related_request_id=str(ctx.request_id)   
    )
    await anyio.sleep(2)  # 模拟工作

2. 可恢复性

长时间运行的智能体受益于跨网络中断的任务连续性,允许客户端重新连接并从离开的地方恢复,而不是丢失进度或重新启动复杂操作。

事件存储实现(服务器保存会话状态):

1
2
3
4
5
# 来自server/event_store.py - 简单的内存事件存储
class SimpleEventStore(EventStore):
    def __init__(self):
        self._events: list[tuple[StreamId, EventId, JSONRPCMessage]] = []
        self._event_id_counter = 0

3. 持久性

长时间运行的智能体需要持久状态,在服务器重启后仍然存在,并支持带外状态/结果检查,允许跨会话的进度跟踪。

4. 多轮交互

智能体可能需要在执行过程中获得额外输入——人类对关键决策的澄清或批准,以及复杂子任务的AI生成内容或完成。

服务器实现(智能体请求确认):

1
2
3
4
5
6
# 来自server/server.py - 旅行智能体请求价格确认
elicit_result = await ctx.session.elicit(
    message=f"请确认您前往{destination}旅行的预估价格$1200",
    requestedSchema=PriceConfirmationSchema.model_json_schema(),
    related_request_id=ctx.request_id,
)

在MCP上实现长时间运行的智能体 - 代码概述

我们的代码示例演示了主机应用程序如何连接到具有多个工具的服务器:一个旅行智能体,通过启发式交互模拟旅行预订服务并进行价格确认;一个研究智能体,通过采样执行研究任务并生成AI辅助摘要。

具体来说,我们实现了一个具有两个主要智能体工具的服务器:

  • 旅行智能体 - 通过启发式交互模拟旅行预订服务并进行价格确认
  • 研究智能体 - 通过采样执行研究任务并生成AI辅助摘要

两个智能体都演示了实时进度更新、交互式确认和完整的会话恢复能力。

项目结构:

  • server/server.py - 具有旅行和研究智能体的可恢复MCP服务器,演示启发式交互、采样和进度更新
  • client/client.py - 具有恢复支持的交互式主机应用程序、回调处理程序和令牌管理
  • server/event_store.py - 实现会话恢复和消息重新传递的事件存储

关键实现概念

流式传输和进度更新 - 实时任务状态

流式传输使智能体能够在长时间运行的任务期间提供实时进度更新,让用户了解任务状态和中间结果。

客户端实现(主机接收进度更新):

1
2
3
4
5
6
7
8
# 来自client/client.py - 客户端处理实时通知
async def message_handler(message) -> None:
    if isinstance(message, types.ServerNotification):
        if isinstance(message.root, types.LoggingMessageNotification):
            console.print(f" [dim]{message.root.params.data}[/dim]")
        elif isinstance(message.root, types.ProgressNotification):
            progress = message.root.params
            console.print(f" [yellow]{progress.message} ({progress.progress}/{progress.total})[/yellow]")

启发式交互 - 请求用户输入

启发式交互使智能体能够在执行过程中请求用户输入。这对于长时间运行任务期间的确认、澄清或批准至关重要。

客户端实现(主机提供启发式交互回调):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 来自client/client.py - 客户端处理启发式交互请求
async def elicitation_callback(context, params):
    console.print(f" 服务器正在请求确认:")
    console.print(f" {params.message}")
    
    response = console.input("是否接受?(y/n): ").strip().lower()
    
    if response in ['y', 'yes']:
        return types.ElicitResult(
            action="accept",
            content={"confirm": True, "notes": "用户确认"}
        )
    else:
        return types.ElicitResult(
            action="decline", 
            content={"confirm": False, "notes": "用户拒绝"}
        )

采样 - 请求AI协助

采样允许智能体在执行过程中请求LLM协助进行复杂决策或内容生成。这实现了混合人机工作流程。

服务器实现(智能体请求AI协助):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 来自server/server.py - 研究智能体请求AI摘要
sampling_result = await ctx.session.create_message(
    messages=[
        SamplingMessage(
            role="user",
            content=TextContent(type="text", text=f"请总结关于{topic}研究的关键发现")
        )
    ],
    max_tokens=100,
    related_request_id=ctx.request_id,
)

可恢复性 - 跨断开连接的会话连续性

可恢复性确保长时间运行的智能体任务能够在客户端断开连接后继续存在,并在重新连接时无缝继续。这是通过事件存储和恢复令牌(最后事件ID)实现的。

客户端元数据与恢复令牌(客户端使用存储状态重新连接):

1
2
3
4
5
6
# 来自client/client.py - 带有元数据的客户端恢复
if existing_tokens and existing_tokens.get("resumption_token"):
    # 使用现有的恢复令牌继续之前的工作
    metadata = ClientMessageMetadata(
        resumption_token=existing_tokens["resumption_token"],
    )

扩展到MCP上的多智能体通信

架构说明:我们的实现已经演示了智能体间通信。主机应用程序充当"编排器智能体",与用户交互并将请求路由到MCP服务器上的专业智能体(旅行智能体、研究智能体)。

虽然我们的示例为简单起见连接到单个服务器,但同一个编排器智能体可以跨多个MCP服务器协调任务,每个服务器公开不同的专业智能体。

要将主机智能体(编排器)扩展到多个服务器的远程MCP智能体模式,主机智能体可以增强以下功能:

  • 智能任务分解:分析复杂的用户请求并将其分解为不同专业智能体的子任务
  • 多服务器协调:维护与多个MCP服务器的连接,每个服务器公开不同的智能体能力
  • 任务状态管理:跟踪多个并发智能体任务的进度,处理依赖关系并管理进行中的请求
  • 用户上下文保存:在协调专业智能体之间时维护交互上下文
  • 弹性和重试:处理故障,实现重试逻辑,并在智能体不可用时重新路由任务
  • 结果合成:将多个智能体的输出合并为连贯的响应

结论

MCP的增强能力——资源通知、启发式交互/采样、可恢复流和持久资源——在保持协议简单性的同时实现了复杂的智能体间交互。

总体而言,MCP协议规范正在快速发展;鼓励读者查看官方文档网站以获取最新更新——https://modelcontextprotocol.io/introduction

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计