引言
MCP(Model Context Protocol)自 2024 年底发布以来,迅速成为 AI 工具集成的标准协议。截至 2025 年 1 月,官方和社区已经开发了数百个 MCP Server,覆盖文件系统、数据库、API、开发工具等各个领域。
本文将系统介绍 MCP 官方工具生态,并手把手教你如何使用和开发 MCP Server。
MCP 官方工具生态概览
┌─────────────────────────────────────────────────────────────────┐
│ MCP 官方工具生态 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 官方工具 │ │ 社区工具 │ │ 企业工具 │ │
│ │ (Official) │ │ (Community) │ │ (Enterprise)│ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ filesystem │ │ brave-search │ │ aws │ │
│ │ github │ │ fetch │ │ gcp │ │
│ │ git │ │ puppeteer │ │ azure │ │
│ │ postgres │ │ sqlite │ │ notion │ │
│ │ sqlite │ │ redis │ │ slack │ │
│ │ sentry │ │ kubernetes │ │ jira │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ 官方仓库:github.com/modelcontextprotocol/servers │
│ │
└─────────────────────────────────────────────────────────────────┘
官方核心工具详解
1. Filesystem MCP Server
最基础也是最常用的 MCP Server,让 Claude 可以安全地访问本地文件系统。
安装配置
// claude_desktop_config.json
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-filesystem",
"/Users/yourname/Documents",
"/Users/yourname/Desktop/projects"
]
}
}
}
使用示例
用户:帮我读取 ~/Documents/project/README.md 文件内容
Claude: [调用 filesystem/read_file 工具]
文件内容如下:
# My Project
这是一个示例项目...
用户:帮我在 ~/Desktop/projects 目录下创建一个新的 Python 项目结构
Claude: [调用 filesystem/create_directory 和 filesystem/write_file]
已为您创建项目结构:
- my_project/
- src/
- __init__.py
- main.py
- tests/
- __init__.py
- test_main.py
- requirements.txt
- README.md
安全限制
Filesystem MCP Server 有严格的安全机制:
// 只允许配置的目录
const allowedDirectories = [
"/Users/yourname/Documents",
"/Users/yourname/Desktop/projects"
];
// 禁止访问的目录
const blockedPaths = [
"/Users/yourname/.ssh",
"/Users/yourname/.aws",
"/etc/passwd"
];
// 所有路径操作都会验证是否在允许范围内
function validatePath(requestedPath: string): boolean {
const resolvedPath = path.resolve(requestedPath);
return allowedDirectories.some(dir =>
resolvedPath.startsWith(path.resolve(dir))
);
}
2. GitHub MCP Server
让 Claude 直接操作 GitHub 仓库,支持 Issues、PR、代码审查等功能。
安装配置
{
"mcpServers": {
"github": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {
"GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_xxxxxxxxxxxx"
}
}
}
}
实战:代码审查 Agent
# github_code_review_agent.py
"""
使用 MCP GitHub Server 实现自动化代码审查
"""
import json
from typing import List, Dict
class GitHubCodeReviewAgent:
"""GitHub 代码审查 Agent"""
def __init__(self, mcp_client):
self.mcp = mcp_client
async def review_pull_request(
self,
owner: str,
repo: str,
pr_number: int
) -> Dict:
"""
审查 Pull Request
流程:
1. 获取 PR 基本信息
2. 获取变更的文件列表
3. 读取每个文件的 diff
4. 分析代码质量
5. 生成审查报告
"""
# 1. 获取 PR 信息
pr_info = await self.mcp.call_tool(
"github",
"get_pull_request",
{
"owner": owner,
"repo": repo,
"pull_number": pr_number
}
)
# 2. 获取变更文件
files = await self.mcp.call_tool(
"github",
"list_pull_request_files",
{
"owner": owner,
"repo": repo,
"pull_number": pr_number
}
)
review_comments = []
for file in files:
filename = file["filename"]
status = file["status"] # added, removed, modified
# 跳过非代码文件
if not self._is_code_file(filename):
continue
# 3. 获取文件内容
if status != "removed":
content = await self.mcp.call_tool(
"github",
"get_file_contents",
{
"owner": owner,
"repo": repo,
"path": filename,
"ref": pr_info["head"]["sha"]
}
)
# 4. 代码分析
issues = await self._analyze_code(filename, content, file["patch"])
review_comments.extend(issues)
# 5. 提交审查评论
for comment in review_comments:
await self.mcp.call_tool(
"github",
"create_pull_request_review_comment",
{
"owner": owner,
"repo": repo,
"pull_number": pr_number,
"body": comment["message"],
"commit_id": pr_info["head"]["sha"],
"path": comment["path"],
"line": comment["line"]
}
)
return {
"pr_title": pr_info["title"],
"files_reviewed": len(files),
"issues_found": len(review_comments),
"review_comments": review_comments
}
def _is_code_file(self, filename: str) -> bool:
"""检查是否为代码文件"""
code_extensions = ['.py', '.js', '.ts', '.java', '.go', '.rs', '.cpp', '.c']
return any(filename.endswith(ext) for ext in code_extensions)
async def _analyze_code(
self,
filename: str,
content: str,
patch: str
) -> List[Dict]:
"""
分析代码质量问题
这里可以集成:
- 静态分析工具(pylint, eslint)
- 安全扫描(bandit, semgrep)
- 自定义规则
"""
issues = []
# 示例:检查常见的代码问题
lines = content.split('\n')
for i, line in enumerate(lines, 1):
# 检查硬编码密钥
if 'api_key' in line.lower() and '=' in line:
if '"' in line or "'" in line:
issues.append({
"path": filename,
"line": i,
"message": "⚠️ 发现可能的硬编码 API Key,建议使用环境变量",
"severity": "warning"
})
# 检查 TODO 注释
if 'TODO' in line.upper():
issues.append({
"path": filename,
"line": i,
"message": f"📋 发现 TODO: {line.strip()}",
"severity": "info"
})
# 检查过长的行
if len(line) > 100:
issues.append({
"path": filename,
"line": i,
"message": f"📏 行过长 ({len(line)} 字符),建议拆分为多行",
"severity": "suggestion"
})
return issues
# 使用示例
async def main():
agent = GitHubCodeReviewAgent(mcp_client)
result = await agent.review_pull_request(
owner="myorg",
repo="myproject",
pr_number=42
)
print(f"审查完成:{result['pr_title']}")
print(f"审查了 {result['files_reviewed']} 个文件")
print(f"发现 {result['issues_found']} 个问题")
# asyncio.run(main())
3. PostgreSQL MCP Server
让 Claude 直接查询和分析数据库。
安装配置
{
"mcpServers": {
"postgres": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-postgres"],
"env": {
"DATABASE_URL": "postgresql://user:pass@localhost:5432/mydb"
}
}
}
}
实战:数据分析 Agent
# data_analysis_agent.py
"""
使用 MCP PostgreSQL Server 实现自然语言数据分析
"""
from typing import Dict, List, Any
import json
class DataAnalysisAgent:
"""自然语言数据分析 Agent"""
def __init__(self, mcp_client):
self.mcp = mcp_client
self.schema_cache = None
async def analyze(self, question: str) -> Dict[str, Any]:
"""
自然语言数据分析
示例问题:
- "过去30天销售额趋势如何?"
- "哪个产品类别转化率最高?"
- "找出购买频次最高的前10名用户"
"""
# 1. 获取数据库 schema
schema = await self._get_schema()
# 2. 生成 SQL 查询(这里应该调用 LLM)
sql = await self._generate_sql(question, schema)
# 3. 执行查询
result = await self.mcp.call_tool(
"postgres",
"query",
{"sql": sql}
)
# 4. 分析结果
analysis = await self._analyze_result(question, result)
return {
"question": question,
"sql": sql,
"result": result,
"analysis": analysis
}
async def _get_schema(self) -> Dict:
"""获取数据库 schema 信息"""
if self.schema_cache:
return self.schema_cache
# 查询所有表
tables_result = await self.mcp.call_tool(
"postgres",
"query",
{
"sql": """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
"""
}
)
# 组织成结构化格式
schema = {}
for row in tables_result["rows"]:
table = row["table_name"]
if table not in schema:
schema[table] = []
schema[table].append({
"column": row["column_name"],
"type": row["data_type"]
})
self.schema_cache = schema
return schema
async def _generate_sql(self, question: str, schema: Dict) -> str:
"""
根据自然语言问题生成 SQL
实际项目中应该调用 LLM,这里简化示例
"""
# 示例:简单的规则匹配
question_lower = question.lower()
if "销售额" in question or "sales" in question_lower:
return """
SELECT
DATE_TRUNC('day', created_at) as date,
SUM(amount) as total_sales
FROM orders
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY DATE_TRUNC('day', created_at)
ORDER BY date
"""
elif "用户" in question or "user" in question_lower:
return """
SELECT
user_id,
COUNT(*) as purchase_count,
SUM(amount) as total_spent
FROM orders
GROUP BY user_id
ORDER BY purchase_count DESC
LIMIT 10
"""
else:
return "SELECT * FROM orders LIMIT 10"
async def _analyze_result(self, question: str, result: Dict) -> str:
"""分析查询结果并生成洞察"""
rows = result.get("rows", [])
if not rows:
return "未找到相关数据"
# 简单的统计分析
analysis_parts = []
# 数据量
analysis_parts.append(f"查询返回 {len(rows)} 条记录")
# 数值列的统计
for key in rows[0].keys():
values = [r[key] for r in rows if r[key] is not None]
# 如果是数值列
if values and isinstance(values[0], (int, float)):
avg_val = sum(values) / len(values)
max_val = max(values)
min_val = min(values)
analysis_parts.append(
f"{key}: 平均值={avg_val:.2f}, 最大值={max_val}, 最小值={min_val}"
)
return "\n".join(analysis_parts)
# 高级功能:自动生成数据报告
class DataReportGenerator:
"""自动生成数据报告"""
def __init__(self, agent: DataAnalysisAgent):
self.agent = agent
async def generate_daily_report(self) -> str:
"""生成每日数据报告"""
questions = [
"今日销售额是多少?",
"今日新增用户数?",
"今日订单转化率?",
"销售额 Top 5 的产品是什么?"
]
report_sections = []
for question in questions:
result = await self.agent.analyze(question)
# 构建报告章节
section = f"## {question}\n\n"
section += f"**SQL 查询:**\n```sql\n{result['sql']}\n```\n\n"
section += f"**分析:**\n{result['analysis']}"
report_sections.append(section)
return "\n---\n".join(report_sections)
自定义 MCP Server 开发
1. 基础 Server 结构
// custom-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// 定义工具
const TOOLS = [
{
name: "calculate",
description: "执行数学计算",
inputSchema: {
type: "object",
properties: {
expression: {
type: "string",
description: "数学表达式,如 '1 + 2 * 3'"
}
},
required: ["expression"]
}
},
{
name: "get_weather",
description: "获取指定城市的天气",
inputSchema: {
type: "object",
properties: {
city: {
type: "string",
description: "城市名称"
},
units: {
type: "string",
enum: ["celsius", "fahrenheit"],
default: "celsius"
}
},
required: ["city"]
}
}
];
// 创建 Server
const server = new Server(
{
name: "custom-mcp-server",
version: "1.0.0"
},
{
capabilities: {
tools: {}
}
}
);
// 处理工具列表请求
server.setRequestHandler(ListToolsRequestSchema, async () => {
return { tools: TOOLS };
});
// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "calculate":
return await handleCalculate(args);
case "get_weather":
return await handleGetWeather(args);
default:
throw new Error(`Unknown tool: ${name}`);
}
} catch (error) {
return {
content: [
{
type: "text",
text: `Error: ${error.message}`
}
],
isError: true
};
}
});
// 工具实现
async function handleCalculate(args: any) {
// 注意:实际项目中应该使用安全的数学表达式解析器
// 这里仅作示例,不要直接 eval 用户输入!
const result = eval(args.expression); // ⚠️ 安全风险
return {
content: [
{
type: "text",
text: `计算结果:${result}`
}
]
};
}
async function handleGetWeather(args: any) {
// 调用天气 API
const response = await fetch(
`https://api.weather.com/v1/current?city=${args.city}&units=${args.units}`
);
const data = await response.json();
return {
content: [
{
type: "text",
text: `${args.city} 当前天气:\n` +
`温度:${data.temperature}°${args.units === 'celsius' ? 'C' : 'F'}\n` +
`天气:${data.condition}\n` +
`湿度:${data.humidity}%`
}
]
};
}
// 启动 Server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Custom MCP Server running on stdio");
}
main().catch(console.error);
2. Python 版 MCP Server
# custom_mcp_server/server.py
"""
使用 Python 实现的 MCP Server 示例
"""
import asyncio
import json
from typing import Any, Dict, List
import httpx
class MCPServer:
"""MCP Server 基类"""
def __init__(self, name: str, version: str):
self.name = name
self.version = version
self.tools: Dict[str, Dict] = {}
def add_tool(
self,
name: str,
description: str,
handler: callable,
schema: Dict
):
"""注册工具"""
self.tools[name] = {
"name": name,
"description": description,
"inputSchema": schema,
"handler": handler
}
async def handle_request(self, request: Dict) -> Dict:
"""处理 MCP 请求"""
method = request.get("method")
if method == "tools/list":
return {
"tools": [
{
"name": t["name"],
"description": t["description"],
"inputSchema": t["inputSchema"]
}
for t in self.tools.values()
]
}
elif method == "tools/call":
tool_name = request["params"]["name"]
arguments = request["params"]["arguments"]
if tool_name not in self.tools:
return {
"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
"isError": True
}
try:
result = await self.tools[tool_name]["handler"](arguments)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error: {str(e)}"}],
"isError": True
}
return {"error": "Unknown method"}
async def run(self):
"""运行 Server(stdio 模式)"""
while True:
try:
line = await asyncio.get_event_loop().run_in_executor(
None, input
)
request = json.loads(line)
response = await self.handle_request(request)
print(json.dumps(response), flush=True)
except EOFError:
break
except Exception as e:
print(json.dumps({"error": str(e)}), flush=True)
# 创建具体的 Server
async def main():
server = MCPServer("weather-mcp", "1.0.0")
# 注册天气查询工具
async def get_weather_handler(args: Dict) -> Dict:
city = args["city"]
units = args.get("units", "celsius")
# 调用天气 API
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.openweathermap.org/data/2.5/weather",
params={
"q": city,
"appid": "your-api-key",
"units": "metric" if units == "celsius" else "imperial"
}
)
data = response.json()
return {
"city": city,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"condition": data["weather"][0]["description"],
"units": units
}
server.add_tool(
name="get_weather",
description="获取指定城市的天气信息",
handler=get_weather_handler,
schema={
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
"units": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
)
# 注册计算器工具
async def calculate_handler(args: Dict) -> Dict:
import operator
import ast
expression = args["expression"]
# 安全的表达式求值
allowed_operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
}
def eval_node(node):
if isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
op_type = type(node.op)
if op_type not in allowed_operators:
raise ValueError(f"Operator not allowed: {op_type}")
left = eval_node(node.left)
right = eval_node(node.right)
return allowed_operators[op_type](left, right)
else:
raise ValueError(f"Node type not allowed: {type(node)}")
tree = ast.parse(expression, mode='eval')
result = eval_node(tree.body)
return {"expression": expression, "result": result}
server.add_tool(
name="calculate",
description="执行安全的数学计算",
handler=calculate_handler,
schema={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "数学表达式"}
},
"required": ["expression"]
}
)
await server.run()
if __name__ == "__main__":
asyncio.run(main())
3. 配置文件
// 在 Claude Desktop 中使用自定义 Server
{
"mcpServers": {
"weather": {
"command": "python",
"args": ["/path/to/custom_mcp_server/server.py"]
}
}
}
MCP Server 开发最佳实践
1. 错误处理
// 良好的错误处理示例
async function handleTool(args: any) {
try {
// 参数验证
if (!args.required_param) {
return {
content: [{ type: "text", text: "Missing required parameter: required_param" }],
isError: true
};
}
// 业务逻辑
const result = await doSomething(args);
return {
content: [{ type: "text", text: JSON.stringify(result) }]
};
} catch (error) {
// 分类错误
if (error instanceof ValidationError) {
return {
content: [{ type: "text", text: `Validation Error: ${error.message}` }],
isError: true
};
}
if (error instanceof NetworkError) {
return {
content: [{ type: "text", text: `Network Error: ${error.message}` }],
isError: true
};
}
// 未知错误
return {
content: [{ type: "text", text: `Unexpected Error: ${error.message}` }],
isError: true
};
}
}
2. 安全性
// 输入验证和清理
import { z } from "zod";
const ToolInputSchema = z.object({
path: z.string()
.min(1)
.regex(/^[a-zA-Z0-9_\-\/\.]+$/, "Invalid path characters")
.transform(path => path.replace(/\.\.\//g, "")), // 防止目录遍历
content: z.string().max(10000, "Content too large")
});
function validateInput(args: unknown) {
return ToolInputSchema.parse(args);
}
3. 性能优化
// 缓存常用数据
class CachedMCPServer {
private cache = new Map<string, { data: any; expiry: number }>();
async getCached(key: string, fetcher: () => Promise<any>, ttl: number = 60000) {
const cached = this.cache.get(key);
if (cached && cached.expiry > Date.now()) {
return cached.data;
}
const data = await fetcher();
this.cache.set(key, { data, expiry: Date.now() + ttl });
return data;
}
}
总结
MCP 生态正在快速发展,核心要点:
- 官方工具:filesystem、github、postgres 等覆盖常见需求
- 社区工具:fetch、puppeteer、sqlite 等扩展功能
- 自定义开发:基于 SDK 快速开发专用 Server
- 最佳实践:注重安全、错误处理和性能
推荐工具组合:
- 开发场景:filesystem + github + git
- 数据分析:postgres + sqlite + fetch
- Web 自动化:fetch + puppeteer
- 运维监控:sentry + kubernetes
相关资源: