云计算百科
云计算领域专业知识百科平台

fastmcp MCPConfig多服务器使用案例;sse、stdio、streamable-http使用

1、sse、stdio、streamable-http使用

参考:https://gofastmcp.com/deployment/running-server#the-run-method

stdio本地使用;sse、streamable-http远程调用(
Streamable HTTP—New in version: 2.3.0)

在这里插入图片描述
在这里插入图片描述

调用:
stdio、sse
在这里插入图片描述
streamable-http
在这里插入图片描述

2、 MCPConfig多服务器使用案例

参考:
https://gofastmcp.com/clients/client

代码

config如果只有一个服务,那call_tool函数不用前缀直接工具函数名,如果多个服务,需要添加前缀

from fastmcp import Client

# Standard MCP configuration with multiple servers
config = {
"mcpServers": {
"trip": {"url": "http://localhost:8000/sse"},
"math":{
"command": "python",
"args": ["./api_mcp_server_math.py"],
"env": {"DEBUG": "true"}
}

}
}

async def main():
async with Client(config) as client:

tools = await client.list_tools()
print("Available tools:")
for tool in tools: ###打印结果系统也自动加了前缀
print(f" – {tool.name}: {tool.description}")

hotel_response = await client.call_tool("trip_check_hotel", {"cityname": "广州","date": "后天"})
math_response = await client.call_tool("math_add", {"a": 1, "b": 2})
print("Math response:", math_response)
print("Hotel response:", hotel_response)

if __name__ == "__main__":
import asyncio
asyncio.run(main())

for tool in tools: ###打印结果系统也自动加了前缀
print(f" – {tool.name}: {tool.description}")
在这里插入图片描述

在这里插入图片描述

大模型自动调用工具执行

模型自动调用工具,LLM 根据用户输入自动选择合适的工具;使用百炼qwen3思考模型流式输出

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import json
import asyncio
from typing import List, Dict, Optional

from openai import OpenAI
from fastmcp import Client

class IntegratedMCPClient:
def __init__(self, config: Dict, model="qwen3-235b-a22b"):
"""
初始化集成的 MCP 客户端

Args:
config: MCP 服务器配置字典
model: 使用的模型名称
"""
self.config = config
self.model = model

# 初始化 OpenAI 客户端
self.client = OpenAI(
api_key="sk-424971",
base_url="https://dashscope.ale-mode/v1"
)

# 初始化 MCP 客户端
self.session = Client(config)
self.tools = []
self.server_tool_mapping = {} # 记录工具来自哪个服务器

async def prepare_tools(self):
"""准备所有可用的工具"""
try:
# 获取所有工具
tools = await self.session.list_tools()
print(f"发现 {len(tools)} 个可用工具:")

self.tools = []
self.server_tool_mapping = {}

for tool in tools:
print(f" – {tool.name}: {tool.description}")

# 构建工具描述
tool_def = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema,
}
}
self.tools.append(tool_def)

# 记录工具映射(如果需要区分来源)
# 这里可以通过工具名前缀或其他方式来识别来源服务器
if tool.name.startswith("trip_"):
self.server_tool_mapping[tool.name] = "trip"
elif tool.name.startswith("math_"):
self.server_tool_mapping[tool.name] = "math"
else:
self.server_tool_mapping[tool.name] = "unknown"

except Exception as e:
print(f"准备工具时出错: {e}")
self.tools = []

async def chat(self, messages: List[Dict]) -> Optional[object]:
"""处理聊天对话,支持工具调用"""
if not self.tools:
await self.prepare_tools()

try:
# 使用流式响应,启用思考过程
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools if self.tools else None,
temperature=0,
max_tokens=16000,
stream=True,
logprobs=False,
stream_options={"include_usage": True},
extra_body={
"enable_thinking": True # 启用思考过程
}
)

# 收集流式响应
collected_content = ""
collected_tool_calls = []
finish_reason = None

print("AI: ", end="", flush=True)

for chunk in response:
# 安全检查:防止空的 choices
if not chunk.choices:
continue

delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason

# 打印思考过程(Qwen 特有字段)
reasoning = getattr(delta, "reasoning_content", None)
if reasoning:
print(f"{reasoning}", end="", flush=True)

# 打印常规内容
if delta.content:
print(delta.content, end="", flush=True)
collected_content += delta.content

# 收集工具调用
if hasattr(delta, 'tool_calls') and delta.tool_calls:
if not collected_tool_calls:
collected_tool_calls = [
{"id": "", "function": {"name": "", "arguments": ""}}
for _ in delta.tool_calls
]

for i, tool_call_delta in enumerate(delta.tool_calls):
if tool_call_delta.id:
collected_tool_calls[i]["id"] = tool_call_delta.id
if tool_call_delta.function:
if tool_call_delta.function.name:
collected_tool_calls[i]["function"]["name"] = tool_call_delta.function.name
if tool_call_delta.function.arguments:
collected_tool_calls[i]["function"]["arguments"] += tool_call_delta.function.arguments

print() # 换行

# 如果没有工具调用,返回收集的内容
if finish_reason != 'tool_calls' or not collected_tool_calls:
class SimpleMessage:
def __init__(self, content):
self.content = content
return SimpleMessage(collected_content)

