Python中如何实现多线程(可控数量)遍历字典?
有一个字典,原本穷举历遍的,
ResultDict = {xxx,xxx,...}
def ProcessID(caseID):
doSomething(caseID)
for eachID in ResultDict:
doSomething(eachID)
pass
单线程运行太慢了,
想作改动,控制 doSomething(eachID),同时进行 5 个线程(以后可能不止 5 个)去历遍完 ResultDict,多线程的教学,和简单测试代码我简单学习,运行过,
这里有个我不懂的多线程解决算法是:如果严格控制同时只能 5 个,第 6 个开始介入,必须要等前面 1 个结束了才能开始开始,( doSomething(eachID)处理时间都是不可预计的,总而言之只能同时 5 个,逐个按顺序压入 doSomething 处理),是不是“栈”的概念?
这样的线程控制算法,我简单搜了一下网上,似乎没有现成例子,高手们能讲解一下如何做到吗?
感谢感谢~~
Python中如何实现多线程(可控数量)遍历字典?
开 5 个线程,每个线程循环从一个线程安全的队列抛个值,做处理就 ok 了
import threading
from queue import Queue
def threaded_dict_process(data_dict, max_threads=4):
"""
使用固定数量线程处理字典的键值对
Args:
data_dict: 要处理的字典
max_threads: 最大线程数,默认4个
"""
# 创建线程安全的队列
task_queue = Queue()
results = {}
results_lock = threading.Lock()
# 将字典项放入队列
for key, value in data_dict.items():
task_queue.put((key, value))
def worker():
"""工作线程函数"""
while True:
try:
# 从队列获取任务,timeout防止死锁
key, value = task_queue.get(timeout=1)
# 这里是你的处理逻辑,示例:计算字符串长度
processed_value = len(str(value))
# 线程安全地存储结果
with results_lock:
results[key] = processed_value
# 标记任务完成
task_queue.task_done()
except Exception as e:
# 队列为空时退出
break
# 创建并启动线程
threads = []
for i in range(min(max_threads, len(data_dict))):
thread = threading.Thread(target=worker, name=f"Worker-{i}")
thread.start()
threads.append(thread)
# 等待所有任务完成
task_queue.join()
# 等待所有线程结束
for thread in threads:
thread.join()
return results
# 使用示例
if __name__ == "__main__":
# 测试数据
sample_dict = {
"key1": "hello world",
"key2": [1, 2, 3, 4, 5],
"key3": {"nested": "dict"},
"key4": 12345,
"key5": "python",
"key6": "multithreading",
"key7": "example",
"key8": "test"
}
# 使用3个线程处理
processed_results = threaded_dict_process(sample_dict, max_threads=3)
print("处理结果:")
for key, value in processed_results.items():
print(f"{key}: {value}")
这个实现用了threading和Queue来搞可控数量的线程。Queue是线程安全的,能保证数据不会乱。max_threads参数控制最多用几个线程,实际会取字典大小和线程数的最小值。
每个线程从队列里拿字典项,处理完存到结果字典里,用Lock保证写操作安全。task_queue.join()等所有任务完成,最后等线程结束。
把处理逻辑放在worker函数里,想干啥都行。示例里是算字符串长度,你可以换成自己的业务代码。
注意Python的GIL限制,这个适合I/O密集型任务。要是CPU密集的活,考虑用multiprocessing。
总结:用Queue加线程池模式最稳。
用的手机,懒得查 api 了,可能某些单词拼的是错的…
你可以用 multiprocess.pool 里面的 threadpool,开一个最大数量为 5 的线程池,然后用 pool.map (或者 mapstar ),去作用在 resultdict.items ()上面
先说方法,multiprocessing.Pool.map 方法可以满足你的需求,不过需要解决好多线程之间的通信问题,并且 windows 上好像不能使用 multiprocessing 多线程,只能用协程,就没有意义了
doSomething 确定是 io 密集嘛?
感谢大伙热心的回复,先挑一些疑问简单回帖了先,手上还有其它事在忙着 ^_^
,是在 Win 的环境下作业的,那么多了线程,历遍的效率总会有提高的吧?
,磁盘读写么? 不多,doSomething 就是轮流去生成几个 10 几 kb 的文本,读几个 1MB,2MB 大小的文件,下载几个网页(这个就是要等爬虫的返回时间,不确定时间点就在这里)
doSomething 函数封装得很好,函数完全是独立的,不交叉到局外通信,while 到不完成不返回,超过重试次数就自己结束,有一个地方交叉,可能就是抛出异常的时候,会向"d:\error.log"写点东西,会担心同时多进程同时异常都往 error.log 写内容?
最简单的还是 multiprocessing.dummy 里面的线程池, 可以 map 也可以自己调度, 符合楼主说的只能 5 并发, 如果不是 IO 密集的, 把 dummy 去掉就是多进程…
from multiprocessing.dummy import Pool
def do(sth):
print(sth)
return sth
pool = Pool(5)
tasks = [i for i in range(20)]
result = pool.map(do, tasks)
print(result)
这个目前信息看起来不像是栈
windows 上的 multiprocessing 原理和 linux 不太一样,我搜了一下别人的文章,好像也可以用多进程,我刚接触 python 的时候被坑过后来没关注了,你可以自己了解一下怎么用。
不启动多个进程,也就无法绕过 GIL 锁,不能利用多个核心,只能加速 io 密集型操作,在 linux 上是肯定可以启动多个进程的,如果计算任务足够重的话,系统会将任务分配给多核计算,能够跑满 cpu。
既然楼上已经回答了,说过题外话。 遍历。。。
上亿数据我的做法 分段扔线程安全的队列 启动任意多个线程从队列取一段数据,消费完再取 这样各个线程不相互影响啊
一般多线程用生产消费模型,临界区用队列就行
假如非要用你说的字典这种,我推荐用 hash 分桶
$hash = @{
a = 1
b = 2
#省略
}
$hash.GetEnumerator() | ForEach-Object -ThrottleLimit 2 -Parallel {
#系统需求=powershell 7 preview4
#你的代码
#-ThrottleLimit 2 为并发数
}
,感谢解答~
,是的,刚刚琢磨到,字典历遍方式似乎不太适合我这样放到线程队列里,正在琢磨其它办法。。。
学习 ing…
你们可以去看看 Python 最好用的(非第三方)多线程类,这才是 Python 多线程的未来。就下面三行代码就可以实现你上面说到要求,就这么简单好用,而且这个多线程类是自带线程锁的,对于 I/O 密集型的线程操作,简直就是爽翻天。<br>from concurrent.futures import ThreadPoolExecutor<br><br>with ThreadPoolExecutor(max_workers=5) as pool:<br> pool.map(dosomething, ResultDict.keys())<br>
注意审题,强调要求“同时”的情况,楼上的所有回答都不及格吧~
我这两天自己学习了一下多线程,还不够深入,尽管上面高手们都提到了解决方法,其实一次取 5 个,每次 5 个这样压入函数进行独立线程处理,等 5 个处理完,再下一批,也是可以的,起码比起单线程处理要好多了。。。
细想了一下,保持 5 个,出 1 个进 1 个的 FIFO 队列算法,太难了,我这里的情况可能要连整个处理函数的设计都要改。。。
加上现在的集合来源,是字典,我本来在 for 的历遍里面是有一些 continue 拐弯的,现在也甚是头疼怎么改。。。
看楼主的需求是并行 doSomething 吧,可以开个 N(=5)个队列,然后遍历 map 往 N 个队列里 push,N 个线程从各自的队列里 pop 消费


