MCP协议 进阶 MCP 客户端 Python TypeScript

MCP客户端集成指南:连接AI与工具的桥梁

AIEng Hub
阅读约 30 分钟

引言

MCP协议的核心价值在于标准化AI模型与外部工具的连接方式。作为开发者,我们需要掌握如何在客户端正确集成和使用MCP Server。

本文将涵盖:

  • MCP Client SDK 使用详解
  • 与 LangChain、LlamaIndex 等框架集成
  • 连接管理与错误处理
  • 实战项目:构建智能文档助手

MCP Client 架构

架构概览

┌─────────────────────────────────────────────────────────────┐
│                    MCP Client 架构                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   ┌──────────────┐    ┌──────────────┐    ┌──────────────┐ │
│   │   AI Model   │◄──►│    Client    │◄──►│   Servers    │ │
│   │   (大模型)    │    │   (客户端)    │    │   (服务端)    │ │
│   └──────────────┘    └──────┬───────┘    └──────────────┘ │
│                              │                              │
│                              ▼                              │
│                        ┌──────────┐                        │
│                        │ Transport│                        │
│                        │ (传输层)  │                        │
│                        └──────────┘                        │
│                                                              │
└─────────────────────────────────────────────────────────────┘

核心职责

职责说明
连接管理建立和维护与Server的连接
能力发现获取Server提供的工具和资源
请求路由将AI模型的调用请求路由到正确的Server
结果处理处理Server返回的结果并格式化
错误恢复处理连接中断和调用失败

Python SDK 实战

1. 环境准备

# 创建虚拟环境
python -m venv mcp-client-env
source mcp-client-env/bin/activate  # Linux/Mac
# 或 mcp-client-env\Scripts\activate  # Windows

# 安装依赖
pip install mcp anthropic langchain langchain-anthropic

2. 基础 Client 实现

# client.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack


class MCPClient:
    """MCP 客户端封装类"""

    def __init__(self):
        self.session: ClientSession | None = None
        self.exit_stack = AsyncExitStack()
        self.tools: list = []

    async def connect_to_server(self, command: str, args: list[str]):
        """连接到 MCP Server"""
        server_params = StdioServerParameters(
            command=command,
            args=args,
            env=None
        )

        # 建立 stdio 连接
        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.stdio, self.write = stdio_transport

        # 创建会话
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )

        # 初始化连接
        await self.session.initialize()

        # 获取可用工具
        response = await self.session.list_tools()
        self.tools = response.tools

        print(f"已连接到服务器,可用工具: {[tool.name for tool in self.tools]}")

    async def call_tool(self, tool_name: str, arguments: dict):
        """调用工具"""
        if not self.session:
            raise RuntimeError("未连接到服务器")

        result = await self.session.call_tool(tool_name, arguments)
        return result

    async def read_resource(self, uri: str):
        """读取资源"""
        if not self.session:
            raise RuntimeError("未连接到服务器")

        result = await self.session.read_resource(uri)
        return result

    async def cleanup(self):
        """清理资源"""
        await self.exit_stack.aclose()


# 使用示例
async def main():
    client = MCPClient()

    try:
        # 连接到文件系统服务器
        await client.connect_to_server(
            command="python",
            args=["filesystem_server.py"]
        )

        # 调用工具
        result = await client.call_tool(
            "read_file",
            {"path": "/tmp/test.txt"}
        )
        print(f"结果: {result}")

    finally:
        await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

3. 多 Server 管理

# multi_server_client.py
import asyncio
from typing import Dict, List
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack


class MultiServerMCPClient:
    """多服务器 MCP 客户端"""

    def __init__(self):
        self.sessions: Dict[str, ClientSession] = {}
        self.server_tools: Dict[str, List] = {}
        self.exit_stack = AsyncExitStack()

    async def add_server(self, name: str, command: str, args: list[str]):
        """添加并连接到新的 MCP Server"""
        server_params = StdioServerParameters(
            command=command,
            args=args,
            env=None
        )

        # 建立连接
        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        stdio, write = stdio_transport

        # 创建会话
        session = await self.exit_stack.enter_async_context(
            ClientSession(stdio, write)
        )

        # 初始化
        await session.initialize()

        # 获取工具列表
        response = await session.list_tools()
        self.sessions[name] = session
        self.server_tools[name] = response.tools

        print(f"服务器 '{name}' 已连接,工具: {[t.name for t in response.tools]}")

    def get_all_tools(self) -> List[dict]:
        """获取所有服务器的工具列表"""
        all_tools = []
        for server_name, tools in self.server_tools.items():
            for tool in tools:
                all_tools.append({
                    "server": server_name,
                    "name": tool.name,
                    "description": tool.description,
                    "schema": tool.inputSchema
                })
        return all_tools

    async def call_tool(self, server_name: str, tool_name: str, arguments: dict):
        """调用指定服务器的工具"""
        if server_name not in self.sessions:
            raise ValueError(f"未知服务器: {server_name}")

        session = self.sessions[server_name]
        result = await session.call_tool(tool_name, arguments)
        return result

    async def find_and_call_tool(self, tool_name: str, arguments: dict):
        """自动查找并调用工具"""
        for server_name, tools in self.server_tools.items():
            if any(t.name == tool_name for t in tools):
                return await self.call_tool(server_name, tool_name, arguments)

        raise ValueError(f"未找到工具: {tool_name}")

    async def cleanup(self):
        """清理所有连接"""
        await self.exit_stack.aclose()
        self.sessions.clear()
        self.server_tools.clear()


