Python中多进程与多线程对文件操作的总结与对比

读写文件,多进程和多线程

# coding:utf-8
“”"
把大文件分块
big_file.txt 是一个 500M 的 5 位数的乱序文档
多线程并没有提升速度
“”"

import time
txtfile = ‘’
import threading
def txtmap(txtup):
with open(‘big_file.txt’,‘r’) as f:
i = 0
while i < 100000:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtup += txt
# txtmap(txtfile)
start = time.time()
for i in range(100):
txti = threading.Thread(target = txtmap(txtfile))
txti.start()
txti.join()
print(time.time() - start)
def txtmap2(txtup):
with open(‘big_file.txt’,‘r’) as f:
i = 0
while i < 1000000:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtup += txt
start = time.time()
for i in range(10):
txtmap2(txtfile)
print(time.time() - start)


多进程和队列初试
import time
from multiprocessing import Process
from multiprocessing import Queue
num = 0
qnum = Queue()
qnum.put(num)
def testnum(num):
num += 1
qnum.put(num)
for i in range(10):
p = Process(target = testnum,args = (qnum.get(),))
p.start()
p.join()
# testnum(num)
print(qnum.get(),qnum.empty())
在这里,qnum 属于实例化对象,不需要用 global 标记



多次测试,发现多进程加队列,必须把文件指针位置也放进去,不然下一个读取位置就会乱跳
with open(‘big_file.txt’,‘r’) as f:
q.put ((txtfile3,f.tell()))
def txtmap(qget):
txtup = qget[0]
i = 0
f.seek(qget[1])
while i < 10:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtup += txt
q.put((txtup,f.tell()))
start = time.time()
for i in range(10):
txtp2i = Process(target = txtmap,args =(q.get(),))
txtp2i.start()
txtp2i.join()
print(‘多进程加队列’,time.time() - start,’\n’,q.get())

以及这个 args 的赋值真是烦人,明明从 q.get()出来的就是元组,它非要你=后面必须是一个元组的形式才行


# coding:utf-8
“”"
把大文件分块
big_file.txt 是一个 500M 的 5 位数的乱序文档
多进加队列速度 < 多进程全局变量 < 多线程加队列 < 多线程加全局变量 < 普通全局变量
多进程加队列不稳定 多进程因为进程间通讯必须借助队列 Queue 或者管道 pipe 否则改变不了全局变量
“”"

import os
import time
from multiprocessing import Process
from multiprocessing import Queue
import threading
txtfile = ‘’
txtfile2 = ‘’
txtfile3 = ‘’
txtfile4 = ‘’
q = Queue()

#因为本子是 4 核的,所以我只创建了 4 个进程,python 的 GIL 的限制决定 4 核只能同时跑 4 个 python 进程,多了就要等待
with open(‘big_file.txt’,‘r’) as f:
def txtmap():
i = 0
global txtfile
while i < 25:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtfile += txt
print(txt)
print (os.getpid ())
print(txtfile)
start = time.time()
for i in range(4):
txtpi = Process(target = txtmap)
txtpi.start()
txtpi.join()
print(‘多进程全局变量’,time.time() - start,’\n’,txtfile)
if txtfile:
print(True)
else:
print(False)

with open(‘big_file.txt’,‘r’) as f:
def txtmap():
i = 0
global txtfile2
while i < 10:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtfile2 += txt
start = time.time()
for i in range(10):
txtti = threading.Thread(target = txtmap)
txtti.start()
txtti.join()
print(‘多线程全局变量’,time.time() - start,’\n’,txtfile2)


with open(‘big_file.txt’,‘r’) as f:
q.put ((txtfile3,f.tell()))
def txtmap(qget):
txtup = qget[0]
i = 0
f.seek(qget[1])
while i < 25:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtup += txt
print (os.getpid ())
q.put((txtup,f.tell()))
start = time.time()
for i in range(4):
txtp2i = Process(target = txtmap,args =(q.get(),))
txtp2i.start()
txtp2i.join()
print(‘多进程加队列’,time.time() - start,’\n’,q.get()[0])

#因为队列 q 内的消息已被取完,所以再放进去一次,不然会一直处于阻塞状态等待插入消息
q.put(txtfile3)
with open(‘big_file.txt’,‘r’) as f:
def txtmap(txtup):
i = 0
while i < 10:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtup += txt
q.put(txtup)
start = time.time()
for i in range(10):
txtt2i = threading.Thread(target = txtmap,args = (q.get(),))
txtt2i.start()
txtt2i.join()
print(‘多线程加队列’,time.time() - start,’\n’,q.get())

