Golang读取百万级CSV文件记录时遇到问题
Golang读取百万级CSV文件记录时遇到问题 无法从CSV文件中读取100万条记录,并且在更新到MySQL数据库时遇到空指针异常。
让我检查并在我的代码中实现这个。无论如何,谢谢。
好的,查尔斯,但在 Go 语言中如何读取文件块呢?
Abhishek_Singh:
出现空指针异常。
能否提供你收到的具体错误信息?
除非你分享一些代码,否则我们无法帮助你。你基本上是在告诉我们"我的车坏了",然后期望我们告诉你问题出在哪里。
可以使用类似这样的方法(虚构示例):
if _, err := db.Exec("INSERT table SET a=?,b=?", a,b); err != nil {
log.Println(err)
}
请查看我的代码并修正它。我的文件应该分块读取,并且需要更新数据库,因为我有一个包含100万条数据的CSV文件。
func main() {
fmt.Println("hello world")
}
使用 bufio.NewReaderSize()。可以设置任意大小。
func main() {
reader := bufio.NewReaderSize(file, 1024*1024) // 1MB buffer
}
预先准备语句应该更高效,但正如前面有人提到的,这应该在 for 循环之前完成,只需执行一次。这意味着关闭操作也应该在循环结束后进行。
不过,据我理解这是次要问题,主要问题是如何读取庞大的 CSV 文件。我在上面贴出了一种实现方式,他可以直接使用那段代码或者参考其中的实现方法。
// 代码示例已按规则保留原文
Abhishek_Singh:
insert, err := datasource.Db.Prepare("INSERT INTO geo_location(ip_address,country_code,country,city,latitude,longitude,mystery_value) VALUES(?,?,?,?,?,?,?)")
看起来你应该在"for"循环之外创建"insert"语句并重复使用它。另外,你使用"err"作为返回值,但测试的却是"error"。
无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_
start := time.Now()
csvFile, _ := os.Open("data/data_dump.csv")
reader := csv.NewReader(bufio.NewReader(csvFile))
var status []models.EntryStatus
var errorCount = 0
var count = 0
var successCount = 0
for {
line, error := reader.Read()
if error == io.EOF {
break
} else {
count++
validation := utils.Validation(line)
if validation {
errorCount++
} else {
insert, err := datasource.Db.Prepare("INSERT INTO geo_location(ip_address,country_code,country,city,latitude,longitude,mystery_value) VALUES(?,?,?,?,?,?,?)")
if error != nil {
errorCount++
panic(err.Error())
} else {
insert.Exec(line[0], utils.TrimQuotes(line[1]), utils.TrimQuotes(line[2]), utils.TrimQuotes(line[3]), utils.TrimQuotes(line[4]), utils.TrimQuotes(line[5]), utils.TrimQuotes(line[6]))
defer datasource.Db.Close()
successCount++
}
}
}
}
elapsed := time.Since(start)
status =
append(status, models.EntryStatus{
Error: errorCount,
Success: successCount,
Count: count,
Elapsed: elapsed,
})
statusJson, _ := json.Marshal(status)
fmt.Fprintf(w, string(statusJson))
我的CSV文件中有100万条记录。我需要如何读取并更新到数据库中。
在处理百万级CSV文件记录时,Go语言提供了高效的读取和数据库操作方式。以下是一个完整的示例,展示如何正确读取大型CSV文件并批量插入MySQL数据库,避免空指针异常。
package main
import (
"database/sql"
"encoding/csv"
"fmt"
"log"
"os"
"strconv"
_ "github.com/go-sql-driver/mysql"
)
type Record struct {
ID int
Name string
Email string
Age int
}
func main() {
// 打开CSV文件
file, err := os.Open("data.csv")
if err != nil {
log.Fatal("无法打开CSV文件:", err)
}
defer file.Close()
// 创建CSV阅读器
reader := csv.NewReader(file)
// 可选:设置读取限制,对于大型文件
reader.FieldsPerRecord = -1 // 允许可变字段数
// 读取所有记录
records, err := reader.ReadAll()
if err != nil {
log.Fatal("读取CSV记录失败:", err)
}
fmt.Printf("成功读取 %d 条记录\n", len(records))
// 连接MySQL数据库
db, err := sql.Open("mysql", "username:password@tcp(localhost:3306)/database")
if err != nil {
log.Fatal("数据库连接失败:", err)
}
defer db.Close()
// 验证数据库连接
err = db.Ping()
if err != nil {
log.Fatal("数据库连接验证失败:", err)
}
// 批量插入记录(每次1000条)
batchSize := 1000
for i := 1; i < len(records); i += batchSize {
end := i + batchSize
if end > len(records) {
end = len(records)
}
batch := records[i:end]
err := insertBatch(db, batch)
if err != nil {
log.Printf("批量插入失败 (记录 %d-%d): %v", i, end-1, err)
} else {
fmt.Printf("成功插入记录 %d-%d\n", i, end-1)
}
}
}
func insertBatch(db *sql.DB, records [][]string) error {
// 开始事务
tx, err := db.Begin()
if err != nil {
return err
}
// 准备插入语句
stmt, err := tx.Prepare("INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)")
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
// 遍历记录并插入
for _, record := range records {
// 数据验证和转换
if len(record) < 4 {
continue // 跳过不完整的记录
}
// 转换ID
id, err := strconv.Atoi(record[0])
if err != nil {
continue // 跳过无效的ID
}
name := record[1]
email := record[2]
// 转换Age
age, err := strconv.Atoi(record[3])
if err != nil {
age = 0 // 为无效年龄设置默认值
}
// 检查空值
if name == "" || email == "" {
continue // 跳过必需字段为空的记录
}
// 执行插入
_, err = stmt.Exec(id, name, email, age)
if err != nil {
tx.Rollback()
return err
}
}
// 提交事务
return tx.Commit()
}
对于更高效的内存使用,可以使用流式处理:
func processCSVStreaming(filePath string, db *sql.DB) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
reader := csv.NewReader(file)
// 跳过标题行(如果存在)
if _, err := reader.Read(); err != nil {
return err
}
batch := make([][]string, 0, 1000)
recordCount := 0
for {
record, err := reader.Read()
if err != nil {
break
}
batch = append(batch, record)
recordCount++
if len(batch) >= 1000 {
if err := insertBatch(db, batch); err != nil {
return err
}
batch = batch[:0] // 清空批次
}
}
// 插入剩余记录
if len(batch) > 0 {
if err := insertBatch(db, batch); err != nil {
return err
}
}
fmt.Printf("总共处理了 %d 条记录\n", recordCount)
return nil
}
关键要点:
- 使用事务和批量插入提高数据库性能
- 实现数据验证避免空指针异常
- 对于超大文件使用流式处理减少内存占用
- 正确处理错误和资源释放
确保MySQL连接参数正确,数据库表结构匹配CSV文件格式。


