Golang中从文件读取数据并同时发送时的数据竞争问题

Golang中从文件读取数据并同时发送时的数据竞争问题 我正在尝试从文件中读取数据,并在读取到数据块后立即发送,而不是等待另一个goroutine完成文件读取。我有两个函数:

func ReadFile(stream chan []byte, stop chan bool) {
	file.Lock()
	defer file.Unlock()
	dir, _ := os.Getwd()
	file, _ := os.Open(dir + "/someFile.json")
	chunk := make([]byte, 512*4)
	for {
		bytesRead, err := file.Read(chunk)
		if err == io.EOF {
			break
		}
		if err != nil {
            panic(err)
			break
		}
		stream <- chunk[:bytesRead]
	}

	stop <- true
}

第一个函数负责读取文件并将数据块发送到从另一个函数接收到的“stream”通道。

func SteamFile() {
	stream := make(chan []byte)
	stop := make(chan bool)

	go ReadFile(stream, stop)

L:
	for {
		select {
		case data := <-stream:
			// 想在这里通过socket发送数据块
			fmt.Println(data)
		case <-stop:
			break L
		}
	}
}

第二个函数创建了2个通道,将它们发送给第一个函数,并开始监听这些通道。

问题在于,有时 data := <-stream 会丢失数据。例如,我可能没有收到文件的第一部分,但收到了所有其他部分。当我使用 -race 标志运行程序时,它告诉我存在数据竞争,两个goroutine同时向通道写入和从通道读取数据。说实话,我原本以为这是使用通道的正常方式,但现在我发现这会带来更多问题。

有人能帮我解决这个问题吗?


更多关于Golang中从文件读取数据并同时发送时的数据竞争问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

在ReadFile()函数中,你需要为每次读取分配一个新的“数据块”,以避免尝试覆盖通道“stream”中的切片。

for {
    chunk := make([]byte, 512*4)
    bytesRead, err := file.Read(chunk)
    ...
}

更多关于Golang中从文件读取数据并同时发送时的数据竞争问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


作为对Charles修复方案的扩展,你也可以添加第二个 chan []byte 并传递给 ReadFile,这样当 SteamFile 完成时,它可以将 data 发送回该通道,从而 ReadFile 就不需要持续分配新的字节切片了。

你的代码中存在数据竞争问题,主要是因为多个goroutine共享了同一个chunk缓冲区。在ReadFile函数中,你重复使用同一个chunk切片来读取文件数据,然后立即将其发送到通道。然而,接收方(SteamFile函数)可能还没来得及处理这个数据,发送方就可能已经用新的数据覆盖了chunk缓冲区。

以下是修复后的代码示例:

func ReadFile(stream chan []byte, stop chan bool) {
    dir, _ := os.Getwd()
    file, err := os.Open(dir + "/someFile.json")
    if err != nil {
        panic(err)
    }
    defer file.Close()
    
    chunk := make([]byte, 512*4)
    for {
        bytesRead, err := file.Read(chunk)
        if err == io.EOF {
            break
        }
        if err != nil {
            panic(err)
        }
        
        // 创建新的切片副本,避免数据竞争
        data := make([]byte, bytesRead)
        copy(data, chunk[:bytesRead])
        stream <- data
    }
    
    stop <- true
}

func SteamFile() {
    stream := make(chan []byte)
    stop := make(chan bool)
    
    go ReadFile(stream, stop)
    
    for {
        select {
        case data := <-stream:
            // 在这里通过socket发送数据块
            fmt.Println(data)
        case <-stop:
            return
        }
    }
}

关键修改:

  1. 移除了不必要的文件锁(file.Lock()/defer file.Unlock()),因为每个goroutine操作的是不同的文件句柄
  2. 在发送数据到通道前,创建了新的切片副本:data := make([]byte, bytesRead); copy(data, chunk[:bytesRead])
  3. 使用defer file.Close()确保文件正确关闭
  4. 简化了循环控制,直接使用return退出函数

这样修改后,每个发送到通道的数据块都是独立的副本,发送方和接收方不会共享内存,从而消除了数据竞争。使用go run -race命令测试时,应该不会再报告数据竞争问题。

回到顶部