Golang如何对CSV文件的行进行排序

Golang如何对CSV文件的行进行排序 我想对一个如下所示的CSV文件进行排序:

Timestamp,Message 2019-12-03T11:10:35,mymessage1 2019-12-03T10:10:10,mymessage2 2019-11-03T12:140:35,mymessage3

排序应基于时间戳字段进行。 我编写了以下代码来导入并排序CSV文件:

package main

import (
	"encoding/csv"
	"fmt"
	"os"
	"sort"
)

func readCsvFile(filePath string) [][]string {
	f, err := os.Open(filePath)
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer f.Close()

	csvReader := csv.NewReader(f)
	records, err := csvReader.ReadAll()
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	sort.Slice(records, func(i, j int) bool {
		return records[i][0] < records[j][0]
	})

	return records
}

func main() {
	records := readCsvFile("myfile.csv")
	fmt.Println(records)
}

我的问题是,如何在排序时跳过表头?

此外,这段代码将在一台资源有限的机器上运行(大部分资源已被另一个程序占用)。很难随时确定有多少可用资源,考虑到要排序的CSV文件大约有10万行,总文件大小约为50 MB,从效率角度来看,这种方法是最佳的吗?或者您是否建议采用更好的方法来降低内存不足错误的风险?

谢谢!


更多关于Golang如何对CSV文件的行进行排序的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

如果标题行是第一行,则添加特殊条件,使得任何行都不会超过第一行。

更多关于Golang如何对CSV文件的行进行排序的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


解决方案其实很简单,就在我眼前,但直到你建议后我才想明白。

sort.Slice(records[1:], func(i, j int) bool {
	return records[1:][i][0] < records[1:][j][0]
})

这样排序会处理除标题行之外的所有内容。

我会研究一下外部归并排序算法,看看它是否适用于我的应用场景。

关于标题行的问题——在你执行 records, err := csvReader.ReadAll()(该操作会读取CSV文件中的所有记录)之前,先执行 header, err := csvReader.Read(),这将读取第一条记录(即你的标题行),然后再执行 ReadAll() 来读取剩余部分,这样你就可以将它们分别返回。

关于第二个资源有限的问题,我想你正在寻找的是外部归并排序算法

对于跳过表头进行排序,可以在排序前将表头分离出来,排序完成后再重新组合。以下是修改后的代码示例:

package main

import (
	"encoding/csv"
	"fmt"
	"os"
	"sort"
)

func readCsvFile(filePath string) [][]string {
	f, err := os.Open(filePath)
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer f.Close()

	csvReader := csv.NewReader(f)
	records, err := csvReader.ReadAll()
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	if len(records) == 0 {
		return records
	}

	// 分离表头和数据
	header := records[0]
	data := records[1:]

	// 对数据进行排序
	sort.Slice(data, func(i, j int) bool {
		return data[i][0] < data[j][0]
	})

	// 重新组合表头和数据
	sortedRecords := make([][]string, 0, len(records))
	sortedRecords = append(sortedRecords, header)
	sortedRecords = append(sortedRecords, data...)

	return sortedRecords
}

func main() {
	records := readCsvFile("myfile.csv")
	fmt.Println(records)
}

对于10万行、50MB的CSV文件,使用ReadAll()方法会将整个文件加载到内存中。在资源受限的环境中,建议使用流式处理来降低内存使用。以下是改进的版本:

package main

import (
	"encoding/csv"
	"fmt"
	"os"
	"sort"
)

type csvRecord struct {
	timestamp string
	message   string
	original  []string
}

func sortCSVFile(inputFile, outputFile string) error {
	// 读取文件
	inFile, err := os.Open(inputFile)
	if err != nil {
		return err
	}
	defer inFile.Close()

	csvReader := csv.NewReader(inFile)
	
	// 读取表头
	header, err := csvReader.Read()
	if err != nil {
		return err
	}

	// 读取所有数据行到内存
	var records []csvRecord
	for {
		row, err := csvReader.Read()
		if err != nil {
			break
		}
		if len(row) >= 2 {
			records = append(records, csvRecord{
				timestamp: row[0],
				message:   row[1],
				original:  row,
			})
		}
	}

	// 按时间戳排序
	sort.Slice(records, func(i, j int) bool {
		return records[i].timestamp < records[j].timestamp
	})

	// 写入排序后的文件
	outFile, err := os.Create(outputFile)
	if err != nil {
		return err
	}
	defer outFile.Close()

	csvWriter := csv.NewWriter(outFile)
	defer csvWriter.Flush()

	// 写入表头
	if err := csvWriter.Write(header); err != nil {
		return err
	}

	// 写入排序后的数据
	for _, record := range records {
		if err := csvWriter.Write(record.original); err != nil {
			return err
		}
	}

	return nil
}

func main() {
	err := sortCSVFile("myfile.csv", "sorted_myfile.csv")
	if err != nil {
		fmt.Println("Error:", err)
		os.Exit(1)
	}
	fmt.Println("CSV file sorted successfully")
}

如果内存仍然受限,可以考虑使用外部排序算法。以下是基于磁盘的外部排序示例:

package main

