前言
2025年人工智能大模型领域最值得关注的热点非MCP莫属。能调用外部工具是人工智能大模型从对话机器人进化为多功能助手AI Agent的关键。MCP技术借助大模型Function Calling的基础能力,凭借高效的开发规范,被大家广泛关注。
MCP提供的统一调用规范,涵盖接口定义、异常处理、方法参数、返回值等方面,结合Anthropic公司开发的标准化SDK,开发者们可以快速开发MCP Server并分享出来,其它任何想使用同样功能的开发者都可以通过大模型客户端访问,减少额外的开发工作。
目前MCP Server的通信方式有Stdio和Http SSE两种,对相关知识不了解的大家可阅读我以前的文章:
- 理论+代码一文带你深入浅出MCP:人工智能大模型与外部世界交互的革命性突破
- 基于 MCP Http SSE模式的天气助手智能体开发实战(一文带你了解MCP两种开发模式)
然而这两种方式存在一些问题无法成为企业级应用的开发范式。2025年5月9日,MCP(Model Context Protocol)迎来重磅升级——Streamable HTTP正式发布,取代了HTTP SSE, 成为AI模型通信的新标准!
Streamable HTTP强大的性能引起广泛热议,很多博主都撰写文章分享Streamable HTTP的相关知识,但总感觉浅尝辄止。笔者也关注到了Streamable HTTP的特性,特意花费较长时间研究Streamable HTTP的核心原理,并编写python代码在不调用官方sdk前提下手搓MCP Server和MCP Client,带你从底层全面了解Streamable HTTP为什么会成为MCP 企业级开发的唯一通信方式。
一、Streamable HTTP 协议理论详解
1.1 Stdio与SSE通信方式的弊端
在Streamable HTTP出现之前,MCP服务器通信方式有Stdio方式和Http SSE两种。
Stdio方式的工作原理是将MCP Server作为MCP Client的子进程,双方通过约定的管道进行通信。这种方式也是目前大家接入MCP Server最常用的方式(可阅读我的文章 不写一行代码! VsCode+Cline+高德地图MCP Server 帮你搞定和女友的出行规划(附原理解析))。然而Stdio通信方式也注定了MCP Server只能局限于本地环境,只适合一些简单的网络请求(例如查询添加),简单运算(加减乘除)等场景,因为它的性能与本地算力息息相关,是不能作为企业级分布式应用的。
HTTP SSE方式工作原理是基于HTTP协议的事件传输机制。它允许服务器通过HTTP单向推送事件到客户端,这种通信方式看似解决了Stdio不能分机部署的弊端,但这种方式的设计还存在其它严重问题。
MCP Client和MCP Server 在HTTP SSE方式下通过两个主要通道通信:
- HTTP请求/响应:客户端通过标准HTTP请求发送消息到服务端
- 服务器推送事件(SSE):通过专门的/sse端点向客户端推送消息
乍一看这种设计方式简单直观,但它存在如下关键问题:
以上关键问题如果不解决,MCP 终将只是个人的玩具,很难被企业广泛使用,这样MCP的生态不会得到很多支持和扩展。好在 Streamable HTTP 的出现解决了这些关键问题~
1.2 Streamable HTTP 设计与原理
早在2025年3月26日,MCP官方github就出现HTTP流式传输服务器通信标准的提议用来代替现在 HTTP SSE的通信方式。该提议详细说明了Streamable HTTP MCP 服务器与客户端之间的通信流程,以及外部工具调用信息同步格式与流程,如下图所示:
结合图片按照客户端首次启动->成功连接服务器->等待用户提问的完整过程分享Streamable HTTP MCP详细的请求响应顺序 ,让大家完全掌握每一步在干什么。
① | POST /mcp | initialize | 协商协议版本 & 能力 | result.protocolVersion = 协议版本号 result.capabilities.tools.listChanged = true |
② | POST /mcp | notifications/initialized | 客户端告诉服务器“我已就绪”(通知服务器只回 204 No Content) | HTTP 204 无包体 |
③ | POST /mcp | tools/list | 向服务器请求工具清单 | result.tools 数组 + nextCursor(下一流式点) |
④ | POST /mcp | tools/call | params.name = get_weather``params.arguments.city 或 location |
⑤ | 流式响应 | stream / result | 服务器逐行推送:• 进度 stream• 成功 result.content[] |
将上述流程按顺序简写后如下:
- JSON 一行 {"stream": "正在查询…"}…
- JSON 一行 {"result": { "content":[…] }} → 服务器随后关闭流
如果有多次工具调用,步骤4, 5会重复,每次id都会改变。
服务器和客户端通信的协议格式建议采用JSON-RPC,是一种用JSON编写的、结构化的远程调用协议,其基本格式结构如图所示:
请求 | jsonrpc | 固定为 "2.0" |
id | 请求编号,用于对应请求与响应 | |
method | 要调用的方法名(比如 "tools/call") | |
params | 方法参数(可以是对象或数组) | |
响应 | jsonrpc | 也要写 "2.0" |
id | 与请求的 ID 一致 | |
result | 成功返回值(只需 result) | |
error | 如果出错则返回 error 对象 |
以上就是Streamable HTTP的设计原理~
1.3 HTTP Streamable 与 SSE对比
了解Streamable HTTP详细的设计原理后,再来看Http SSE的四个关键问题,搞懂这些问题是如何被解决的将进一步加深我们对Streamable HTTP协议的理解。
问:Streamable HTTP如何解决SSE不支持断线重连的问题? 答:Streamable HTTP在每次通信时会记录id编号对应请求与响应,将这里请求与响应存储可断线重连进行恢复。
问:Streamable HTTP如何解决SSE服务器需要维持长连接的问题?答:在需要发送响应过程中会保持连接,但一旦流式响应结束,服务器随后便会关闭流。
问:Streamable HTTP如何解决SSE服务器消息只能通过SSE传输的问题?答:Streamable HTTP服务器可灵活选择是返回普通HTTP响应还是升级为SSE流,对于简单请求直接使用普通HTTP响应,对于内容复杂等需要流式传输的请求场景自动升级为SSE。
问:Streamable HTTP如何解决SSE服务器基础设施兼容性限制?答:Streamable HTTP各基础设施的兼容性很完备。
解决了SSE的关键问题,Streamable HTTP才能真正成为企业级MCP应用的大杀器~
二、手搓 Streamable MCP Server
2.1 环境搭建
conda create -n mcp python=3.12
conda activate mcp
pip install uv
- openai: 调用OpenAI请求格式访问大模型
- fastapi: python高性能Web框架,快速发起请求与响应, 本项目中用于模拟流式请求与响应
- requests: 快速发起请求的库,本项目用于调用天气api
2.2 编写MCP Server
在streamable-mcp-server项目中新建server.py文件编写MCP服务端代码,根据上面讲述的流式 HTTP的请求原理, 编写一个可以查询天气的Streamable MCP Server。这里查询天气功能需要使用心知天气免费api key, 注册流程可见:从0到1开发DeepSeek天气助手智能体——你以为大模型只会聊天?Function Calling让它“上天入地”, 完整代码在: https://github.com/TangBaron/streamable_http_mcp_server
import argparse
import asyncio
import json
from typing import Any, AsyncIterator
import requests
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponse
import uvicorn
SERVER_NAME = "WeatherServer" # 定义服务器名称
SERVER_VERSION = "1.0.0" #定义服务器版本
PROTOCOL_VERSION = "2025-05-16" #定义协议版本号
# 编写请求天气函数
async def fetch_weather(city: str):
try:
url="https://api.seniverse.com/v3/weather/now.json"
params={
"key": "你注册的心知天气api",
"location": city,
"language": "zh-Hans",
"unit": "c"
}
response = requests.get(url, params=params)
temperature = response.json()['results'][0]['now']
except Exception:
return "error"
return json.dumps(temperature)
#
async def stream_weather(city: str, req_id: int | str):
yield json.dumps({"jsonrpc": "2.0", "id": req_id, "stream": f"查询 {city} 天气中…"}).encode() + b"\\n"
await asyncio.sleep(0.3)
data = await fetch_weather(city)
if data == "error":
yield json.dumps({"jsonrpc": "2.0", "id": req_id, "error": {"code": –32000, "message": data["error"]}}).encode() + b"\\n"
return
yield json.dumps({
"jsonrpc": "2.0", "id": req_id,
"result": {
"content": [
{"type": "text", "text": data}
],
"isError": False
}
}).encode() + b"\\n"
TOOLS_REGISTRY = {
"tools": [
{
"name": "get_weather",
"description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。",
"inputSchema": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name, e.g. 'Hangzhou'"
}
},
"required": ["city"]
}
}
],
"nextCursor": None
}
app = FastAPI(title="Weather HTTP-Streamble MCP SERVER")
@app.get("/mcp")
async def mcp_initialize_via_get():
# GET 请求也执行了 initialize 方法, 对应步骤1
return {
"jsonrpc": "2.0",
"id": 0,
"result": {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"streaming": True,
"tools": {"listChanged": True}
},
"serverInfo": {
"name": SERVER_NAME,
"version": SERVER_VERSION
},
"instructions": "Use the get_weather tool to fetch weather by city name."
}
}
@app.post("/mcp")
async def mcp_endpoint(request: Request):
try:
body = await request.json()
# 打印客户端的请求内容
print("收到请求:", json.dumps(body, ensure_ascii=False, indent=2))
except Exception:
return {"jsonrpc": "2.0", "id": None, "error": {"code": –32700, "message": "Parse error"}}
req_id = body.get("id", 1)
method = body.get("method")
#打印当前方法类型
print(f"方法: {method}")
if method == "notifications/initialized": #对应步骤2,连接建立初始化
return Response(status_code=status.HTTP_204_NO_CONTENT)
if method is None:
return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP server online."}}
if method == "initialize": # 对应步骤1,请求建立连接
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"streaming": True,
"tools": {"listChanged": True}
},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
"instructions": "Use the get_weather tool to fetch weather by city name."
}
}
if method == "tools/list": # 对应步骤3,向服务器请求函数工具清单
print(json.dumps(TOOLS_REGISTRY, indent=2, ensure_ascii=False))
return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}
if method == "tools/call": # 对应步骤4和步骤5,客户端发送工具调用请求
params = body.get("params", {})
tool_name = params.get("name")
args = params.get("arguments", {})
if tool_name != "get_weather":
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": –32602, "message": "Unknown tool"}}
city = args.get("city")
if not city:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": –32602, "message": "Missing city"}}
return StreamingResponse(stream_weather(city, req_id), media_type="application/json")
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": –32601, "message": "Method not found"}}
def main() –> None:
parser = argparse.ArgumentParser(description="Weather MCP HTTP-Stream")
parser.add_argument("–host", default="127.0.0.1")
parser.add_argument("–port", type=int, default=8000)
args = parser.parse_args()
uvicorn.run(app, host=args.host, port=args.port, log_level="info")****
if __name__ == "__main__":
main()
2.3 Streamable Server开启与测试
编写完server.py的代码后,我们可以开启服务并进行测试。注意我们在 基于 MCP Http SSE模式的天气助手智能体开发实战(一文带你了解MCP两种开发模式) 文章中给大家推荐过的MCP Inspector工具目前还并不支持Streamable MCP服务器测试,我们基于HTTP流式传输协议的流程,利用接口调试神器Postman发送响应请求来模拟MCP客户端与流式服务端的通信。


