Golang读写同一*os.File时可能发生死锁的问题

Golang读写同一*os.File时可能发生死锁的问题 大家好,

背景:

我正在开发一个名为"standpipe"的程序,它从标准输入获取数据,写入临时文件,然后将这些数据输送到标准输出。这样设计的目的是让源程序(向stdout写入)能够以最快速度写入,即使目标程序(从stdin读取)无法跟上处理速度。我(某种程度上人为设计的)使用场景是:当使用像xz和/或7z中LZMA2这样的内存密集型压缩算法,并通过网络将数据管道传输到某个目标时,我希望xz/7z能尽快完成以释放内存,即使我的网络速度跟不上。当然,我也可以采取其他替代方案,但作为学习经验,我想弄清楚为什么我当前尝试的方法行不通。

详细信息:

我的代码库在这里:https://github.com/skillian/standpipe

克隆/下载后,切换到sp目录并执行go buildsp命令的用法如下:

usage: sp [ -f CACHEFILE ] [ --log-level LOGLEVEL ] [ -s PAGESIZE ]

standpipe to cache output from one command before piping it into a slower command.

optional arguments:
  -f, --cache-file
                Custom cache file name.  If not used, a temp file is created
                instead.
  --log-level   Specify a custom logging level (useful for debugging).
  -s, --page-size
                Page size within the standpipe file. Pages are updated in random
                locations within the standpipe file so to reduce the amount of
                seeking, this value should be as large as possible.  There are
                two pages always kept in memory at a time:  One for reading and
                one for writing, so this value is a balancing act between
                reduced seeks and memory usage

以下是我在编写测试用例之前用于alpha测试程序的一组命令(我是那种事后才写测试的人):

dd if=/dev/urandom bs=32768 count=8192 of=~/test.dat
cat ~/test.dat | sp -f ~/test.sp -s 1048576 | gzip -c -9 > ~/test.dat.gz

当我编译并运行程序时,standpipe文件头被生成,缓存文件快速加载数据(我尚未确定写入的数据是否有效,或者是否写入了相同的缓冲区等。这将是我接下来的测试步骤之一)。

问题:

我的问题是,似乎没有数据从缓存文件中读取并写入stdout。我以为我正确使用了sync.Cond,但现在猜测我在锁定/信号处理方面遗漏了关键点。我曾考虑使用chan来代替偏移量切片,但考虑到:

  1. 我不知道V1Pipe.offs的长度可能增长到多少(10, 100, 1000, 10,000, 100,000等)。
  2. 程序被中断时,我需要能够刷新管道中的内容,因此我需要确保V1Pipe.Close函数能获取V1Pipe.offs中的所有偏移量,并且另一个在概念上的V1Pipe.offs chan上监听的goroutine不会在我们关闭时窃取一个偏移量。

问题:

能否麻烦各位Gopher们 review 一下我的代码,看看我在goroutines/锁定/或其他方面是否做错了什么,需要尝试修复?尽管我承认我并一定需要这个程序能工作,但我想理解为什么我当前的代码不工作,以及需要做哪些修改才能让它工作,从而让我成为一个更好的程序员。

也欢迎提出改进我编程"风格"的建议!感谢您的考虑!


更多关于Golang读写同一*os.File时可能发生死锁的问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

我进行了一些重大的重构,现在不再出现死锁了。我的问题似乎与我自己的类似 bytes.Buffer 的实现有关,该实现从未增长以配合我在管道缓存文件中的固定大小块工作。当缓冲区已满时,我返回了一个 nil 错误和 0 字节。我重构了代码,现在在尝试写入缓冲区时返回“缓冲区已满”错误,在从空缓冲区读取时返回“缓冲区为空”错误。然后,当我分别清空和填充底层缓冲区时,我会在管道文件的读取和写入函数中处理缓冲区的切换。

虽然目前还没有完全正常工作,但至少我不再像之前那样卡住了。

更多关于Golang读写同一*os.File时可能发生死锁的问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中同时读写同一个*os.File确实容易导致死锁,特别是在使用同步原语时。让我分析你的代码并给出具体示例。

问题分析

从你的描述看,主要问题可能出现在以下几个方面:

1. 文件指针竞争

当多个goroutine同时操作同一个文件描述符时,文件指针的位置会相互干扰:

