引言
MCP协议的核心价值在于标准化AI模型与外部工具的连接方式。作为开发者,我们需要掌握如何在客户端正确集成和使用MCP Server。
本文将涵盖:
- MCP Client SDK 使用详解
- 与 LangChain、LlamaIndex 等框架集成
- 连接管理与错误处理
- 实战项目:构建智能文档助手
MCP Client 架构
架构概览
┌─────────────────────────────────────────────────────────────┐
│ MCP Client 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ AI Model │◄──►│ Client │◄──►│ Servers │ │
│ │ (大模型) │ │ (客户端) │ │ (服务端) │ │
│ └──────────────┘ └──────┬───────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ Transport│ │
│ │ (传输层) │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
核心职责
| 职责 | 说明 |
|---|---|
| 连接管理 | 建立和维护与Server的连接 |
| 能力发现 | 获取Server提供的工具和资源 |
| 请求路由 | 将AI模型的调用请求路由到正确的Server |
| 结果处理 | 处理Server返回的结果并格式化 |
| 错误恢复 | 处理连接中断和调用失败 |
Python SDK 实战
1. 环境准备
# 创建虚拟环境
python -m venv mcp-client-env
source mcp-client-env/bin/activate # Linux/Mac
# 或 mcp-client-env\Scripts\activate # Windows
# 安装依赖
pip install mcp anthropic langchain langchain-anthropic
2. 基础 Client 实现
# client.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
class MCPClient:
"""MCP 客户端封装类"""
def __init__(self):
self.session: ClientSession | None = None
self.exit_stack = AsyncExitStack()
self.tools: list = []
async def connect_to_server(self, command: str, args: list[str]):
"""连接到 MCP Server"""
server_params = StdioServerParameters(
command=command,
args=args,
env=None
)
# 建立 stdio 连接
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
# 创建会话
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
# 初始化连接
await self.session.initialize()
# 获取可用工具
response = await self.session.list_tools()
self.tools = response.tools
print(f"已连接到服务器,可用工具: {[tool.name for tool in self.tools]}")
async def call_tool(self, tool_name: str, arguments: dict):
"""调用工具"""
if not self.session:
raise RuntimeError("未连接到服务器")
result = await self.session.call_tool(tool_name, arguments)
return result
async def read_resource(self, uri: str):
"""读取资源"""
if not self.session:
raise RuntimeError("未连接到服务器")
result = await self.session.read_resource(uri)
return result
async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose()
# 使用示例
async def main():
client = MCPClient()
try:
# 连接到文件系统服务器
await client.connect_to_server(
command="python",
args=["filesystem_server.py"]
)
# 调用工具
result = await client.call_tool(
"read_file",
{"path": "/tmp/test.txt"}
)
print(f"结果: {result}")
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
3. 多 Server 管理
# multi_server_client.py
import asyncio
from typing import Dict, List
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
class MultiServerMCPClient:
"""多服务器 MCP 客户端"""
def __init__(self):
self.sessions: Dict[str, ClientSession] = {}
self.server_tools: Dict[str, List] = {}
self.exit_stack = AsyncExitStack()
async def add_server(self, name: str, command: str, args: list[str]):
"""添加并连接到新的 MCP Server"""
server_params = StdioServerParameters(
command=command,
args=args,
env=None
)
# 建立连接
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
stdio, write = stdio_transport
# 创建会话
session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write)
)
# 初始化
await session.initialize()
# 获取工具列表
response = await session.list_tools()
self.sessions[name] = session
self.server_tools[name] = response.tools
print(f"服务器 '{name}' 已连接,工具: {[t.name for t in response.tools]}")
def get_all_tools(self) -> List[dict]:
"""获取所有服务器的工具列表"""
all_tools = []
for server_name, tools in self.server_tools.items():
for tool in tools:
all_tools.append({
"server": server_name,
"name": tool.name,
"description": tool.description,
"schema": tool.inputSchema
})
return all_tools
async def call_tool(self, server_name: str, tool_name: str, arguments: dict):
"""调用指定服务器的工具"""
if server_name not in self.sessions:
raise ValueError(f"未知服务器: {server_name}")
session = self.sessions[server_name]
result = await session.call_tool(tool_name, arguments)
return result
async def find_and_call_tool(self, tool_name: str, arguments: dict):
"""自动查找并调用工具"""
for server_name, tools in self.server_tools.items():
if any(t.name == tool_name for t in tools):
return await self.call_tool(server_name, tool_name, arguments)
raise ValueError(f"未找到工具: {tool_name}")
async def cleanup(self):
"""清理所有连接"""
await self.exit_stack.aclose()
self.sessions.clear()
self.server_tools.clear()
# 使用示例
async def main():
client = MultiServerMCPClient()
try:
# 连接多个服务器
await client.add_server(
"filesystem",
"python",
["servers/filesystem.py"]
)
await client.add_server(
"github",
"node",
["servers/github/dist/index.js"]
)
await client.add_server(
"database",
"python",
["servers/postgres.py"]
)
# 查看所有可用工具
tools = client.get_all_tools()
print(f"\n总共 {len(tools)} 个工具可用")
# 调用文件系统工具
result = await client.call_tool(
"filesystem",
"read_file",
{"path": "/tmp/test.txt"}
)
print(f"文件内容: {result}")
# 自动查找并调用
result = await client.find_and_call_tool(
"search_issues",
{"query": "bug", "repo": "owner/repo"}
)
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
TypeScript SDK 实战
1. 基础 Client
// client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
interface ServerConfig {
name: string;
transport: "stdio" | "http";
command?: string;
args?: string[];
url?: string;
}
export class MCPClientManager {
private clients: Map<string, Client> = new Map();
private tools: Map<string, any[]> = new Map();
async connectServer(config: ServerConfig): Promise<void> {
let transport;
if (config.transport === "stdio") {
transport = new StdioClientTransport({
command: config.command!,
args: config.args || [],
});
} else {
transport = new StreamableHTTPClientTransport(new URL(config.url!));
}
const client = new Client(
{ name: "mcp-client", version: "1.0.0" },
{ capabilities: { tools: {}, resources: {} } }
);
await client.connect(transport);
// 获取工具列表
const toolsResponse = await client.listTools();
this.clients.set(config.name, client);
this.tools.set(config.name, toolsResponse.tools);
console.log(`服务器 '${config.name}' 已连接`);
}
async callTool(serverName: string, toolName: string, args: any) {
const client = this.clients.get(serverName);
if (!client) {
throw new Error(`服务器未连接: ${serverName}`);
}
return await client.callTool({
name: toolName,
arguments: args,
});
}
getAllTools() {
const result: any[] = [];
for (const [serverName, tools] of this.tools) {
for (const tool of tools) {
result.push({
server: serverName,
...tool,
});
}
}
return result;
}
async disconnectAll(): Promise<void> {
for (const [name, client] of this.clients) {
await client.close();
console.log(`服务器 '${name}' 已断开`);
}
this.clients.clear();
this.tools.clear();
}
}
// 使用示例
async function main() {
const manager = new MCPClientManager();
try {
// 连接多个服务器
await manager.connectServer({
name: "filesystem",
transport: "stdio",
command: "node",
args: ["dist/filesystem.js"],
});
await manager.connectServer({
name: "github",
transport: "http",
url: "http://localhost:3000/mcp",
});
// 调用工具
const result = await manager.callTool("filesystem", "read_file", {
path: "/tmp/test.txt",
});
console.log("结果:", result);
} finally {
await manager.disconnectAll();
}
}
main().catch(console.error);
与 LangChain 集成
1. 工具转换
# langchain_integration.py
from langchain_core.tools import BaseTool, Tool
from mcp import ClientSession
import json
async def convert_mcp_tools(session: ClientSession) -> list[BaseTool]:
"""将 MCP 工具转换为 LangChain 工具"""
response = await session.list_tools()
tools = []
for mcp_tool in response.tools:
# 创建工具包装器
async def tool_func(**kwargs):
result = await session.call_tool(mcp_tool.name, kwargs)
# 提取文本内容
content_parts = []
for item in result.content:
if item.type == "text":
content_parts.append(item.text)
return "\n".join(content_parts)
# 构建参数 schema
schema = mcp_tool.inputSchema
tool = Tool(
name=mcp_tool.name,
description=mcp_tool.description,
func=lambda **kwargs: asyncio.run(tool_func(**kwargs)),
args_schema=schema,
)
tools.append(tool)
return tools
2. 完整 Agent 示例
# mcp_agent.py
import asyncio
from langchain_anthropic import ChatAnthropic
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
class MCPAgent:
"""基于 MCP 的 LangChain Agent"""
def __init__(self, api_key: str):
self.llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
anthropic_api_key=api_key
)
self.exit_stack = AsyncExitStack()
self.sessions: dict[str, ClientSession] = {}
self.tools: list = []
async def add_mcp_server(self, name: str, command: str, args: list):
"""添加 MCP Server"""
server_params = StdioServerParameters(
command=command,
args=args,
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
stdio, write = stdio_transport
session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write)
)
await session.initialize()
self.sessions[name] = session
# 转换工具
from langchain_mcp_adapters.tools import load_mcp_tools
mcp_tools = await load_mcp_tools(session)
self.tools.extend(mcp_tools)
print(f"已加载 {len(mcp_tools)} 个工具来自 '{name}'")
async def run(self, query: str) -> str:
"""运行 Agent"""
# 创建 ReAct Agent
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(self.llm, self.tools)
# 执行查询
response = await agent.ainvoke({
"messages": [HumanMessage(content=query)]
})
# 提取最终回复
final_message = response["messages"][-1]
return final_message.content
async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose()
# 使用示例
async def main():
agent = MCPAgent(api_key="your-api-key")
try:
# 添加文件系统服务器
await agent.add_mcp_server(
"filesystem",
"python",
["servers/filesystem.py"]
)
# 添加数据库服务器
await agent.add_mcp_server(
"database",
"python",
["servers/postgres.py"]
)
# 运行查询
queries = [
"读取 /tmp/data.txt 文件内容并分析",
"查询数据库中用户表的总记录数",
"帮我整理 /tmp/docs 目录下的所有 markdown 文件"
]
for query in queries:
print(f"\n用户: {query}")
result = await agent.run(query)
print(f"Agent: {result}")
finally:
await agent.cleanup()
if __name__ == "__main__":
asyncio.run(main())
与 LlamaIndex 集成
# llamaindex_integration.py
from llama_index.core.tools import FunctionTool
from llama_index.core.agent import ReActAgent
from llama_index.llms.anthropic import Anthropic
from mcp import ClientSession
import asyncio
async def create_llamaindex_tools(session: ClientSession) -> list[FunctionTool]:
"""将 MCP 工具转换为 LlamaIndex 工具"""
response = await session.list_tools()
tools = []
for mcp_tool in response.tools:
async def async_wrapper(**kwargs):
result = await session.call_tool(mcp_tool.name, kwargs)
return result
# 创建同步包装
def sync_wrapper(**kwargs):
return asyncio.run(async_wrapper(**kwargs))
tool = FunctionTool.from_defaults(
fn=sync_wrapper,
name=mcp_tool.name,
description=mcp_tool.description,
)
tools.append(tool)
return tools
# 创建 LlamaIndex Agent
async def create_mcp_agent(session: ClientSession):
tools = await create_llamaindex_tools(session)
llm = Anthropic(model="claude-3-sonnet-20240229")
agent = ReActAgent.from_tools(
tools=tools,
llm=llm,
verbose=True
)
return agent
错误处理与重试机制
1. 错误分类处理
# error_handling.py
from mcp import McpError, ErrorCode
import asyncio
from typing import Callable, Any
import logging
logger = logging.getLogger(__name__)
class MCPErrorHandler:
"""MCP 错误处理器"""
@staticmethod
def is_retryable_error(error: Exception) -> bool:
"""判断错误是否可重试"""
if isinstance(error, McpError):
return error.code in [
ErrorCode.InternalError,
ErrorCode.ConnectionClosed,
]
return isinstance(error, (ConnectionError, TimeoutError))
@staticmethod
async def with_retry(
func: Callable,
max_retries: int = 3,
delay: float = 1.0,
*args,
**kwargs
) -> Any:
"""带重试的函数执行"""
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"尝试 {attempt + 1}/{max_retries} 失败: {e}")
if not MCPErrorHandler.is_retryable_error(e):
raise
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt) # 指数退避
logger.info(f"等待 {wait_time}s 后重试...")
await asyncio.sleep(wait_time)
raise last_error
# 使用示例
async def safe_call_tool(session, tool_name: str, arguments: dict):
"""安全地调用工具(带重试)"""
return await MCPErrorHandler.with_retry(
session.call_tool,
max_retries=3,
delay=1.0,
tool_name,
arguments
)
2. 连接健康检查
# health_check.py
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Optional
class ConnectionHealthMonitor:
"""连接健康监控器"""
def __init__(self, check_interval: int = 30):
self.check_interval = check_interval
self.last_ping: Dict[str, datetime] = {}
self.is_healthy: Dict[str, bool] = {}
self._stop_event = asyncio.Event()
async def start_monitoring(self, sessions: Dict[str, Any]):
"""开始监控连接健康"""
while not self._stop_event.is_set():
for name, session in sessions.items():
try:
# 发送 ping 请求
await session.send_ping()
self.last_ping[name] = datetime.now()
self.is_healthy[name] = True
except Exception as e:
self.is_healthy[name] = False
logger.error(f"服务器 '{name}' 健康检查失败: {e}")
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self.check_interval
)
def stop(self):
"""停止监控"""
self._stop_event.set()
def get_status(self) -> Dict[str, dict]:
"""获取连接状态"""
status = {}
for name in self.is_healthy:
last_ping = self.last_ping.get(name)
status[name] = {
"healthy": self.is_healthy.get(name, False),
"last_ping": last_ping.isoformat() if last_ping else None,
}
return status
实战项目:智能文档助手
项目架构
doc-assistant/
├── src/
│ ├── __init__.py
│ ├── client.py # MCP 客户端
│ ├── agent.py # AI Agent
│ ├── servers/
│ │ ├── filesystem.py # 文件系统服务器
│ │ └── search.py # 搜索服务器
│ └── ui/
│ └── cli.py # 命令行界面
├── config/
│ └── servers.json # 服务器配置
├── tests/
└── main.py
完整实现
# main.py
import asyncio
import json
import os
from pathlib import Path
from src.agent import MCPAgent
async def main():
# 加载配置
config_path = Path("config/servers.json")
with open(config_path) as f:
config = json.load(f)
# 创建 Agent
agent = MCPAgent(api_key=os.getenv("ANTHROPIC_API_KEY"))
try:
# 连接配置的服务器
for server_config in config["servers"]:
await agent.add_mcp_server(
name=server_config["name"],
command=server_config["command"],
args=server_config["args"]
)
print("\n🤖 智能文档助手已启动")
print("支持的命令:")
print(" - 读取文件: '读取 docs/readme.md'")
print(" - 搜索内容: '搜索包含 API 的文档'")
print(" - 分析文档: '分析 src/main.py 的代码结构'")
print(" - 退出: 'quit' 或 'exit'")
print()
# 交互式循环
while True:
try:
query = input("\n💬 请输入问题: ").strip()
if query.lower() in ["quit", "exit", "q"]:
break
if not query:
continue
print("🤔 思考中...")
result = await agent.run(query)
print(f"\n✅ {result}")
except KeyboardInterrupt:
break
except Exception as e:
print(f"\n❌ 错误: {e}")
finally:
await agent.cleanup()
print("\n👋 再见!")
if __name__ == "__main__":
asyncio.run(main())
性能优化建议
| 优化策略 | 实现方式 | 效果 |
|---|---|---|
| 连接池 | 复用已建立的连接 | 减少连接开销 |
| 并行调用 | 使用 asyncio.gather | 提高吞吐量 |
| 结果缓存 | 缓存频繁访问的资源 | 减少重复调用 |
| 超时控制 | 设置合理的超时时间 | 防止长时间阻塞 |
| 工具筛选 | 只加载需要的工具 | 减少上下文长度 |
总结
本文全面讲解了 MCP 客户端的开发和集成:
✅ SDK 使用 - Python/TypeScript SDK 详解
✅ 框架集成 - LangChain、LlamaIndex 集成
✅ 错误处理 - 重试机制和连接管理
✅ 实战项目 - 智能文档助手完整实现
下一步建议:
- 学习 MCP服务端开发实战
- 了解 MCP安全最佳实践
- 探索更多 MCP生态工具
本文最后更新于 2024-02-15,如有问题欢迎在社区讨论。