MCP协议 进阶 MCP Model Context Protocol Claude Desktop 工具集成

MCP 工具生态实战:从官方工具箱到自定义 Server 开发

AIEng Hub
阅读约 30 分钟

引言

MCP(Model Context Protocol)自 2024 年底发布以来,迅速成为 AI 工具集成的标准协议。截至 2025 年 1 月,官方和社区已经开发了数百个 MCP Server,覆盖文件系统、数据库、API、开发工具等各个领域。

本文将系统介绍 MCP 官方工具生态,并手把手教你如何使用和开发 MCP Server。

MCP 官方工具生态概览

┌─────────────────────────────────────────────────────────────────┐
│                     MCP 官方工具生态                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │   官方工具   │  │   社区工具   │  │   企业工具   │          │
│  │  (Official)  │  │  (Community) │  │  (Enterprise)│          │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘          │
│         │                 │                 │                  │
│         ▼                 ▼                 ▼                  │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │ filesystem   │  │ brave-search │  │ aws          │          │
│  │ github       │  │ fetch        │  │ gcp          │          │
│  │ git          │  │ puppeteer    │  │ azure        │          │
│  │ postgres     │  │ sqlite       │  │ notion       │          │
│  │ sqlite       │  │ redis        │  │ slack        │          │
│  │ sentry       │  │ kubernetes   │  │ jira         │          │
│  └──────────────┘  └──────────────┘  └──────────────┘          │
│                                                                 │
│  官方仓库:github.com/modelcontextprotocol/servers             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

官方核心工具详解

1. Filesystem MCP Server

最基础也是最常用的 MCP Server,让 Claude 可以安全地访问本地文件系统。

安装配置

// claude_desktop_config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/Users/yourname/Documents",
        "/Users/yourname/Desktop/projects"
      ]
    }
  }
}

使用示例

用户:帮我读取 ~/Documents/project/README.md 文件内容

Claude: [调用 filesystem/read_file 工具]

文件内容如下:
# My Project

这是一个示例项目...

用户:帮我在 ~/Desktop/projects 目录下创建一个新的 Python 项目结构

Claude: [调用 filesystem/create_directory 和 filesystem/write_file]

已为您创建项目结构:
- my_project/
  - src/
    - __init__.py
    - main.py
  - tests/
    - __init__.py
    - test_main.py
  - requirements.txt
  - README.md

安全限制

Filesystem MCP Server 有严格的安全机制:

// 只允许配置的目录
const allowedDirectories = [
  "/Users/yourname/Documents",
  "/Users/yourname/Desktop/projects"
];

// 禁止访问的目录
const blockedPaths = [
  "/Users/yourname/.ssh",
  "/Users/yourname/.aws",
  "/etc/passwd"
];

// 所有路径操作都会验证是否在允许范围内
function validatePath(requestedPath: string): boolean {
  const resolvedPath = path.resolve(requestedPath);
  return allowedDirectories.some(dir => 
    resolvedPath.startsWith(path.resolve(dir))
  );
}

2. GitHub MCP Server

让 Claude 直接操作 GitHub 仓库,支持 Issues、PR、代码审查等功能。

安装配置

{
  "mcpServers": {
    "github": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-github"],
      "env": {
        "GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_xxxxxxxxxxxx"
      }
    }
  }
}

实战:代码审查 Agent

# github_code_review_agent.py
"""
使用 MCP GitHub Server 实现自动化代码审查
"""

import json
from typing import List, Dict