// 错误示例 - 会导致死锁
func readWriteSameFile() {
    file, _ := os.OpenFile("test.dat", os.O_RDWR, 0644)
    
    go func() {
        for {
            buf := make([]byte, 1024)
            n, err := file.Read(buf)  // 移动文件指针
            if err != nil {
                break
            }
            // 处理数据...
        }
    }()
    
    go func() {
        for {
            data := []byte("some data")
            _, err := file.Write(data)  // 也移动文件指针
            if err != nil {
                break
            }
        }
    }()
    
    time.Sleep(time.Second * 10)
    file.Close()
}

2. sync.Cond 使用不当

在你的代码中,sync.Cond可能没有正确等待和通知:

// 正确的sync.Cond使用模式
type Pipe struct {
    cond   *sync.Cond
    buffer []byte
    closed bool
}

func (p *Pipe) Write(data []byte) (int, error) {
    p.cond.L.Lock()
    defer p.cond.L.Unlock()
    
    if p.closed {
        return 0, io.ErrClosedPipe
    }
    
    p.buffer = append(p.buffer, data...)
    p.cond.Signal()  // 通知等待的读取者
    return len(data), nil
}

func (p *Pipe) Read(b []byte) (int, error) {
    p.cond.L.Lock()
    defer p.cond.L.Unlock()
    
    for len(p.buffer) == 0 && !p.closed {
        p.cond.Wait()  // 等待数据可用
    }
    
    if len(p.buffer) == 0 && p.closed {
        return 0, io.EOF
    }
    
    n := copy(b, p.buffer)
    p.buffer = p.buffer[n:]
    return n, nil
}

3. 文件读写分离方案

对于你的用例,建议使用不同的文件句柄:

type Standpipe struct {
    file     *os.File
    readFile *os.File  // 单独的读取句柄
    cond     *sync.Cond
    offsets  []int64
    closed   bool
}

func NewStandpipe(filename string) (*Standpipe, error) {
    file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
    if err != nil {
        return nil, err
    }
    
    // 打开同一个文件的第二个句柄用于读取
    readFile, err := os.Open(filename)
    if err != nil {
        file.Close()
        return nil, err
    }
    
    sp := &Standpipe{
        file:     file,
        readFile: readFile,
        cond:     sync.NewCond(&sync.Mutex{}),
    }
    
    return sp, nil
}

func (sp *Standpipe) Write(data []byte) (int, error) {
    sp.cond.L.Lock()
    defer sp.cond.L.Unlock()
    
    if sp.closed {
        return 0, io.ErrClosedPipe
    }
    
    offset, err := sp.file.Seek(0, io.SeekEnd)
    if err != nil {
        return 0, err
    }
    
    n, err := sp.file.Write(data)
    if err != nil {
        return n, err
    }
    
    sp.offsets = append(sp.offsets, offset)
    sp.cond.Signal()
    return n, nil
}

func (sp *Standpipe) Read(b []byte) (int, error) {
    sp.cond.L.Lock()
    defer sp.cond.L.Unlock()
    
    for len(sp.offsets) == 0 && !sp.closed {
        sp.cond.Wait()
    }
    
    if len(sp.offsets) == 0 && sp.closed {
        return 0, io.EOF
    }
    
    offset := sp.offsets[0]
    sp.offsets = sp.offsets[1:]
    
    // 使用读取句柄,避免干扰写入位置
    _, err := sp.readFile.Seek(offset, io.SeekStart)
    if err != nil {
        return 0, err
    }
    
    return sp.readFile.Read(b)
}

4. 使用管道替代文件

对于内存缓冲的场景,考虑使用io.Pipe

func pipeExample() {
    pr, pw := io.Pipe()
    
    go func() {
        defer pw.Close()
        for i := 0; i < 10; i++ {
            data := []byte(fmt.Sprintf("Data chunk %d\n", i))
            if _, err := pw.Write(data); err != nil {
                break
            }
            time.Sleep(time.Millisecond * 100)
        }
    }()
    
    go func() {
        buf := make([]byte, 1024)
        for {
            n, err := pr.Read(buf)
            if err != nil {
                break
            }
            fmt.Printf("Read: %s", buf[:n])
        }
    }()
    
    time.Sleep(time.Second * 2)
}

关键修复点

  1. 使用独立的文件句柄进行读写操作
  2. 确保sync.Cond的等待和通知正确配对
  3. 正确处理goroutine间的同步和关闭信号
  4. 考虑使用更简单的并发原语如channel

检查你的代码中是否在等待条件变量时没有正确处理虚假唤醒,或者在关闭时没有正确通知所有等待的goroutine。

回到顶部