Python中如何用Threading实现并发执行100个耗时几分钟的Class/函数?

Python 初学者求教。。
python 写的服务器端,每次收到请求命令的时候,需要执行 100 个 Class ( Class 就包含一个函数具体干活),因为请求来自不同客户,所以可能会同时有几十个请求。
最好还能实现客户能对没执行完的那些 Class 进行 Pause/Resume/Terminate 暂停执行 /恢复执行 /终止执行
多线程能完成这个项目需求吗?
class serv100:
def serv( parameter):
print (“abc”)

class serv199:
def serv( parameter):
print(“xyz”)


def handleClientRequest(options):
???

初学者求教,???这边大概的代码怎么写?一开始要 import 哪些 library? Thanks in advance.
Python中如何用Threading实现并发执行100个耗时几分钟的Class/函数?


8 回复

100 個 def 的函數都做到 1 個 Class 裡是不是要好一些?
Class server(option):
def serv100:
print()
def serv101:
print()

def serv199:
print()


import threading
import time
from queue import Queue

class TaskProcessor:
    def __init__(self, task_id):
        self.task_id = task_id
        
    def process(self):
        """模拟耗时几分钟的任务"""
        print(f"任务 {self.task_id} 开始执行")
        time.sleep(180)  # 模拟3分钟的任务,实际使用时替换为真实逻辑
        print(f"任务 {self.task_id} 执行完成")
        return f"任务 {self.task_id} 结果"

def worker(task_queue, results):
    """工作线程函数"""
    while not task_queue.empty():
        try:
            task = task_queue.get_nowait()
            result = task.process()
            results.append(result)
        except Queue.Empty:
            break
        finally:
            task_queue.task_done()

def main():
    # 1. 创建100个任务实例
    tasks = [TaskProcessor(i) for i in range(100)]
    
    # 2. 创建任务队列
    task_queue = Queue()
    for task in tasks:
        task_queue.put(task)
    
    # 3. 创建线程池(建议根据CPU核心数调整)
    max_workers = 10  # 控制并发数,避免资源耗尽
    threads = []
    results = []
    
    # 4. 启动工作线程
    for _ in range(max_workers):
        thread = threading.Thread(target=worker, args=(task_queue, results))
        thread.start()
        threads.append(thread)
    
    # 5. 等待所有任务完成
    task_queue.join()
    
    # 6. 等待所有线程结束
    for thread in threads:
        thread.join()
    
    print(f"所有任务完成,共处理 {len(results)} 个任务")

if __name__ == "__main__":
    main()

关键点说明:

  1. 任务封装:用TaskProcessor类封装每个耗时任务,process()方法包含具体业务逻辑
  2. 线程安全队列:使用Queue管理任务分发,避免线程竞争
  3. 工作线程模式:每个线程从队列获取任务执行,队列空时自动退出
  4. 并发控制:通过max_workers限制同时运行的线程数(建议设为CPU核心数的2-4倍)
  5. 结果收集:通过共享的results列表收集任务结果(注意:如果涉及复杂数据结构,可能需要加锁)

替代方案:

# 使用concurrent.futures(Python 3.2+)
from concurrent.futures import ThreadPoolExecutor

def run_tasks():
    with ThreadPoolExecutor(max_workers=10) as executor:
        tasks = [TaskProcessor(i) for i in range(100)]
        results = list(executor.map(lambda t: t.process(), tasks))
    return results

用线程池更简洁,自动管理线程生命周期。

一句话总结:用线程池+队列控制并发,避免直接创建100个线程。

第一次遇到 Pause/Resume/Terminate 这样的需求,话说服务器构造的返回不是都要尽量快吗

是 C/S 还是 B/S ?如果是 B/S, 那可以让 wsgi 来处理接收并发的客户请求。 取决于你选择什么样的 python wsgi 实现。 对应的实现会处理好不同客户不同时间发起的多个请求,每个请求相互间应该是线程 /进程独立的。(取决于你选择的 wsgi 实现)。 所以你只需要考虑再收到客户请求的时候( http get/post ) 你要如何执行那 100 个 class。 这里有两种方案:

1. 客户请求( http get/post )进入,你采用阻塞的方式运行 100 个 class,全部运行结束返回给客户运行结果。 这期间你可以用全局变量或者别的方式控制每个阻塞进程运行的情况。 但这样无法完全实现你想要的 pasue/resume/terminate 因为一旦客户端口链接,一般 wsgi 的实现会销毁掉你正在阻塞的处理线程 /进程。
2. 客户请求( http get/post ) 进入, 你开启非阻塞线程 /进程来跑 100 个 class, 然后立刻返回客户成功运行程序。 后台需要全局变量(或别的实现方式, 我常用类里面的静态变量和方法来代替全局变量)来控制。

总结而言, 用成熟的 python web 框架帮你处理用户请求,再自己去选择请求内创建自己可以控制的进程或线程完成任务,维护全局信息表来索引实现你的控制。 至于 是用 threading 还是 mulitprocess 则见仁见智了。 另外推荐用 APScheduler 这个包, 分装了 Pause/Resume/Terminate。

有不对的地方,还请指正~

因为每个函数都是要运行几分钟到几十分钟才能完成任务的,资源消耗都不大,但是运行时间比较长

多谢解答,C/S 的桌面应用,Server 端接收到 Client 端请求后,后台运行这 100 个 thread/process,然后直接返回的,函数 /Class 所执行任务的结果在另外的地方会有体现,所以 Server 端要做的就是把函数扔到后台运行。
目前是直接 Flask + FlaskRestful API,还没有用到其它的 Web 框架。

感觉应该用个消息队列吧。每次来个请求,把任务放进队列就可以返回了。

Flask ( wsgi ) + FalskRestful API 足够解决了, 配合 APScheduler。

回到顶部