Golang读取百万级CSV文件记录时遇到问题

Golang读取百万级CSV文件记录时遇到问题 无法从CSV文件中读取100万条记录,并且在更新到MySQL数据库时遇到空指针异常。

15 回复

好的。

更多关于Golang读取百万级CSV文件记录时遇到问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我建议您直接使用插入操作,而不是准备+插入的组合方式。

让我检查并在我的代码中实现这个。无论如何,谢谢。

好的,查尔斯,但在 Go 语言中如何读取文件块呢?

Abhishek_Singh:

出现空指针异常。

能否提供你收到的具体错误信息?

除非你分享一些代码,否则我们无法帮助你。你基本上是在告诉我们"我的车坏了",然后期望我们告诉你问题出在哪里。

可以使用类似这样的方法(虚构示例):

if _, err := db.Exec("INSERT table SET a=?,b=?", a,b); err != nil {
    log.Println(err)
}

请查看我的代码并修正它。我的文件应该分块读取,并且需要更新数据库,因为我有一个包含100万条数据的CSV文件。

func main() {
    fmt.Println("hello world")
}

使用 bufio.NewReaderSize()。可以设置任意大小。

func main() {
    reader := bufio.NewReaderSize(file, 1024*1024) // 1MB buffer
}

GitHub头像

mzimmerman/multicorecsv

一个使用Go语言编写的多核CSV读取器库。欢迎通过在GitHub上创建账户来为mzimmerman/multicorecsv的开发做出贡献。

预先准备语句应该更高效,但正如前面有人提到的,这应该在 for 循环之前完成,只需执行一次。这意味着关闭操作也应该在循环结束后进行。

不过,据我理解这是次要问题,主要问题是如何读取庞大的 CSV 文件。我在上面贴出了一种实现方式,他可以直接使用那段代码或者参考其中的实现方法。

// 代码示例已按规则保留原文

Abhishek_Singh:

insert, err := datasource.Db.Prepare("INSERT INTO geo_location(ip_address,country_code,country,city,latitude,longitude,mystery_value) VALUES(?,?,?,?,?,?,?)")

看起来你应该在"for"循环之外创建"insert"语句并重复使用它。另外,你使用"err"作为返回值,但测试的却是"error"。

