Python中如何实现异步MCP Filesystem?欢迎交流与优化建议
为什么还要做一个 MCP Filesystem 的 Python 实现?
目前 MCP 生态里,很多能力是 Node.js 实现的,Python 这边要么只是封装一层,要么偏同步阻塞。对那种“我就想写个 async 小工具”的场景,其实不太友好:
- 整条链路最好都是 async I/O,跟主流生态对齐
- 不想为了这点能力额外引一堆依赖
- 返回结构要规整,人能看懂还不够,AI 也得读得舒服
所以在自己的 MCP 工具里,基于官方协议搞了一个「精简版」实现,主要特点:
- 异步 I/O 实现:接口全部是
async,方便直接接到现有的 async Web 框架 / 任务系统 - 无第三方依赖(基础操作):只用标准库,适合受限环境或者简单脚本项目
- 结构体友好:返回体设计得比较规整,方便后面再封装成 dataclass / Pydantic 等
- 代码简单有注释:逻辑刻意写得直白,没有做重度优化,更适合 fork 下来按自己需求魔改
可以自行 TODO / 二次开发的方向包括:
- 更严格的读写保证与错误处理(重试、限流、幂等等)
-
目录边界策略调整:
- 目前通过
DATA_DIR做目录沙箱,防止越界访问,避免一上来就摸整个磁盘 - 也可以根据业务需要去掉或替换为自定义根目录、甚至开放整盘访问
- 目前通过
Github 项目地址:
技术含量不多,纯粹有相同需求的人可以不用再思考太多,直接用,也可以当作组件拼接到其他地方。
Python中如何实现异步MCP Filesystem?欢迎交流与优化建议
3 回复
同级有相同的包和模块文件,逆天。。。
哦,顶,你是对的,谢谢你的修正。我自己是从 mcp_gateway.py 或者 app_with_stray.py 进的 [捂脸]
在Python中实现异步MCP(Model Context Protocol)文件系统,核心是利用asyncio和aiofiles等异步库来构建非阻塞的I/O操作。下面是一个完整的异步MCP文件系统实现示例,包含基本文件操作和服务器端:
import asyncio
import aiofiles
import os
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from contextlib import asynccontextmanager
@dataclass
class FileInfo:
name: str
size: int
is_dir: bool
modified: float
class AsyncMCPFileSystem:
"""异步MCP文件系统实现"""
def __init__(self, base_path: str = "."):
self.base_path = os.path.abspath(base_path)
async def read_file(self, filepath: str) -> str:
"""异步读取文件内容"""
full_path = os.path.join(self.base_path, filepath)
async with aiofiles.open(full_path, 'r', encoding='utf-8') as f:
return await f.read()
async def write_file(self, filepath: str, content: str) -> None:
"""异步写入文件"""
full_path = os.path.join(self.base_path, filepath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
async with aiofiles.open(full_path, 'w', encoding='utf-8') as f:
await f.write(content)
async def list_directory(self, dirpath: str = "") -> List[FileInfo]:
"""异步列出目录内容"""
full_path = os.path.join(self.base_path, dirpath)
entries = []
async def process_entry(entry):
entry_path = os.path.join(full_path, entry)
stat = await asyncio.to_thread(os.stat, entry_path)
return FileInfo(
name=entry,
size=stat.st_size,
is_dir=os.path.isdir(entry_path),
modified=stat.st_mtime
)
tasks = [process_entry(entry) for entry in await asyncio.to_thread(os.listdir, full_path)]
return await asyncio.gather(*tasks)
async def file_exists(self, filepath: str) -> bool:
"""检查文件是否存在"""
full_path = os.path.join(self.base_path, filepath)
return await asyncio.to_thread(os.path.exists, full_path)
@asynccontextmanager
async def open_file(self, filepath: str, mode: str = 'r'):
"""异步上下文管理器打开文件"""
full_path = os.path.join(self.base_path, filepath)
if 'w' in mode or 'a' in mode:
os.makedirs(os.path.dirname(full_path), exist_ok=True)
async with aiofiles.open(full_path, mode, encoding='utf-8' if 't' in mode else None) as f:
yield f
# 示例:MCP文件系统服务器
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPFileSystemServer:
def __init__(self, fs: AsyncMCPFileSystem):
self.fs = fs
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理MCP请求"""
method = request.get("method")
if method == "read_file":
content = await self.fs.read_file(request["params"]["path"])
return {"result": content}
elif method == "list_directory":
entries = await self.fs.list_directory(request["params"].get("path", ""))
return {"result": [{"name": e.name, "type": "directory" if e.is_dir else "file"} for e in entries]}
elif method == "file_exists":
exists = await self.fs.file_exists(request["params"]["path"])
return {"result": exists}
return {"error": "Method not found"}
async def main():
# 初始化文件系统
fs = AsyncMCPFileSystem("./data")
# 创建示例文件
await fs.write_file("test.txt", "Hello, Async MCP Filesystem!")
# 读取文件
content = await fs.read_file("test.txt")
print(f"File content: {content}")
# 列出目录
entries = await fs.list_directory()
for entry in entries:
print(f"{'📁' if entry.is_dir else '📄'} {entry.name} ({entry.size} bytes)")
# 使用上下文管理器
async with fs.open_file("test.txt", "a") as f:
await f.write("\nAppended content!")
# 优化建议:
# 1. 添加文件缓存层(使用aiocache或自定义LRU缓存)
# 2. 实现文件变更监听(使用watchfiles库)
# 3. 添加连接池管理并发文件操作
# 4. 实现断点续传和大文件分块处理
# 5. 集成监控和性能指标(使用prometheus_client)
if __name__ == "__main__":
asyncio.run(main())
关键实现要点:
- 异步I/O基础:使用
aiofiles替代同步open(),asyncio.to_thread()处理阻塞的OS调用 - 路径安全:使用
os.path.join()确保路径安全,防止目录遍历攻击 - 错误处理:实际使用时应添加完整的异常处理(示例中省略以保持简洁)
- MCP集成:通过
mcp库实现标准协议接口,支持工具调用和资源管理
性能优化建议:
- 对大目录列表实现分页和延迟加载
- 对频繁读取的文件添加内存缓存
- 使用连接池限制并发文件描述符数量
- 对网络文件系统考虑添加本地缓存层
总结建议:核心是合理利用asyncio生态,注意I/O边界和资源管理。

