云计算百科
云计算领域专业知识百科平台

理论+代码讲解Streamable HTTP MCP服务器原理,拒绝调包从0到1手撕流式 HTTP MCP服务器!

前言

2025年人工智能大模型领域最值得关注的热点非MCP莫属。能调用外部工具是人工智能大模型从对话机器人进化为多功能助手AI Agent的关键。MCP技术借助大模型Function Calling的基础能力,凭借高效的开发规范,被大家广泛关注。

MCP提供的统一调用规范,涵盖接口定义、异常处理、方法参数、返回值等方面,结合Anthropic公司开发的标准化SDK,开发者们可以快速开发MCP Server并分享出来,其它任何想使用同样功能的开发者都可以通过大模型客户端访问,减少额外的开发工作。

目前MCP Server的通信方式有Stdio和Http SSE两种,对相关知识不了解的大家可阅读我以前的文章:

  • 理论+代码一文带你深入浅出MCP:人工智能大模型与外部世界交互的革命性突破
  • 基于 MCP Http SSE模式的天气助手智能体开发实战(一文带你了解MCP两种开发模式)

然而这两种方式存在一些问题无法成为企业级应用的开发范式。2025年5月9日,MCP(Model Context Protocol)迎来重磅升级——Streamable HTTP正式发布,取代了HTTP SSE, 成为AI模型通信的新标准!

Streamable HTTP强大的性能引起广泛热议,很多博主都撰写文章分享Streamable HTTP的相关知识,但总感觉浅尝辄止。笔者也关注到了Streamable HTTP的特性,特意花费较长时间研究Streamable HTTP的核心原理,并编写python代码在不调用官方sdk前提下手搓MCP Server和MCP Client,带你从底层全面了解Streamable HTTP为什么会成为MCP 企业级开发的唯一通信方式。

一、Streamable HTTP 协议理论详解

1.1 Stdio与SSE通信方式的弊端

在Streamable HTTP出现之前,MCP服务器通信方式有Stdio方式和Http SSE两种。

Stdio方式的工作原理是将MCP Server作为MCP Client的子进程,双方通过约定的管道进行通信。这种方式也是目前大家接入MCP Server最常用的方式(可阅读我的文章 不写一行代码! VsCode+Cline+高德地图MCP Server 帮你搞定和女友的出行规划(附原理解析))。然而Stdio通信方式也注定了MCP Server只能局限于本地环境,只适合一些简单的网络请求(例如查询添加),简单运算(加减乘除)等场景,因为它的性能与本地算力息息相关,是不能作为企业级分布式应用的。

在这里插入图片描述 HTTP SSE方式工作原理是基于HTTP协议的事件传输机制。它允许服务器通过HTTP单向推送事件到客户端,这种通信方式看似解决了Stdio不能分机部署的弊端,但这种方式的设计还存在其它严重问题。

在这里插入图片描述 MCP Client和MCP Server 在HTTP SSE方式下通过两个主要通道通信:

  • HTTP请求/响应:客户端通过标准HTTP请求发送消息到服务端
  • 服务器推送事件(SSE):通过专门的/sse端点向客户端推送消息

