传输
模型上下文协议(MCP)中的传输为客户端和服务器之间的通信提供基础。传输处理消息发送和接收的底层机制。
消息格式
MCP使用JSON-RPC 2.0作为其线路格式。传输层负责将MCP协议消息转换为JSON-RPC格式进行传输,并将接收到的JSON-RPC消息转换回MCP协议消息。
使用三种类型的JSON-RPC消息:
请求
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}
响应
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}
通知
{
jsonrpc: "2.0",
method: string,
params?: object
}
内置传输类型
MCP包括两种标准传输实现:
标准输入/输出(stdio)
stdio传输通过标准输入和输出流实现通信。这对于本地集成和命令行工具特别有用。
在以下情况下使用stdio:
- 构建命令行工具
- 实现本地集成
- 需要简单的进程通信
- 使用shell脚本
- TypeScript (服务器)
- TypeScript (客户端)
- Python (服务器)
- Python (客户端)
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport();
await server.connect(transport);
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioClientTransport({
command: "./server",
args: ["--option", "value"]
});
await client.connect(transport);
app = Server("example-server")
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
params = StdioServerParameters(
command="./server",
args=["--option", "value"]
)
async with stdio_client(params) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
服务器发送事件(SSE)
SSE传输通过HTTP POST请求实现服务器到客户端的流式传输,用于客户端到服务器的通信。
在以下情况下使用SSE:
- 只 需要服务器到客户端的流式传输
- 在受限网络中工作
- 实现简单的更新
- TypeScript (服务器)
- TypeScript (客户端)
- Python (服务器)
- Python (客户端)
import express from "express";
const app = express();
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
app.listen(3000);
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server("example-server")
sse = SseServerTransport("/messages")
async def handle_sse(scope, receive, send):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
async def handle_messages(scope, receive, send):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=handle_messages, methods=["POST"]),
]
)
async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
自定义传输
MCP使得为特定需求实现自定义传输变得容易。任何传输实现只需要符合传输接口:
你可以为以下情况实现自定义传输:
- 自定义网络协议
- 专门的通信通道
- 与现有系统集成
- 性能优化
- TypeScript
- Python
interface Transport {
// 开始处理消息
start(): Promise<void>;
// 发送JSON-RPC消息
send(message: JSONRPCMessage): Promise<void>;
// 关闭连接
close(): Promise<void>;
// 回调
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
}
注意,虽然MCP服务器通常使用asyncio实现,但我们建议使用anyio
实现低级接口(如传输)以获得更广泛的兼容性。
@contextmanager
async def create_transport(
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
"""
MCP的传输接口。
参数:
read_stream: 用于读取传入消息的流
write_stream: 用于写入传出消息的流
"""
async with anyio.create_task_group() as tg:
# 实现传输逻辑
pass
最佳实践
在实现传输时:
- 确保可靠的消息传递
- 实现适当的错误处理
- 处理连接生命周期
- 考虑性能优化
- 实现重连机制
- 添加日志记录
- 处理超时
- 实现心跳机制
- 考虑安全性
- 测试边界情况
安全考虑
在实现传输时:
- 加密通信
- 验证消息完整性
- 实现访问控制
- 防止消息注入
- 处理敏感数据
- 监控异常活动
- 实现速率限制
- 审计传输日志