Python中如何统一管理几十个业务线程对MySQL的顺序读写操作,是否适合使用消息队列技术?
Python,目前是几十个业务线程都在操作 MySQL 读写,主要是怕写操作时加锁什么的等待时间长了会不会超时,返回失败,程序目前没有检测操作未成功重试的机制。
想写一个函数 /类,统一顺序化的操作数据库,获取到操作结果之后返回。业务线程统一通过这个函数 /类操作数据库。这个函数 /类操作数据库是单线程的,不是并发的,这样就要一个缓存队列的,消息队列是不是就是做这个事情的?不然直接调用函数的话还是直接并发了,等于没改。
做成同步和阻塞的没有关系,Python 异步编程还不太会呢。。
Python 下 Google 了一下找到了 zmq,好像 pip 安装完就可以使用。
受教。
Python中如何统一管理几十个业务线程对MySQL的顺序读写操作,是否适合使用消息队列技术?
对于管理几十个业务线程对MySQL的顺序读写操作,使用消息队列(如RabbitMQ、Kafka)是非常合适的方案。核心思路是把并发请求转为串行处理,避免直接竞争数据库连接和锁。
下面是一个使用threading和queue模块实现的简单示例,它模拟了多个业务线程通过一个共享队列进行顺序化数据库操作:
import threading
import queue
import time
import random
import pymysql
from datetime import datetime
# 模拟的数据库操作函数
def mock_db_operation(task_data):
"""模拟一个数据库写入或更新操作"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] 线程 {threading.current_thread().name} 正在处理任务: {task_data}")
# 这里替换为真实的 pymysql 操作,例如:
# connection = pymysql.connect(...)
# with connection.cursor() as cursor:
# sql = "INSERT INTO your_table (data) VALUES (%s)"
# cursor.execute(sql, (task_data,))
# connection.commit()
# connection.close()
time.sleep(random.uniform(0.1, 0.5)) # 模拟操作耗时
print(f"[{datetime.now().strftime('%H:%M:%S')}] 线程 {threading.current_thread().name} 完成任务: {task_data}")
return f"Processed: {task_data}"
# 工作线程函数,从队列中取出任务执行
def worker(task_queue, result_queue):
while True:
task = task_queue.get()
if task is None: # 终止信号
task_queue.task_done()
break
try:
result = mock_db_operation(task)
result_queue.put(result)
except Exception as e:
result_queue.put(f"Error processing {task}: {e}")
finally:
task_queue.task_done()
def main():
# 创建任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 创建并启动工作线程池 (这里启动3个线程作为消费者)
num_worker_threads = 3
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker, args=(task_queue, result_queue), name=f"Worker-{i+1}")
t.start()
threads.append(t)
# 模拟多个业务线程(生产者)生成任务
def business_thread_producer(thread_id):
for j in range(5): # 每个业务线程生成5个任务
task_data = f"业务线程{thread_id}-任务{j+1}"
task_queue.put(task_data)
time.sleep(random.uniform(0.05, 0.2)) # 模拟任务产生间隔
# 启动多个业务线程(生产者)
business_threads = []
for i in range(5): # 模拟5个并发的业务线程
bt = threading.Thread(target=business_thread_producer, args=(i+1,), name=f"Business-{i+1}")
bt.start()
business_threads.append(bt)
# 等待所有业务线程生产完毕
for bt in business_threads:
bt.join()
# 等待任务队列中的所有任务被处理完成
task_queue.join()
# 发送终止信号给工作线程
for _ in range(num_worker_threads):
task_queue.put(None)
for t in threads:
t.join()
# 输出结果
print("\n=== 处理结果 ===")
while not result_queue.empty():
print(result_queue.get())
if __name__ == "__main__":
main()
代码解释:
- 任务队列 (
task_queue):所有业务线程(生产者)将需要执行的数据库操作封装成“任务”放入此队列。 - 工作线程池:一组专用的工作线程(消费者)从
task_queue中顺序取出任务并执行。queue.Queue本身是线程安全的,并且默认是FIFO(先进先出),这保证了任务被处理的顺序性。 - 业务线程:你的几十个业务线程不再直接操作数据库,而是将操作请求放入队列。它们可以快速返回,继续处理其他业务逻辑,实现了生产与消费的解耦。
- 顺序性保证:由于任务由单个工作线程逐个处理(或者由有限数量的工作线程从同一个队列中取),对MySQL的访问自然就是串行的,彻底避免了并发写冲突、锁超时等问题。
为什么消息队列更合适? 虽然上面的例子用了内置队列,但在分布式系统或需要更高可靠性、持久化、扩展性的场景下,使用专业的消息队列中间件(如RabbitMQ、Kafka、RocketMQ)是更优解。它们能提供:
- 持久化:任务不会因为程序重启而丢失。
- 解耦:生产者和消费者完全独立,可独立部署和扩展。
- 削峰填谷:突发的大量请求会被队列缓冲,数据库不会被打垮。
- 可靠性:提供消息确认、重试、死信队列等机制。
- 扩展性:可以轻松增加消费者来提高处理能力。
总结建议:对于你的场景,引入消息队列是解决多线程顺序操作数据库的标准且优雅的方案。
可以这样做 ,但是没道理 , 没有压测数据对比的情况提前优化不可行

