Golang能否实现更高程度的并发优化
Golang能否实现更高程度的并发优化 我编写了一个小型Go代码,这是一个大型项目的一部分。我尝试将其改为并发执行,但认为在相同机器上的纯粹执行速度方面还有改进空间,比如对数据库或CSV文件进行多次调用。在本地SQLite数据库上表现良好,但我担心从远程数据库服务器获取数据时会显著变慢。以下是我的代码:
package main
import (
"fmt"
"reflect"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
type AirQuality struct {
// gorm.Model
// ID uint `gorm:"column:id"`
Index string `gorm:"column:index"`
BEN string `gorm:"column:BEN"`
CH4 string `gorm:"column:CH4"`
CO string `gorm:"column:CO"`
EBE string `gorm:"column:EBE"`
MXY string `gorm:"column:MXY"`
NMHC string `gorm:"column:NMHC"`
NO string `gorm:"column:NO"`
NO2 string `gorm:"column:NO_2"`
NOX string `gorm:"column:NOx"`
OXY string `gorm:"column:OXY"`
O3 string `gorm:"column:O_3"`
PM10 string `gorm:"column:PM10"`
PM25 string `gorm:"column:PM25"`
PXY string `gorm:"column:PXY"`
SO2 string `gorm:"column:SO_2"`
TCH string `gorm:"column:TCH"`
TOL string `gorm:"column:TOL"`
Time string `gorm:"column:date; type:timestamp"`
Station string `gorm:"column:station"`
}
func (AirQuality) TableName() string {
return "AQ"
}
func main() {
c := generateRowsConcurrent("boring!!")
for row := range c {
fmt.Println(row)
}
// for {
// fmt.Println(<-c)
// if c == nil {
// fmt.Println("Bye")
// break
// }
// }
}
func generateRowsConcurrent(msg string) <-chan []string {
c := make(chan []string)
go func() {
db, err := gorm.Open("sqlite3", "./load_testing_7.6m.db")
if err != nil {
panic("failed to connect database")
}
defer db.Close()
rows, err := db.Model(&AirQuality{}).Limit(20).Rows()
defer rows.Close()
if err != nil {
panic(err)
}
for rows.Next() {
var aq AirQuality
db.ScanRows(rows, &aq)
v := reflect.Indirect(reflect.ValueOf(aq))
var buf []string
for i := 0; i < v.NumField(); i++ {
buf = append(buf, v.Field(i).String())
}
c <- buf
}
defer close(c)
}()
return c
}
更多关于Golang能否实现更高程度的并发优化的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang能否实现更高程度的并发优化的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
以下是针对您的代码进行并发优化的具体实现方案。通过使用工作池模式、批量处理和连接池优化,可以显著提高从远程数据库获取数据时的性能。
package main
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
type AirQuality struct {
Index string `gorm:"column:index"`
BEN string `gorm:"column:BEN"`
CH4 string `gorm:"column:CH4"`
CO string `gorm:"column:CO"`
EBE string `gorm:"column:EBE"`
MXY string `gorm:"column:MXY"`
NMHC string `gorm:"column:NMHC"`
NO string `gorm:"column:NO"`
NO2 string `gorm:"column:NO_2"`
NOX string `gorm:"column:NOx"`
OXY string `gorm:"column:OXY"`
O3 string `gorm:"column:O_3"`
PM10 string `gorm:"column:PM10"`
PM25 string `gorm:"column:PM25"`
PXY string `gorm:"column:PXY"`
SO2 string `gorm:"column:SO_2"`
TCH string `gorm:"column:TCH"`
TOL string `gorm:"column:TOL"`
Time string `gorm:"column:date; type:timestamp"`
Station string `gorm:"column:station"`
}
func (AirQuality) TableName() string {
return "AQ"
}
// 全局数据库连接池
var dbPool *gorm.DB
func initDB() {
var err error
dbPool, err = gorm.Open("sqlite3", "./load_testing_7.6m.db")
if err != nil {
log.Fatal("failed to connect database")
}
// 配置连接池参数
dbPool.DB().SetMaxOpenConns(20)
dbPool.DB().SetMaxIdleConns(10)
dbPool.DB().SetConnMaxLifetime(time.Hour)
}
func main() {
initDB()
defer dbPool.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 使用工作池处理数据
results := processDataConcurrent(ctx, 5, 1000)
for result := range results {
if result.Err != nil {
log.Printf("Error processing batch: %v", result.Err)
continue
}
for _, row := range result.Data {
fmt.Println(row)
}
}
}
type ProcessResult struct {
Data [][]string
Err error
}
// 批量处理数据
func processDataConcurrent(ctx context.Context, workerCount, batchSize int) <-chan ProcessResult {
out := make(chan ProcessResult)
var wg sync.WaitGroup
batches := generateBatches(ctx, batchSize)
// 创建工作池
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for batch := range batches {
select {
case <-ctx.Done():
return
default:
data, err := processBatch(batch.Offset, batch.Limit)
out <- ProcessResult{Data: data, Err: err}
}
}
}(i)
}
// 等待所有worker完成并关闭输出通道
go func() {
wg.Wait()
close(out)
}()
return out
}
type Batch struct {
Offset int
Limit int
}
func generateBatches(ctx context.Context, batchSize int) <-chan Batch {
batches := make(chan Batch)
go func() {
defer close(batches)
offset := 0
for {
select {
case <-ctx.Done():
return
case batches <- Batch{Offset: offset, Limit: batchSize}:
offset += batchSize
}
}
}()
return batches
}
func processBatch(offset, limit int) ([][]string, error) {
var results [][]string
// 使用连接池中的连接
db := dbPool
rows, err := db.Model(&AirQuality{}).Offset(offset).Limit(limit).Rows()
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var aq AirQuality
if err := db.ScanRows(rows, &aq); err != nil {
return nil, err
}
v := reflect.Indirect(reflect.ValueOf(aq))
var buf []string
for i := 0; i < v.NumField(); i++ {
buf = append(buf, v.Field(i).String())
}
results = append(results, buf)
}
return results, nil
}
// 替代方案:使用原生SQL进行批量处理
func processBatchSQL(offset, limit int) ([][]string, error) {
var results [][]string
query := "SELECT * FROM AQ LIMIT ? OFFSET ?"
rows, err := dbPool.Raw(query, limit, offset).Rows()
if err != nil {
return nil, err
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
return nil, err
}
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for rows.Next() {
for i := range columns {
valuePtrs[i] = &values[i]
}
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
var row []string
for i := range columns {
var val string
if values[i] != nil {
val = fmt.Sprintf("%v", values[i])
}
row = append(row, val)
}
results = append(results, row)
}
return results, nil
}
这个优化版本实现了以下改进:
- 数据库连接池:使用全局连接池避免重复创建连接的开销
- 批量处理:将数据分成多个批次并行处理
- 工作池模式:限制并发goroutine数量,防止资源耗尽
- 上下文超时控制:添加超时机制防止长时间阻塞
- 错误处理:完善的错误处理机制
对于远程数据库,还可以进一步优化:
// 添加重试机制
func processBatchWithRetry(offset, limit int, maxRetries int) ([][]string, error) {
var lastErr error
for i := 0; i < maxRetries; i++ {
data, err := processBatch(offset, limit)
if err == nil {
return data, nil
}
lastErr = err
time.Sleep(time.Duration(i+1) * time.Second) // 指数退避
}
return nil, lastErr
}
这些优化措施可以有效减少远程数据库访问时的延迟影响,提高整体并发性能。

