Python中多线程和多进程如何同时共用?

下面这段代码在 windows 下运行就报错 NameError: name 'SEM' is not defined

在 windows7 windows10 测试 报错

在 Mac Linux 下测试 能正常运行

如果我在代码的最上面给 SEM 和 TIME_OUT 赋值后,就能正常运行

但是我需要用户加参数将值带进去,而不是事先定义好,所以只能放到 argparse 里面。我现在很疑惑,这个问题能解决吗?

时间过了两天了,依旧没有解决这个问题,希望小伙伴们能和我一起讨论一下怎么回事,谢谢!

# /usr/bin/env python
# _*_ coding:utf-8 _*_

import argparse, datetime, time import threading from multiprocessing import Pool

def test(i): with SEM: print(i) time.sleep(TIME_OUT)

def test1©: threads = [] for i in range(int(200)): t = threading.Thread(target=test, args=(i,)) t.start() threads.append(t) for t in threads: t.join()

def main(): start_time = time.time() print(datetime.datetime.now().strftime(‘Start Time: %m/%d/%Y %H:%M:%S’)) p = Pool(int(args.PROCESS)) for i in range(10): p.apply_async(test1, args=(i,)) p.close() p.join() print(datetime.datetime.now().strftime(‘Complete time: %m/%d/%Y %H:%M:%S’)) print(“Total time:%s” % (time.time() - start_time))

if name == ‘main’: parser = argparse.ArgumentParser(description=‘test’) parser.add_argument(’-m’, dest=‘PROCESS’, type=int, help=‘多进程数量,默认 5’, default=5) parser.add_argument(’-t’, dest=‘THREAD’, type=int, help=‘多线程数量,默认 20’, default=20) parser.add_argument(’-o’, dest=‘OVERTIME’, type=int, help=‘超时时间 默认 2’, default=2) args = parser.parse_args() SEM = threading.Semaphore(int(args.THREAD)) TIME_OUT = int(args.OVERTIME) main()


Python中多线程和多进程如何同时共用?

23 回复

Windows 不支持 fork 吧?没把变量拷贝到别的进程。


在Python里同时用多线程和多进程,核心思路是让进程负责CPU密集型任务,线程负责I/O密集型任务。最常见的方式是用multiprocessing创建进程池,每个进程内部再用concurrent.futures开线程池。

看个具体例子,假设我们要处理一堆文件(I/O多)并对内容做复杂计算(CPU多):

import concurrent.futures
import multiprocessing
from multiprocessing import Pool
import threading
import os

# 这是CPU密集型任务,放在进程里
def cpu_intensive_task(data_chunk):
    # 模拟复杂计算
    result = sum(i*i for i in range(10000))
    return f"Process {os.getpid()}: {result}"

# 这是I/O密集型任务,在线程池里处理
def io_intensive_task(file_path):
    # 模拟文件读取
    with threading.Lock():
        print(f"Thread {threading.get_ident()} reading {file_path}")
    # 这里可以实际读取文件
    return f"Processed {file_path}"

# 混合工作函数:每个进程执行这个函数
def hybrid_worker(file_paths):
    results = []
    # 在进程内部创建线程池处理I/O
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as thread_pool:
        thread_results = list(thread_pool.map(io_intensive_task, file_paths))
        results.extend(thread_results)
    
    # 对处理后的数据做CPU密集型计算
    cpu_result = cpu_intensive_task(thread_results)
    results.append(cpu_result)
    
    return results

def main():
    # 模拟一批文件路径
    all_files = [f"file_{i}.txt" for i in range(20)]
    
    # 将文件分成4批,给4个进程处理
    chunk_size = len(all_files) // 4
    file_chunks = [all_files[i:i+chunk_size] for i in range(0, len(all_files), chunk_size)]
    
    # 创建进程池
    with Pool(processes=4) as process_pool:
        # 每个进程处理一批文件,内部再用线程池
        all_results = process_pool.map(hybrid_worker, file_chunks)
    
    # 展平结果
    flat_results = [item for sublist in all_results for item in (sublist if isinstance(sublist, list) else [sublist])]
    
    for result in flat_results:
        print(result)

if __name__ == "__main__":
    main()

这里的关键点:

  1. multiprocessing.Pool创建了4个进程
  2. 每个进程执行hybrid_worker函数
  3. hybrid_worker内部,用ThreadPoolExecutor创建线程池处理I/O任务
  4. 线程任务完成后,在同一个进程内做CPU密集型计算

