Python分布式任务队列Celery结合RabbitMQ时如何处理文件传递问题?

需求:web 应用,有几个非常耗时任务(基于深度学习算法,时间复杂度高)需要通过分发请求任务到服务器结点 A、B、C 上,提交任务时需要文件(数据集),执行完后生成文件,需要将 A、B、C 结果文件统一存放在 A 上

框架:本人使用 Django+Celery+rabbitmq 进行任务分发,问题在于 django 接收的 request 中 FILE 如何传递给 rabbitmq,而执行完任务后结果文件如何写回到 A 中?

配置

A B C: broker: amqp://A  backend: amqp://A
A broker  B: worker   C: worker

相关代码:

celery: A demo/demo/celery.py

A demo/web/view.py

def handle_upload_como(request): 
    ...
    dataset = request.FILE.get("data", None)
    ...
	//这里传递文件 dataset 不能成功
    res = taskA.delay(dataset, args)
    res.ready()
	//获取结果文件 resf
	//写入 resf 到 A 结点的统一存储路径中
    ...

A demo/web/task.py

[@share_app](/user/share_app)
def taskA(data, args):
    ...
    run(data,args)
    ...
    //result 写在路径: /var/www/result/中,存储结点为当前执行任务结点
    //如何返回 result 结果
    return 'ok'

若上述配置有误,也请多多指正,上述问题应如何解决呢?


Python分布式任务队列Celery结合RabbitMQ时如何处理文件传递问题?

4 回复

A 提供一个 NFS server,传递文件路径过去


在Celery里直接传大文件不是个好主意,会把消息队列搞崩。我一般用这两种方式:

方案一:存路径,不传文件 把文件存到共享存储(比如NFS、S3),任务只传文件路径:

# tasks.py
@app.task
def process_file(file_path):
    with open(file_path, 'r') as f:
        # 处理文件
    return result

# 调用时
file_path = '/shared/storage/data.csv'
process_file.delay(file_path)

方案二:小文件用base64编码 如果文件不大(<几百KB),可以编码后传递:

import base64

@app.task
def process_file_data(file_data_b64):
    file_data = base64.b64decode(file_data_b64)
    # 处理二进制数据
    return result

# 调用时
with open('data.csv', 'rb') as f:
    encoded = base64.b64encode(f.read()).decode()
process_file_data.delay(encoded)

第一种方案更靠谱,特别是生产环境。RabbitMQ消息体别超过128MB,不然各种问题都来了。

总结:用共享存储传路径最稳妥。

我查了下 Stack Overflow,解决方法有以下几种:
1.rabbitmq 不支持文件传输,但支持 json 传输,所以可以用 json.dump({‘data’:encode(dataset)})传输,B 端进行反序列化解码
2.如 所说,传递 A:file/to/path,BC 远程复制读取
3.文件内容解析为 xml 或 json 传输
方法略有缺点,若有更好的方法或者更好的架构框架或者基于 django 的分布式框架也可以讨论一二

  1. json 序列化数据后传输,节点拿到后反序列化后执行。

    2. 提供 http 下载接口,celery 可以分发任务让节点通过 http 下载对应数据集。
回到顶部