Golang解析大型CSV时Goroutines的未定义行为

Golang解析大型CSV时Goroutines的未定义行为 我正在尝试使用Golang的goroutine加载一个大型CSV文件。该CSV的维度为(254882, 100)。但当我使用goroutine解析CSV并将其存储到二维列表时,得到的行数少于254882,并且每次运行的结果都不相同。我认为这是由于goroutine导致的,但无法确定具体原因。我是Golang新手,希望能得到帮助。以下是我的代码:

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

我的主函数如下所示:

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

如果有人需要下载CSV文件(485 MB),请点击以下链接: https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing


更多关于Golang解析大型CSV时Goroutines的未定义行为的实战教程也可以访问 https://www.itying.com/category-94-b0.html

10 回复

我还没有进行基准测试,但我已经运行了足够多次来确定这一点。不过,我计划稍后进行。你能否评论一下嵌套goroutine是否可行?

更多关于Golang解析大型CSV时Goroutines的未定义行为的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


刚刚测试了先初始化临时变量,然后使用索引直接推入临时变量的方法,这并没有提升性能,甚至没有使性能变得更差

是的,我指的是由 goroutine 创建 goroutine……这对于当前任务可能不是必需的,但我需要将其用于其他任务。

了解这一点很有帮助。我在测试程序时使用过它,用来找出哪个程序运行最快、哪个程序分配内存最少。

你提到的"嵌套"是指goroutine创建goroutine吗?这当然可以做到。还是你指的是其他意思?

感谢您的回复……实际上使用协程将时间缩短了一半。在没有协程的情况下,大约需要6.2秒,而使用协程后,只需约3.1秒。不过我会尝试先初始化临时变量再进行基准测试,看看是否会有改善……

一个小的优化方法是:使用应有的元素数量来初始化临时变量,即记录的长度,然后通过索引而非追加方式设置项目。或者至少让临时变量具备你所需的容量。

你是否尝试过不使用协程运行程序?也许由于并行处理的工作量减少,速度会变慢,但锁操作本身也需要时间。

多个goroutine操作相同的变量数据集,因此它们可能会相互覆盖结果。你可以采取以下措施之一:

  1. 使用通道在goroutine之间传递数据
  2. 使用锁来同步对变量的写入操作
  3. 不要使用goroutine。由于这些goroutine是在每次读取CSV文件的一行后按顺序启动的,我不确定读取速度能提升多少。这取决于将字符串解析为浮点数所需的时间。

如果你使用基准测试的话,我认为应该能看到更少的内存分配。我认为空切片首次分配时容量为4,当容量不足时会重新分配一个容量为8的切片,之后可能会扩容到16。因此,如果你需要将20个元素存入切片,可能需要经历4次分配才能容纳所有元素,而不是从一开始就直接分配20个容量。

很高兴看到使用协程后耗时减少了 :)

你如何进行基准测试?是否研究过testing包的使用方法?https://dave.cheney.net/2013/06/30/how-to-write-benchmarks-in-go

多个协程操作同一个变量数据集,因此它们可能会相互覆盖结果。你可以采取以下措施之一:

感谢通过以下方式找到了解决方案

func loadCSV(csvFile string) [][]float64 {
    var dataset [][]float64

    f, _ := os.Open(csvFile)

    r := csv.NewReader(f)

    var wg sync.WaitGroup
    l := new(sync.Mutex) // 锁

    for record, err := r.Read(); err == nil; record, err = r.Read() {
        wg.Add(1)

        go func(record []string) {
            defer wg.Done()

            var temp []float64
            for _, each := range record {
                if f, err := strconv.ParseFloat(each, 64); err == nil {
                    temp = append(temp, f)
                }
            }
            l.Lock() // 写入前加锁
            dataset = append(dataset, temp) // 写入
            l.Unlock() // 解锁

        }(record)
    }

    wg.Wait()

    return dataset
}

我能否进一步改进并让代码运行得更快?

问题在于你的代码中存在数据竞争和goroutine闭包捕获问题。多个goroutine同时向同一个切片追加数据,导致未定义行为。

主要问题:

  1. 多个goroutine并发修改*dataset切片
  2. goroutine闭包捕获了循环变量record

以下是修复后的代码:

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    defer f.Close()
    
    r := csv.NewReader(bufio.NewReader(f))
    
    // 读取所有行到内存中
    records, err := r.ReadAll()
    if err != nil {
        return &dataset, err
    }
    
    dataset = make([][]float64, len(records)-1) // 排除标题行
    var wg sync.WaitGroup
    var mu sync.Mutex
    
    for i := 1; i < len(records); i++ { // 从第1行开始,跳过标题
        wg.Add(1)
        go func(idx int, record []string) {
            defer wg.Done()
            var temp []float64
            for _, each := range record {
                f, err := strconv.ParseFloat(each, 64)
                if err == nil {
                    temp = append(temp, f)
                }
            }
            mu.Lock()
            dataset[idx-1] = temp // 索引调整,因为从第1行开始
            mu.Unlock()
        }(i, records[i])
    }
    wg.Wait()
    
    duration := time.Since(startTime)
    log.Printf("Loaded %d rows in %v seconds", len(dataset), duration)
    return &dataset, nil
}

或者更高效的版本,使用通道避免锁竞争:

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    f, err := os.Open(csvFile)
    if err != nil {
        return nil, err
    }
    defer f.Close()
    
    r := csv.NewReader(bufio.NewReader(f))
    records, err := r.ReadAll()
    if err != nil {
        return nil, err
    }
    
    dataset := make([][]float64, len(records)-1)
    resultChan := make(chan struct {
        idx int
        row []float64
    }, len(records)-1)
    
    var wg sync.WaitGroup
    
    for i := 1; i < len(records); i++ {
        wg.Add(1)
        go func(idx int, record []string) {
            defer wg.Done()
            var temp []float64
            for _, each := range record {
                f, err := strconv.ParseFloat(each, 64)
                if err == nil {
                    temp = append(temp, f)
                }
            }
            resultChan <- struct {
                idx int
                row []float64
            }{idx: idx - 1, row: temp}
        }(i, records[i])
    }
    
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    for result := range resultChan {
        dataset[result.idx] = result.row
    }
    
    duration := time.Since(startTime)
    log.Printf("Loaded %d rows in %v seconds", len(dataset), duration)
    return &dataset, nil
}

主要修复点:

  1. 预先分配切片大小避免并发追加
  2. 使用互斥锁或通道安全地写入数据
  3. 将循环变量作为参数传递给goroutine
  4. 正确处理索引偏移(跳过标题行)

这样就能确保每次运行都得到完整的254881行数据。

回到顶部