class GitHubCodeReviewAgent:
    """GitHub 代码审查 Agent"""
    
    def __init__(self, mcp_client):
        self.mcp = mcp_client
    
    async def review_pull_request(
        self, 
        owner: str, 
        repo: str, 
        pr_number: int
    ) -> Dict:
        """
        审查 Pull Request
        
        流程:
        1. 获取 PR 基本信息
        2. 获取变更的文件列表
        3. 读取每个文件的 diff
        4. 分析代码质量
        5. 生成审查报告
        """
        # 1. 获取 PR 信息
        pr_info = await self.mcp.call_tool(
            "github",
            "get_pull_request",
            {
                "owner": owner,
                "repo": repo,
                "pull_number": pr_number
            }
        )
        
        # 2. 获取变更文件
        files = await self.mcp.call_tool(
            "github",
            "list_pull_request_files",
            {
                "owner": owner,
                "repo": repo,
                "pull_number": pr_number
            }
        )
        
        review_comments = []
        
        for file in files:
            filename = file["filename"]
            status = file["status"]  # added, removed, modified
            
            # 跳过非代码文件
            if not self._is_code_file(filename):
                continue
            
            # 3. 获取文件内容
            if status != "removed":
                content = await self.mcp.call_tool(
                    "github",
                    "get_file_contents",
                    {
                        "owner": owner,
                        "repo": repo,
                        "path": filename,
                        "ref": pr_info["head"]["sha"]
                    }
                )
                
                # 4. 代码分析
                issues = await self._analyze_code(filename, content, file["patch"])
                review_comments.extend(issues)
        
        # 5. 提交审查评论
        for comment in review_comments:
            await self.mcp.call_tool(
                "github",
                "create_pull_request_review_comment",
                {
                    "owner": owner,
                    "repo": repo,
                    "pull_number": pr_number,
                    "body": comment["message"],
                    "commit_id": pr_info["head"]["sha"],
                    "path": comment["path"],
                    "line": comment["line"]
                }
            )
        
        return {
            "pr_title": pr_info["title"],
            "files_reviewed": len(files),
            "issues_found": len(review_comments),
            "review_comments": review_comments
        }
    
    def _is_code_file(self, filename: str) -> bool:
        """检查是否为代码文件"""
        code_extensions = ['.py', '.js', '.ts', '.java', '.go', '.rs', '.cpp', '.c']
        return any(filename.endswith(ext) for ext in code_extensions)
    
    async def _analyze_code(
        self, 
        filename: str, 
        content: str, 
        patch: str
    ) -> List[Dict]:
        """
        分析代码质量问题
        
        这里可以集成:
        - 静态分析工具(pylint, eslint)
        - 安全扫描(bandit, semgrep)
        - 自定义规则
        """
        issues = []
        
        # 示例:检查常见的代码问题
        lines = content.split('\n')
        
        for i, line in enumerate(lines, 1):
            # 检查硬编码密钥
            if 'api_key' in line.lower() and '=' in line:
                if '"' in line or "'" in line:
                    issues.append({
                        "path": filename,
                        "line": i,
                        "message": "⚠️ 发现可能的硬编码 API Key,建议使用环境变量",
                        "severity": "warning"
                    })
            
            # 检查 TODO 注释
            if 'TODO' in line.upper():
                issues.append({
                    "path": filename,
                    "line": i,
                    "message": f"📋 发现 TODO: {line.strip()}",
                    "severity": "info"
                })
            
            # 检查过长的行
            if len(line) > 100:
                issues.append({
                    "path": filename,
                    "line": i,
                    "message": f"📏 行过长 ({len(line)} 字符),建议拆分为多行",
                    "severity": "suggestion"
                })
        
        return issues


# 使用示例
async def main():
    agent = GitHubCodeReviewAgent(mcp_client)
    
    result = await agent.review_pull_request(
        owner="myorg",
        repo="myproject",
        pr_number=42
    )
    
    print(f"审查完成:{result['pr_title']}")
    print(f"审查了 {result['files_reviewed']} 个文件")
    print(f"发现 {result['issues_found']} 个问题")

# asyncio.run(main())

3. PostgreSQL MCP Server

让 Claude 直接查询和分析数据库。

安装配置

{
  "mcpServers": {
    "postgres": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-postgres"],
      "env": {
        "DATABASE_URL": "postgresql://user:pass@localhost:5432/mydb"
      }
    }
  }
}

实战:数据分析 Agent

# data_analysis_agent.py
"""
使用 MCP PostgreSQL Server 实现自然语言数据分析
"""

from typing import Dict, List, Any
import json

