Python多进程写入文件时遇到问题,如何解决?

https://gist.github.com/kingmo888/be4d2dfa033cd50c7bca01db70196ee2

上述是正常的代码, 如果将"f = open('test.txt', 'a')"转移到"if name == 'main':"下面后,

并且通过参数传入到函数中时,代码就运行不正常了。请问这是什么原因?

还有一个问题,为何多进程下"print('main...') "会执行了 N 次?

谢谢。


Python多进程写入文件时遇到问题,如何解决?

17 回复

第一个问题无法回答。你需要给出你将"f = open(‘test.txt’, ‘a’)"转移到"if name == ‘main’:"下面后,并且通过参数传入到函数中时”的具体代码,别人才能解答。仅凭几句描述不知道你都改了什么地方。

第二个问题:因为每个进程都要导入 main 模块,这就导致每个进程当中 global scope 的代码都会被执行一次。所以最好只在 global scope 放一些函数定义,其他代码比如 print 就不要放到 global scope 了。另外,在执行 global scope 的代码之前,python 解释器会设置一些特殊变量,比如 name。如果当前程序是做为主程序运行的,则将 name 设置为 ‘main’。如果当前程序是 multiprocessing 派生出的子进程,则 name 设置为 ‘mp_main’。你可以打印每个进程的 name 来验证这一说法。


多进程写文件最常见的问题是并发写入导致数据错乱或文件损坏。核心解决方案是使用进程锁(multiprocessing.Lock)或让子进程将数据返回给主进程,由主进程统一写入。

这里给你两个最实用的方案:

方案一:使用进程锁(适合简单场景)

import multiprocessing as mp

def worker(lock, data, filename):
    # 模拟一些处理
    result = f"Processed: {data}\n"
    
    with lock:  # 获取锁,确保同一时间只有一个进程写入
        with open(filename, 'a', encoding='utf-8') as f:
            f.write(result)

if __name__ == '__main__':
    lock = mp.Lock()
    filename = 'output.txt'
    
    # 清空或创建文件
    with open(filename, 'w', encoding='utf-8') as f:
        pass
    
    processes = []
    for i in range(5):
        p = mp.Process(target=worker, args=(lock, f'data_{i}', filename))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print("写入完成")

方案二:使用队列+主进程写入(推荐,性能更好)

import multiprocessing as mp

def worker(queue, data):
    # 模拟处理
    result = f"Processed: {data}"
    queue.put(result)  # 将结果放入队列

def writer_process(queue, filename):
    """专门的写入进程"""
    with open(filename, 'w', encoding='utf-8') as f:
        while True:
            data = queue.get()
            if data == 'STOP':  # 停止信号
                break
            f.write(data + '\n')
            f.flush()  # 及时写入磁盘

if __name__ == '__main__':
    queue = mp.Queue()
    filename = 'output.txt'
    
    # 启动写入进程
    writer = mp.Process(target=writer_process, args=(queue, filename))
    writer.start()
    
    # 启动工作进程
    workers = []
    for i in range(5):
        p = mp.Process(target=worker, args=(queue, f'data_{i}'))
        workers.append(p)
        p.start()
    
    # 等待所有工作进程完成
    for p in workers:
        p.join()
    
    # 发送停止信号给写入进程
    queue.put('STOP')
    writer.join()
    
    print("写入完成")

简单总结:小文件用锁,大量数据用队列。

想起来一个问题。貌似传给 multiprocessing.Process 构造函数的参数必须是 pickable 的,而文件对象不是 pickable 的,你第一个问题有可能出在这儿了。

执行多次可能是多进程脚本被加载了多次吧,你验证一下。

在 print(‘main’)下面把 pid 打印出来,每次都不一样,每个进程会都加载一次脚本,启动进程数量为进程池数量+2。自身算一个,还有一个不知道是啥。

感谢,我表述有问题。直接上代码

https://gist.github.com/kingmo888/a7cdc36f77397f65ac19fb49f5795814

