Golang中如何解决这个规范问题
Golang中如何解决这个规范问题 我有一些 Golang 代码,如下所示:
package main
type MyStruct struct {
field1 string
field2 float64
field3 int
field4 bool
}
func main() {
names := getNames()
myStruct := getMyStruct(names)
writeToCsv(myStruct)
}
func getNames() []string {
// get list of names then return
}
func getMyStruct(names []string) []Mystruct {
myStruct := []MyStruct{}
for i := range names {
// do something then assign the computed values
myStruct = append(myStruct, MyStruct{
field1: value1,
field2: value2,
field3: value3,
field4: value4,
})
}
return myStruct
}
func writeToCsv(myStruct []MyStruct) {
// prepare writer then write header
for i := range myStruct {
// create the slice of string to be written to csv
}
}
代码运行正常。但是,我希望能够将单个 CSV 文件中的行数限制为例如 50 万行,同时避免将相同 name(来自 names 切片)的行分开。
例如,name1 有 20 万行 MyStruct 数据,name2 有 28.9 万行 MyStruct 数据,name3 有 18 万行 MyStruct 数据。由于从 name1 到 name3 的总行数已经超过 50 万,我希望将它们写入单个 CSV 文件。之后,我希望继续获取 name4、name5 等的 MyStruct 数据,直到它们的总数再次超过 50 万。
使用闭包能否妥善解决上述问题?
另外,由于我使用的是 Golang,我认为如果我能了解如何并发执行以下操作会更好:
1. 获取 MyStruct 行数据
2. 将它们写入 CSV
感谢您的帮助。
更多关于Golang中如何解决这个规范问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我会创建一个 map[string][]MyStruct,其中键是名称,这些名称将从 getNames() 返回。然后,通过简单地检查这个映射中 MyStructs 的 len(),我会决定将哪些 []MyStructs 传递给可变参数写入函数。或许可以这样做。换句话说:获取所有名称 >> 获取所有名称对应的结构体 >> 决定在单个 CSV 中包含哪些结构体。
func main() {
fmt.Println("hello world")
}
更多关于Golang中如何解决这个规范问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Golang中,您可以使用分块处理和并发来优化这个场景。以下是一个解决方案,使用闭包来管理状态,并通过goroutine并发处理数据获取和CSV写入:
package main
import (
"encoding/csv"
"os"
"sync"
)
type MyStruct struct {
field1 string
field2 float64
field3 int
field4 bool
}
const maxRowsPerFile = 500000
func main() {
names := getNames()
// 使用工作池并发处理
var wg sync.WaitGroup
nameChan := make(chan string, len(names))
resultChan := make(chan []MyStruct, len(names))
// 启动worker goroutines来获取MyStruct数据
for i := 0; i < 3; i++ { // 3个并发worker
wg.Add(1)
go func() {
defer wg.Done()
for name := range nameChan {
myStructs := getMyStructForName(name)
resultChan <- myStructs
}
}()
}
// 发送names到channel
go func() {
for _, name := range names {
nameChan <- name
}
close(nameChan)
}()
// 收集结果并分块写入CSV
go func() {
wg.Wait()
close(resultChan)
}()
processAndWriteCSV(resultChan)
}
func getNames() []string {
// 返回names列表
return []string{"name1", "name2", "name3", "name4", "name5"}
}
func getMyStructForName(name string) []MyStruct {
// 根据name获取对应的MyStruct数据
var myStructs []MyStruct
// 模拟数据获取逻辑
return myStructs
}
func processAndWriteCSV(resultChan <-chan []MyStruct) {
var currentBatch []MyStruct
currentRowCount := 0
fileCounter := 1
for myStructs := range resultChan {
// 检查是否应该开始新文件
if currentRowCount+len(myStructs) > maxRowsPerFile && currentRowCount > 0 {
writeBatchToCSV(currentBatch, fileCounter)
fileCounter++
currentBatch = nil
currentRowCount = 0
}
// 添加当前name的所有数据到批次
currentBatch = append(currentBatch, myStructs...)
currentRowCount += len(myStructs)
}
// 写入最后一批数据
if len(currentBatch) > 0 {
writeBatchToCSV(currentBatch, fileCounter)
}
}
func writeBatchToCSV(batch []MyStruct, fileNumber int) {
filename := fmt.Sprintf("output_%d.csv", fileNumber)
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
// 写入CSV头部
header := []string{"field1", "field2", "field3", "field4"}
writer.Write(header)
// 写入数据行
for _, item := range batch {
record := []string{
item.field1,
fmt.Sprintf("%f", item.field2),
fmt.Sprintf("%d", item.field3),
fmt.Sprintf("%t", item.field4),
}
writer.Write(record)
}
}
// 闭包版本的解决方案
func createBatchProcessor() func([]MyStruct) {
var currentBatch []MyStruct
currentRowCount := 0
fileCounter := 1
return func(myStructs []MyStruct) {
if currentRowCount+len(myStructs) > maxRowsPerFile && currentRowCount > 0 {
writeBatchToCSV(currentBatch, fileCounter)
fileCounter++
currentBatch = nil
currentRowCount = 0
}
currentBatch = append(currentBatch, myStructs...)
currentRowCount += len(myStructs)
}
}
这个解决方案提供了两种方法:
-
并发处理:使用worker池并发获取不同name的MyStruct数据,同时保持每个name数据的完整性。
-
闭包方法:
createBatchProcessor函数返回一个闭包,该闭包维护内部状态来跟踪当前批次的行数和文件计数。
关键特性:
- 确保同一name的所有行不会被分割到不同文件中
- 当累计行数超过50万时自动创建新文件
- 使用goroutine并发处理数据获取
- 使用channel进行goroutine间的安全通信
您可以根据实际需求调整worker数量(for i := 0; i < 3; i++中的3)和maxRowsPerFile常量。