无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_count 语句(当前值:16382)等待 2 秒%!(EXTRA *mysql.MySQLError=Error 1461: 无法创建超过 max_prepared_stmt_

start := time.Now()
csvFile, _ := os.Open("data/data_dump.csv")
reader := csv.NewReader(bufio.NewReader(csvFile))
var status []models.EntryStatus
var errorCount = 0
var count = 0
var successCount = 0
for {
	line, error := reader.Read()
	if error == io.EOF {
		break
	} else {
		count++
		validation := utils.Validation(line)
		if validation {
			errorCount++
		} else {
			insert, err := datasource.Db.Prepare("INSERT INTO geo_location(ip_address,country_code,country,city,latitude,longitude,mystery_value) VALUES(?,?,?,?,?,?,?)")
			if error != nil {
				errorCount++
				panic(err.Error())
			} else {
				insert.Exec(line[0], utils.TrimQuotes(line[1]), utils.TrimQuotes(line[2]), utils.TrimQuotes(line[3]), utils.TrimQuotes(line[4]), utils.TrimQuotes(line[5]), utils.TrimQuotes(line[6]))
				defer datasource.Db.Close()
				successCount++
			}
		}
	}
}
elapsed := time.Since(start)
status =
	append(status, models.EntryStatus{
		Error:   errorCount,
		Success: successCount,
		Count:   count,
		Elapsed: elapsed,
	})
statusJson, _ := json.Marshal(status)
fmt.Fprintf(w, string(statusJson))

我的CSV文件中有100万条记录。我需要如何读取并更新到数据库中。

在处理百万级CSV文件记录时,Go语言提供了高效的读取和数据库操作方式。以下是一个完整的示例,展示如何正确读取大型CSV文件并批量插入MySQL数据库,避免空指针异常。

package main

import (
    "database/sql"
    "encoding/csv"
    "fmt"
    "log"
    "os"
    "strconv"

    _ "github.com/go-sql-driver/mysql"
)

type Record struct {
    ID    int
    Name  string
    Email string
    Age   int
}

func main() {
    // 打开CSV文件
    file, err := os.Open("data.csv")
    if err != nil {
        log.Fatal("无法打开CSV文件:", err)
    }
    defer file.Close()

    // 创建CSV阅读器
    reader := csv.NewReader(file)
    
    // 可选:设置读取限制,对于大型文件
    reader.FieldsPerRecord = -1 // 允许可变字段数
    
    // 读取所有记录
    records, err := reader.ReadAll()
    if err != nil {
        log.Fatal("读取CSV记录失败:", err)
    }

    fmt.Printf("成功读取 %d 条记录\n", len(records))

    // 连接MySQL数据库
    db, err := sql.Open("mysql", "username:password@tcp(localhost:3306)/database")
    if err != nil {
        log.Fatal("数据库连接失败:", err)
    }
    defer db.Close()

    // 验证数据库连接
    err = db.Ping()
    if err != nil {
        log.Fatal("数据库连接验证失败:", err)
    }

    // 批量插入记录(每次1000条)
    batchSize := 1000
    for i := 1; i < len(records); i += batchSize {
        end := i + batchSize
        if end > len(records) {
            end = len(records)
        }

        batch := records[i:end]
        err := insertBatch(db, batch)
        if err != nil {
            log.Printf("批量插入失败 (记录 %d-%d): %v", i, end-1, err)
        } else {
            fmt.Printf("成功插入记录 %d-%d\n", i, end-1)
        }
    }
}

func insertBatch(db *sql.DB, records [][]string) error {
    // 开始事务
    tx, err := db.Begin()
    if err != nil {
        return err
    }

    // 准备插入语句
    stmt, err := tx.Prepare("INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)")
    if err != nil {
        tx.Rollback()
        return err
    }
    defer stmt.Close()

    // 遍历记录并插入
    for _, record := range records {
        // 数据验证和转换
        if len(record) < 4 {
            continue // 跳过不完整的记录
        }

        // 转换ID
        id, err := strconv.Atoi(record[0])
        if err != nil {
            continue // 跳过无效的ID
        }

        name := record[1]
        email := record[2]

        // 转换Age
        age, err := strconv.Atoi(record[3])
        if err != nil {
            age = 0 // 为无效年龄设置默认值
        }

        // 检查空值
        if name == "" || email == "" {
            continue // 跳过必需字段为空的记录
        }

        // 执行插入
        _, err = stmt.Exec(id, name, email, age)
        if err != nil {
            tx.Rollback()
            return err
        }
    }

    // 提交事务
    return tx.Commit()
}

对于更高效的内存使用,可以使用流式处理:

func processCSVStreaming(filePath string, db *sql.DB) error {
    file, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer file.Close()

    reader := csv.NewReader(file)
    
    // 跳过标题行(如果存在)
    if _, err := reader.Read(); err != nil {
        return err
    }

    batch := make([][]string, 0, 1000)
    recordCount := 0

    for {
        record, err := reader.Read()
        if err != nil {
            break
        }

        batch = append(batch, record)
        recordCount++

        if len(batch) >= 1000 {
            if err := insertBatch(db, batch); err != nil {
                return err
            }
            batch = batch[:0] // 清空批次
        }
    }

    // 插入剩余记录
    if len(batch) > 0 {
        if err := insertBatch(db, batch); err != nil {
            return err
        }
    }

    fmt.Printf("总共处理了 %d 条记录\n", recordCount)
    return nil
}

关键要点:

  1. 使用事务和批量插入提高数据库性能
  2. 实现数据验证避免空指针异常
  3. 对于超大文件使用流式处理减少内存占用
  4. 正确处理错误和资源释放

确保MySQL连接参数正确,数据库表结构匹配CSV文件格式。

回到顶部