- (1)initialize能力协商请求,对应步骤1。请求和返回结果如下图,我们可以看到成功返回服务器支持的协议版本:
- (2) notifications/initialized 通知,确认客户端成功连接服务器,对应步骤2。请求和返回结果如下图,因为只是通知类型,服务器会返回状态码为204的空包:
- (3) tools/list 请求,获取工具注册表,对应步骤3。请求和返回结果如下图, 我们期望可以获得get_weather工具结构体和json schema的工具注册表:
- (4) tools/call请求,调用get_weather工具获得北京天气,对应步骤4,5。请求和返回结果如下图,我们期望可以逐行输出响应:
可以看到上述的请求均可按照期望状态正常响应,我们Streamable HTTP MCP Server就开发完成了,是不是非常硬核!
三、手搓MCP Client
现在Streamable HTTP MCP Server的基本功能就测试完了,相信大家看到这里根本不会满足:“就这?就通过几个伪造请求简单测试一下就想蒙我说开发出Streamable HTTP MCP Server了?”。
那当然不会,MCP脱离了大模型客户端还能叫大模型嘛?这篇分享我们一路硬核到底,向大家介绍从零编写MCP客户端,并按照标准流程接入我们编写的Streamable HTTP MCP Server
3.1 注册DeepSeek API Key
本次代码编写客户端的大模型依赖是DeepSeek-V3-0324,大家首先要去DeepSeek官网注册API Key
3.2 编写 MCP Client
在streamble-mcp-server项目中新建client.py文件编写MCP客户端代码:
import asyncio
import json
import logging
import os
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional
import httpx
from openai import OpenAI
class Configuration:
def __init__(self) –> None:
self.api_key = "你注册的deepseek api key"
self.base_url = "https://api.deepseek.com"
self.model = "deepseek-chat"
# 添加mcp server 配置文件
@staticmethod
def load_config(path: str) –> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
class HTTPMCPServer:
"""与单个 MCP Streamable HTTP 服务器通信"""
def __init__(self, name: str, endpoint: str) –> None:
self.name = name
self.endpoint = endpoint.rstrip("/") # e.g. http://localhost:8000/mcp
self.session: Optional[httpx.AsyncClient] = None
self.protocol_version: str = "2025-05-16" # 与server.py中定义的协议版本一致
# 发送Post请求的方法
async def _post_json(self, payload: Dict[str, Any]) –> Dict[str, Any]:
assert self.session is not None
r = await self.session.post(self.endpoint, json=payload, headers={"Accept": "application/json"})
if r.status_code == 204 or not r.content:
return {} # ← 通知无响应体
r.raise_for_status()
return r.json()
async def initialize(self) –> None: #客户端发起
self.session = httpx.AsyncClient(timeout=httpx.Timeout(30.0))
# 1) 步骤1发送连接请求
init_req = {
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": self.protocol_version,
"capabilities": {},
"clientInfo": {"name": "Streamable HTTP Client Demo", "version": "0.1"},
},
}
r = await self._post_json(init_req)
if "error" in r:
raise RuntimeError(f"Initialize error: {r['error']}")
# 2) 步骤二,发送请求初始化包,通知服务器已连接
await self._post_json({"jsonrpc": "2.0", "method": "notifications/initialized"})
# 步骤三 请求服务端 tools列表
async def list_tools(self) –> List[Dict[str, Any]]:
req = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}
res = await self._post_json(req)
return res["result"]["tools"]
# 步骤四 发起工具调用并将流式结果拼接为完整文本
async def call_tool_stream(self, tool_name: str, arguments: Dict[str, Any]) –> str:
"""调用工具并将流式结果拼接为完整文本"""
req = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
assert self.session is not None
async with self.session.stream(
"POST", self.endpoint, json=req, headers={"Accept": "application/json"}
) as resp:
if resp.status_code != 200:
raise RuntimeError(f"HTTP {resp.status_code}")
collected_text: List[str] = []
async for line in resp.aiter_lines():
if not line:
continue
chunk = json.loads(line)
if "stream" in chunk:
continue # 中间进度
if "error" in chunk:
raise RuntimeError(chunk["error"]["message"])
if "result" in chunk:
# 根据协议,文本在 result.content[0].text
for item in chunk["result"]["content"]:
if item["type"] == "text":
collected_text.append(item["text"])
return "\\n".join(collected_text)
async def close(self) –> None:
if self.session:
await self.session.aclose()
self.session = None
class LLMClient:
def __init__(self, api_key: str, base_url: Optional[str], model: str) –> None:
self.client = OpenAI(api_key=api_key, base_url=base_url)
self.model = model
def chat(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]]):
return self.client.chat.completions.create(model=self.model, messages=messages, tools=tools)
class MultiHTTPMCPClient:
def __init__(self, servers_conf: Dict[str, Any], api_key: str, base_url: Optional[str], model: str) –> None:
self.servers: Dict[str, HTTPMCPServer] = {
name: HTTPMCPServer(name, cfg["endpoint"]) for name, cfg in servers_conf.items()
}
self.llm = LLMClient(api_key, base_url, model)
self.all_tools: List[Dict[str, Any]] = []
async def start(self):
for srv in self.servers.values():
await srv.initialize()
tools = await srv.list_tools()
for t in tools:
# 重命名以区分不同服务器
full_name = f"{srv.name}_{t['name']}"
self.all_tools.append({
"type": "function",
"function": {
"name": full_name,
"description": t["description"],
"parameters": t["inputSchema"],
},
})
logging.info("已连接服务器并汇总工具:%s", [t["function"]["name"] for t in self.all_tools])
async def call_local_tool(self, full_name: str, args: Dict[str, Any]) –> str:
srv_name, tool_name = full_name.split("_", 1)
srv = self.servers[srv_name]
city = args.get("city")
if not city:
raise ValueError("Missing city/location")
return await srv.call_tool_stream(tool_name, {"city": city})
async def chat_loop(self):
print("🤖 HTTP MCP + Function Calling 客户端已启动,输入 quit 退出")
messages: List[Dict[str, Any]] = []
while True:
user = input("你: ").strip()
if user.lower() == "quit":
break
messages.append({"role": "user", "content": user})
# 1st LLM call
resp = self.llm.chat(messages, self.all_tools)
choice = resp.choices[0]
if choice.finish_reason == "tool_calls":
tc = choice.message.tool_calls[0]
tool_name = tc.function.name
tool_args = json.loads(tc.function.arguments)
print(f"[调用工具] {tool_name} → {tool_args}")
tool_resp = await self.call_local_tool(tool_name, tool_args)
messages.append(choice.message.model_dump())
messages.append({"role": "tool", "content": tool_resp, "tool_call_id": tc.id})
resp2 = self.llm.chat(messages, self.all_tools)
print("AI:", resp2.choices[0].message.content)
messages.append(resp2.choices[0].message.model_dump())
else:
print("AI:", choice.message.content)
messages.append(choice.message.model_dump())
async def close(self):
for s in self.servers.values():
await s.close()
async def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s – %(levelname)s – %(message)s")
conf = Configuration()
servers_conf = conf.load_config("./servers_config.json").get("mcpServers", {})
client = MultiHTTPMCPClient(servers_conf, conf.api_key, conf.base_url, conf.model)
try:
await client.start()
await client.chat_loop()
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
我们编写的Client功能支持多轮对话,能自动识别是否需要调用工具,自动处理工具参数解析与调用逻辑,通过读取配置文件方式支持多个服务器并包含了容错处理,不但可以作为本次测试需求,更可以作为我们日后MCP服务器接入的通用客户端模板!
3.3 Client 接入 Streamable Server
{
"mcpServers": {
"weather": {
"endpoint": "http://127.0.0.1:8000/mcp"
}
}
}
3. 执行uv run client.py命令开启 MCP Client, 并尝试进行问答“你好,好久不见?”,“请问北京今天天气如何”,并观察Streamable HTTP MCP Server运行效果:
显然客户端成功调用服务端的get_weather工具函数,这进一步验证了Streamable HTTP MCP Server编写的正确性。
四、总结
本篇文章详细介绍了2025年人工智能领域的关键技术——MCP(Model Context Protocol)的重大升级Streamable HTTP协议。Streamable HTTP通过绝妙的通信协议设计解决了HTTP SSE方式断线无法恢复、服务器资源消耗大的缺陷,成为MCP企业级应用的通信新标准。
本篇文章后半部分通过硬核的代码实战,编写python代码从0到1手动开发了Streamable MCP Server和Client:Server基于FastAPI框架实现流式天气查询功能,支持协议协商与工具调用;Client则集成大模型,通过异步通信自动解析工具参数并处理多轮对话。测试验证了从连接建立到工具调用的全流程,证明Streamable HTTP的高效与可靠性。
本篇文章笔者倾注心血,力图让大家完全明白Streamable HTTP MCP Server的核心原理、实现流程以及它是如何推动MCP从AI助手迈向企业级应用。当然随着Streamable HTTP Server的发展,相关的SDK开发也在逐步完善,截至2025年5月19日,Anthropic已经开发出简化Streamable HTTP MCP Server编写的SDK, 接下来笔者会也分享借助MCP SDK快速开发Streamable HTTP MCP Server的更多内容,让大家都能快速上手MCP, 开发属于自己的AI Agent智能体!
感兴趣大家可关注我的CSDN账号,更推荐关注我同名微信公众号:大模型真好玩, 免费分享工作生活中遇到的大模型相关知识和教程,带你体系化从0到1学习大模型,在AI时代先人一步~
评论前必须登录!
注册