class DataAnalysisAgent:
    """自然语言数据分析 Agent"""
    
    def __init__(self, mcp_client):
        self.mcp = mcp_client
        self.schema_cache = None
    
    async def analyze(self, question: str) -> Dict[str, Any]:
        """
        自然语言数据分析
        
        示例问题:
        - "过去30天销售额趋势如何?"
        - "哪个产品类别转化率最高?"
        - "找出购买频次最高的前10名用户"
        """
        # 1. 获取数据库 schema
        schema = await self._get_schema()
        
        # 2. 生成 SQL 查询(这里应该调用 LLM)
        sql = await self._generate_sql(question, schema)
        
        # 3. 执行查询
        result = await self.mcp.call_tool(
            "postgres",
            "query",
            {"sql": sql}
        )
        
        # 4. 分析结果
        analysis = await self._analyze_result(question, result)
        
        return {
            "question": question,
            "sql": sql,
            "result": result,
            "analysis": analysis
        }
    
    async def _get_schema(self) -> Dict:
        """获取数据库 schema 信息"""
        if self.schema_cache:
            return self.schema_cache
        
        # 查询所有表
        tables_result = await self.mcp.call_tool(
            "postgres",
            "query",
            {
                "sql": """
                    SELECT table_name, column_name, data_type
                    FROM information_schema.columns
                    WHERE table_schema = 'public'
                    ORDER BY table_name, ordinal_position
                """
            }
        )
        
        # 组织成结构化格式
        schema = {}
        for row in tables_result["rows"]:
            table = row["table_name"]
            if table not in schema:
                schema[table] = []
            schema[table].append({
                "column": row["column_name"],
                "type": row["data_type"]
            })
        
        self.schema_cache = schema
        return schema
    
    async def _generate_sql(self, question: str, schema: Dict) -> str:
        """
        根据自然语言问题生成 SQL
        
        实际项目中应该调用 LLM,这里简化示例
        """
        # 示例:简单的规则匹配
        question_lower = question.lower()
        
        if "销售额" in question or "sales" in question_lower:
            return """
                SELECT 
                    DATE_TRUNC('day', created_at) as date,
                    SUM(amount) as total_sales
                FROM orders
                WHERE created_at >= NOW() - INTERVAL '30 days'
                GROUP BY DATE_TRUNC('day', created_at)
                ORDER BY date
            """
        
        elif "用户" in question or "user" in question_lower:
            return """
                SELECT 
                    user_id,
                    COUNT(*) as purchase_count,
                    SUM(amount) as total_spent
                FROM orders
                GROUP BY user_id
                ORDER BY purchase_count DESC
                LIMIT 10
            """
        
        else:
            return "SELECT * FROM orders LIMIT 10"
    
    async def _analyze_result(self, question: str, result: Dict) -> str:
        """分析查询结果并生成洞察"""
        rows = result.get("rows", [])
        
        if not rows:
            return "未找到相关数据"
        
        # 简单的统计分析
        analysis_parts = []
        
        # 数据量
        analysis_parts.append(f"查询返回 {len(rows)} 条记录")
        
        # 数值列的统计
        for key in rows[0].keys():
            values = [r[key] for r in rows if r[key] is not None]
            
            # 如果是数值列
            if values and isinstance(values[0], (int, float)):
                avg_val = sum(values) / len(values)
                max_val = max(values)
                min_val = min(values)
                
                analysis_parts.append(
                    f"{key}: 平均值={avg_val:.2f}, 最大值={max_val}, 最小值={min_val}"
                )
        
        return "\n".join(analysis_parts)


# 高级功能:自动生成数据报告
class DataReportGenerator:
    """自动生成数据报告"""
    
    def __init__(self, agent: DataAnalysisAgent):
        self.agent = agent
    
    async def generate_daily_report(self) -> str:
        """生成每日数据报告"""
        
        questions = [
            "今日销售额是多少?",
            "今日新增用户数?",
            "今日订单转化率?",
            "销售额 Top 5 的产品是什么?"
        ]
        
        report_sections = []
        
        for question in questions:
            result = await self.agent.analyze(question)
            # 构建报告章节
            section = f"## {question}\n\n"
            section += f"**SQL 查询:**\n```sql\n{result['sql']}\n```\n\n"
            section += f"**分析:**\n{result['analysis']}"
            report_sections.append(section)
        
        return "\n---\n".join(report_sections)

自定义 MCP Server 开发

1. 基础 Server 结构

// custom-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// 定义工具
const TOOLS = [
  {
    name: "calculate",
    description: "执行数学计算",
    inputSchema: {
      type: "object",
      properties: {
        expression: {
          type: "string",
          description: "数学表达式,如 '1 + 2 * 3'"
        }
      },
      required: ["expression"]
    }
  },
  {
    name: "get_weather",
    description: "获取指定城市的天气",
    inputSchema: {
      type: "object",
      properties: {
        city: {
          type: "string",
          description: "城市名称"
        },
        units: {
          type: "string",
          enum: ["celsius", "fahrenheit"],
          default: "celsius"
        }
      },
      required: ["city"]
    }
  }
];