乍一看这种设计方式简单直观,但它存在如下关键问题:

  • 不支持断线重连/恢复:SSE连接断开所有会话状态丢失,客户端必须重新建立连接并初始化整个会话。
  • 服务器需维护长连接:服务器必须为每个客户端维护一个长时间的SSE连接,大量并发用户会导致资源消耗剧增,当服务器重启或扩容时所有连接会中断影响用户体验和系统可靠性。
  • 服务器消息只能通过SSE传递:即使是简单的请求-响应交互、服务器也必须通过SSE通道返回信息,这就需要服务器一直保持SSE连接,造成不必要的复杂性和开销。
  • 基础设施兼容性限制:目前很多Web基础设施如CDN、负载均衡器、API网关等对长时间SSE连接支持性不够,企业防火墙有可能强制关闭超时SSE连接,造成连接不可用。
  • 在这里插入图片描述 以上关键问题如果不解决,MCP 终将只是个人的玩具,很难被企业广泛使用,这样MCP的生态不会得到很多支持和扩展。好在 Streamable HTTP 的出现解决了这些关键问题~

    1.2 Streamable HTTP 设计与原理

    早在2025年3月26日,MCP官方github就出现HTTP流式传输服务器通信标准的提议用来代替现在 HTTP SSE的通信方式。该提议详细说明了Streamable HTTP MCP 服务器与客户端之间的通信流程,以及外部工具调用信息同步格式与流程,如下图所示:

    在这里插入图片描述 结合图片按照客户端首次启动->成功连接服务器->等待用户提问的完整过程分享Streamable HTTP MCP详细的请求响应顺序 ,让大家完全掌握每一步在干什么。

  • 首先是客户端启动与服务端的连接,需要3步握手,这时用户还没有输入信息
  • 时刻HTTPJSON-RPC method作用服务器典型响应
    POST /mcp initialize 协商协议版本 & 能力 result.protocolVersion = 协议版本号 result.capabilities.tools.listChanged = true
    POST /mcp notifications/initialized 客户端告诉服务器“我已就绪”(通知服务器只回 204 No Content) HTTP 204 无包体
    POST /mcp tools/list 向服务器请求工具清单 result.tools 数组 + nextCursor(下一流式点)
  • 当用户第一次提问时,模型判断要使用工具,客户端向服务端发起工具调用请求,执行如下步骤
  • 时刻HTTPJSON-RPC method内容要点
    POST /mcp tools/call params.name = get_weather``params.arguments.city 或 location
    流式响应 stream / result 服务器逐行推送:• 进度 stream• 成功 result.content[]
  • 客户端在收到⑤的result.content后,会把文本回填到大模型对话记录中,大模型再输出给终端—你就可以看到MCP服务器执行的结果啦!
  • 将上述流程按顺序简写后如下:

  • POST /mcp → 200 initialize
  • POST /mcp → 204 notifications/initialized
  • POST /mcp → 200 tools/list
  • ——等待用户——
  • POST /mcp → 200/stream tools/call (服务器保持连接,逐行推流)
    • JSON 一行 {"stream": "正在查询…"}…
    • JSON 一行 {"result": { "content":[…] }} → 服务器随后关闭流
  • 如果有多次工具调用,步骤4, 5会重复,每次id都会改变。

    服务器和客户端通信的协议格式建议采用JSON-RPC,是一种用JSON编写的、结构化的远程调用协议,其基本格式结构如图所示:

    类型字段说明
    请求 jsonrpc 固定为 "2.0"
    id 请求编号,用于对应请求与响应
    method 要调用的方法名(比如 "tools/call")
    params 方法参数(可以是对象或数组)
    响应 jsonrpc 也要写 "2.0"
    id 与请求的 ID 一致
    result 成功返回值(只需 result)
    error 如果出错则返回 error 对象

    以上就是Streamable HTTP的设计原理~

    1.3 HTTP Streamable 与 SSE对比

    了解Streamable HTTP详细的设计原理后,再来看Http SSE的四个关键问题,搞懂这些问题是如何被解决的将进一步加深我们对Streamable HTTP协议的理解。

  • 问:Streamable HTTP如何解决SSE不支持断线重连的问题? 答:Streamable HTTP在每次通信时会记录id编号对应请求与响应,将这里请求与响应存储可断线重连进行恢复。

  • 问:Streamable HTTP如何解决SSE服务器需要维持长连接的问题?答:在需要发送响应过程中会保持连接,但一旦流式响应结束,服务器随后便会关闭流。

  • 问:Streamable HTTP如何解决SSE服务器消息只能通过SSE传输的问题?答:Streamable HTTP服务器可灵活选择是返回普通HTTP响应还是升级为SSE流,对于简单请求直接使用普通HTTP响应,对于内容复杂等需要流式传输的请求场景自动升级为SSE。

  • 问:Streamable HTTP如何解决SSE服务器基础设施兼容性限制?答:Streamable HTTP各基础设施的兼容性很完备。

  • 解决了SSE的关键问题,Streamable HTTP才能真正成为企业级MCP应用的大杀器~

    二、手搓 Streamable MCP Server

    2.1 环境搭建

  • 本项目使用anaconda创建虚拟环境,同时mcp官方建议使用uv管理python环境, 执行如下命令安装环境:
  • conda create -n mcp python=3.12
    conda activate mcp
    pip install uv

  • 执行uv init streamble-mcp-server初始化项目,使用cd streamble-mcp-server命令进入项目目录,执行uv venv创建虚拟环境。要激活虚拟环境,windows下执行.venv\\Scripts\\activate, Linux下执行source .venv/bin/activate,我这里使用windows系统演示:
  • 在这里插入图片描述

  • 编写Streamable MCP Server还需要执行uv add openai fastapi requests在环境中安装如下三个依赖库:
    • openai: 调用OpenAI请求格式访问大模型
    • fastapi: python高性能Web框架,快速发起请求与响应, 本项目中用于模拟流式请求与响应
    • requests: 快速发起请求的库,本项目用于调用天气api
  • 在这里插入图片描述

    2.2 编写MCP Server

    在streamable-mcp-server项目中新建server.py文件编写MCP服务端代码,根据上面讲述的流式 HTTP的请求原理, 编写一个可以查询天气的Streamable MCP Server。这里查询天气功能需要使用心知天气免费api key, 注册流程可见:从0到1开发DeepSeek天气助手智能体——你以为大模型只会聊天?Function Calling让它“上天入地”, 完整代码在: https://github.com/TangBaron/streamable_http_mcp_server

  • 导入所需的pyhton库并定义全局变量:
  • import argparse
    import asyncio
    import json
    from typing import Any, AsyncIterator

    import requests
    from fastapi import FastAPI, Request, Response, status
    from fastapi.responses import StreamingResponse
    import uvicorn

    SERVER_NAME = "WeatherServer" # 定义服务器名称
    SERVER_VERSION = "1.0.0" #定义服务器版本
    PROTOCOL_VERSION = "2025-05-16" #定义协议版本号

  • 编写天气请求工具函数fetch_weather,编写stream_weather函数使用生成器将天气请求改写为流传输的形式,在传输jsonrpc协议中记录了req_id标识请求和响应。
  • # 编写请求天气函数
    async def fetch_weather(city: str):
    try:
    url="https://api.seniverse.com/v3/weather/now.json"
    params={
    "key": "你注册的心知天气api",
    "location": city,
    "language": "zh-Hans",
    "unit": "c"
    }
    response = requests.get(url, params=params)
    temperature = response.json()['results'][0]['now']
    except Exception:
    return "error"
    return json.dumps(temperature)

    #
    async def stream_weather(city: str, req_id: int | str):
    yield json.dumps({"jsonrpc": "2.0", "id": req_id, "stream": f"查询 {city} 天气中…"}).encode() + b"\\n"

    await asyncio.sleep(0.3)
    data = await fetch_weather(city)

    if data == "error":
    yield json.dumps({"jsonrpc": "2.0", "id": req_id, "error": {"code": 32000, "message": data["error"]}}).encode() + b"\\n"
    return

    yield json.dumps({
    "jsonrpc": "2.0", "id": req_id,
    "result": {
    "content": [
    {"type": "text", "text": data}
    ],
    "isError": False
    }
    }).encode() + b"\\n"

  • 定义服务器的工具列表,这里只有一个get_weather工具用来获取天气情况。因为是从底层编写,需要详细的JSON Schema格式让天气函数被大模型Function Calling功能识别,比我们使用mcp官方sdk繁琐。
  • TOOLS_REGISTRY = {
    "tools": [
    {
    "name": "get_weather",
    "description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。",
    "inputSchema": {
    "type": "object",
    "properties": {
    "city": {
    "type": "string",
    "description": "City name, e.g. 'Hangzhou'"
    }
    },
    "required": ["city"]
    }
    }
    ],
    "nextCursor": None
    }

  • 使用FastAPI模拟Streamable HTTP请求与响应的流程,阅读代码时建议仔细阅读注释将代码与1.2节 Streamable HTTP 设计与原理中的步骤对应,更清晰直观的了解请求与响应的全过程:
  • app = FastAPI(title="Weather HTTP-Streamble MCP SERVER")

    @app.get("/mcp")
    async def mcp_initialize_via_get():
    # GET 请求也执行了 initialize 方法, 对应步骤1
    return {
    "jsonrpc": "2.0",
    "id": 0,
    "result": {
    "protocolVersion": PROTOCOL_VERSION,
    "capabilities": {
    "streaming": True,
    "tools": {"listChanged": True}
    },
    "serverInfo": {
    "name": SERVER_NAME,
    "version": SERVER_VERSION
    },
    "instructions": "Use the get_weather tool to fetch weather by city name."
    }
    }

    @app.post("/mcp")
    async def mcp_endpoint(request: Request):
    try:
    body = await request.json()
    # 打印客户端的请求内容
    print("收到请求:", json.dumps(body, ensure_ascii=False, indent=2))
    except Exception:
    return {"jsonrpc": "2.0", "id": None, "error": {"code": 32700, "message": "Parse error"}}
    req_id = body.get("id", 1)
    method = body.get("method")

    #打印当前方法类型
    print(f"方法: {method}")

    if method == "notifications/initialized": #对应步骤2,连接建立初始化
    return Response(status_code=status.HTTP_204_NO_CONTENT)

    if method is None:
    return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP server online."}}

    if method == "initialize": # 对应步骤1,请求建立连接
    return {
    "jsonrpc": "2.0",
    "id": req_id,
    "result": {
    "protocolVersion": PROTOCOL_VERSION,
    "capabilities": {
    "streaming": True,
    "tools": {"listChanged": True}
    },
    "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
    "instructions": "Use the get_weather tool to fetch weather by city name."
    }
    }

    if method == "tools/list": # 对应步骤3,向服务器请求函数工具清单
    print(json.dumps(TOOLS_REGISTRY, indent=2, ensure_ascii=False))
    return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}

    if method == "tools/call": # 对应步骤4和步骤5,客户端发送工具调用请求
    params = body.get("params", {})
    tool_name = params.get("name")
    args = params.get("arguments", {})

    if tool_name != "get_weather":
    return {"jsonrpc": "2.0", "id": req_id, "error": {"code": 32602, "message": "Unknown tool"}}

    city = args.get("city")
    if not city:
    return {"jsonrpc": "2.0", "id": req_id, "error": {"code": 32602, "message": "Missing city"}}

    return StreamingResponse(stream_weather(city, req_id), media_type="application/json")

    return {"jsonrpc": "2.0", "id": req_id, "error": {"code": 32601, "message": "Method not found"}}

  • 解析命令行参数获得服务端要运行的IP和端口,启动服务端
  • def main() > None:
    parser = argparse.ArgumentParser(description="Weather MCP HTTP-Stream")
    parser.add_argument("–host", default="127.0.0.1")
    parser.add_argument("–port", type=int, default=8000)
    args = parser.parse_args()

    uvicorn.run(app, host=args.host, port=args.port, log_level="info")****

    if __name__ == "__main__":
    main()

    2.3 Streamable Server开启与测试

    编写完server.py的代码后,我们可以开启服务并进行测试。注意我们在 基于 MCP Http SSE模式的天气助手智能体开发实战(一文带你了解MCP两种开发模式) 文章中给大家推荐过的MCP Inspector工具目前还并不支持Streamable MCP服务器测试,我们基于HTTP流式传输协议的流程,利用接口调试神器Postman发送响应请求来模拟MCP客户端与流式服务端的通信。

  • 利用uv开启HTTP流式传输服务器, 在streamble-mcp-server项目目录下执行:uv run server.py命令开启Streamable HTTP MCP Server: 在这里插入图片描述
  • 接下来通过4个请求模拟MCP客户端与服务器的标准通信流程, 注意将Postman中Headers请求头中加入Content-Type: application/json 在这里插入图片描述
    • (1)initialize能力协商请求,对应步骤1。请求和返回结果如下图,我们可以看到成功返回服务器支持的协议版本: 在这里插入图片描述
    • (2) notifications/initialized 通知,确认客户端成功连接服务器,对应步骤2。请求和返回结果如下图,因为只是通知类型,服务器会返回状态码为204的空包: 在这里插入图片描述
    • (3) tools/list 请求,获取工具注册表,对应步骤3。请求和返回结果如下图, 我们期望可以获得get_weather工具结构体和json schema的工具注册表: 在这里插入图片描述
    • (4) tools/call请求,调用get_weather工具获得北京天气,对应步骤4,5。请求和返回结果如下图,我们期望可以逐行输出响应: 在这里插入图片描述 可以看到上述的请求均可按照期望状态正常响应,我们Streamable HTTP MCP Server就开发完成了,是不是非常硬核!
  • 三、手搓MCP Client

    现在Streamable HTTP MCP Server的基本功能就测试完了,相信大家看到这里根本不会满足:“就这?就通过几个伪造请求简单测试一下就想蒙我说开发出Streamable HTTP MCP Server了?”。

    那当然不会,MCP脱离了大模型客户端还能叫大模型嘛?这篇分享我们一路硬核到底,向大家介绍从零编写MCP客户端,并按照标准流程接入我们编写的Streamable HTTP MCP Server

    3.1 注册DeepSeek API Key

    本次代码编写客户端的大模型依赖是DeepSeek-V3-0324,大家首先要去DeepSeek官网注册API Key 在这里插入图片描述

    3.2 编写 MCP Client

    在streamble-mcp-server项目中新建client.py文件编写MCP客户端代码:

  • 导入相关包并定义客户端依赖大模型:
  • import asyncio
    import json
    import logging
    import os

    from contextlib import AsyncExitStack
    from typing import Any, Dict, List, Optional

    import httpx
    from openai import OpenAI

    class Configuration:
    def __init__(self) > None:
    self.api_key = "你注册的deepseek api key"
    self.base_url = "https://api.deepseek.com"
    self.model = "deepseek-chat"

    # 添加mcp server 配置文件
    @staticmethod
    def load_config(path: str) > Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
    return json.load(f)

  • 编写与单个 Streambale HTTP MCP Server交互的类,模拟服务器通信流程, 支持四个核心操作, 包括发送连接请求、初始化连接请求、获取工具列表和工具调用流式读取。
  • class HTTPMCPServer:
    """与单个 MCP Streamable HTTP 服务器通信"""

    def __init__(self, name: str, endpoint: str) > None:
    self.name = name
    self.endpoint = endpoint.rstrip("/") # e.g. http://localhost:8000/mcp
    self.session: Optional[httpx.AsyncClient] = None
    self.protocol_version: str = "2025-05-16" # 与server.py中定义的协议版本一致

    # 发送Post请求的方法
    async def _post_json(self, payload: Dict[str, Any]) > Dict[str, Any]:
    assert self.session is not None
    r = await self.session.post(self.endpoint, json=payload, headers={"Accept": "application/json"})
    if r.status_code == 204 or not r.content:
    return {} # ← 通知无响应体
    r.raise_for_status()
    return r.json()

    async def initialize(self) > None: #客户端发起
    self.session = httpx.AsyncClient(timeout=httpx.Timeout(30.0))
    # 1) 步骤1发送连接请求
    init_req = {
    "jsonrpc": "2.0",
    "id": 0,
    "method": "initialize",
    "params": {
    "protocolVersion": self.protocol_version,
    "capabilities": {},
    "clientInfo": {"name": "Streamable HTTP Client Demo", "version": "0.1"},
    },
    }
    r = await self._post_json(init_req)
    if "error" in r:
    raise RuntimeError(f"Initialize error: {r['error']}")
    # 2) 步骤二,发送请求初始化包,通知服务器已连接
    await self._post_json({"jsonrpc": "2.0", "method": "notifications/initialized"})

    # 步骤三 请求服务端 tools列表
    async def list_tools(self) > List[Dict[str, Any]]:
    req = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}
    res = await self._post_json(req)
    return res["result"]["tools"]

    # 步骤四 发起工具调用并将流式结果拼接为完整文本
    async def call_tool_stream(self, tool_name: str, arguments: Dict[str, Any]) > str:
    """调用工具并将流式结果拼接为完整文本"""
    req = {
    "jsonrpc": "2.0",
    "id": 3,
    "method": "tools/call",
    "params": {"name": tool_name, "arguments": arguments},
    }
    assert self.session is not None
    async with self.session.stream(
    "POST", self.endpoint, json=req, headers={"Accept": "application/json"}
    ) as resp:
    if resp.status_code != 200:
    raise RuntimeError(f"HTTP {resp.status_code}")
    collected_text: List[str] = []
    async for line in resp.aiter_lines():
    if not line:
    continue
    chunk = json.loads(line)
    if "stream" in chunk:
    continue # 中间进度
    if "error" in chunk:
    raise RuntimeError(chunk["error"]["message"])
    if "result" in chunk:
    # 根据协议,文本在 result.content[0].text
    for item in chunk["result"]["content"]:
    if item["type"] == "text":
    collected_text.append(item["text"])
    return "\\n".join(collected_text)

    async def close(self) > None:
    if self.session:
    await self.session.aclose()
    self.session = None

  • 编写类封装大模型对话的请求响应
  • class LLMClient:
    def __init__(self, api_key: str, base_url: Optional[str], model: str) > None:
    self.client = OpenAI(api_key=api_key, base_url=base_url)
    self.model = model

    def chat(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]]):
    return self.client.chat.completions.create(model=self.model, messages=messages, tools=tools)

  • 利用单服务器类与大模型对话请求类编写多服务MCP客户端,让Client更通用:
  • class MultiHTTPMCPClient:
    def __init__(self, servers_conf: Dict[str, Any], api_key: str, base_url: Optional[str], model: str) > None:
    self.servers: Dict[str, HTTPMCPServer] = {
    name: HTTPMCPServer(name, cfg["endpoint"]) for name, cfg in servers_conf.items()
    }
    self.llm = LLMClient(api_key, base_url, model)
    self.all_tools: List[Dict[str, Any]] = []

    async def start(self):
    for srv in self.servers.values():
    await srv.initialize()
    tools = await srv.list_tools()
    for t in tools:
    # 重命名以区分不同服务器
    full_name = f"{srv.name}_{t['name']}"
    self.all_tools.append({
    "type": "function",
    "function": {
    "name": full_name,
    "description": t["description"],
    "parameters": t["inputSchema"],
    },
    })
    logging.info("已连接服务器并汇总工具:%s", [t["function"]["name"] for t in self.all_tools])

    async def call_local_tool(self, full_name: str, args: Dict[str, Any]) > str:
    srv_name, tool_name = full_name.split("_", 1)
    srv = self.servers[srv_name]
    city = args.get("city")
    if not city:
    raise ValueError("Missing city/location")
    return await srv.call_tool_stream(tool_name, {"city": city})

    async def chat_loop(self):
    print("🤖 HTTP MCP + Function Calling 客户端已启动,输入 quit 退出")
    messages: List[Dict[str, Any]] = []
    while True:
    user = input("你: ").strip()
    if user.lower() == "quit":
    break
    messages.append({"role": "user", "content": user})
    # 1st LLM call
    resp = self.llm.chat(messages, self.all_tools)
    choice = resp.choices[0]
    if choice.finish_reason == "tool_calls":
    tc = choice.message.tool_calls[0]
    tool_name = tc.function.name
    tool_args = json.loads(tc.function.arguments)
    print(f"[调用工具] {tool_name}{tool_args}")
    tool_resp = await self.call_local_tool(tool_name, tool_args)
    messages.append(choice.message.model_dump())
    messages.append({"role": "tool", "content": tool_resp, "tool_call_id": tc.id})
    resp2 = self.llm.chat(messages, self.all_tools)
    print("AI:", resp2.choices[0].message.content)
    messages.append(resp2.choices[0].message.model_dump())
    else:
    print("AI:", choice.message.content)
    messages.append(choice.message.model_dump())

    async def close(self):
    for s in self.servers.values():
    await s.close()

  • 编写main函数,读取MCP Server配置文件,运行main文件:
  • async def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s – %(levelname)s – %(message)s")
    conf = Configuration()
    servers_conf = conf.load_config("./servers_config.json").get("mcpServers", {})
    client = MultiHTTPMCPClient(servers_conf, conf.api_key, conf.base_url, conf.model)
    try:
    await client.start()
    await client.chat_loop()
    finally:
    await client.close()

    if __name__ == "__main__":
    asyncio.run(main())

    我们编写的Client功能支持多轮对话,能自动识别是否需要调用工具,自动处理工具参数解析与调用逻辑,通过读取配置文件方式支持多个服务器并包含了容错处理,不但可以作为本次测试需求,更可以作为我们日后MCP服务器接入的通用客户端模板!

    3.3 Client 接入 Streamable Server

  • 在streamble-mcp-server项目中新建servers_config.json文件用于写入我们Streamable HTTP MCP Server的传输服务器地址和名称:
  • {
    "mcpServers": {
    "weather": {
    "endpoint": "http://127.0.0.1:8000/mcp"
    }
    }
    }

  • 执行uv run server.py命令开启Streamable HTTP MCP Server:
  • 在这里插入图片描述 3. 执行uv run client.py命令开启 MCP Client, 并尝试进行问答“你好,好久不见?”,“请问北京今天天气如何”,并观察Streamable HTTP MCP Server运行效果:

    在这里插入图片描述 显然客户端成功调用服务端的get_weather工具函数,这进一步验证了Streamable HTTP MCP Server编写的正确性。

    四、总结

    本篇文章详细介绍了2025年人工智能领域的关键技术——MCP(Model Context Protocol)的重大升级Streamable HTTP协议。Streamable HTTP通过绝妙的通信协议设计解决了HTTP SSE方式断线无法恢复、服务器资源消耗大的缺陷,成为MCP企业级应用的通信新标准。

    本篇文章后半部分通过硬核的代码实战,编写python代码从0到1手动开发了Streamable MCP Server和Client:Server基于FastAPI框架实现流式天气查询功能,支持协议协商与工具调用;Client则集成大模型,通过异步通信自动解析工具参数并处理多轮对话。测试验证了从连接建立到工具调用的全流程,证明Streamable HTTP的高效与可靠性。

    本篇文章笔者倾注心血,力图让大家完全明白Streamable HTTP MCP Server的核心原理、实现流程以及它是如何推动MCP从AI助手迈向企业级应用。当然随着Streamable HTTP Server的发展,相关的SDK开发也在逐步完善,截至2025年5月19日,Anthropic已经开发出简化Streamable HTTP MCP Server编写的SDK, 接下来笔者会也分享借助MCP SDK快速开发Streamable HTTP MCP Server的更多内容,让大家都能快速上手MCP, 开发属于自己的AI Agent智能体!

    感兴趣大家可关注我的CSDN账号,更推荐关注我同名微信公众号:大模型真好玩, 免费分享工作生活中遇到的大模型相关知识和教程,带你体系化从0到1学习大模型,在AI时代先人一步~

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 理论+代码讲解Streamable HTTP MCP服务器原理,拒绝调包从0到1手撕流式 HTTP MCP服务器!
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!