golang事件聚合与批量处理解决方案插件库go-accumulator的使用

go-accumulator - Golang事件聚合与批量处理解决方案

简介

go-accumulator是一个用于事件聚合和批量处理的Golang库。它解决了单条数据处理耗时过长的问题,通过将数据累积到一定数量或时间间隔后再进行批量处理。

功能特点

  1. 提供两种触发处理的条件:

    • 存储达到最大容量(flushSize)
    • 达到累积时间间隔(flushInterval)
  2. 提供两种添加数据的方法:

    • AddAsync - 异步添加数据,不等待处理完成
    • AddSync - 同步添加数据,等待处理完成

安装

go get github.com/nar10z/go-accumulator

使用示例

下面是一个完整的使用示例,展示了如何使用go-accumulator进行事件聚合和批量处理:

package main

import (
	"context"
	"fmt"
	"strings"
	"sync"
	"time"

	goaccum "github.com/nar10z/go-accumulator"
)

func main() {
	// 创建带有超时的上下文
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	const (
		countSync  = 4 // 同步添加的事件数量
		countAsync = 3 // 异步添加的事件数量
	)

	// 创建accumulator实例
	// 参数说明:
	// - 3: 最大flush大小(flushSize)
	// - time.Second: flush间隔(flushInterval)
	// - 200*time.Millisecond: 处理超时时间
	// - 最后一个参数是处理函数(flushFunc)
	accumulator := goaccum.New[string](3, time.Second, 200*time.Millisecond, func(ctx context.Context, events []string) error {
		fmt.Printf("开始处理 %d 个事件:\n", len(events))
		for _, e := range events {
			fmt.Printf(" - %s\n", e)
		}
		fmt.Printf("处理完成\n%s\n", strings.Repeat("-", 20))
		return nil
	})

	var wg sync.WaitGroup
	wg.Add(countSync + countAsync)

	// 异步添加事件
	go func() {
		for i := 0; i < countAsync; i++ {
			err := accumulator.AddAsync(ctx, fmt.Sprintf("异步事件 #%d", i))
			if err != nil {
				fmt.Printf("添加事件失败: %v\n", err)
			}
			wg.Done()
		}
	}()

	// 同步添加事件
	go func() {
		for i := 0; i < countSync; i++ {
			i := i
			go func() {
				err := accumulator.AddSync(ctx, fmt.Sprintf("同步事件 #%d", i))
				if err != nil {
					fmt.Printf("添加事件失败: %v\n", err)
				}
				wg.Done()
			}()
		}
	}()

	// 等待所有事件添加完成
	wg.Wait()

	// 停止accumulator
	accumulator.Stop()
}

输出示例

运行上述代码可能会产生如下输出:

开始处理 3 个事件:
 - 同步事件 #3
 - 异步事件 #0
 - 异步事件 #1
处理完成
--------------------
开始处理 3 个事件:
 - 异步事件 #2
 - 同步事件 #0
 - 同步事件 #1
处理完成
--------------------
开始处理 1 个事件:
 - 同步事件 #2
处理完成
--------------------

工作原理

  1. 当添加的事件数量达到flushSize(本例中为3)时,会触发处理函数
  2. 如果未达到flushSize但达到了flushInterval(本例中为1秒),也会触发处理函数
  3. 处理函数会对累积的事件进行批量处理

适用场景

go-accumulator特别适合以下场景:

  • 单条数据处理开销较大
  • 需要减少频繁处理的开销
  • 可以容忍一定延迟的批量处理

许可证

MIT License


更多关于golang事件聚合与批量处理解决方案插件库go-accumulator的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang事件聚合与批量处理解决方案插件库go-accumulator的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


go-accumulator 使用指南

go-accumulator 是一个用于事件聚合与批量处理的 Golang 库,它可以帮助开发者高效地收集、聚合和批量处理事件数据。下面我将详细介绍它的使用方法和示例代码。

核心特性

  1. 事件聚合:将多个小事件聚合成批量数据
  2. 定时触发:支持按时间间隔自动触发处理
  3. 大小触发:当累积事件达到指定数量时触发
  4. 并发安全:支持多协程并发操作
  5. 自定义处理:灵活定义批量数据处理逻辑

安装

go get github.com/go-accumulator/accumulator

基本使用示例

package main

import (
	"fmt"
	"time"
	
	"github.com/go-accumulator/accumulator"
)

func main() {
	// 创建一个新的累加器
	acc := accumulator.NewAccumulator(
		accumulator.WithBatchSize(10),     // 每10条触发一次
		accumulator.WithInterval(5*time.Second), // 或5秒触发一次
		accumulator.WithHandler(handleBatch), // 设置批处理函数
	)
	
	// 启动累加器
	acc.Start()
	
	// 模拟产生事件
	for i := 0; i < 25; i++ {
		acc.Add(i) // 添加事件
		time.Sleep(500 * time.Millisecond)
	}
	
	// 关闭累加器
	acc.Stop()
}

// 批处理函数
func handleBatch(items []interface{}) {
	fmt.Printf("处理批量数据(%d条): %v\n", len(items), items)
}

高级配置选项

acc := accumulator.NewAccumulator(
	accumulator.WithBatchSize(100),          // 每100条触发
	accumulator.WithInterval(30*time.Second), // 或30秒触发
	accumulator.WithHandler(handleBatch),     // 处理函数
	accumulator.WithMaxWait(5*time.Second),  // 最大等待时间
	accumulator.WithBufferSize(1000),        // 缓冲区大小
	accumulator.WithSharding(4),             // 分片数,提高并发性能
)

实际应用场景示例

1. 日志批量上传

func main() {
	// 创建日志累加器
	logAcc := accumulator.NewAccumulator(
		accumulator.WithBatchSize(50),
		accumulator.WithInterval(10*time.Second),
		accumulator.WithHandler(uploadLogs),
	)
	
	// 在HTTP处理中使用
	http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
		log := r.URL.Query().Get("message")
		if log != "" {
			logAcc.Add(log)
		}
		w.WriteHeader(http.StatusOK)
	})
	
	logAcc.Start()
	http.ListenAndServe(":8080", nil)
}

func uploadLogs(logs []interface{}) {
	// 将日志批量上传到日志服务器
	fmt.Printf("上传%d条日志\n", len(logs))
}

2. 数据库批量插入

func main() {
	dbAcc := accumulator.NewAccumulator(
		accumulator.WithBatchSize(200),
		accumulator.WithInterval(15*time.Second),
		accumulator.WithHandler(batchInsert),
	)
	
	// 模拟数据产生
	for i := 0; i < 1000; i++ {
		record := map[string]interface{}{
			"id":   i,
			"name": fmt.Sprintf("user%d", i),
		}
		dbAcc.Add(record)
	}
	
	dbAcc.Start()
	time.Sleep(1 * time.Minute)
	dbAcc.Stop()
}

func batchInsert(records []interface{}) {
	// 这里实现批量插入数据库逻辑
	fmt.Printf("批量插入%d条记录\n", len(records))
}

性能优化建议

  1. 合理设置批大小:根据后端处理能力调整
  2. 使用分片:高并发场景下使用WithSharding
  3. 监控指标:添加处理耗时和批大小的监控
  4. 错误处理:在批处理函数中添加重试机制

注意事项

  1. 处理函数应该快速完成,避免阻塞
  2. 程序退出前调用Stop()确保所有数据被处理
  3. 处理函数中的panic会被捕获并打印,但不会中断程序

go-accumulator 是一个简单而强大的工具,特别适合需要将分散操作聚合成批量处理的场景,能显著提高系统吞吐量和降低I/O压力。

回到顶部