# 使用示例
async def main():
    client = MultiServerMCPClient()

    try:
        # 连接多个服务器
        await client.add_server(
            "filesystem",
            "python",
            ["servers/filesystem.py"]
        )
        await client.add_server(
            "github",
            "node",
            ["servers/github/dist/index.js"]
        )
        await client.add_server(
            "database",
            "python",
            ["servers/postgres.py"]
        )

        # 查看所有可用工具
        tools = client.get_all_tools()
        print(f"\n总共 {len(tools)} 个工具可用")

        # 调用文件系统工具
        result = await client.call_tool(
            "filesystem",
            "read_file",
            {"path": "/tmp/test.txt"}
        )
        print(f"文件内容: {result}")

        # 自动查找并调用
        result = await client.find_and_call_tool(
            "search_issues",
            {"query": "bug", "repo": "owner/repo"}
        )

    finally:
        await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

TypeScript SDK 实战

1. 基础 Client

// client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";

interface ServerConfig {
  name: string;
  transport: "stdio" | "http";
  command?: string;
  args?: string[];
  url?: string;
}

export class MCPClientManager {
  private clients: Map<string, Client> = new Map();
  private tools: Map<string, any[]> = new Map();

  async connectServer(config: ServerConfig): Promise<void> {
    let transport;

    if (config.transport === "stdio") {
      transport = new StdioClientTransport({
        command: config.command!,
        args: config.args || [],
      });
    } else {
      transport = new StreamableHTTPClientTransport(new URL(config.url!));
    }

    const client = new Client(
      { name: "mcp-client", version: "1.0.0" },
      { capabilities: { tools: {}, resources: {} } }
    );

    await client.connect(transport);

    // 获取工具列表
    const toolsResponse = await client.listTools();
    this.clients.set(config.name, client);
    this.tools.set(config.name, toolsResponse.tools);

    console.log(`服务器 '${config.name}' 已连接`);
  }

  async callTool(serverName: string, toolName: string, args: any) {
    const client = this.clients.get(serverName);
    if (!client) {
      throw new Error(`服务器未连接: ${serverName}`);
    }

    return await client.callTool({
      name: toolName,
      arguments: args,
    });
  }

  getAllTools() {
    const result: any[] = [];
    for (const [serverName, tools] of this.tools) {
      for (const tool of tools) {
        result.push({
          server: serverName,
          ...tool,
        });
      }
    }
    return result;
  }

  async disconnectAll(): Promise<void> {
    for (const [name, client] of this.clients) {
      await client.close();
      console.log(`服务器 '${name}' 已断开`);
    }
    this.clients.clear();
    this.tools.clear();
  }
}

// 使用示例
async function main() {
  const manager = new MCPClientManager();

  try {
    // 连接多个服务器
    await manager.connectServer({
      name: "filesystem",
      transport: "stdio",
      command: "node",
      args: ["dist/filesystem.js"],
    });

    await manager.connectServer({
      name: "github",
      transport: "http",
      url: "http://localhost:3000/mcp",
    });

    // 调用工具
    const result = await manager.callTool("filesystem", "read_file", {
      path: "/tmp/test.txt",
    });
    console.log("结果:", result);

  } finally {
    await manager.disconnectAll();
  }
}

main().catch(console.error);

与 LangChain 集成

1. 工具转换

# langchain_integration.py
from langchain_core.tools import BaseTool, Tool
from mcp import ClientSession
import json


