Golang中如何解决这个规范问题

Golang中如何解决这个规范问题 我有一些 Golang 代码,如下所示:

package main

type MyStruct struct {
    field1 string
    field2 float64
    field3 int
    field4 bool
}

func main() {
    names := getNames()
    myStruct := getMyStruct(names)
    writeToCsv(myStruct)
}

func getNames() []string {
    // get list of names then return
}

func getMyStruct(names []string) []Mystruct {
    myStruct := []MyStruct{}
    for i := range names {
	    // do something then assign the computed values
	    myStruct = append(myStruct, MyStruct{
		    field1: value1,
		    field2: value2,
		    field3: value3,
		    field4: value4,
	    })
    }
    return myStruct
}

func writeToCsv(myStruct []MyStruct) {
    // prepare writer then write header
    for i := range myStruct {
	    // create the slice of string to be written to csv
    }
}

代码运行正常。但是,我希望能够将单个 CSV 文件中的行数限制为例如 50 万行,同时避免将相同 name(来自 names 切片)的行分开。

例如,name1 有 20 万行 MyStruct 数据,name2 有 28.9 万行 MyStruct 数据,name3 有 18 万行 MyStruct 数据。由于从 name1name3 的总行数已经超过 50 万,我希望将它们写入单个 CSV 文件。之后,我希望继续获取 name4name5 等的 MyStruct 数据,直到它们的总数再次超过 50 万。

使用闭包能否妥善解决上述问题?

另外,由于我使用的是 Golang,我认为如果我能了解如何并发执行以下操作会更好:

1. 获取 MyStruct 行数据
2. 将它们写入 CSV

感谢您的帮助。


更多关于Golang中如何解决这个规范问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

我会创建一个 map[string][]MyStruct,其中键是名称,这些名称将从 getNames() 返回。然后,通过简单地检查这个映射中 MyStructs 的 len(),我会决定将哪些 []MyStructs 传递给可变参数写入函数。或许可以这样做。换句话说:获取所有名称 >> 获取所有名称对应的结构体 >> 决定在单个 CSV 中包含哪些结构体。

func main() {
    fmt.Println("hello world")
}

更多关于Golang中如何解决这个规范问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中,您可以使用分块处理和并发来优化这个场景。以下是一个解决方案,使用闭包来管理状态,并通过goroutine并发处理数据获取和CSV写入:

package main

import (
    "encoding/csv"
    "os"
    "sync"
)

type MyStruct struct {
    field1 string
    field2 float64
    field3 int
    field4 bool
}

const maxRowsPerFile = 500000

func main() {
    names := getNames()
    
    // 使用工作池并发处理
    var wg sync.WaitGroup
    nameChan := make(chan string, len(names))
    resultChan := make(chan []MyStruct, len(names))
    
    // 启动worker goroutines来获取MyStruct数据
    for i := 0; i < 3; i++ { // 3个并发worker
        wg.Add(1)
        go func() {
            defer wg.Done()
            for name := range nameChan {
                myStructs := getMyStructForName(name)
                resultChan <- myStructs
            }
        }()
    }
    
    // 发送names到channel
    go func() {
        for _, name := range names {
            nameChan <- name
        }
        close(nameChan)
    }()
    
    // 收集结果并分块写入CSV
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    processAndWriteCSV(resultChan)
}

func getNames() []string {
    // 返回names列表
    return []string{"name1", "name2", "name3", "name4", "name5"}
}

func getMyStructForName(name string) []MyStruct {
    // 根据name获取对应的MyStruct数据
    var myStructs []MyStruct
    // 模拟数据获取逻辑
    return myStructs
}

func processAndWriteCSV(resultChan <-chan []MyStruct) {
    var currentBatch []MyStruct
    currentRowCount := 0
    fileCounter := 1
    
    for myStructs := range resultChan {
        // 检查是否应该开始新文件
        if currentRowCount+len(myStructs) > maxRowsPerFile && currentRowCount > 0 {
            writeBatchToCSV(currentBatch, fileCounter)
            fileCounter++
            currentBatch = nil
            currentRowCount = 0
        }
        
        // 添加当前name的所有数据到批次
        currentBatch = append(currentBatch, myStructs...)
        currentRowCount += len(myStructs)
    }
    
    // 写入最后一批数据
    if len(currentBatch) > 0 {
        writeBatchToCSV(currentBatch, fileCounter)
    }
}

func writeBatchToCSV(batch []MyStruct, fileNumber int) {
    filename := fmt.Sprintf("output_%d.csv", fileNumber)
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    
    writer := csv.NewWriter(file)
    defer writer.Flush()
    
    // 写入CSV头部
    header := []string{"field1", "field2", "field3", "field4"}
    writer.Write(header)
    
    // 写入数据行
    for _, item := range batch {
        record := []string{
            item.field1,
            fmt.Sprintf("%f", item.field2),
            fmt.Sprintf("%d", item.field3),
            fmt.Sprintf("%t", item.field4),
        }
        writer.Write(record)
    }
}

// 闭包版本的解决方案
func createBatchProcessor() func([]MyStruct) {
    var currentBatch []MyStruct
    currentRowCount := 0
    fileCounter := 1
    
    return func(myStructs []MyStruct) {
        if currentRowCount+len(myStructs) > maxRowsPerFile && currentRowCount > 0 {
            writeBatchToCSV(currentBatch, fileCounter)
            fileCounter++
            currentBatch = nil
            currentRowCount = 0
        }
        
        currentBatch = append(currentBatch, myStructs...)
        currentRowCount += len(myStructs)
    }
}

这个解决方案提供了两种方法:

  1. 并发处理:使用worker池并发获取不同name的MyStruct数据,同时保持每个name数据的完整性。

  2. 闭包方法createBatchProcessor函数返回一个闭包,该闭包维护内部状态来跟踪当前批次的行数和文件计数。

关键特性:

  • 确保同一name的所有行不会被分割到不同文件中
  • 当累计行数超过50万时自动创建新文件
  • 使用goroutine并发处理数据获取
  • 使用channel进行goroutine间的安全通信

您可以根据实际需求调整worker数量(for i := 0; i < 3; i++中的3)和maxRowsPerFile常量。

回到顶部