Golang能否实现更高程度的并发优化

Golang能否实现更高程度的并发优化 我编写了一个小型Go代码,这是一个大型项目的一部分。我尝试将其改为并发执行,但认为在相同机器上的纯粹执行速度方面还有改进空间,比如对数据库或CSV文件进行多次调用。在本地SQLite数据库上表现良好,但我担心从远程数据库服务器获取数据时会显著变慢。以下是我的代码:

package main

import (
	"fmt"
	"reflect"

	"github.com/jinzhu/gorm"
	_ "github.com/jinzhu/gorm/dialects/sqlite"
)

type AirQuality struct {
	// gorm.Model
	// ID      uint   `gorm:"column:id"`
	Index   string `gorm:"column:index"`
	BEN     string `gorm:"column:BEN"`
	CH4     string `gorm:"column:CH4"`
	CO      string `gorm:"column:CO"`
	EBE     string `gorm:"column:EBE"`
	MXY     string `gorm:"column:MXY"`
	NMHC    string `gorm:"column:NMHC"`
	NO      string `gorm:"column:NO"`
	NO2     string `gorm:"column:NO_2"`
	NOX     string `gorm:"column:NOx"`
	OXY     string `gorm:"column:OXY"`
	O3      string `gorm:"column:O_3"`
	PM10    string `gorm:"column:PM10"`
	PM25    string `gorm:"column:PM25"`
	PXY     string `gorm:"column:PXY"`
	SO2     string `gorm:"column:SO_2"`
	TCH     string `gorm:"column:TCH"`
	TOL     string `gorm:"column:TOL"`
	Time    string `gorm:"column:date; type:timestamp"`
	Station string `gorm:"column:station"`
}

func (AirQuality) TableName() string {
	return "AQ"
}

func main() {
	c := generateRowsConcurrent("boring!!")

	for row := range c {
		fmt.Println(row)
	}
	// for {
	// 	fmt.Println(<-c)
	//  if c == nil {
	//      fmt.Println("Bye")
	//      break
	//  }
	// }
}

func generateRowsConcurrent(msg string) <-chan []string {
	c := make(chan []string)
	go func() {
		db, err := gorm.Open("sqlite3", "./load_testing_7.6m.db")
		if err != nil {
			panic("failed to connect database")
		}
		defer db.Close()
		rows, err := db.Model(&AirQuality{}).Limit(20).Rows()
		defer rows.Close()
		if err != nil {
			panic(err)
		}
		for rows.Next() {
			var aq AirQuality
			db.ScanRows(rows, &aq)
			v := reflect.Indirect(reflect.ValueOf(aq))
			var buf []string
			for i := 0; i < v.NumField(); i++ {
				buf = append(buf, v.Field(i).String())
			}
			c <- buf
		}

		defer close(c)
	}()
	return c
}

更多关于Golang能否实现更高程度的并发优化的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang能否实现更高程度的并发优化的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


以下是针对您的代码进行并发优化的具体实现方案。通过使用工作池模式、批量处理和连接池优化,可以显著提高从远程数据库获取数据时的性能。

package main

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sync"
	"time"

	"github.com/jinzhu/gorm"
	_ "github.com/jinzhu/gorm/dialects/sqlite"
)

type AirQuality struct {
	Index   string `gorm:"column:index"`
	BEN     string `gorm:"column:BEN"`
	CH4     string `gorm:"column:CH4"`
	CO      string `gorm:"column:CO"`
	EBE     string `gorm:"column:EBE"`
	MXY     string `gorm:"column:MXY"`
	NMHC    string `gorm:"column:NMHC"`
	NO      string `gorm:"column:NO"`
	NO2     string `gorm:"column:NO_2"`
	NOX     string `gorm:"column:NOx"`
	OXY     string `gorm:"column:OXY"`
	O3      string `gorm:"column:O_3"`
	PM10    string `gorm:"column:PM10"`
	PM25    string `gorm:"column:PM25"`
	PXY     string `gorm:"column:PXY"`
	SO2     string `gorm:"column:SO_2"`
	TCH     string `gorm:"column:TCH"`
	TOL     string `gorm:"column:TOL"`
	Time    string `gorm:"column:date; type:timestamp"`
	Station string `gorm:"column:station"`
}

func (AirQuality) TableName() string {
	return "AQ"
}

