Python分布式任务队列Celery如何启动多个线程?
RT =-=小白想要提升一下现在的一个任务的性能
假设我们有一个 api 多次调用的需求在一个 celery 的 task 里面
最普通的方法可以是这样:
[@task](/user/task)
def foo():
for i in range(20):
call_api_here(i)
do_something_else()
但是这个很耗时因为会卡在 api 调用上
然后如果我们用 multiprocessing (不是一个好的注意,但是粗暴), celery 有自己的 library 叫做 billiard.
然后我们可以这样做
[@task](/user/task)
def foo():
with Pool(5) as p:
p.map(call_api_here, some_params)
do_something_else()
但是这样的问题是,每一个 task 都会开启一个 pool 然后结束后这个 pool 并没有被释放,最终导致内存被各种占用
所以我想了另外一个办法,用 multithreading
def foo()
threads = []
for i in range(20):
t = threading.Thread(target=call_api_here, args=(i,))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
但是试了下发现,虽然线程会随着进程结束而被销毁,但是貌似在每个 task 里面只有前几个线程执行了。。。后面的都 gg 了。。。
想问问各位大佬有什么好的方法嘛😂
Python分布式任务队列Celery如何启动多个线程?
from time import sleep
from concurrent.futures import ThreadPoolExecutor\ ProcessPoolExecutor
def child_1():
sleep(9)
print(1)
def child_2():
sleep(2)
print(2)
def child_3():
sleep(3)
print(3)
def child_4():
sleep(1)
print(4)
def child_5():
sleep(2)
print(5)
with ThreadPoolExecutor\ProcessPoolExecutor(max_workers=5) as executor:
executor.submit(child_1)
executor.submit(child_2)
executor.submit(child_3)
executor.submit(child_4)
executor.submit(child_5)
t1= executor.submit(child_1)
t2=executor.submit(child_2)
t3=executor.submit(child_3)
t4=executor.submit(child_4)
t5=executor.submit(child_5)
print(t1.result())
Celery本身不直接启动线程,它是通过启动多个工作进程(Worker)来并发处理任务的。每个工作进程内部会使用预派生(prefork)模型创建多个子进程(或者协程/线程,取决于并发模式)来执行任务。要增加并发度,你需要配置Worker的并发参数。
最常用的方式是使用celery worker命令的--concurrency(或-c)参数。例如,要启动一个Worker并让它使用4个子进程来处理任务,可以这样:
celery -A your_project worker --loglevel=info --concurrency=4
这里的your_project是你的Celery应用所在的模块。
更常见的生产环境配置:
通常我们会在启动Worker时进行更详细的配置。假设你的应用对象在proj/celery.py中,名为app:
celery -A proj worker --loglevel=info --concurrency=10 --hostname=worker1@%h
通过代码配置:
你也可以在创建Celery应用时设置默认的并发数:
from celery import Celery
app = Celery('proj',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
include=['proj.tasks'])
# 设置默认的并发工作进程数
app.conf.worker_concurrency = 8
关于“线程”的特别说明:
你问题中提到了“线程”。Celery的默认并发模型是预派生多进程(prefork),这更稳定,能利用多核CPU。但它也支持其他并发模式,其中就包括多线程(--pool=threads)。如果你的任务是I/O密集型且需要共享状态,可以考虑使用线程池。启动一个使用线程池的Worker:
celery -A your_project worker --pool=threads --concurrency=50
注意:使用线程池时,可以设置更高的并发数(比如50或100),因为线程比进程更轻量。但Python的GIL会导致CPU密集型任务在纯线程模式下性能不佳。
总结一下:
要增加Celery处理任务的能力,核心是调整Worker的--concurrency参数。根据任务类型(CPU密集型或I/O密集型)和是否需要共享内存,选择进程池(默认)或线程池。
简单说:用-c参数指定并发数,I/O密集型任务可考虑换用线程池。
这样的话每个任务都会创建一个 pool 还是占着内存呀
y worker -A app.tasks.celery -l INFO -Q default -c 20 (每个队列多搞几个 worker ) -n default_worker.%%i
import asyncio
async def slow_operation(n):
await asyncio.sleep(n)
print(‘Slow operation {} complete’.format(n))
return n
loop = asyncio.get_event_loop()
done, _ = loop.run_until_complete(
asyncio.wait([
slow_operation(1),
slow_operation(2),
slow_operation(9),
slow_operation(2),
slow_operation(1),
slow_operation(2),
slow_operation(3),
]))
for fut in done:
print(“return value is {}”.format(fut.result()))
然后用 uvloop
关键词 celery worker
不能拆成多个 celery 任务,让 celery 去管嘛?
你的意思是对 api 的调用也拆成 celery 任务嘛=-=?这个思路也有考虑
优点是 每个任务都可以追踪
缺点是 任务颗粒度太高,很吃存储
同意楼上的拆成很多任务
遇到同样的问题,一个 task 启动了,然后在 task 里面读取一个目录下面的所有文件,查找文件内容是否包含我要查找的字符串,如果不用 threading,就是一个文件一个文件的读取,效率真的很低,请问各位大佬该怎么办?
每个文件启动一个 worker 感觉也很 low 啊
用 threading

