Python分布式任务队列Celery如何启动多个线程?

RT =-=小白想要提升一下现在的一个任务的性能

假设我们有一个 api 多次调用的需求在一个 celery 的 task 里面

最普通的方法可以是这样:

[@task](/user/task)
def foo():
	for i in range(20):
    	call_api_here(i)
    do_something_else()

但是这个很耗时因为会卡在 api 调用上

然后如果我们用 multiprocessing (不是一个好的注意,但是粗暴), celery 有自己的 library 叫做 billiard.

然后我们可以这样做

[@task](/user/task)
def foo():
	with Pool(5) as p:
    	p.map(call_api_here, some_params)
    do_something_else()    

但是这样的问题是,每一个 task 都会开启一个 pool 然后结束后这个 pool 并没有被释放,最终导致内存被各种占用

所以我想了另外一个办法,用 multithreading

def foo()
    threads = []
    for i in range(20):
            t = threading.Thread(target=call_api_here, args=(i,))
            threads.append(t)
for t in threads:
    t.start()

for t in threads:
    t.join()

但是试了下发现,虽然线程会随着进程结束而被销毁,但是貌似在每个 task 里面只有前几个线程执行了。。。后面的都 gg 了。。。

想问问各位大佬有什么好的方法嘛😂


Python分布式任务队列Celery如何启动多个线程?

11 回复

from time import sleep
from concurrent.futures import ThreadPoolExecutor\ ProcessPoolExecutor
def child_1():
sleep(9)
print(1)


def child_2():
sleep(2)
print(2)



def child_3():
sleep(3)
print(3)



def child_4():
sleep(1)
print(4)


def child_5():
sleep(2)
print(5)



with ThreadPoolExecutor\ProcessPoolExecutor(max_workers=5) as executor:
executor.submit(child_1)
executor.submit(child_2)
executor.submit(child_3)
executor.submit(child_4)
executor.submit(child_5)


t1= executor.submit(child_1)
t2=executor.submit(child_2)
t3=executor.submit(child_3)
t4=executor.submit(child_4)
t5=executor.submit(child_5)

print(t1.result())


Celery本身不直接启动线程,它是通过启动多个工作进程(Worker)来并发处理任务的。每个工作进程内部会使用预派生(prefork)模型创建多个子进程(或者协程/线程,取决于并发模式)来执行任务。要增加并发度,你需要配置Worker的并发参数。

最常用的方式是使用celery worker命令的--concurrency(或-c)参数。例如,要启动一个Worker并让它使用4个子进程来处理任务,可以这样:

celery -A your_project worker --loglevel=info --concurrency=4

这里的your_project是你的Celery应用所在的模块。

更常见的生产环境配置:

通常我们会在启动Worker时进行更详细的配置。假设你的应用对象在proj/celery.py中,名为app

celery -A proj worker --loglevel=info --concurrency=10 --hostname=worker1@%h

通过代码配置:

你也可以在创建Celery应用时设置默认的并发数:

from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/0',
             include=['proj.tasks'])

# 设置默认的并发工作进程数
app.conf.worker_concurrency = 8

关于“线程”的特别说明:

你问题中提到了“线程”。Celery的默认并发模型是预派生多进程(prefork),这更稳定,能利用多核CPU。但它也支持其他并发模式,其中就包括多线程(--pool=threads)。如果你的任务是I/O密集型且需要共享状态,可以考虑使用线程池。启动一个使用线程池的Worker:

celery -A your_project worker --pool=threads --concurrency=50

注意:使用线程池时,可以设置更高的并发数(比如50或100),因为线程比进程更轻量。但Python的GIL会导致CPU密集型任务在纯线程模式下性能不佳。

总结一下:

要增加Celery处理任务的能力,核心是调整Worker的--concurrency参数。根据任务类型(CPU密集型或I/O密集型)和是否需要共享内存,选择进程池(默认)或线程池。

简单说:用-c参数指定并发数,I/O密集型任务可考虑换用线程池。

这样的话每个任务都会创建一个 pool 还是占着内存呀

y worker -A app.tasks.celery -l INFO -Q default -c 20 (每个队列多搞几个 worker ) -n default_worker.%%i

import asyncio

async def slow_operation(n):
await asyncio.sleep(n)
print(‘Slow operation {} complete’.format(n))
return n


loop = asyncio.get_event_loop()
done, _ = loop.run_until_complete(
asyncio.wait([
slow_operation(1),
slow_operation(2),
slow_operation(9),
slow_operation(2),
slow_operation(1),
slow_operation(2),
slow_operation(3),
]))
for fut in done:
print(“return value is {}”.format(fut.result()))

然后用 uvloop

关键词 celery worker

不能拆成多个 celery 任务,让 celery 去管嘛?

你的意思是对 api 的调用也拆成 celery 任务嘛=-=?这个思路也有考虑
优点是 每个任务都可以追踪
缺点是 任务颗粒度太高,很吃存储

同意楼上的拆成很多任务

遇到同样的问题,一个 task 启动了,然后在 task 里面读取一个目录下面的所有文件,查找文件内容是否包含我要查找的字符串,如果不用 threading,就是一个文件一个文件的读取,效率真的很低,请问各位大佬该怎么办?


每个文件启动一个 worker 感觉也很 low 啊

用 threading

回到顶部