这样既利用了多进程绕过GIL做并行计算,又用多线程高效处理I/O。注意数据传递要用pickleable的对象,进程间通信可以用multiprocessing.Queue

简单说就是进程管计算,线程管I/O,各司其职。

请问这个还有办法解决吗

在 test1 中初始化 SEM。

咋个初始化呢,是这样么? 但是这样之后程序一秒运行完成,没有执行里面的操作
python<br><br> global SEM<br> SEM = threading.Semaphore(int(args.THREAD))<br>

那我也不知道了……是我不懂 Python。

你要把这两个变量放到全局变量中去

不好意思,感谢你的回复,感觉可能惹你生气了,我对 Python 也不是太熟练,听到你说了在 test1 里面初始化 SEM,我想了一下,想到了我可以先将:
sem = int(args.THREAD)
time_out = int(args.OVERTIME)
放到 main 函数里面,然后通过传参方式,从 mian-> test1->test 最后直接在 test 里面写入
with threading.Semaphore(sem): 然后可以成功的运行,感谢你的回复

放到全局变量里面 不管怎么样,test 函数就是不识别

我没生气啊,我真的不懂啊……
我从来没用过 multiprocess,前面只是随口说说以为可以解决问题,但解决不了我就真的不懂了。

一般来说,可以认为 Windows 不能直接使用 multiprocessing,也就是不支持多进程。

多进程好像可以设一个 value 作为公用的变量,类似锁吧。
多线程以前试过,但没什么印象了。反正最后用多进程了。

  1. 多线程实际上还是利用单核, 直接用多进程.
    2. Python 在共享全局变量这块做的不太好, 多线程场景可以查询下 thread local 相关的资料.
    多进程考虑 multiprocessing 的 value 机制, multiprocessing 支持一个 daemon 进程维护这组变量, 多进程通过 IPC

接上一跳评论,理解错题意了, win 下的机制比较诡异.

multi process 本身就不能假设同一个东西会 share between processes…
然后就是 message-passing or shared memory
https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Value

把这两个变量放到全局变量中去

多进程是不能共享变量的,可以用共享内存试试

shared_mem = manager.dict() 这一句就是添加一个共享内存,可以在多进程之间使用
<br># /usr/bin/env python<br># _*_ coding:utf-8 _*_<br><br><br>import argparse, datetime, time<br>import threading<br>import multiprocessing<br><br><br>def test(i):<br> with SEM:<br> print(i)<br> time.sleep(TIME_OUT)<br><br><br>def test1(c, data):<br> global SEM<br> global TIME_OUT<br> SEM = data['SEM']<br> TIME_OUT = datda['TIME_OUT']<br><br> threads = []<br> for i in range(200):<br> t = threading.Thread(target=test, args=(i,))<br> t.start()<br> threads.append(t)<br> for t in threads:<br> t.join()<br><br><br>def main(args):<br> start_time = time.time()<br> print(datetime.datetime.now().strftime('Start Time: %m/%d/%Y %H:%M:%S'))<br> manager = multiprocessing.Manager()<br> p = multiprocessing.Pool(int(args.PROCESS))<br> <br> shared_mem = manager.dict()<br> shared_mem['SEM'] = threading.Semaphore(int(args.THREAD))<br> shared_mem['TIME_OUT'] = int(args.OVERTIME)<br> <br> for i in range(10):<br> p.apply_async(test1, args=(i, shared_mem))<br> p.close()<br> p.join()<br> print(datetime.datetime.now().strftime('Complete time: %m/%d/%Y %H:%M:%S'))<br> print("Total time:%s" % (time.time() - start_time))<br><br><br>if __name__ == '__main__':<br> parser = argparse.ArgumentParser(description='test')<br> parser.add_argument('-m', dest='PROCESS', type=int, help='多进程数量,默认 5', default=5)<br> parser.add_argument('-t', dest='THREAD', type=int, help='多线程数量,默认 20', default=20)<br> parser.add_argument('-o', dest='OVERTIME', type=int, help='超时时间 默认 2', default=2)<br> main(parser.parse_args())<br>

在函数里引用了
既不是参数,又不是本地定义
的变量
这是一种很典型的错误

显试的传递一下不行吗……

一般来说,可以认为 Windows 不能直接使用 multiprocessing,也就是不支持多进程。

多进程不共享全局变量, 把 SEM 存放在消息队列里,就应该好了…

回到顶部