这种情况下,不会写入任何东西。

是的。我验证了这个问题。还有一个问题,请看这段代码:
<br>import pandas as pd<br>import os, time<br>import multiprocessing<br>import numpy as np<br><br><br>class Mutilprocess_Test:<br> def __init__(self):<br> print('init:',os.getpid())<br> <br> def run(self,i):<br> timesleep = np.random.rand()/3<br> #print()<br> time.sleep(timesleep)<br> print(i, 'run:', os.getpid(), timesleep)<br><br> <br> <br>if __name__ == '__main__':<br> pool = multiprocessing.Pool(processes=3)<br> for i in range(10):<br><br> pool.apply_async(Mutilprocess_Test().run, args=(i,))<br> pool.close()<br> pool.join()<br>
这段代码中,类初始化时候的 pid 和 run 时候的 pid 不一样。

貌似 init 时的 pid 是主进程的。请问这是什么原因或者说应该怎样去理解呢?比如多进程运行不同的类——可能基类是一个,派生类不同,这时候如何对一些数据库连接、相同部分的数据调用的代码进行存放呢,是放到 init 中,还是放到 run 里各自取?

二者的区别会否导致数据都到各自进程下?

比如说,我有一个字典 d1,实际上所有子类都有用到这个字典 d1,这时候我把他放 main 进程中好,还是在 init 中初始化好,还是在 run 中好呢?

貌似 init 时的 pid 是主进程的。请问这是什么原因或者说应该怎样去理解呢?
这是因为你调用 Mutilprocess_Test()是在主线程中调用的,所以 init 的 pid 是主线程。

你传入 Mutilprocess_Test().run 进去在其他进程里面执行的时候 self 参数是不是就丢掉了,你验证一下看看还能不能获取到 self。
如果你需要共享数据的话为啥不用多线程?多进程就不要考虑共享数据的事情了吧。

多线程是不是在一个进程之下了?我的需求是有大量的数据需要计算,同时数据还很多(几十 G ),需要迭代计算的。所以运算次数相当之多。

这种情况下,用多线程可以吗?


为了解决 I/O 以及计算耗时问题,已经买了一个带 SSD 的 24 核塔式服务器。


您看,我的需求是这样的:
数据已经按照类别放到了多个 HDF5 里,其中某个文件是 30G 左右( d1.h5 ),其中 d1 里面有 5000 多个 table。

计算逻辑:
一个基类,按照不同的需求分为 N 个子类。
之前是这样的:每一个子类读取 d1.h5,循环读取所有 table,然后计算。

哪种方式更合理呢?谢谢。

你看看现在瓶颈到底在 cpu 上还是 IO 上,如果是 cpu 就用线程或进程绑核跑,这样性能最高。

现在的瓶颈是计算应该是。请问线程或进程绑核怎么搞。。

感谢。打印 self.__name__失败,情形是子进程 while True 下,主进程都结束了。说明打印失败了。。原因是?

原因是 python 里面的面向对象实际上是通过语法糖实现的,你传入一个 obj.func 实际上传入的只是一个函数,其第一个参数是 self,python 里面调用 obj.func()实际上是类似于:
f = obj.func
f(obj)
你直接传方法进去,self 已经丢失了,因为在子进程中调用的时候不是通过对象实例调用的。

您好,您说的有道理。我做了这样一个测试:
在类 Test 中某个函数比如 mutil_test 内部实现多线程,如果直接在 Test().mutil_test()这种形式,多进程的子函数执行成功。如果把 mutil_test 放到类函数 run 下,执行 Test().run()就无法执行 mutil_test 中多进程调用的子函数。


另外,多进程中无法获取到__name__,但如果我在 init 阶段,复制 self.var1=123123,然后在子函数中打印的话,可以打印出来。

好奇葩,在 run 下放了一个 hdf5 的读取链接,然后读完就关闭,竟然也会导致子函数出现问题。

回到顶部