// 创建 Server
const server = new Server(
  {
    name: "custom-mcp-server",
    version: "1.0.0"
  },
  {
    capabilities: {
      tools: {}
    }
  }
);

// 处理工具列表请求
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return { tools: TOOLS };
});

// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  try {
    switch (name) {
      case "calculate":
        return await handleCalculate(args);
      
      case "get_weather":
        return await handleGetWeather(args);
      
      default:
        throw new Error(`Unknown tool: ${name}`);
    }
  } catch (error) {
    return {
      content: [
        {
          type: "text",
          text: `Error: ${error.message}`
        }
      ],
      isError: true
    };
  }
});

// 工具实现
async function handleCalculate(args: any) {
  // 注意:实际项目中应该使用安全的数学表达式解析器
  // 这里仅作示例,不要直接 eval 用户输入!
  const result = eval(args.expression); // ⚠️ 安全风险
  
  return {
    content: [
      {
        type: "text",
        text: `计算结果:${result}`
      }
    ]
  };
}

async function handleGetWeather(args: any) {
  // 调用天气 API
  const response = await fetch(
    `https://api.weather.com/v1/current?city=${args.city}&units=${args.units}`
  );
  const data = await response.json();
  
  return {
    content: [
      {
        type: "text",
        text: `${args.city} 当前天气:\n` +
              `温度:${data.temperature}°${args.units === 'celsius' ? 'C' : 'F'}\n` +
              `天气:${data.condition}\n` +
              `湿度:${data.humidity}%`
      }
    ]
  };
}

// 启动 Server
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Custom MCP Server running on stdio");
}

main().catch(console.error);

2. Python 版 MCP Server

# custom_mcp_server/server.py
"""
使用 Python 实现的 MCP Server 示例
"""

import asyncio
import json
from typing import Any, Dict, List
import httpx

class MCPServer:
    """MCP Server 基类"""
    
    def __init__(self, name: str, version: str):
        self.name = name
        self.version = version
        self.tools: Dict[str, Dict] = {}
    
    def add_tool(
        self, 
        name: str, 
        description: str, 
        handler: callable,
        schema: Dict
    ):
        """注册工具"""
        self.tools[name] = {
            "name": name,
            "description": description,
            "inputSchema": schema,
            "handler": handler
        }
    
    async def handle_request(self, request: Dict) -> Dict:
        """处理 MCP 请求"""
        method = request.get("method")
        
        if method == "tools/list":
            return {
                "tools": [
                    {
                        "name": t["name"],
                        "description": t["description"],
                        "inputSchema": t["inputSchema"]
                    }
                    for t in self.tools.values()
                ]
            }
        
        elif method == "tools/call":
            tool_name = request["params"]["name"]
            arguments = request["params"]["arguments"]
            
            if tool_name not in self.tools:
                return {
                    "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
                    "isError": True
                }
            
            try:
                result = await self.tools[tool_name]["handler"](arguments)
                return {"content": [{"type": "text", "text": json.dumps(result)}]}
            except Exception as e:
                return {
                    "content": [{"type": "text", "text": f"Error: {str(e)}"}],
                    "isError": True
                }
        
        return {"error": "Unknown method"}
    
    async def run(self):
        """运行 Server(stdio 模式)"""
        while True:
            try:
                line = await asyncio.get_event_loop().run_in_executor(
                    None, input
                )
                request = json.loads(line)
                response = await self.handle_request(request)
                print(json.dumps(response), flush=True)
            except EOFError:
                break
            except Exception as e:
                print(json.dumps({"error": str(e)}), flush=True)