import (
	"bufio"
	"encoding/csv"
	"fmt"
	"os"
	"sort"
	"strconv"
	"strings"
)

func externalSortCSV(inputFile, outputFile string, chunkSize int) error {
	// 分割文件为多个排序好的块
	chunkFiles, err := createSortedChunks(inputFile, chunkSize)
	if err != nil {
		return err
	}
	defer cleanupChunks(chunkFiles)

	// 合并排序好的块
	err = mergeSortedChunks(chunkFiles, outputFile)
	return err
}

func createSortedChunks(inputFile string, chunkSize int) ([]string, error) {
	inFile, err := os.Open(inputFile)
	if err != nil {
		return nil, err
	}
	defer inFile.Close()

	reader := csv.NewReader(inFile)
	
	// 跳过表头
	header, err := reader.Read()
	if err != nil {
		return nil, err
	}

	var chunkFiles []string
	var chunk [][]string
	chunkIndex := 0

	for {
		row, err := reader.Read()
		if err != nil {
			break
		}
		
		chunk = append(chunk, row)
		
		if len(chunk) >= chunkSize {
			// 排序当前块
			sort.Slice(chunk, func(i, j int) bool {
				return chunk[i][0] < chunk[j][0]
			})
			
			// 写入临时文件
			chunkFile := fmt.Sprintf("chunk_%d.csv", chunkIndex)
			if err := writeChunk(chunkFile, header, chunk); err != nil {
				return nil, err
			}
			
			chunkFiles = append(chunkFiles, chunkFile)
			chunk = nil
			chunkIndex++
		}
	}

	// 处理最后一个块
	if len(chunk) > 0 {
		sort.Slice(chunk, func(i, j int) bool {
			return chunk[i][0] < chunk[j][0]
		})
		
		chunkFile := fmt.Sprintf("chunk_%d.csv", chunkIndex)
		if err := writeChunk(chunkFile, header, chunk); err != nil {
			return nil, err
		}
		chunkFiles = append(chunkFiles, chunkFile)
	}

	return chunkFiles, nil
}

func writeChunk(filename string, header []string, data [][]string) error {
	file, err := os.Create(filename)
	if err != nil {
		return err
	}
	defer file.Close()

	writer := csv.NewWriter(file)
	if err := writer.Write(header); err != nil {
		return err
	}
	if err := writer.WriteAll(data); err != nil {
		return err
	}
	writer.Flush()
	return writer.Error()
}

func mergeSortedChunks(chunkFiles []string, outputFile string) error {
	if len(chunkFiles) == 0 {
		return nil
	}

	// 打开所有块文件
	readers := make([]*csv.Reader, len(chunkFiles))
	files := make([]*os.File, len(chunkFiles))
	
	for i, chunkFile := range chunkFiles {
		file, err := os.Open(chunkFile)
		if err != nil {
			return err
		}
		files[i] = file
		reader := csv.NewReader(file)
		// 跳过每个块的表头
		if _, err := reader.Read(); err != nil {
			return err
		}
		readers[i] = reader
	}
	defer func() {
		for _, file := range files {
			file.Close()
		}
	}()

	// 创建输出文件
	outFile, err := os.Create(outputFile)
	if err != nil {
		return err
	}
	defer outFile.Close()

	writer := csv.NewWriter(outFile)
	defer writer.Flush()

	// 写入表头(从第一个块读取)
	header := []string{"Timestamp", "Message"}
	if err := writer.Write(header); err != nil {
		return err
	}

	// 多路归并
	rows := make([]csvRow, len(readers))
	for i, reader := range readers {
		row, err := reader.Read()
		if err == nil {
			rows[i] = csvRow{data: row, readerIndex: i}
		}
	}

	for {
		// 找到最小时间戳的行
		minIndex := -1
		var minRow csvRow
		
		for i, row := range rows {
			if row.data != nil {
				if minIndex == -1 || row.data[0] < minRow.data[0] {
					minIndex = i
					minRow = row
				}
			}
		}

		if minIndex == -1 {
			break
		}

		// 写入最小行
		if err := writer.Write(minRow.data); err != nil {
			return err
		}

		// 从对应的reader读取下一行
		nextRow, err := readers[minIndex].Read()
		if err != nil {
			rows[minIndex] = csvRow{}
		} else {
			rows[minIndex] = csvRow{data: nextRow, readerIndex: minIndex}
		}
	}

	return nil
}

type csvRow struct {
	data        []string
	readerIndex int
}

func cleanupChunks(chunkFiles []string) {
	for _, file := range chunkFiles {
		os.Remove(file)
	}
}

func main() {
	// 设置每个块的大小(例如:10000行)
	chunkSize := 10000
	err := externalSortCSV("myfile.csv", "sorted_myfile.csv", chunkSize)
	if err != nil {
		fmt.Println("Error:", err)
		os.Exit(1)
	}
	fmt.Println("CSV file sorted successfully using external sort")
}

第一个示例适合内存充足的情况,第二个示例使用结构体存储数据更高效,第三个示例使用外部排序算法,通过分块和归并的方式处理大文件,适合内存受限的环境。

回到顶部