Python中多线程和多进程如何同时共用?
下面这段代码在 windows 下运行就报错 NameError: name 'SEM' is not defined
在 windows7 windows10 测试 报错
在 Mac Linux 下测试 能正常运行
如果我在代码的最上面给 SEM 和 TIME_OUT 赋值后,就能正常运行
但是我需要用户加参数将值带进去,而不是事先定义好,所以只能放到 argparse 里面。我现在很疑惑,这个问题能解决吗?
时间过了两天了,依旧没有解决这个问题,希望小伙伴们能和我一起讨论一下怎么回事,谢谢!
# /usr/bin/env python
# _*_ coding:utf-8 _*_
import argparse, datetime, time
import threading
from multiprocessing import Pool
def test(i):
with SEM:
print(i)
time.sleep(TIME_OUT)
def test1©:
threads = []
for i in range(int(200)):
t = threading.Thread(target=test, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
def main():
start_time = time.time()
print(datetime.datetime.now().strftime(‘Start Time: %m/%d/%Y %H:%M:%S’))
p = Pool(int(args.PROCESS))
for i in range(10):
p.apply_async(test1, args=(i,))
p.close()
p.join()
print(datetime.datetime.now().strftime(‘Complete time: %m/%d/%Y %H:%M:%S’))
print(“Total time:%s” % (time.time() - start_time))
if name == ‘main’:
parser = argparse.ArgumentParser(description=‘test’)
parser.add_argument(’-m’, dest=‘PROCESS’, type=int, help=‘多进程数量,默认 5’, default=5)
parser.add_argument(’-t’, dest=‘THREAD’, type=int, help=‘多线程数量,默认 20’, default=20)
parser.add_argument(’-o’, dest=‘OVERTIME’, type=int, help=‘超时时间 默认 2’, default=2)
args = parser.parse_args()
SEM = threading.Semaphore(int(args.THREAD))
TIME_OUT = int(args.OVERTIME)
main()
Python中多线程和多进程如何同时共用?
Windows 不支持 fork 吧?没把变量拷贝到别的进程。
在Python里同时用多线程和多进程,核心思路是让进程负责CPU密集型任务,线程负责I/O密集型任务。最常见的方式是用multiprocessing创建进程池,每个进程内部再用concurrent.futures开线程池。
看个具体例子,假设我们要处理一堆文件(I/O多)并对内容做复杂计算(CPU多):
import concurrent.futures
import multiprocessing
from multiprocessing import Pool
import threading
import os
# 这是CPU密集型任务,放在进程里
def cpu_intensive_task(data_chunk):
# 模拟复杂计算
result = sum(i*i for i in range(10000))
return f"Process {os.getpid()}: {result}"
# 这是I/O密集型任务,在线程池里处理
def io_intensive_task(file_path):
# 模拟文件读取
with threading.Lock():
print(f"Thread {threading.get_ident()} reading {file_path}")
# 这里可以实际读取文件
return f"Processed {file_path}"
# 混合工作函数:每个进程执行这个函数
def hybrid_worker(file_paths):
results = []
# 在进程内部创建线程池处理I/O
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as thread_pool:
thread_results = list(thread_pool.map(io_intensive_task, file_paths))
results.extend(thread_results)
# 对处理后的数据做CPU密集型计算
cpu_result = cpu_intensive_task(thread_results)
results.append(cpu_result)
return results
def main():
# 模拟一批文件路径
all_files = [f"file_{i}.txt" for i in range(20)]
# 将文件分成4批,给4个进程处理
chunk_size = len(all_files) // 4
file_chunks = [all_files[i:i+chunk_size] for i in range(0, len(all_files), chunk_size)]
# 创建进程池
with Pool(processes=4) as process_pool:
# 每个进程处理一批文件,内部再用线程池
all_results = process_pool.map(hybrid_worker, file_chunks)
# 展平结果
flat_results = [item for sublist in all_results for item in (sublist if isinstance(sublist, list) else [sublist])]
for result in flat_results:
print(result)
if __name__ == "__main__":
main()
这里的关键点:
multiprocessing.Pool创建了4个进程- 每个进程执行
hybrid_worker函数 - 在
hybrid_worker内部,用ThreadPoolExecutor创建线程池处理I/O任务 - 线程任务完成后,在同一个进程内做CPU密集型计算
这样既利用了多进程绕过GIL做并行计算,又用多线程高效处理I/O。注意数据传递要用pickleable的对象,进程间通信可以用multiprocessing.Queue。
简单说就是进程管计算,线程管I/O,各司其职。
请问这个还有办法解决吗
在 test1 中初始化 SEM。
咋个初始化呢,是这样么? 但是这样之后程序一秒运行完成,没有执行里面的操作python<br><br> global SEM<br> SEM = threading.Semaphore(int(args.THREAD))<br>
那我也不知道了……是我不懂 Python。
你要把这两个变量放到全局变量中去
不好意思,感谢你的回复,感觉可能惹你生气了,我对 Python 也不是太熟练,听到你说了在 test1 里面初始化 SEM,我想了一下,想到了我可以先将:
sem = int(args.THREAD)
time_out = int(args.OVERTIME)
放到 main 函数里面,然后通过传参方式,从 mian-> test1->test 最后直接在 test 里面写入
with threading.Semaphore(sem): 然后可以成功的运行,感谢你的回复
放到全局变量里面 不管怎么样,test 函数就是不识别
我没生气啊,我真的不懂啊……
我从来没用过 multiprocess,前面只是随口说说以为可以解决问题,但解决不了我就真的不懂了。
一般来说,可以认为 Windows 不能直接使用 multiprocessing,也就是不支持多进程。
多进程好像可以设一个 value 作为公用的变量,类似锁吧。
多线程以前试过,但没什么印象了。反正最后用多进程了。
- 多线程实际上还是利用单核, 直接用多进程.
2. Python 在共享全局变量这块做的不太好, 多线程场景可以查询下 thread local 相关的资料.
多进程考虑 multiprocessing 的 value 机制, multiprocessing 支持一个 daemon 进程维护这组变量, 多进程通过 IPC
接上一跳评论,理解错题意了, win 下的机制比较诡异.
multi process 本身就不能假设同一个东西会 share between processes…
然后就是 message-passing or shared memory
https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Value
把这两个变量放到全局变量中去
多进程是不能共享变量的,可以用共享内存试试
shared_mem = manager.dict() 这一句就是添加一个共享内存,可以在多进程之间使用<br># /usr/bin/env python<br># _*_ coding:utf-8 _*_<br><br><br>import argparse, datetime, time<br>import threading<br>import multiprocessing<br><br><br>def test(i):<br> with SEM:<br> print(i)<br> time.sleep(TIME_OUT)<br><br><br>def test1(c, data):<br> global SEM<br> global TIME_OUT<br> SEM = data['SEM']<br> TIME_OUT = datda['TIME_OUT']<br><br> threads = []<br> for i in range(200):<br> t = threading.Thread(target=test, args=(i,))<br> t.start()<br> threads.append(t)<br> for t in threads:<br> t.join()<br><br><br>def main(args):<br> start_time = time.time()<br> print(datetime.datetime.now().strftime('Start Time: %m/%d/%Y %H:%M:%S'))<br> manager = multiprocessing.Manager()<br> p = multiprocessing.Pool(int(args.PROCESS))<br> <br> shared_mem = manager.dict()<br> shared_mem['SEM'] = threading.Semaphore(int(args.THREAD))<br> shared_mem['TIME_OUT'] = int(args.OVERTIME)<br> <br> for i in range(10):<br> p.apply_async(test1, args=(i, shared_mem))<br> p.close()<br> p.join()<br> print(datetime.datetime.now().strftime('Complete time: %m/%d/%Y %H:%M:%S'))<br> print("Total time:%s" % (time.time() - start_time))<br><br><br>if __name__ == '__main__':<br> parser = argparse.ArgumentParser(description='test')<br> parser.add_argument('-m', dest='PROCESS', type=int, help='多进程数量,默认 5', default=5)<br> parser.add_argument('-t', dest='THREAD', type=int, help='多线程数量,默认 20', default=20)<br> parser.add_argument('-o', dest='OVERTIME', type=int, help='超时时间 默认 2', default=2)<br> main(parser.parse_args())<br>
在函数里引用了
既不是参数,又不是本地定义
的变量
这是一种很典型的错误
显试的传递一下不行吗……
一般来说,可以认为 Windows 不能直接使用 multiprocessing,也就是不支持多进程。
多进程不共享全局变量, 把 SEM 存放在消息队列里,就应该好了…

