Python分布式任务队列Celery处理结果如何入库?

鄙人做了一个小项目,部署了 celery 的分布式,本来结果是直接在节点往主控节点的 mysql 里面存入的。

问题来了,有 leader 表示这个不太安全,直接互联网上暴露了 mysql 接口,降权神马的也欠考虑。

另外,leader 觉得这个东西,一旦节点部署多了以后,主控端唯一的 mysql 会负荷不了高速大量的数据写入。 所以他建议我找几台能缓冲的中间节点,借助上面的某种机制来存入 mysql,而不是将 mysql 接口直接暴露给 celery 的节点。 同时,这样有缓冲,数据库写入的稳定性也能得到保证。

鄙人水平有限,是野路子 coder,所以没有想到特别好的法子,求大家指点下迷津。

在线等,很急,leader 这两天在催我要方案。 感谢!!!

PS:感觉这个应该经验丰富些的大佬都能帮我解答的,这帖子不能跟以前似的沉了吧。。


Python分布式任务队列Celery处理结果如何入库?

19 回复

这里怕大家没看懂,故此补充下,我这边是跟大家求方案细节,我才好去做计划。
如果有大佬做过或者有过完善点的想法,劳烦多敲几个字指点下小弟,感谢感谢!


在Celery里处理任务结果入库,最直接的方式是用backend参数。你需要在创建Celery实例时配置一个结果后端,比如用RabbitMQ(RPC)、Redis或者数据库。这里给你一个用Redis做后端,并把结果存到MySQL的完整例子。

首先,装好依赖:

pip install celery redis pymysql

然后是你的tasks.py

from celery import Celery
import pymysql
from pymysql.cursors import DictCursor
import json
import time

# 配置Celery,用Redis做broker和backend
app = Celery('my_tasks',
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/0')

# 一个简单的任务
@app.task
def process_data(data_id):
    # 模拟一些处理
    time.sleep(2)
    result = {'id': data_id, 'status': 'processed', 'value': data_id * 10}
    return result

# 另一个任务,专门用来把结果存到MySQL
@app.task
def save_to_mysql(task_result):
    # 连接MySQL
    connection = pymysql.connect(
        host='localhost',
        user='your_user',
        password='your_password',
        database='celery_results',
        charset='utf8mb4',
        cursorclass=DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            # 创建表(如果不存在)
            create_table_sql = """
            CREATE TABLE IF NOT EXISTS task_results (
                id INT AUTO_INCREMENT PRIMARY KEY,
                task_id VARCHAR(255) NOT NULL,
                result_data JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
            cursor.execute(create_table_sql)
            
            # 插入结果
            # 注意:这里task_result是之前任务的返回值,我们假设它是个字典
            insert_sql = "INSERT INTO task_results (task_id, result_data) VALUES (%s, %s)"
            # 用当前任务的ID作为task_id,实际使用时你可能需要传递原始任务的ID
            cursor.execute(insert_sql, (save_to_mysql.request.id, json.dumps(task_result)))
        
        connection.commit()
    finally:
        connection.close()
    
    return f"Result saved to MySQL for task {save_to_mysql.request.id}"

# 链式调用示例:先处理数据,然后把结果存到MySQL
def process_and_save(data_id):
    # 使用chain:process_data执行完后,结果传给save_to_mysql
    from celery import chain
    result_chain = chain(process_data.s(data_id), save_to_mysql.s())
    return result_chain()

运行worker:

celery -A tasks.app worker --loglevel=info

调用任务:

from tasks import process_and_save

# 这会先执行process_data,然后把结果传给save_to_mysql
async_result = process_and_save(42)
print(f"Task chain started with ID: {async_result.id}")

这个方案的核心是用chain把两个任务串起来,第一个任务处理业务,第二个任务专门负责入库。Redis backend用来存中间状态,MySQL做持久化存储。注意处理好数据库连接和错误重试。

简单说就是用任务链把业务处理和入库分开。

小项目确定真的有“高速大量”的数据写入么?不如先看看 mysql 到底能不能承受吧。

首先有个小问题,为神马说“直接互联网上暴露了 mysql 接口” 。难道你的 web 应用框架,是直接把后台处理数据库部分代码直接暴露在 web 访问的表层了?我记得 Django 一类的 web 框架都有很好的分层处理的概念吧,譬如 mvc 等。

……数据写入 QPS 是多少…… 得根据具体业务量看是否需要优化~“提前优化是万恶之源”

小项目就要上 celery 分布式,这确定有必要吗?

celery 不是往 broker 发消息的吗?写个代理把 broker 里面的结果存到 MySQL 就好了吧

不知道具体业务如何。根据你的描述
1 分理一个写数据库的 task
2 这个 task 只允许在主节点跑

上面可以解决不暴露 mysql 给其他 celery node 这个目标。
另外 celery 是基于消息队列的,这本身就是你写 mysql 的缓冲。

直接互联网上暴露了 mysql 接口
=====================
这个问题有很多简单的方式可以解决

为什么会暴露 MySQL 接口啊求教

项目是小,但类似于分布式采集数据类的东西,每个任务都会有写入数据的操作,所以一旦节点多了以后,不做写入缓冲,确实可能存在数据写入过量。

leader 的意思是,既然是分布式的,那每个节点我都需要配置 mysql 连接信息,既然部署到公网,无论是传输途中,还是节点服务器被黑,都是有可能泄露主控端的数据的。
大佬,求教如何解决,随便甩给小弟几个方案就成^_^。
小弟看看怎么方便实施,先感谢下~

celery worker 作为 producer 丢结果到 queue 里,然后 mysql 那一头作为 consumer 处理结果入库。

既然有 leader 为什么不问他。。。。

部署到公网,这个题无解
MySQL 有暴露的风险,你的 celery broker 一样会暴露,而且 broker 的信息一样是全的

方案 1 服务器之间建立 VPN 连接,MySQL 服务配置只接受来自 VPN 内网 IP 的连接。用 VPN 的验证机制来保证安全。当然,VPN 可能网络稳定性会是个问题。 方案 2 配置 MySQL 主机的 iptables,使之对 3306 端口只放行来自特定 IP 的连接。
当然我说的两种方案可能都有弊端。但是我想表达的意思是,安全问题可以找一些成熟的工具来解决而不是因为顾虑安全问题而放弃高效的工具。

您的意思还是将 mysql 处理这块作为节点吧?
leader 不是做开发的。
不会啊,为了避免 broker 量过载,我那边计划的是跑完一批任务,就清空一次,不然肯定占用太多了。
大佬说的蛮对的,个人觉得即使做缓冲也无法解决安全问题,确实需要第三方进行加固。

考虑做成 RPC ?

可以考虑,我试试,本来是做的服务集群,谢谢指点~

回到顶部