Python中如何统一管理几十个业务线程对MySQL的顺序读写操作,是否适合使用消息队列技术?

Python,目前是几十个业务线程都在操作 MySQL 读写,主要是怕写操作时加锁什么的等待时间长了会不会超时,返回失败,程序目前没有检测操作未成功重试的机制。
想写一个函数 /类,统一顺序化的操作数据库,获取到操作结果之后返回。业务线程统一通过这个函数 /类操作数据库。这个函数 /类操作数据库是单线程的,不是并发的,这样就要一个缓存队列的,消息队列是不是就是做这个事情的?不然直接调用函数的话还是直接并发了,等于没改。
做成同步和阻塞的没有关系,Python 异步编程还不太会呢。。
Python 下 Google 了一下找到了 zmq,好像 pip 安装完就可以使用。
受教。
Python中如何统一管理几十个业务线程对MySQL的顺序读写操作,是否适合使用消息队列技术?


2 回复

对于管理几十个业务线程对MySQL的顺序读写操作,使用消息队列(如RabbitMQ、Kafka)是非常合适的方案。核心思路是把并发请求转为串行处理,避免直接竞争数据库连接和锁。

下面是一个使用threadingqueue模块实现的简单示例,它模拟了多个业务线程通过一个共享队列进行顺序化数据库操作:

import threading
import queue
import time
import random
import pymysql
from datetime import datetime

# 模拟的数据库操作函数
def mock_db_operation(task_data):
    """模拟一个数据库写入或更新操作"""
    print(f"[{datetime.now().strftime('%H:%M:%S')}] 线程 {threading.current_thread().name} 正在处理任务: {task_data}")
    # 这里替换为真实的 pymysql 操作,例如:
    # connection = pymysql.connect(...)
    # with connection.cursor() as cursor:
    #     sql = "INSERT INTO your_table (data) VALUES (%s)"
    #     cursor.execute(sql, (task_data,))
    # connection.commit()
    # connection.close()
    time.sleep(random.uniform(0.1, 0.5))  # 模拟操作耗时
    print(f"[{datetime.now().strftime('%H:%M:%S')}] 线程 {threading.current_thread().name} 完成任务: {task_data}")
    return f"Processed: {task_data}"

# 工作线程函数,从队列中取出任务执行
def worker(task_queue, result_queue):
    while True:
        task = task_queue.get()
        if task is None:  # 终止信号
            task_queue.task_done()
            break
        try:
            result = mock_db_operation(task)
            result_queue.put(result)
        except Exception as e:
            result_queue.put(f"Error processing {task}: {e}")
        finally:
            task_queue.task_done()

def main():
    # 创建任务队列和结果队列
    task_queue = queue.Queue()
    result_queue = queue.Queue()

    # 创建并启动工作线程池 (这里启动3个线程作为消费者)
    num_worker_threads = 3
    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker, args=(task_queue, result_queue), name=f"Worker-{i+1}")
        t.start()
        threads.append(t)

    # 模拟多个业务线程(生产者)生成任务
    def business_thread_producer(thread_id):
        for j in range(5):  # 每个业务线程生成5个任务
            task_data = f"业务线程{thread_id}-任务{j+1}"
            task_queue.put(task_data)
            time.sleep(random.uniform(0.05, 0.2))  # 模拟任务产生间隔

    # 启动多个业务线程(生产者)
    business_threads = []
    for i in range(5):  # 模拟5个并发的业务线程
        bt = threading.Thread(target=business_thread_producer, args=(i+1,), name=f"Business-{i+1}")
        bt.start()
        business_threads.append(bt)

    # 等待所有业务线程生产完毕
    for bt in business_threads:
        bt.join()

    # 等待任务队列中的所有任务被处理完成
    task_queue.join()

    # 发送终止信号给工作线程
    for _ in range(num_worker_threads):
        task_queue.put(None)
    for t in threads:
        t.join()

    # 输出结果
    print("\n=== 处理结果 ===")
    while not result_queue.empty():
        print(result_queue.get())

if __name__ == "__main__":
    main()

代码解释:

  1. 任务队列 (task_queue):所有业务线程(生产者)将需要执行的数据库操作封装成“任务”放入此队列。
  2. 工作线程池:一组专用的工作线程(消费者)从task_queue顺序取出任务并执行。queue.Queue本身是线程安全的,并且默认是FIFO(先进先出),这保证了任务被处理的顺序性。
  3. 业务线程:你的几十个业务线程不再直接操作数据库,而是将操作请求放入队列。它们可以快速返回,继续处理其他业务逻辑,实现了生产与消费的解耦。
  4. 顺序性保证:由于任务由单个工作线程逐个处理(或者由有限数量的工作线程从同一个队列中取),对MySQL的访问自然就是串行的,彻底避免了并发写冲突、锁超时等问题。

为什么消息队列更合适? 虽然上面的例子用了内置队列,但在分布式系统或需要更高可靠性、持久化、扩展性的场景下,使用专业的消息队列中间件(如RabbitMQ、Kafka、RocketMQ)是更优解。它们能提供:

  • 持久化:任务不会因为程序重启而丢失。
  • 解耦:生产者和消费者完全独立,可独立部署和扩展。
  • 削峰填谷:突发的大量请求会被队列缓冲,数据库不会被打垮。
  • 可靠性:提供消息确认、重试、死信队列等机制。
  • 扩展性:可以轻松增加消费者来提高处理能力。

总结建议:对于你的场景,引入消息队列是解决多线程顺序操作数据库的标准且优雅的方案。


可以这样做 ,但是没道理 , 没有压测数据对比的情况提前优化不可行

回到顶部