golang轻量高效异步任务队列管理插件库async-job的使用

Golang轻量高效异步任务队列管理插件库async-job的使用

概述

AsyncJob是一个轻量级、高效且清晰的异步任务管理库。

Go GitHub go.mod Go version Go Report Card codecov License

特性

  • AsyncJob是一个简单的异步任务管理器
  • 完整的代码覆盖率
  • 异步队列
  • 可定义异步任务数量(默认: runtime.NumCPU())
  • 处理托管和非托管错误
  • 提供简单的ETA(预计完成时间)功能
  • 完整的代码描述

使用示例

基础用法

package main

import (
	"github.com/lab210-dev/async-job"
	"log"
)

func main() {
	// 创建一个新的AsyncJob实例
	asj := asyncjob.New[string]()

	// 设置异步任务数量(默认: runtime.NumCPU())
	asj.SetWorkers(2)

	// 监听进度状态
	asj.OnProgress(func(progress asyncjob.Progress) {
            log.Printf("Progress: %s\n", progress.String())
	})

	// 运行所有任务
	err := asj.Run(func(job asyncjob.Job[string]) error {
            // 在任务函数中接收job
            // 如果返回错误或panic,任务将被标记为失败,所有进度将被取消
            return nil
	}, []string{"Hello", "World"})

	// 如果任务返回错误,将停止处理
	if err != nil {
            log.Fatal(err)
	}
}

性能优化示例

使用模数减少ETA显示频率(快速示例):

package main

import (
	"github.com/lab210-dev/async-job"
	"log"
	"time"
)

func main() {
	// 创建任务切片
	var list []time.Duration
	for i := 1; i <= 100; i++ {
		list = append(list, time.Duration(1)*time.Millisecond)
	}
	err := asyncjob.New[time.Duration]().
		SetWorkers(2).
		OnProgress(func(progress asyncjob.Progress) {
			// 每10个任务显示一次ETA
			if progress.Current()%10 != 0 {
				return
			}
			// 打印ETA
		    log.Printf("Progress: %s\n", progress.String())
		}).
		Run(func(job asyncjob.Job[time.Duration]) error {
			// 模拟任务耗时
			time.Sleep(job.Data())
			return nil
		}, list)
	
	// 如果任务返回错误,将停止处理
	if err != nil {
		log.Fatal(err)
	}
}

贡献

欢迎贡献者帮助改进代码。


更多关于golang轻量高效异步任务队列管理插件库async-job的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量高效异步任务队列管理插件库async-job的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang轻量高效异步任务队列管理插件库async-job使用指南

async-job简介

async-job是一个轻量级的Golang异步任务队列管理库,它提供了简单易用的API来管理后台任务的执行。主要特点包括:

  • 轻量级设计,无外部依赖
  • 支持任务重试机制
  • 支持任务超时控制
  • 简单的任务优先级管理
  • 易于集成到现有项目中

安装

go get github.com/vgmdj/async-job

基本使用示例

1. 创建任务队列

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/vgmdj/async-job"
)

func main() {
	// 创建任务队列
	queue := async.NewQueue(
		async.WithMaxWorkers(10),      // 最大工作协程数
		async.WithMaxRetry(3),         // 最大重试次数
		async.WithTimeout(30*time.Second), // 任务超时时间
	)
	
	// 启动队列
	queue.Start()
	defer queue.Stop() // 程序退出时停止队列
}

2. 添加任务

// 定义一个简单的任务函数
func sendEmail(payload interface{}) error {
	email, ok := payload.(string)
	if !ok {
		return fmt.Errorf("invalid payload type")
	}
	
	// 模拟发送邮件
	fmt.Printf("Sending email to %s...\n", email)
	time.Sleep(1 * time.Second) // 模拟耗时操作
	fmt.Printf("Email sent to %s successfully\n", email)
	return nil
}

func main() {
	// ... 创建队列代码同上 ...
	
	// 添加任务到队列
	err := queue.AddTask(context.Background(), &async.Task{
		Fn:      sendEmail,
		Payload: "user@example.com",
	})
	
	if err != nil {
		fmt.Printf("Failed to add task: %v\n", err)
	}
}

3. 带重试的任务

func unreliableOperation(payload interface{}) error {
	// 模拟有时会失败的操作
	if time.Now().Unix()%2 == 0 {
		return fmt.Errorf("random failure")
	}
	fmt.Println("Operation succeeded")
	return nil
}

func main() {
	// ... 创建队列代码同上 ...
	
	// 添加带重试的任务
	err := queue.AddTask(context.Background(), &async.Task{
		Fn:      unreliableOperation,
		Payload: nil,
		Retry:   3, // 自定义重试次数
	})
	
	// ... 错误处理 ...
}

高级功能

1. 任务优先级

func main() {
	// ... 创建队列代码同上 ...
	
	// 高优先级任务
	queue.AddTask(context.Background(), &async.Task{
		Fn:       processUrgent,
		Payload:  "urgent data",
		Priority: async.HighPriority,
	})
	
	// 低优先级任务
	queue.AddTask(context.Background(), &async.Task{
		Fn:       processNormal,
		Payload:  "normal data",
		Priority: async.LowPriority,
	})
}

2. 任务超时控制

func longRunningTask(payload interface{}) error {
	// 模拟长时间运行的任务
	time.Sleep(5 * time.Second)
	return nil
}

func main() {
	queue := async.NewQueue(
		async.WithTimeout(2*time.Second), // 全局超时设置
	)
	
	// 添加任务并设置单独的超时时间
	queue.AddTask(context.Background(), &async.Task{
		Fn:      longRunningTask,
		Payload: nil,
		Timeout: 1 * time.Second, // 任务特定超时
	})
}

3. 任务结果处理

func main() {
	// 创建带结果通道的任务
	resultChan := make(chan async.Result, 1)
	
	queue.AddTask(context.Background(), &async.Task{
		Fn:      processData,
		Payload: "some data",
		Result:  resultChan,
	})
	
	// 等待任务结果
	select {
	case res := <-resultChan:
		if res.Err != nil {
			fmt.Printf("Task failed: %v\n", res.Err)
		} else {
			fmt.Printf("Task succeeded with result: %v\n", res.Value)
		}
	case <-time.After(5 * time.Second):
		fmt.Println("Timeout waiting for task result")
	}
}

最佳实践

  1. 合理设置工作协程数量:根据服务器CPU核心数和任务特性设置MaxWorkers

  2. 任务幂等性:确保任务函数可以安全重试,具有幂等性

  3. 资源清理:使用defer queue.Stop()确保程序退出时清理资源

  4. 上下文传递:使用context.Context传递取消信号和超时控制

  5. 错误处理:为关键任务实现结果通道监控

性能考虑

async-job作为轻量级库,性能主要受以下因素影响:

  1. 任务函数本身的执行时间
  2. 工作协程数量设置
  3. 任务队列长度
  4. 任务切换开销

对于高吞吐量场景,建议:

  • 适当增加MaxWorkers数量
  • 将耗时任务拆分为更小的子任务
  • 考虑使用更专业的消息队列系统如RabbitMQ、Kafka等

总结

async-job是一个简单易用的Golang异步任务队列管理库,适合需要轻量级后台任务处理的场景。它提供了基本的任务队列功能,包括任务执行、重试、超时控制和简单的优先级管理,是中小型项目中实现异步处理的不错选择。

回到顶部