用Python的asyncio实现ZooKeeper客户端

zookeeper client 的状态处理是复杂的事情,对第三方的 client 实现细节不了解的情况下,不敢贸然使用。现成的 kazoo 代码量不少,也不支持异步 io,所以萌生出自己动手实现一个的想法:

https://github.com/roy2220/aiozk

代码量少,简单可控(谁能一起写些单元测试就太好了


用Python的asyncio实现ZooKeeper客户端

4 回复

要自己用asyncio实现一个完整的ZooKeeper客户端挺复杂的,不过我可以给你一个基础框架,用kazoo的异步适配器或者直接基于asyncio的socket来搞。

如果你想要现成的,直接用aiokazoo库最省事:

import asyncio
from aiokazoo.client import KazooClient

async def main():
    zk = KazooClient(hosts='127.0.0.1:2181')
    await zk.start()
    
    # 创建节点
    await zk.create("/my/node", b"data")
    
    # 获取数据
    data, stat = await zk.get("/my/node")
    print(f"Data: {data}, Version: {stat.version}")
    
    # 设置watch
    @zk.DataWatch("/my/node")
    def watch_node(data, stat):
        print(f"Node changed: {data}")
    
    await asyncio.sleep(60)
    await zk.stop()

asyncio.run(main())

如果想自己从头实现,得处理TCP连接、序列化、心跳这些。这里是个极简的例子:

import asyncio
import struct

class AsyncZooKeeper:
    def __init__(self, hosts):
        self.hosts = hosts
        self.reader = None
        self.writer = None
        
    async def connect(self):
        self.reader, self.writer = await asyncio.open_connection(
            self.hosts.split(',')[0]
        )
        # 发送connect请求
        await self._send_connect()
        
    async def _send_connect(self):
        # 简化版的connect协议
        protocol = struct.pack('!i', 0)  # 协议版本
        self.writer.write(protocol)
        await self.writer.drain()
        
    async def create(self, path, data):
        # 实现create操作
        pass
        
    async def close(self):
        self.writer.close()
        await self.writer.wait_closed()

# 使用示例
async def main():
    zk = AsyncZooKeeper('127.0.0.1:2181')
    await zk.connect()
    # ... 其他操作
    await zk.close()

asyncio.run(main())

自己实现的话得处理ZooKeeper的二进制协议、session管理、重连机制,挺麻烦的。建议直接用现成的库。

用aiokazoo吧,别重复造轮子。

这个库之前用过,当时 bug 很多,行为也莫名其妙,现在看起来有所改善,但是对一些机制的实现还是错的

nb,,不明觉厉

回到顶部