async def convert_mcp_tools(session: ClientSession) -> list[BaseTool]:
    """将 MCP 工具转换为 LangChain 工具"""
    response = await session.list_tools()
    tools = []

    for mcp_tool in response.tools:
        # 创建工具包装器
        async def tool_func(**kwargs):
            result = await session.call_tool(mcp_tool.name, kwargs)
            # 提取文本内容
            content_parts = []
            for item in result.content:
                if item.type == "text":
                    content_parts.append(item.text)
            return "\n".join(content_parts)

        # 构建参数 schema
        schema = mcp_tool.inputSchema

        tool = Tool(
            name=mcp_tool.name,
            description=mcp_tool.description,
            func=lambda **kwargs: asyncio.run(tool_func(**kwargs)),
            args_schema=schema,
        )
        tools.append(tool)

    return tools

2. 完整 Agent 示例

# mcp_agent.py
import asyncio
from langchain_anthropic import ChatAnthropic
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack


class MCPAgent:
    """基于 MCP 的 LangChain Agent"""

    def __init__(self, api_key: str):
        self.llm = ChatAnthropic(
            model="claude-3-5-sonnet-20241022",
            anthropic_api_key=api_key
        )
        self.exit_stack = AsyncExitStack()
        self.sessions: dict[str, ClientSession] = {}
        self.tools: list = []

    async def add_mcp_server(self, name: str, command: str, args: list):
        """添加 MCP Server"""
        server_params = StdioServerParameters(
            command=command,
            args=args,
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        stdio, write = stdio_transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(stdio, write)
        )
        await session.initialize()

        self.sessions[name] = session

        # 转换工具
        from langchain_mcp_adapters.tools import load_mcp_tools
        mcp_tools = await load_mcp_tools(session)
        self.tools.extend(mcp_tools)

        print(f"已加载 {len(mcp_tools)} 个工具来自 '{name}'")

    async def run(self, query: str) -> str:
        """运行 Agent"""
        # 创建 ReAct Agent
        from langgraph.prebuilt import create_react_agent

        agent = create_react_agent(self.llm, self.tools)

        # 执行查询
        response = await agent.ainvoke({
            "messages": [HumanMessage(content=query)]
        })

        # 提取最终回复
        final_message = response["messages"][-1]
        return final_message.content

    async def cleanup(self):
        """清理资源"""
        await self.exit_stack.aclose()


# 使用示例
async def main():
    agent = MCPAgent(api_key="your-api-key")

    try:
        # 添加文件系统服务器
        await agent.add_mcp_server(
            "filesystem",
            "python",
            ["servers/filesystem.py"]
        )

        # 添加数据库服务器
        await agent.add_mcp_server(
            "database",
            "python",
            ["servers/postgres.py"]
        )

        # 运行查询
        queries = [
            "读取 /tmp/data.txt 文件内容并分析",
            "查询数据库中用户表的总记录数",
            "帮我整理 /tmp/docs 目录下的所有 markdown 文件"
        ]

        for query in queries:
            print(f"\n用户: {query}")
            result = await agent.run(query)
            print(f"Agent: {result}")

    finally:
        await agent.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

与 LlamaIndex 集成

# llamaindex_integration.py
from llama_index.core.tools import FunctionTool
from llama_index.core.agent import ReActAgent
from llama_index.llms.anthropic import Anthropic
from mcp import ClientSession
import asyncio


async def create_llamaindex_tools(session: ClientSession) -> list[FunctionTool]:
    """将 MCP 工具转换为 LlamaIndex 工具"""
    response = await session.list_tools()
    tools = []

    for mcp_tool in response.tools:
        async def async_wrapper(**kwargs):
            result = await session.call_tool(mcp_tool.name, kwargs)
            return result

        # 创建同步包装
        def sync_wrapper(**kwargs):
            return asyncio.run(async_wrapper(**kwargs))

        tool = FunctionTool.from_defaults(
            fn=sync_wrapper,
            name=mcp_tool.name,
            description=mcp_tool.description,
        )
        tools.append(tool)

    return tools


# 创建 LlamaIndex Agent
async def create_mcp_agent(session: ClientSession):
    tools = await create_llamaindex_tools(session)
    llm = Anthropic(model="claude-3-sonnet-20240229")

    agent = ReActAgent.from_tools(
        tools=tools,
        llm=llm,
        verbose=True
    )

    return agent

错误处理与重试机制

1. 错误分类处理

# error_handling.py
from mcp import McpError, ErrorCode
import asyncio
from typing import Callable, Any
import logging

logger = logging.getLogger(__name__)


