云计算百科
云计算领域专业知识百科平台

MCP的SSE的底层通信原理(给出了不使用SDK实现mcp服务器的示例)

MCP通信原理

模型上下文协议( MCP)定义了两种标准的通信传输协议:STDIO和SSE,用于连接大型语言模型(LLM)与外部工具或数据源。

特性标准输入/输出(Stdio)HTTP + 服务器发送事件(SSE)
通信方式 通过标准输入(stdin)和标准输出(stdout)进行通信 客户端通过 HTTP POST 发送请求,服务器使用 SSE 推送响应
典型场景 本地开发、插件集成、命令行工具等 分布式部署、远程服务调用、需要实时数据更新的应用等
优势 无需网络连接,通信延迟低;部署简单;数据在本地传输,有助于保障数据隐私 支持跨网络通信;服务器可主动推送数据;兼容现有的 HTTP 基础设施,易于集成和扩展
限制 仅适用于同一台机器上的进程间通信,无法支持分布式部署 需要网络连接,可能涉及更复杂的安全配置和网络管理
认证机制 不适用 支持 JWT 和 API 密钥等认证机制
可扩展性 适用于单一进程通信 支持多客户端连接,适合分布式系统
配置复杂度 配置简单,适合快速开发和调试 配置灵活,可自定义端口、端点、认证等参数
适用场景 构建命令行工具、实现本地集成、需要简单的进程通信、使用 shell 脚本 构建 Web 应用程序、需要网络通信、需要认证、支持多个客户端、需要水平扩展

SSE介绍

Server-Sent Events(SSE)是一种基于 HTTP 协议的服务器推送技术,允许服务器通过单向的方式向客户端发送实时更新的数据流。SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。

总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

stdio中可以使用stdin来进行输入,使用stdout来进行输出。

但是SSE是单向通道,MCP要如何实现双向通信呢?是建立两根SSE通道吗?带着这个疑问,我们来进行动手实践。

MCP的SSE通信流程

我们可以构建一个简单的MCP Server,然后利用MCP官方提供的工具npx @modelcontextprotocol/inspector 可以比较方便地拉起一个验证MCP的管理页。针对这个管理页抓包就能发现一些SSE的通信端倪。

初始化项目

uv init mcp-server-demo
cd mcp-server-demo
uv add "mcp[cli]"

构建一个简单的MCP server

# server.py
from mcp.server.fastmcp import FastMCP

# Create an MCP server
mcp = FastMCP("Demo", port=3001)

# Add an addition tool
@mcp.tool()
def add(a: int, b: int) > int:
"""Add two numbers"""
return a + b

# Add a dynamic greeting resource
@mcp.resource("greeting://{name}")
def get_greeting(name: str) > str:
"""Get a personalized greeting"""
return f"Hello, {name}!"

if __name__ == "__main__":
mcp.run(transport="sse")

启动服务器以及启动mcp inspector

python server.py
npx @modelcontextprotocol/inspector

