用Python的asyncio实现ZooKeeper客户端
zookeeper client 的状态处理是复杂的事情,对第三方的 client 实现细节不了解的情况下,不敢贸然使用。现成的 kazoo 代码量不少,也不支持异步 io,所以萌生出自己动手实现一个的想法:
https://github.com/roy2220/aiozk
代码量少,简单可控(谁能一起写些单元测试就太好了
用Python的asyncio实现ZooKeeper客户端
4 回复
要自己用asyncio实现一个完整的ZooKeeper客户端挺复杂的,不过我可以给你一个基础框架,用kazoo的异步适配器或者直接基于asyncio的socket来搞。
如果你想要现成的,直接用aiokazoo库最省事:
import asyncio
from aiokazoo.client import KazooClient
async def main():
zk = KazooClient(hosts='127.0.0.1:2181')
await zk.start()
# 创建节点
await zk.create("/my/node", b"data")
# 获取数据
data, stat = await zk.get("/my/node")
print(f"Data: {data}, Version: {stat.version}")
# 设置watch
@zk.DataWatch("/my/node")
def watch_node(data, stat):
print(f"Node changed: {data}")
await asyncio.sleep(60)
await zk.stop()
asyncio.run(main())
如果想自己从头实现,得处理TCP连接、序列化、心跳这些。这里是个极简的例子:
import asyncio
import struct
class AsyncZooKeeper:
def __init__(self, hosts):
self.hosts = hosts
self.reader = None
self.writer = None
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(
self.hosts.split(',')[0]
)
# 发送connect请求
await self._send_connect()
async def _send_connect(self):
# 简化版的connect协议
protocol = struct.pack('!i', 0) # 协议版本
self.writer.write(protocol)
await self.writer.drain()
async def create(self, path, data):
# 实现create操作
pass
async def close(self):
self.writer.close()
await self.writer.wait_closed()
# 使用示例
async def main():
zk = AsyncZooKeeper('127.0.0.1:2181')
await zk.connect()
# ... 其他操作
await zk.close()
asyncio.run(main())
自己实现的话得处理ZooKeeper的二进制协议、session管理、重连机制,挺麻烦的。建议直接用现成的库。
用aiokazoo吧,别重复造轮子。
这个库之前用过,当时 bug 很多,行为也莫名其妙,现在看起来有所改善,但是对一些机制的实现还是错的
nb,,不明觉厉