with open(‘big_file.txt’,‘r’) as f:
def txtmap2():
i = 0
global txtfile4
while i < 10:
txt = f.read(1)
i += 1 if txt == ‘,’ else 0
txtfile4+= txt
start = time.time()
for i in range(10):
txtmap2()
print(‘普通全局变量’,time.time() - start,’\n’,txtfile4)

#os.path.geisize 返回的文件大小就是 seek 指针的最后一个位置
print(os.path.getsize(‘big_file.txt’))

with open(‘big_file.txt’,‘r’) as f:
print(f.seek(0,2))



#终于看到了多进程同时进行,哦呼!甚是欢喜!而且和文件的指针并不会冲突
from multiprocessing import Process
import time
with open(‘test.txt’, ‘r’) as f:
def readseek(num):
f.seek(num)
print(time.time(),f.read(10))
time.sleep(10)
p1 = Process(target = readseek, args = (20,))
p2 = Process(target = readseek, args = (60,))
p3 = Process(target = readseek, args = (120,))
p4 = Process(target = readseek, args = (160,))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
print(time.time())
Python中多进程与多线程对文件操作的总结与对比


17 回复

第一次发帖,改了大概 3 次后发现不能更改了…
想再多进程全局变量那里加一个:多进程并不能改变全局变量,只能通过队列 Queue,或者管道 Pipe


多进程和多线程处理文件的核心区别在于GIL和内存隔离。

多线程(threading):

  • 受GIL限制,I/O密集型操作(如读写大文件)能利用等待时间切换线程,有一定并发效果。
  • CPU密集型文件处理(如压缩/加密)用多线程没用,因为GIL会让线程串行执行。
  • 共享内存方便,但写文件需要加锁(threading.Lock)避免数据混乱。

多进程(multiprocessing):

  • 绕过GIL,CPU密集型文件处理能真正并行。
  • 内存不共享,进程间传文件数据要用QueuePipe,大文件建议用内存映射。
  • 每个进程独立打开文件时要注意文件指针位置。

简单示例:

# 多线程安全写文件
import threading
lock = threading.Lock()

def write_thread(filename, data):
    with lock:
        with open(filename, 'a') as f:
            f.write(data + '\n')

# 多进程处理文件
from multiprocessing import Pool
def process_file(chunk):
    # 处理文件块
    return processed_data

if __name__ == '__main__':
    with Pool() as pool:
        results = pool.map(process_file, file_chunks)

选型一句话:I/O多用线程,计算多用进程。

Python 多线程只适用 io 密集场景

是的,但是同一时刻也是只能运行一个线程,都是在等待,所以我觉得,要是文件真的超过 1G 的那种,仍然要用和核心数一样多的进程数,这样就可以同时运行和核心数一样多的线程

我建议你贴到外面的网站上,比如 github gist。你贴出来的代码实在影响大家的交流,缩进丢失,非常难以理解。
我觉得你说的多线程没有用大概是实现的一些地方不是那么好,有其他因素抵消了多线程的优势。IO 本身是会释放 GIL,跟是不是多进程没关系

Python 的 IO 操作以及部分在底层释放 GIL 锁的类库并不受 GIL 限制.

一般起 CPU 核数的进程针对的是 CPU 密集型操作, 并不是 IO 密集型操作.

本地的文件 IO 一般情况下搞个单独的消费者线 /进程就够了, 多了性能反而差.

一般爆协程、线程处理的 IO 密集型操作的 IO 主要是 socket、PIPE 这类, 并不是文件 IO.

感觉 python 的文本处理性能就是屎,特别是 utf8 这种变长编码的,既然不用管内容的话按二进制文件处理会快很多

像楼上们说的 Python 线程还是在 I/O 密集型应用上用的多,另外没有缩进看起来真的难受…0.0

依赖缩进的代码不要乱贴,如果不支持代码块就放到 github 之类,gist 在国内被 q 了。

另外,不要用随机读写来加速磁盘访问。

第一次发帖,改了几次发现还是改不了,我贴到 GitHub 看下

我设想的是把大文件分四个块分别给四个进程处理,这样不就可以缩短时间吗?

这个我没想过,我试试用 pickle 弄一下

第一贴 第一贴 第一贴,谅解一下下~我来发到 GitHub

随机读写?读倒不是随机,就是命名的时候用的随机

可以试试 Dpark

第一次知道这个框架,好像很好用,我仔细看看

回到顶部