class MCPErrorHandler:
    """MCP 错误处理器"""

    @staticmethod
    def is_retryable_error(error: Exception) -> bool:
        """判断错误是否可重试"""
        if isinstance(error, McpError):
            return error.code in [
                ErrorCode.InternalError,
                ErrorCode.ConnectionClosed,
            ]
        return isinstance(error, (ConnectionError, TimeoutError))

    @staticmethod
    async def with_retry(
        func: Callable,
        max_retries: int = 3,
        delay: float = 1.0,
        *args,
        **kwargs
    ) -> Any:
        """带重试的函数执行"""
        last_error = None

        for attempt in range(max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                last_error = e
                logger.warning(f"尝试 {attempt + 1}/{max_retries} 失败: {e}")

                if not MCPErrorHandler.is_retryable_error(e):
                    raise

                if attempt < max_retries - 1:
                    wait_time = delay * (2 ** attempt)  # 指数退避
                    logger.info(f"等待 {wait_time}s 后重试...")
                    await asyncio.sleep(wait_time)

        raise last_error


# 使用示例
async def safe_call_tool(session, tool_name: str, arguments: dict):
    """安全地调用工具(带重试)"""
    return await MCPErrorHandler.with_retry(
        session.call_tool,
        max_retries=3,
        delay=1.0,
        tool_name,
        arguments
    )

2. 连接健康检查

# health_check.py
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Optional


class ConnectionHealthMonitor:
    """连接健康监控器"""

    def __init__(self, check_interval: int = 30):
        self.check_interval = check_interval
        self.last_ping: Dict[str, datetime] = {}
        self.is_healthy: Dict[str, bool] = {}
        self._stop_event = asyncio.Event()

    async def start_monitoring(self, sessions: Dict[str, Any]):
        """开始监控连接健康"""
        while not self._stop_event.is_set():
            for name, session in sessions.items():
                try:
                    # 发送 ping 请求
                    await session.send_ping()
                    self.last_ping[name] = datetime.now()
                    self.is_healthy[name] = True
                except Exception as e:
                    self.is_healthy[name] = False
                    logger.error(f"服务器 '{name}' 健康检查失败: {e}")

            await asyncio.wait_for(
                self._stop_event.wait(),
                timeout=self.check_interval
            )

    def stop(self):
        """停止监控"""
        self._stop_event.set()

    def get_status(self) -> Dict[str, dict]:
        """获取连接状态"""
        status = {}
        for name in self.is_healthy:
            last_ping = self.last_ping.get(name)
            status[name] = {
                "healthy": self.is_healthy.get(name, False),
                "last_ping": last_ping.isoformat() if last_ping else None,
            }
        return status

实战项目:智能文档助手

项目架构

doc-assistant/
├── src/
│   ├── __init__.py
│   ├── client.py          # MCP 客户端
│   ├── agent.py           # AI Agent
│   ├── servers/
│   │   ├── filesystem.py  # 文件系统服务器
│   │   └── search.py      # 搜索服务器
│   └── ui/
│       └── cli.py         # 命令行界面
├── config/
│   └── servers.json       # 服务器配置
├── tests/
└── main.py

完整实现

# main.py
import asyncio
import json
import os
from pathlib import Path
from src.agent import MCPAgent


async def main():
    # 加载配置
    config_path = Path("config/servers.json")
    with open(config_path) as f:
        config = json.load(f)

    # 创建 Agent
    agent = MCPAgent(api_key=os.getenv("ANTHROPIC_API_KEY"))

    try:
        # 连接配置的服务器
        for server_config in config["servers"]:
            await agent.add_mcp_server(
                name=server_config["name"],
                command=server_config["command"],
                args=server_config["args"]
            )

        print("\n🤖 智能文档助手已启动")
        print("支持的命令:")
        print("  - 读取文件: '读取 docs/readme.md'")
        print("  - 搜索内容: '搜索包含 API 的文档'")
        print("  - 分析文档: '分析 src/main.py 的代码结构'")
        print("  - 退出: 'quit' 或 'exit'")
        print()

        # 交互式循环
        while True:
            try:
                query = input("\n💬 请输入问题: ").strip()

                if query.lower() in ["quit", "exit", "q"]:
                    break

                if not query:
                    continue

                print("🤔 思考中...")
                result = await agent.run(query)
                print(f"\n{result}")

            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"\n❌ 错误: {e}")

    finally:
        await agent.cleanup()
        print("\n👋 再见!")


if __name__ == "__main__":
    asyncio.run(main())

性能优化建议

优化策略实现方式效果
连接池复用已建立的连接减少连接开销
并行调用使用 asyncio.gather提高吞吐量
结果缓存缓存频繁访问的资源减少重复调用
超时控制设置合理的超时时间防止长时间阻塞
工具筛选只加载需要的工具减少上下文长度

总结

本文全面讲解了 MCP 客户端的开发和集成:

SDK 使用 - Python/TypeScript SDK 详解
框架集成 - LangChain、LlamaIndex 集成
错误处理 - 重试机制和连接管理
实战项目 - 智能文档助手完整实现

下一步建议:


本文最后更新于 2024-02-15,如有问题欢迎在社区讨论。