golang轻量高效异步任务队列管理插件库async-job的使用
Golang轻量高效异步任务队列管理插件库async-job的使用
概述
AsyncJob是一个轻量级、高效且清晰的异步任务管理库。
特性
- AsyncJob是一个简单的异步任务管理器
- 完整的代码覆盖率
- 异步队列
- 可定义异步任务数量(默认: runtime.NumCPU())
- 处理托管和非托管错误
- 提供简单的ETA(预计完成时间)功能
- 完整的代码描述
使用示例
基础用法
package main
import (
"github.com/lab210-dev/async-job"
"log"
)
func main() {
// 创建一个新的AsyncJob实例
asj := asyncjob.New[string]()
// 设置异步任务数量(默认: runtime.NumCPU())
asj.SetWorkers(2)
// 监听进度状态
asj.OnProgress(func(progress asyncjob.Progress) {
log.Printf("Progress: %s\n", progress.String())
})
// 运行所有任务
err := asj.Run(func(job asyncjob.Job[string]) error {
// 在任务函数中接收job
// 如果返回错误或panic,任务将被标记为失败,所有进度将被取消
return nil
}, []string{"Hello", "World"})
// 如果任务返回错误,将停止处理
if err != nil {
log.Fatal(err)
}
}
性能优化示例
使用模数减少ETA显示频率(快速示例):
package main
import (
"github.com/lab210-dev/async-job"
"log"
"time"
)
func main() {
// 创建任务切片
var list []time.Duration
for i := 1; i <= 100; i++ {
list = append(list, time.Duration(1)*time.Millisecond)
}
err := asyncjob.New[time.Duration]().
SetWorkers(2).
OnProgress(func(progress asyncjob.Progress) {
// 每10个任务显示一次ETA
if progress.Current()%10 != 0 {
return
}
// 打印ETA
log.Printf("Progress: %s\n", progress.String())
}).
Run(func(job asyncjob.Job[time.Duration]) error {
// 模拟任务耗时
time.Sleep(job.Data())
return nil
}, list)
// 如果任务返回错误,将停止处理
if err != nil {
log.Fatal(err)
}
}
贡献
欢迎贡献者帮助改进代码。
更多关于golang轻量高效异步任务队列管理插件库async-job的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang轻量高效异步任务队列管理插件库async-job的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang轻量高效异步任务队列管理插件库async-job使用指南
async-job简介
async-job是一个轻量级的Golang异步任务队列管理库,它提供了简单易用的API来管理后台任务的执行。主要特点包括:
- 轻量级设计,无外部依赖
- 支持任务重试机制
- 支持任务超时控制
- 简单的任务优先级管理
- 易于集成到现有项目中
安装
go get github.com/vgmdj/async-job
基本使用示例
1. 创建任务队列
package main
import (
"context"
"fmt"
"time"
"github.com/vgmdj/async-job"
)
func main() {
// 创建任务队列
queue := async.NewQueue(
async.WithMaxWorkers(10), // 最大工作协程数
async.WithMaxRetry(3), // 最大重试次数
async.WithTimeout(30*time.Second), // 任务超时时间
)
// 启动队列
queue.Start()
defer queue.Stop() // 程序退出时停止队列
}
2. 添加任务
// 定义一个简单的任务函数
func sendEmail(payload interface{}) error {
email, ok := payload.(string)
if !ok {
return fmt.Errorf("invalid payload type")
}
// 模拟发送邮件
fmt.Printf("Sending email to %s...\n", email)
time.Sleep(1 * time.Second) // 模拟耗时操作
fmt.Printf("Email sent to %s successfully\n", email)
return nil
}
func main() {
// ... 创建队列代码同上 ...
// 添加任务到队列
err := queue.AddTask(context.Background(), &async.Task{
Fn: sendEmail,
Payload: "user@example.com",
})
if err != nil {
fmt.Printf("Failed to add task: %v\n", err)
}
}
3. 带重试的任务
func unreliableOperation(payload interface{}) error {
// 模拟有时会失败的操作
if time.Now().Unix()%2 == 0 {
return fmt.Errorf("random failure")
}
fmt.Println("Operation succeeded")
return nil
}
func main() {
// ... 创建队列代码同上 ...
// 添加带重试的任务
err := queue.AddTask(context.Background(), &async.Task{
Fn: unreliableOperation,
Payload: nil,
Retry: 3, // 自定义重试次数
})
// ... 错误处理 ...
}
高级功能
1. 任务优先级
func main() {
// ... 创建队列代码同上 ...
// 高优先级任务
queue.AddTask(context.Background(), &async.Task{
Fn: processUrgent,
Payload: "urgent data",
Priority: async.HighPriority,
})
// 低优先级任务
queue.AddTask(context.Background(), &async.Task{
Fn: processNormal,
Payload: "normal data",
Priority: async.LowPriority,
})
}
2. 任务超时控制
func longRunningTask(payload interface{}) error {
// 模拟长时间运行的任务
time.Sleep(5 * time.Second)
return nil
}
func main() {
queue := async.NewQueue(
async.WithTimeout(2*time.Second), // 全局超时设置
)
// 添加任务并设置单独的超时时间
queue.AddTask(context.Background(), &async.Task{
Fn: longRunningTask,
Payload: nil,
Timeout: 1 * time.Second, // 任务特定超时
})
}
3. 任务结果处理
func main() {
// 创建带结果通道的任务
resultChan := make(chan async.Result, 1)
queue.AddTask(context.Background(), &async.Task{
Fn: processData,
Payload: "some data",
Result: resultChan,
})
// 等待任务结果
select {
case res := <-resultChan:
if res.Err != nil {
fmt.Printf("Task failed: %v\n", res.Err)
} else {
fmt.Printf("Task succeeded with result: %v\n", res.Value)
}
case <-time.After(5 * time.Second):
fmt.Println("Timeout waiting for task result")
}
}
最佳实践
-
合理设置工作协程数量:根据服务器CPU核心数和任务特性设置MaxWorkers
-
任务幂等性:确保任务函数可以安全重试,具有幂等性
-
资源清理:使用defer queue.Stop()确保程序退出时清理资源
-
上下文传递:使用context.Context传递取消信号和超时控制
-
错误处理:为关键任务实现结果通道监控
性能考虑
async-job作为轻量级库,性能主要受以下因素影响:
- 任务函数本身的执行时间
- 工作协程数量设置
- 任务队列长度
- 任务切换开销
对于高吞吐量场景,建议:
- 适当增加MaxWorkers数量
- 将耗时任务拆分为更小的子任务
- 考虑使用更专业的消息队列系统如RabbitMQ、Kafka等
总结
async-job是一个简单易用的Golang异步任务队列管理库,适合需要轻量级后台任务处理的场景。它提供了基本的任务队列功能,包括任务执行、重试、超时控制和简单的优先级管理,是中小型项目中实现异步处理的不错选择。