// 全局数据库连接池
var dbPool *gorm.DB

func initDB() {
	var err error
	dbPool, err = gorm.Open("sqlite3", "./load_testing_7.6m.db")
	if err != nil {
		log.Fatal("failed to connect database")
	}
	
	// 配置连接池参数
	dbPool.DB().SetMaxOpenConns(20)
	dbPool.DB().SetMaxIdleConns(10)
	dbPool.DB().SetConnMaxLifetime(time.Hour)
}

func main() {
	initDB()
	defer dbPool.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// 使用工作池处理数据
	results := processDataConcurrent(ctx, 5, 1000)
	
	for result := range results {
		if result.Err != nil {
			log.Printf("Error processing batch: %v", result.Err)
			continue
		}
		for _, row := range result.Data {
			fmt.Println(row)
		}
	}
}

type ProcessResult struct {
	Data [][]string
	Err  error
}

// 批量处理数据
func processDataConcurrent(ctx context.Context, workerCount, batchSize int) <-chan ProcessResult {
	out := make(chan ProcessResult)
	
	var wg sync.WaitGroup
	batches := generateBatches(ctx, batchSize)
	
	// 创建工作池
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			for batch := range batches {
				select {
				case <-ctx.Done():
					return
				default:
					data, err := processBatch(batch.Offset, batch.Limit)
					out <- ProcessResult{Data: data, Err: err}
				}
			}
		}(i)
	}
	
	// 等待所有worker完成并关闭输出通道
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

type Batch struct {
	Offset int
	Limit  int
}

func generateBatches(ctx context.Context, batchSize int) <-chan Batch {
	batches := make(chan Batch)
	
	go func() {
		defer close(batches)
		offset := 0
		for {
			select {
			case <-ctx.Done():
				return
			case batches <- Batch{Offset: offset, Limit: batchSize}:
				offset += batchSize
			}
		}
	}()
	
	return batches
}

func processBatch(offset, limit int) ([][]string, error) {
	var results [][]string
	
	// 使用连接池中的连接
	db := dbPool
	rows, err := db.Model(&AirQuality{}).Offset(offset).Limit(limit).Rows()
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	for rows.Next() {
		var aq AirQuality
		if err := db.ScanRows(rows, &aq); err != nil {
			return nil, err
		}
		
		v := reflect.Indirect(reflect.ValueOf(aq))
		var buf []string
		for i := 0; i < v.NumField(); i++ {
			buf = append(buf, v.Field(i).String())
		}
		results = append(results, buf)
	}
	
	return results, nil
}

// 替代方案:使用原生SQL进行批量处理
func processBatchSQL(offset, limit int) ([][]string, error) {
	var results [][]string
	
	query := "SELECT * FROM AQ LIMIT ? OFFSET ?"
	rows, err := dbPool.Raw(query, limit, offset).Rows()
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	columns, err := rows.Columns()
	if err != nil {
		return nil, err
	}

	values := make([]interface{}, len(columns))
	valuePtrs := make([]interface{}, len(columns))
	
	for rows.Next() {
		for i := range columns {
			valuePtrs[i] = &values[i]
		}
		
		if err := rows.Scan(valuePtrs...); err != nil {
			return nil, err
		}
		
		var row []string
		for i := range columns {
			var val string
			if values[i] != nil {
				val = fmt.Sprintf("%v", values[i])
			}
			row = append(row, val)
		}
		results = append(results, row)
	}
	
	return results, nil
}

这个优化版本实现了以下改进:

  1. 数据库连接池:使用全局连接池避免重复创建连接的开销
  2. 批量处理:将数据分成多个批次并行处理
  3. 工作池模式:限制并发goroutine数量,防止资源耗尽
  4. 上下文超时控制:添加超时机制防止长时间阻塞
  5. 错误处理:完善的错误处理机制

对于远程数据库,还可以进一步优化:

// 添加重试机制
func processBatchWithRetry(offset, limit int, maxRetries int) ([][]string, error) {
	var lastErr error
	for i := 0; i < maxRetries; i++ {
		data, err := processBatch(offset, limit)
		if err == nil {
			return data, nil
		}
		lastErr = err
		time.Sleep(time.Duration(i+1) * time.Second) // 指数退避
	}
	return nil, lastErr
}

这些优化措施可以有效减少远程数据库访问时的延迟影响,提高整体并发性能。

回到顶部