# 处理工具调用
print("\\n[正在调用工具…]")
assistant_message = {
'role': 'assistant',
'content': collected_content,
'tool_calls': [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
}
for tc in collected_tool_calls
]
}

print(f"[助手消息]: 调用了 {len(collected_tool_calls)} 个工具")
messages.append(assistant_message)

# 执行每个工具调用
for tool_call in collected_tool_calls:
try:
tool_name = tool_call['function']['name']
tool_args = json.loads(tool_call['function']['arguments'])
server_name = self.server_tool_mapping.get(tool_name, "unknown")

print(f"\\n[执行工具]: {tool_name} (来自服务器: {server_name})")
print(f"[工具参数]: {tool_args}")

# 调用工具
result = await self.session.call_tool(tool_name, tool_args)

# 处理结果
result_content = ""
if result:
if isinstance(result, list) and len(result) > 0:
if hasattr(result[0], 'text'):
result_content = result[0].text
else:
result_content = str(result[0])
else:
result_content = str(result)
else:
result_content = "No result"

# 添加工具结果到消息中
messages.append({
'role': 'tool',
'tool_call_id': tool_call['id'],
'content': result_content
})

print(f"[工具结果]: {result_content}")

except Exception as e:
print(f"[工具调用错误]: {e}")
messages.append({
'role': 'tool',
'tool_call_id': tool_call['id'],
'content': f"Error executing tool: {str(e)}"
})

# 使用工具结果获取最终响应
print("\\n[生成最终回答…]")
return await self.chat(messages)

except Exception as e:
print(f"聊天过程中出错: {e}")
class ErrorMessage:
def __init__(self, content):
self.content = content
return ErrorMessage(f"抱歉,处理您的请求时出现错误: {str(e)}")

async def loop(self):
"""交互式聊天循环"""
print("=== 集成 MCP 客户端启动 ===")
print("支持的命令:")
print(" – quit/exit/bye: 退出程序")
print(" – tools: 显示可用工具")
print(" – clear: 清除对话历史")
print("=====================================\\n")

conversation_history = []

while True:
try:
async with self.session:
# 如果是第一次进入,准备工具
if not self.tools:
await self.prepare_tools()

question = input("\\nUser: ").strip()

# 处理特殊命令
if question.lower() in ['quit', 'exit', 'bye']:
print("再见!")
break
elif question.lower() == 'tools':
print(f"\\n可用工具 ({len(self.tools)} 个):")
for tool in self.tools:
func = tool['function']
server = self.server_tool_mapping.get(func['name'], 'unknown')
print(f" – {func['name']} (服务器: {server})")
print(f" 描述: {func['description']}")
continue
elif question.lower() == 'clear':
conversation_history = []
print("对话历史已清除")
continue
elif not question:
continue

# 添加用户消息到历史
user_message = {"role": "user", "content": question}
current_messages = conversation_history + [user_message]

# 获取响应
response = await self.chat(current_messages.copy())

if response and hasattr(response, 'content'):
print(f"\\n回答: {response.content}")

# 更新对话历史
conversation_history.append(user_message)
conversation_history.append({"role": "assistant", "content": response.content})

# 限制历史长度,避免上下文过长
if len(conversation_history) > 20: # 保留最近10轮对话
conversation_history = conversation_history[-20:]

except KeyboardInterrupt:
print("\\n\\n程序被用户中断")
break
except Exception as e:
print(f"循环过程中出错: {e}")
continue

async def single_query(self, query: str) -> str:
"""单次查询接口,适用于非交互式使用"""
try:
async with self.session:
if not self.tools:
await self.prepare_tools()

messages = [{"role": "user", "content": query}]
response = await self.chat(messages)

return response.content if response and hasattr(response, 'content') else "无响应"

except Exception as e:
return f"查询失败: {str(e)}"

async def main():
"""主函数"""
# MCP 配置
config = {
"mcpServers": {
"trip": {"url": "http://localhost:8000/sse"},
"math": {
"command": "python",
"args": ["./api_mcp_server_math.py"],
"env": {"DEBUG": "true"}
}
}
}

print("正在初始化集成 MCP 客户端…")
mcp_client = IntegratedMCPClient(config)

# 启动交互式循环
await mcp_client.loop()

async def test_single_queries():
"""测试单次查询的示例"""
config = {
"mcpServers": {
"trip": {"url": "http://localhost:8000/sse"},
"math": {
"command": "python",
"args": ["./api_mcp_server_math.py"],
"env": {"DEBUG": "true"}
}
}
}

mcp_client = IntegratedMCPClient(config)

# 测试查询
test_queries = [
"帮我查询广州后天的酒店信息",
"计算 15 加 27 等于多少",
"我想知道北京明天有什么好的酒店推荐",
"计算 100 乘以 50",
"你好,请介绍一下你的功能"
]

for query in test_queries:
print(f"\\n{'='*50}")
print(f"测试查询: {query}")
print(f"{'='*50}")

result = await mcp_client.single_query(query)
print(f"结果: {result}")

if __name__ == '__main__':
# 运行交互式模式
asyncio.run(main())

# 或者运行测试模式
# asyncio.run(test_single_queries())

在这里插入图片描述

赞(0)
未经允许不得转载:网硕互联帮助中心 » fastmcp MCPConfig多服务器使用案例;sse、stdio、streamable-http使用
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!