Python中如何实现异步MCP Filesystem?欢迎交流与优化建议

为什么还要做一个 MCP Filesystem 的 Python 实现?

目前 MCP 生态里,很多能力是 Node.js 实现的,Python 这边要么只是封装一层,要么偏同步阻塞。对那种“我就想写个 async 小工具”的场景,其实不太友好:

  • 整条链路最好都是 async I/O,跟主流生态对齐
  • 不想为了这点能力额外引一堆依赖
  • 返回结构要规整,人能看懂还不够,AI 也得读得舒服

所以在自己的 MCP 工具里,基于官方协议搞了一个「精简版」实现,主要特点:

  • 异步 I/O 实现:接口全部是 async,方便直接接到现有的 async Web 框架 / 任务系统
  • 无第三方依赖(基础操作):只用标准库,适合受限环境或者简单脚本项目
  • 结构体友好:返回体设计得比较规整,方便后面再封装成 dataclass / Pydantic 等
  • 代码简单有注释:逻辑刻意写得直白,没有做重度优化,更适合 fork 下来按自己需求魔改

可以自行 TODO / 二次开发的方向包括:

  • 更严格的读写保证与错误处理(重试、限流、幂等等)
  • 目录边界策略调整:
    • 目前通过 DATA_DIR 做目录沙箱,防止越界访问,避免一上来就摸整个磁盘
    • 也可以根据业务需要去掉或替换为自定义根目录、甚至开放整盘访问

Github 项目地址:

mcp-filesystem-python

技术含量不多,纯粹有相同需求的人可以不用再思考太多,直接用,也可以当作组件拼接到其他地方。


Python中如何实现异步MCP Filesystem?欢迎交流与优化建议

3 回复

同级有相同的包和模块文件,逆天。。。


哦,顶,你是对的,谢谢你的修正。我自己是从 mcp_gateway.py 或者 app_with_stray.py 进的 [捂脸]

在Python中实现异步MCP(Model Context Protocol)文件系统,核心是利用asyncioaiofiles等异步库来构建非阻塞的I/O操作。下面是一个完整的异步MCP文件系统实现示例,包含基本文件操作和服务器端:

import asyncio
import aiofiles
import os
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from contextlib import asynccontextmanager

@dataclass
class FileInfo:
    name: str
    size: int
    is_dir: bool
    modified: float

class AsyncMCPFileSystem:
    """异步MCP文件系统实现"""
    
    def __init__(self, base_path: str = "."):
        self.base_path = os.path.abspath(base_path)
    
    async def read_file(self, filepath: str) -> str:
        """异步读取文件内容"""
        full_path = os.path.join(self.base_path, filepath)
        async with aiofiles.open(full_path, 'r', encoding='utf-8') as f:
            return await f.read()
    
    async def write_file(self, filepath: str, content: str) -> None:
        """异步写入文件"""
        full_path = os.path.join(self.base_path, filepath)
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        async with aiofiles.open(full_path, 'w', encoding='utf-8') as f:
            await f.write(content)
    
    async def list_directory(self, dirpath: str = "") -> List[FileInfo]:
        """异步列出目录内容"""
        full_path = os.path.join(self.base_path, dirpath)
        entries = []
        
        async def process_entry(entry):
            entry_path = os.path.join(full_path, entry)
            stat = await asyncio.to_thread(os.stat, entry_path)
            return FileInfo(
                name=entry,
                size=stat.st_size,
                is_dir=os.path.isdir(entry_path),
                modified=stat.st_mtime
            )
        
        tasks = [process_entry(entry) for entry in await asyncio.to_thread(os.listdir, full_path)]
        return await asyncio.gather(*tasks)
    
    async def file_exists(self, filepath: str) -> bool:
        """检查文件是否存在"""
        full_path = os.path.join(self.base_path, filepath)
        return await asyncio.to_thread(os.path.exists, full_path)
    
    @asynccontextmanager
    async def open_file(self, filepath: str, mode: str = 'r'):
        """异步上下文管理器打开文件"""
        full_path = os.path.join(self.base_path, filepath)
        if 'w' in mode or 'a' in mode:
            os.makedirs(os.path.dirname(full_path), exist_ok=True)
        
        async with aiofiles.open(full_path, mode, encoding='utf-8' if 't' in mode else None) as f:
            yield f

# 示例:MCP文件系统服务器
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

class MCPFileSystemServer:
    def __init__(self, fs: AsyncMCPFileSystem):
        self.fs = fs
    
    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理MCP请求"""
        method = request.get("method")
        
        if method == "read_file":
            content = await self.fs.read_file(request["params"]["path"])
            return {"result": content}
        
        elif method == "list_directory":
            entries = await self.fs.list_directory(request["params"].get("path", ""))
            return {"result": [{"name": e.name, "type": "directory" if e.is_dir else "file"} for e in entries]}
        
        elif method == "file_exists":
            exists = await self.fs.file_exists(request["params"]["path"])
            return {"result": exists}
        
        return {"error": "Method not found"}

async def main():
    # 初始化文件系统
    fs = AsyncMCPFileSystem("./data")
    
    # 创建示例文件
    await fs.write_file("test.txt", "Hello, Async MCP Filesystem!")
    
    # 读取文件
    content = await fs.read_file("test.txt")
    print(f"File content: {content}")
    
    # 列出目录
    entries = await fs.list_directory()
    for entry in entries:
        print(f"{'📁' if entry.is_dir else '📄'} {entry.name} ({entry.size} bytes)")
    
    # 使用上下文管理器
    async with fs.open_file("test.txt", "a") as f:
        await f.write("\nAppended content!")

# 优化建议:
# 1. 添加文件缓存层(使用aiocache或自定义LRU缓存)
# 2. 实现文件变更监听(使用watchfiles库)
# 3. 添加连接池管理并发文件操作
# 4. 实现断点续传和大文件分块处理
# 5. 集成监控和性能指标(使用prometheus_client)

if __name__ == "__main__":
    asyncio.run(main())

关键实现要点:

  1. 异步I/O基础:使用aiofiles替代同步open()asyncio.to_thread()处理阻塞的OS调用
  2. 路径安全:使用os.path.join()确保路径安全,防止目录遍历攻击
  3. 错误处理:实际使用时应添加完整的异常处理(示例中省略以保持简洁)
  4. MCP集成:通过mcp库实现标准协议接口,支持工具调用和资源管理

性能优化建议:

  • 对大目录列表实现分页和延迟加载
  • 对频繁读取的文件添加内存缓存
  • 使用连接池限制并发文件描述符数量
  • 对网络文件系统考虑添加本地缓存层

总结建议:核心是合理利用asyncio生态,注意I/O边界和资源管理。

回到顶部