打开http://127.0.0.1:6274进入管理页,打开开发者模式,针对这个管理页抓包就能发现一些SSE的通信端倪。

  • 当连接到mcp server时,会建立一条SSE长连接,它只负责推送消息 在这里插入图片描述

    可以看到Client连接上/sse这个地址的第一个Event就是告诉Client发送信息需要去哪个URL发,这个URL通常会带上唯一的会话ID。

  • 当我调用工具时计算2+7时,client会发送一个post请求,结果会通过一开始的SSE长连接返回

    在这里插入图片描述 在这里插入图片描述

  • 总结双向通信的实现方式为

  • 只有一根SSE长连接,用来Server向Client推送数据,另外一个Client向Server发送请求的通道是使用普通的HTTP POST请求。

  • Client向Server发送的HTTP POST请求中只使用2xx反馈是否收到指令,所有的数据返回是通过一开始的SSE长连接来推送。

  • 简单实现MCP,致敬这个优秀的设计

    from fastapi import FastAPI, Request
    from sse_starlette.sse import EventSourceResponse
    from pydantic import BaseModel
    from typing import Optional, Callable, Awaitable, Any, List, Dict
    import asyncio
    import uuid
    import json
    import inspect
    import uvicorn

    app = FastAPI()
    mcpHub: Dict[str, "MCPServer"] = {}

    # 请求模型
    class McpRequest(BaseModel):
    id: Optional[int] = None
    jsonrpc: str
    method: str
    params: Optional[dict] = None

    # MCP 服务核心类
    class MCPServer:
    def __init__(
    self, name: str, message_path: str, tools: List[Callable[..., Awaitable[Any]]]
    ):
    """
    初始化函数,用于设置实例变量和启动必要的异步任务。

    参数:
    – name (str): 服务器名称。
    – message_path (str): 消息路径,用于指定消息的路由。
    – tools (List[Callable[…, Awaitable[Any]]]): 工具函数列表,这些函数是异步的,并可接受不定数量的参数。
    """
    # 初始化一个异步队列,用于处理消息
    self.queue: asyncio.Queue = asyncio.Queue()
    # 生成一个唯一的客户端ID
    self.client_id: str = str(uuid.uuid4())
    # 存储消息路径
    self.message_path: str = message_path
    # 存储提供的工具函数列表
    self.tools = tools
    # 初始化信息字典,包含协议版本、功能和服务器信息
    self.info = {
    "protocolVersion": "2024-11-05",
    "capabilities": {"experimental": {}, "tools": {"listChanged": False}},
    "serverInfo": {"name": name, "version": "1.6.0"},
    }

    def list_tool(self) > List[Dict[str, Any]]:
    """
    生成工具列表的函数。

    此函数遍历self.tools中的所有工具,并为每个工具生成一个包含名称、描述和输入模式的字典。
    这些字典最终被收集到一个列表中并返回。该函数主要用于整理和提供工具的相关信息,以便于后续的处理或展示。

    Returns:
    List[Dict[str, Any]]: 包含所有工具信息的列表,每个工具的信息包括名称、描述和输入模式。
    """
    # 初始化一个空列表,用于存储所有工具的信息
    tool_list = []

    # 遍历self.tools中的每个工具
    for tool in self.tools:
    # 使用inspect模块获取工具的签名信息,以便后续获取参数信息
    sig = inspect.signature(tool)

    # 将工具的名称、描述和输入模式添加到tool_list中
    tool_list.append(
    {
    "name": tool.__name__, # 工具的名称
    "description": tool.__doc__, # 工具的描述信息
    "inputSchema": {
    "type": "object",
    "properties": {
    # 根据工具的签名信息,生成输入模式的属性
    name: {"title": name, "type": "string"}
    for name in sig.parameters
    },
    },
    }
    )

    # 返回包含所有工具信息的列表
    return tool_list

    # 异步生成器函数:reader
    # 目的:持续从队列中读取事件并返回
    async def reader(self):
    # 循环无限期地尝试从队列中获取下一个事件
    while True:
    # 异步地从队列中获取事件
    event = await self.queue.get()
    # 返回事件
    yield event

    @staticmethod
    def response(result: Any, id: Optional[int]) > str:
    """
    生成一个JSON-RPC格式的响应字符串。

    参数:
    – result: Any 类型,代表JSON-RPC请求的结果,可以是任意类型。
    – id: Optional[int] 类型,代表JSON-RPC请求的ID,可能为None,用于标识请求。

    返回值:
    – str 类型,代表JSON-RPC格式的响应字符串,包括结果和可选的请求ID。
    """
    # 初始化JSON-RPC响应消息的基本结构,包含结果。
    message = {"jsonrpc": "2.0", "result": result}

    # 如果请求ID不为空,则将其添加到响应消息中。
    if id is not None:
    message["id"] = id

    # 将响应消息序列化为JSON字符串并返回。
    return json.dumps(message)

    async def handle_request(self, req: McpRequest):
    """
    处理请求函数,根据不同的请求方法执行相应的逻辑。

    参数:
    – req (McpRequest): 请求对象,包含请求的方法、参数等信息。

    此函数根据请求方法的不同,执行初始化、工具列表查询或工具调用等操作,并将结果通过队列返回。
    """
    if req.method == "initialize":
    # 当请求方法为initialize时,将包含info信息的响应放入队列
    await self.queue.put(
    {"event": "message", "data": self.response(self.info, req.id)}
    )

    elif req.method == "tools/list":
    # 当请求方法为tools/list时,列出所有工具信息并放入队列
    tools_info = self.list_tool()
    await self.queue.put(
    {
    "event": "message",
    "data": self.response({"tools": tools_info}, req.id),
    }
    )

    elif req.method == "tools/call":
    # 当请求方法为tools/call时,根据名称调用工具,并将结果放入队列
    tool_name = req.params.get("name")
    args = req.params.get("arguments", {})

    for tool in self.tools:
    if tool.__name__ == tool_name:
    try:
    # 尝试调用工具并处理结果
    result = await tool(**args)
    await self.queue.put(
    {
    "event": "message",
    "data": self.response(
    {"content": result, "isError": False}, req.id
    ),
    }
    )
    except Exception as e:
    # 如果工具调用出错,将错误信息放入队列
    await self.queue.put(
    {
    "event": "message",
    "data": self.response(
    {"content": str(e), "isError": True}, req.id
    ),
    }
    )
    break

    # 工具函数
    async def test(state: Optional[str] = None) > str:
    """Returns a simple greeting message"""
    await asyncio.sleep(1)
    return f"hi {state}!"

    # SSE 接收端:创建 MCPServer 并建立连接
    @app.get("/sse")
    async def receive_test():
    """
    创建并初始化一个MCPServer实例,将其添加到mcpHub中,并向其队列放入事件信息。

    该函数主要完成以下任务:
    1. 实例化MCPServer对象,指定名称、消息路径和工具函数。
    2. 将新创建的MCPServer实例根据其client_id存储在mcpHub中。
    3. 向MCPServer的队列放入包含endpoint信息的数据。
    4. 返回一个EventSourceResponse对象,用于SSE连接的响应。
    """
    # 实例化MCPServer对象,参数包括名称、消息路径和工具函数列表
    mcp = MCPServer(name="mcp-test", message_path="/message", tools=[test])

    # 将MCPServer实例存储在mcpHub中,键为client_id
    mcpHub[mcp.client_id] = mcp

    # 向MCPServer的队列放入事件信息,包含endpoint的路径和client_id
    await mcp.queue.put(
    {"event": "endpoint", "data": f"{mcp.message_path}?client_id={mcp.client_id}"}
    )

    # 返回EventSourceResponse对象,用于SSE连接的响应
    return EventSourceResponse(mcp.reader())

    # SSE 发送端:接收 JSON-RPC 请求
    @app.post("/message")
    async def send_test(request: Request, payload: McpRequest):
    """
    处理来自客户端的JSON-RPC请求。

    该函数首先从请求的查询参数中获取client_id,检查其是否存在且有效。
    如果client_id无效或不在mcpHub中,则返回错误信息。
    否则,将payload转发给对应的mcpHub客户端处理。

    参数:
    – request: Request对象,包含请求的相关信息。
    – payload: McpRequest对象,承载着JSON-RPC请求的数据。

    返回:
    – 如果client_id无效或不在mcpHub中,返回包含错误信息的字典。
    – 如果请求被成功处理,返回包含状态信息的字典。
    """
    # 从请求的查询参数中获取client_id
    client_id = request.query_params.get("client_id")
    # 检查client_id是否存在且有效
    if not client_id or client_id not in mcpHub:
    # 如果client_id无效或不在mcpHub中,返回错误信息
    return {"error": "Invalid client_id"}

    # 转发payload给对应的mcpHub客户端处理
    await mcpHub[client_id].handle_request(payload)
    # 请求成功处理,返回状态信息
    return {"status": "ok"}

    # 本地运行入口
    if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8001)

    启动 在这里插入图片描述

    我这里使用APIFOX来发送请求 在这里插入图片描述

    把这个client_id复制下来去请求message接口 在这里插入图片描述 在这里插入图片描述

    sse长连接成功返回信息

    从我们的简单实现中可以看到,我们完全可以不依赖 /sse /message 这些默认路由地址,MCP的URL可以完全自定义。

    参考链接:https://mp.weixin.qq.com/s/UM6PwoBGhRGvJbvUYggObw

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » MCP的SSE的底层通信原理(给出了不使用SDK实现mcp服务器的示例)
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!