Golang解析大型CSV时Goroutines的未定义行为
Golang解析大型CSV时Goroutines的未定义行为 我正在尝试使用Golang的goroutine加载一个大型CSV文件。该CSV的维度为(254882, 100)。但当我使用goroutine解析CSV并将其存储到二维列表时,得到的行数少于254882,并且每次运行的结果都不相同。我认为这是由于goroutine导致的,但无法确定具体原因。我是Golang新手,希望能得到帮助。以下是我的代码:
func loadCSV(csvFile string) (*[][]float64, error) {
startTime := time.Now()
var dataset [][]float64
f, err := os.Open(csvFile)
if err != nil {
return &dataset, err
}
r := csv.NewReader(bufio.NewReader(f))
counter := 0
var wg sync.WaitGroup
for {
record, err := r.Read()
if err == io.EOF {
break
}
if counter != 0 {
wg.Add(1)
go func(r []string, dataset *[][]float64) {
var temp []float64
for _, each := range record {
f, err := strconv.ParseFloat(each, 64)
if err == nil {
temp = append(temp, f)
}
}
*dataset = append(*dataset, temp)
wg.Done()
}(record, &dataset)
}
counter++
}
wg.Wait()
duration := time.Now().Sub(startTime)
log.Printf("Loaded %d rows in %v seconds", counter, duration)
return &dataset, nil
}
我的主函数如下所示:
func main() {
// runtime.GOMAXPROCS(4)
dataset, err := loadCSV("AvgW2V_train.csv")
if err != nil {
panic(err)
}
fmt.Println(len(*dataset))
}
如果有人需要下载CSV文件(485 MB),请点击以下链接: https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing
更多关于Golang解析大型CSV时Goroutines的未定义行为的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我还没有进行基准测试,但我已经运行了足够多次来确定这一点。不过,我计划稍后进行。你能否评论一下嵌套goroutine是否可行?
更多关于Golang解析大型CSV时Goroutines的未定义行为的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
刚刚测试了先初始化临时变量,然后使用索引直接推入临时变量的方法,这并没有提升性能,甚至没有使性能变得更差
是的,我指的是由 goroutine 创建 goroutine……这对于当前任务可能不是必需的,但我需要将其用于其他任务。
了解这一点很有帮助。我在测试程序时使用过它,用来找出哪个程序运行最快、哪个程序分配内存最少。
你提到的"嵌套"是指goroutine创建goroutine吗?这当然可以做到。还是你指的是其他意思?
感谢您的回复……实际上使用协程将时间缩短了一半。在没有协程的情况下,大约需要6.2秒,而使用协程后,只需约3.1秒。不过我会尝试先初始化临时变量再进行基准测试,看看是否会有改善……
一个小的优化方法是:使用应有的元素数量来初始化临时变量,即记录的长度,然后通过索引而非追加方式设置项目。或者至少让临时变量具备你所需的容量。
你是否尝试过不使用协程运行程序?也许由于并行处理的工作量减少,速度会变慢,但锁操作本身也需要时间。
多个goroutine操作相同的变量数据集,因此它们可能会相互覆盖结果。你可以采取以下措施之一:
- 使用通道在goroutine之间传递数据
- 使用锁来同步对变量的写入操作
- 不要使用goroutine。由于这些goroutine是在每次读取CSV文件的一行后按顺序启动的,我不确定读取速度能提升多少。这取决于将字符串解析为浮点数所需的时间。
如果你使用基准测试的话,我认为应该能看到更少的内存分配。我认为空切片首次分配时容量为4,当容量不足时会重新分配一个容量为8的切片,之后可能会扩容到16。因此,如果你需要将20个元素存入切片,可能需要经历4次分配才能容纳所有元素,而不是从一开始就直接分配20个容量。
很高兴看到使用协程后耗时减少了 :)
你如何进行基准测试?是否研究过testing包的使用方法?https://dave.cheney.net/2013/06/30/how-to-write-benchmarks-in-go
多个协程操作同一个变量数据集,因此它们可能会相互覆盖结果。你可以采取以下措施之一:
感谢通过以下方式找到了解决方案
func loadCSV(csvFile string) [][]float64 {
var dataset [][]float64
f, _ := os.Open(csvFile)
r := csv.NewReader(f)
var wg sync.WaitGroup
l := new(sync.Mutex) // 锁
for record, err := r.Read(); err == nil; record, err = r.Read() {
wg.Add(1)
go func(record []string) {
defer wg.Done()
var temp []float64
for _, each := range record {
if f, err := strconv.ParseFloat(each, 64); err == nil {
temp = append(temp, f)
}
}
l.Lock() // 写入前加锁
dataset = append(dataset, temp) // 写入
l.Unlock() // 解锁
}(record)
}
wg.Wait()
return dataset
}
我能否进一步改进并让代码运行得更快?
问题在于你的代码中存在数据竞争和goroutine闭包捕获问题。多个goroutine同时向同一个切片追加数据,导致未定义行为。
主要问题:
- 多个goroutine并发修改
*dataset切片 - goroutine闭包捕获了循环变量
record
以下是修复后的代码:
func loadCSV(csvFile string) (*[][]float64, error) {
startTime := time.Now()
var dataset [][]float64
f, err := os.Open(csvFile)
if err != nil {
return &dataset, err
}
defer f.Close()
r := csv.NewReader(bufio.NewReader(f))
// 读取所有行到内存中
records, err := r.ReadAll()
if err != nil {
return &dataset, err
}
dataset = make([][]float64, len(records)-1) // 排除标题行
var wg sync.WaitGroup
var mu sync.Mutex
for i := 1; i < len(records); i++ { // 从第1行开始,跳过标题
wg.Add(1)
go func(idx int, record []string) {
defer wg.Done()
var temp []float64
for _, each := range record {
f, err := strconv.ParseFloat(each, 64)
if err == nil {
temp = append(temp, f)
}
}
mu.Lock()
dataset[idx-1] = temp // 索引调整,因为从第1行开始
mu.Unlock()
}(i, records[i])
}
wg.Wait()
duration := time.Since(startTime)
log.Printf("Loaded %d rows in %v seconds", len(dataset), duration)
return &dataset, nil
}
或者更高效的版本,使用通道避免锁竞争:
func loadCSV(csvFile string) (*[][]float64, error) {
startTime := time.Now()
f, err := os.Open(csvFile)
if err != nil {
return nil, err
}
defer f.Close()
r := csv.NewReader(bufio.NewReader(f))
records, err := r.ReadAll()
if err != nil {
return nil, err
}
dataset := make([][]float64, len(records)-1)
resultChan := make(chan struct {
idx int
row []float64
}, len(records)-1)
var wg sync.WaitGroup
for i := 1; i < len(records); i++ {
wg.Add(1)
go func(idx int, record []string) {
defer wg.Done()
var temp []float64
for _, each := range record {
f, err := strconv.ParseFloat(each, 64)
if err == nil {
temp = append(temp, f)
}
}
resultChan <- struct {
idx int
row []float64
}{idx: idx - 1, row: temp}
}(i, records[i])
}
go func() {
wg.Wait()
close(resultChan)
}()
for result := range resultChan {
dataset[result.idx] = result.row
}
duration := time.Since(startTime)
log.Printf("Loaded %d rows in %v seconds", len(dataset), duration)
return &dataset, nil
}
主要修复点:
- 预先分配切片大小避免并发追加
- 使用互斥锁或通道安全地写入数据
- 将循环变量作为参数传递给goroutine
- 正确处理索引偏移(跳过标题行)
这样就能确保每次运行都得到完整的254881行数据。

