asynq确实是一个优秀的Go异步任务处理库,其设计简洁且性能出色。以下是一个典型的生产者-消费者示例:
生产者示例:
package main
import (
"log"
"github.com/hibiken/asynq"
)
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
})
defer client.Close()
// 创建任务
task := asynq.NewTask("email:send",
[]byte(`{"to":"user@example.com","subject":"Hello!"}`))
// 提交任务
info, err := client.Enqueue(task)
if err != nil {
log.Fatal(err)
}
log.Printf("任务已提交: %s", info.ID)
}
消费者示例:
package main
import (
"context"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func handleEmailTask(ctx context.Context, t *asynq.Task) error {
fmt.Printf("处理邮件任务: payload=%s\n", t.Payload())
return nil
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:send", handleEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
任务调度示例:
// 延迟任务
info, _ := client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
// 定时任务
info, _ := client.Enqueue(task, asynq.ProcessAt(time.Now().Add(24*time.Hour)))
// 设置重试
info, _ := client.Enqueue(task, asynq.MaxRetry(5))
// 设置优先级
info, _ := client.Enqueue(task, asynq.Queue("critical"))
监控面板集成:
package main
import (
"net/http"
"github.com/hibiken/asynqmon"
)
func main() {
h := asynqmon.New(asynqmon.Options{
RootPath: "/monitoring",
RedisConnOpt: asynq.RedisClientOpt{
Addr: "localhost:6379",
},
})
http.Handle(h.RootPath()+"/", h)
http.ListenAndServe(":8080", nil)
}
asynq支持任务去重、优先级队列、定时任务、失败重试等企业级特性。其监控面板提供实时任务状态可视化,Redis作为后端存储确保可靠性。该库的API设计符合Go习惯,易于集成到现有应用中。