golang事件聚合与批量处理解决方案插件库go-accumulator的使用
go-accumulator - Golang事件聚合与批量处理解决方案
简介
go-accumulator是一个用于事件聚合和批量处理的Golang库。它解决了单条数据处理耗时过长的问题,通过将数据累积到一定数量或时间间隔后再进行批量处理。
功能特点
-
提供两种触发处理的条件:
- 存储达到最大容量(flushSize)
- 达到累积时间间隔(flushInterval)
-
提供两种添加数据的方法:
- AddAsync - 异步添加数据,不等待处理完成
- AddSync - 同步添加数据,等待处理完成
安装
go get github.com/nar10z/go-accumulator
使用示例
下面是一个完整的使用示例,展示了如何使用go-accumulator进行事件聚合和批量处理:
package main
import (
"context"
"fmt"
"strings"
"sync"
"time"
goaccum "github.com/nar10z/go-accumulator"
)
func main() {
// 创建带有超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
const (
countSync = 4 // 同步添加的事件数量
countAsync = 3 // 异步添加的事件数量
)
// 创建accumulator实例
// 参数说明:
// - 3: 最大flush大小(flushSize)
// - time.Second: flush间隔(flushInterval)
// - 200*time.Millisecond: 处理超时时间
// - 最后一个参数是处理函数(flushFunc)
accumulator := goaccum.New[string](3, time.Second, 200*time.Millisecond, func(ctx context.Context, events []string) error {
fmt.Printf("开始处理 %d 个事件:\n", len(events))
for _, e := range events {
fmt.Printf(" - %s\n", e)
}
fmt.Printf("处理完成\n%s\n", strings.Repeat("-", 20))
return nil
})
var wg sync.WaitGroup
wg.Add(countSync + countAsync)
// 异步添加事件
go func() {
for i := 0; i < countAsync; i++ {
err := accumulator.AddAsync(ctx, fmt.Sprintf("异步事件 #%d", i))
if err != nil {
fmt.Printf("添加事件失败: %v\n", err)
}
wg.Done()
}
}()
// 同步添加事件
go func() {
for i := 0; i < countSync; i++ {
i := i
go func() {
err := accumulator.AddSync(ctx, fmt.Sprintf("同步事件 #%d", i))
if err != nil {
fmt.Printf("添加事件失败: %v\n", err)
}
wg.Done()
}()
}
}()
// 等待所有事件添加完成
wg.Wait()
// 停止accumulator
accumulator.Stop()
}
输出示例
运行上述代码可能会产生如下输出:
开始处理 3 个事件:
- 同步事件 #3
- 异步事件 #0
- 异步事件 #1
处理完成
--------------------
开始处理 3 个事件:
- 异步事件 #2
- 同步事件 #0
- 同步事件 #1
处理完成
--------------------
开始处理 1 个事件:
- 同步事件 #2
处理完成
--------------------
工作原理
- 当添加的事件数量达到flushSize(本例中为3)时,会触发处理函数
- 如果未达到flushSize但达到了flushInterval(本例中为1秒),也会触发处理函数
- 处理函数会对累积的事件进行批量处理
适用场景
go-accumulator特别适合以下场景:
- 单条数据处理开销较大
- 需要减少频繁处理的开销
- 可以容忍一定延迟的批量处理
许可证
MIT License
更多关于golang事件聚合与批量处理解决方案插件库go-accumulator的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang事件聚合与批量处理解决方案插件库go-accumulator的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
go-accumulator 使用指南
go-accumulator 是一个用于事件聚合与批量处理的 Golang 库,它可以帮助开发者高效地收集、聚合和批量处理事件数据。下面我将详细介绍它的使用方法和示例代码。
核心特性
- 事件聚合:将多个小事件聚合成批量数据
- 定时触发:支持按时间间隔自动触发处理
- 大小触发:当累积事件达到指定数量时触发
- 并发安全:支持多协程并发操作
- 自定义处理:灵活定义批量数据处理逻辑
安装
go get github.com/go-accumulator/accumulator
基本使用示例
package main
import (
"fmt"
"time"
"github.com/go-accumulator/accumulator"
)
func main() {
// 创建一个新的累加器
acc := accumulator.NewAccumulator(
accumulator.WithBatchSize(10), // 每10条触发一次
accumulator.WithInterval(5*time.Second), // 或5秒触发一次
accumulator.WithHandler(handleBatch), // 设置批处理函数
)
// 启动累加器
acc.Start()
// 模拟产生事件
for i := 0; i < 25; i++ {
acc.Add(i) // 添加事件
time.Sleep(500 * time.Millisecond)
}
// 关闭累加器
acc.Stop()
}
// 批处理函数
func handleBatch(items []interface{}) {
fmt.Printf("处理批量数据(%d条): %v\n", len(items), items)
}
高级配置选项
acc := accumulator.NewAccumulator(
accumulator.WithBatchSize(100), // 每100条触发
accumulator.WithInterval(30*time.Second), // 或30秒触发
accumulator.WithHandler(handleBatch), // 处理函数
accumulator.WithMaxWait(5*time.Second), // 最大等待时间
accumulator.WithBufferSize(1000), // 缓冲区大小
accumulator.WithSharding(4), // 分片数,提高并发性能
)
实际应用场景示例
1. 日志批量上传
func main() {
// 创建日志累加器
logAcc := accumulator.NewAccumulator(
accumulator.WithBatchSize(50),
accumulator.WithInterval(10*time.Second),
accumulator.WithHandler(uploadLogs),
)
// 在HTTP处理中使用
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
log := r.URL.Query().Get("message")
if log != "" {
logAcc.Add(log)
}
w.WriteHeader(http.StatusOK)
})
logAcc.Start()
http.ListenAndServe(":8080", nil)
}
func uploadLogs(logs []interface{}) {
// 将日志批量上传到日志服务器
fmt.Printf("上传%d条日志\n", len(logs))
}
2. 数据库批量插入
func main() {
dbAcc := accumulator.NewAccumulator(
accumulator.WithBatchSize(200),
accumulator.WithInterval(15*time.Second),
accumulator.WithHandler(batchInsert),
)
// 模拟数据产生
for i := 0; i < 1000; i++ {
record := map[string]interface{}{
"id": i,
"name": fmt.Sprintf("user%d", i),
}
dbAcc.Add(record)
}
dbAcc.Start()
time.Sleep(1 * time.Minute)
dbAcc.Stop()
}
func batchInsert(records []interface{}) {
// 这里实现批量插入数据库逻辑
fmt.Printf("批量插入%d条记录\n", len(records))
}
性能优化建议
- 合理设置批大小:根据后端处理能力调整
- 使用分片:高并发场景下使用WithSharding
- 监控指标:添加处理耗时和批大小的监控
- 错误处理:在批处理函数中添加重试机制
注意事项
- 处理函数应该快速完成,避免阻塞
- 程序退出前调用Stop()确保所有数据被处理
- 处理函数中的panic会被捕获并打印,但不会中断程序
go-accumulator 是一个简单而强大的工具,特别适合需要将分散操作聚合成批量处理的场景,能显著提高系统吞吐量和降低I/O压力。