# 创建具体的 Server
async def main():
    server = MCPServer("weather-mcp", "1.0.0")
    
    # 注册天气查询工具
    async def get_weather_handler(args: Dict) -> Dict:
        city = args["city"]
        units = args.get("units", "celsius")
        
        # 调用天气 API
        async with httpx.AsyncClient() as client:
            response = await client.get(
                "https://api.openweathermap.org/data/2.5/weather",
                params={
                    "q": city,
                    "appid": "your-api-key",
                    "units": "metric" if units == "celsius" else "imperial"
                }
            )
            data = response.json()
        
        return {
            "city": city,
            "temperature": data["main"]["temp"],
            "humidity": data["main"]["humidity"],
            "condition": data["weather"][0]["description"],
            "units": units
        }
    
    server.add_tool(
        name="get_weather",
        description="获取指定城市的天气信息",
        handler=get_weather_handler,
        schema={
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "城市名称"},
                "units": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }
    )
    
    # 注册计算器工具
    async def calculate_handler(args: Dict) -> Dict:
        import operator
        import ast
        
        expression = args["expression"]
        
        # 安全的表达式求值
        allowed_operators = {
            ast.Add: operator.add,
            ast.Sub: operator.sub,
            ast.Mult: operator.mul,
            ast.Div: operator.truediv,
            ast.Pow: operator.pow,
        }
        
        def eval_node(node):
            if isinstance(node, ast.Num):
                return node.n
            elif isinstance(node, ast.BinOp):
                op_type = type(node.op)
                if op_type not in allowed_operators:
                    raise ValueError(f"Operator not allowed: {op_type}")
                left = eval_node(node.left)
                right = eval_node(node.right)
                return allowed_operators[op_type](left, right)
            else:
                raise ValueError(f"Node type not allowed: {type(node)}")
        
        tree = ast.parse(expression, mode='eval')
        result = eval_node(tree.body)
        
        return {"expression": expression, "result": result}
    
    server.add_tool(
        name="calculate",
        description="执行安全的数学计算",
        handler=calculate_handler,
        schema={
            "type": "object",
            "properties": {
                "expression": {"type": "string", "description": "数学表达式"}
            },
            "required": ["expression"]
        }
    )
    
    await server.run()


if __name__ == "__main__":
    asyncio.run(main())

3. 配置文件

// 在 Claude Desktop 中使用自定义 Server
{
  "mcpServers": {
    "weather": {
      "command": "python",
      "args": ["/path/to/custom_mcp_server/server.py"]
    }
  }
}

MCP Server 开发最佳实践

1. 错误处理

// 良好的错误处理示例
async function handleTool(args: any) {
  try {
    // 参数验证
    if (!args.required_param) {
      return {
        content: [{ type: "text", text: "Missing required parameter: required_param" }],
        isError: true
      };
    }
    
    // 业务逻辑
    const result = await doSomething(args);
    
    return {
      content: [{ type: "text", text: JSON.stringify(result) }]
    };
    
  } catch (error) {
    // 分类错误
    if (error instanceof ValidationError) {
      return {
        content: [{ type: "text", text: `Validation Error: ${error.message}` }],
        isError: true
      };
    }
    
    if (error instanceof NetworkError) {
      return {
        content: [{ type: "text", text: `Network Error: ${error.message}` }],
        isError: true
      };
    }
    
    // 未知错误
    return {
      content: [{ type: "text", text: `Unexpected Error: ${error.message}` }],
      isError: true
    };
  }
}

2. 安全性

// 输入验证和清理
import { z } from "zod";

const ToolInputSchema = z.object({
  path: z.string()
    .min(1)
    .regex(/^[a-zA-Z0-9_\-\/\.]+$/, "Invalid path characters")
    .transform(path => path.replace(/\.\.\//g, "")), // 防止目录遍历
  content: z.string().max(10000, "Content too large")
});

function validateInput(args: unknown) {
  return ToolInputSchema.parse(args);
}

3. 性能优化

// 缓存常用数据
class CachedMCPServer {
  private cache = new Map<string, { data: any; expiry: number }>();
  
  async getCached(key: string, fetcher: () => Promise<any>, ttl: number = 60000) {
    const cached = this.cache.get(key);
    
    if (cached && cached.expiry > Date.now()) {
      return cached.data;
    }
    
    const data = await fetcher();
    this.cache.set(key, { data, expiry: Date.now() + ttl });
    return data;
  }
}

总结

MCP 生态正在快速发展,核心要点:

  1. 官方工具:filesystem、github、postgres 等覆盖常见需求
  2. 社区工具:fetch、puppeteer、sqlite 等扩展功能
  3. 自定义开发:基于 SDK 快速开发专用 Server
  4. 最佳实践:注重安全、错误处理和性能

推荐工具组合:

  • 开发场景:filesystem + github + git
  • 数据分析:postgres + sqlite + fetch
  • Web 自动化:fetch + puppeteer
  • 运维监控:sentry + kubernetes

相关资源: