golang分布式容错任务队列插件库hatchet的使用
Golang分布式容错任务队列插件库Hatchet的使用
什么是Hatchet?
Hatchet是一个基于Postgres构建的运行后台任务的平台。您可以使用Hatchet在最小配置或基础设施的情况下,将函数分发到一组工作节点之间,而不需要管理自己的任务队列或发布/订阅系统。
何时使用Hatchet?
当您的任务变得复杂时,传统的基于Redis或RabbitMQ的队列(如Celery或BullMQ)会变得难以调试、监控,并开始以意外的方式失败。Hatchet是一个功能齐全的后台任务管理平台,内置支持将复杂任务链接成工作流、失败警报、使任务更持久以及在实时Web仪表板中查看任务。
Go语言使用示例
基本任务队列
// 1. 定义任务输入
type SimpleInput struct {
Message string `json:"message"`
}
// 2. 使用factory.NewTask定义任务
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet,
)
// 3. 在工作节点上注册任务
worker, err := hatchet.Worker(v1worker.WorkerOpts{
Name: "simple-worker",
Workflows: []workflow.WorkflowBase{
simple,
},
})
worker.StartBlocking()
// 4. 从应用程序调用任务
simple.Run(context.Background(), SimpleInput{Message: "Hello, World!"})
工作流(DAG)
// 1. 定义工作流(工作流是任务的集合)
simple := v1.WorkflowFactory[DagInput, DagOutput](
workflow.CreateOpts[DagInput]{
Name: "simple-workflow",
},
hatchet,
)
// 2. 将第一个任务附加到工作流
const task1 = simple.Task(
task.CreateOpts[DagInput]{
Name: "task-1",
Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
return &SimpleOutput{
Result: "task-1",
}, nil
},
},
);
// 3. 将第二个任务附加到工作流,该任务在task-1之后执行
const task2 = simple.Task(
task.CreateOpts[DagInput]{
Name: "task-2",
Parents: []task.NamedTask{
step1,
},
Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
return &SimpleOutput{
Result: "task-2",
}, nil
},
},
);
// 4. 从应用程序调用工作流
simple.Run(ctx, DagInput{})
流控制(限流)
// 基于每个用户限制并发
flowControlWorkflow := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
Concurrency: []*types.Concurrency{
{
Expression: "input.userId",
MaxRuns: 1,
LimitStrategy: types.GroupRoundRobin,
},
},
},
hatchet,
)
// 将任务按用户限制为每分钟10个任务,每个任务消耗1个单位
flowControlWorkflow.Task(
create.WorkflowTask[FlowControlInput, FlowControlOutput]{
Name: "rate-limit-task",
RateLimits: []*types.RateLimit{
{
Key: "user-rate-limit",
KeyExpr: "input.userId",
Units: 1,
LimitValueExpr: 10,
Duration: types.Minute,
},
},
}, func(ctx worker.HatchetContext, input FlowControlInput) (interface{}, error) {
return &SimpleOutput{
Step: 1,
}, nil
},
)
调度任务
const tomorrow = time.Now().Add(24 * time.Hour);
// 安排任务明天运行
simple.Schedule(ctx, tomorrow, ScheduleInput{
Message: "Hello, World!",
})
// 安排任务每天午夜运行
simple.Cron(ctx, "every-day", "0 0 * * *", CronInput{
Message: "Hello, World!",
})
事件触发
// 创建一个等待外部用户事件或休眠10秒的任务
simple.Task(
conditionOpts{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
WaitFor: condition.Conditions(
condition.UserEventCondition("user:event", "'true'"),
condition.SleepCondition(10 * time.Second),
),
}, func(ctx worker.HatchetContext, input DagWithConditionsInput) (interface{}, error) {
// ...
},
);
主要特性
- 队列: 持久化任务队列,确保工作完成(或收到警报)
- 任务编排: 支持构建复杂的工作流,可以并行生成多个任务
- 流控制: 支持并发限制和速率限制
- 调度: 支持cron、一次性调度和暂停执行
- 任务路由: 支持粘性分配和工作节点亲和性
- 事件触发和监听: 支持基于事件的架构
- 实时Web UI: 内置监控、日志和告警功能
快速开始
Hatchet可作为云版本或自托管版本使用。请参阅文档快速入门。
文档
最新文档可以在Hatchet官方文档中找到。
更多关于golang分布式容错任务队列插件库hatchet的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang分布式容错任务队列插件库hatchet的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Hatchet - Go分布式容错任务队列库使用指南
Hatchet是一个轻量级的Go分布式任务队列库,提供了容错、重试和分布式执行能力。下面我将详细介绍Hatchet的使用方法。
安装
go get github.com/hatchet-dev/hatchet-go
基本使用
1. 初始化Hatchet客户端
package main
import (
"context"
"log"
"time"
"github.com/hatchet-dev/hatchet-go"
)
func main() {
// 创建Hatchet客户端
client, err := hatchet.New(
hatchet.WithLogLevel("debug"),
hatchet.WithRetries(3),
hatchet.WithRetryDelay(2*time.Second),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用client进行操作...
}
2. 定义工作任务
// 定义一个简单的任务
type EmailTask struct{}
func (e *EmailTask) Name() string {
return "send-email"
}
func (e *EmailTask) Execute(ctx context.Context, payload []byte) error {
// 解析任务数据
var emailData struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
if err := json.Unmarshal(payload, &emailData); err != nil {
return fmt.Errorf("failed to unmarshal email data: %w", err)
}
// 模拟发送邮件
log.Printf("Sending email to %s with subject: %s", emailData.To, emailData.Subject)
// 模拟可能发生的错误
if emailData.To == "fail@example.com" {
return fmt.Errorf("deliberate failure for testing")
}
return nil
}
3. 注册工作任务
func registerTasks(client *hatchet.Client) error {
// 注册邮件任务
emailTask := &EmailTask{}
if err := client.RegisterTask(emailTask); err != nil {
return fmt.Errorf("failed to register email task: %w", err)
}
// 可以注册更多任务...
return nil
}
4. 启动Worker
func startWorker(client *hatchet.Client) error {
// 启动worker处理任务
worker := client.NewWorker("email-worker")
// 设置并发数
worker.SetConcurrency(5)
// 启动worker
if err := worker.Start(); err != nil {
return fmt.Errorf("failed to start worker: %w", err)
}
return nil
}
5. 提交任务
func submitEmailTask(client *hatchet.Client) error {
emailData := map[string]interface{}{
"to": "user@example.com",
"subject": "Welcome to our service",
"body": "Thank you for signing up!",
}
payload, err := json.Marshal(emailData)
if err != nil {
return fmt.Errorf("failed to marshal email data: %w", err)
}
// 提交任务
taskID, err := client.SubmitTask("send-email", payload)
if err != nil {
return fmt.Errorf("failed to submit email task: %w", err)
}
log.Printf("Submitted email task with ID: %s", taskID)
return nil
}
高级功能
1. 任务重试
Hatchet内置了自动重试机制。当任务执行失败时,会根据配置自动重试:
client, err := hatchet.New(
hatchet.WithRetries(3), // 最大重试次数
hatchet.WithRetryDelay(5*time.Second), // 重试间隔
hatchet.WithRetryBackoff(true), // 启用指数退避
)
2. 定时任务
// 提交一个延迟10分钟执行的任务
taskID, err := client.SubmitScheduledTask(
"send-email",
payload,
time.Now().Add(10*time.Minute),
)
// 提交一个周期性任务(每1小时执行一次)
taskID, err := client.SubmitPeriodicTask(
"cleanup-task",
payload,
1*time.Hour,
)
3. 任务结果处理
// 获取任务状态
status, err := client.GetTaskStatus(taskID)
if err != nil {
log.Printf("Failed to get task status: %v", err)
} else {
log.Printf("Task status: %s, Last error: %v", status.State, status.LastError)
}
// 设置任务完成回调
client.SetTaskCompletionHandler(func(taskID string, success bool, err error) {
if success {
log.Printf("Task %s completed successfully", taskID)
} else {
log.Printf("Task %s failed: %v", taskID, err)
}
})
分布式部署
Hatchet支持多worker分布式部署,只需在不同节点上启动worker即可:
// Worker节点1
worker1 := client.NewWorker("worker-group-1")
worker1.Start()
// Worker节点2
worker2 := client.NewWorker("worker-group-1")
worker2.Start()
任务会自动在可用worker之间分配。
最佳实践
- 幂等性设计:确保任务可以安全地多次执行
- 合理设置重试:根据任务特性设置适当的重试次数和间隔
- 资源控制:根据服务器资源设置合适的并发数
- 监控任务:实现任务状态监控和报警机制
- 优雅关闭:处理SIGTERM信号确保任务完成后再退出
// 优雅关闭示例
func main() {
// ...初始化代码...
// 捕获中断信号
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go func() {
<-ctx.Done()
log.Println("Shutting down gracefully...")
// 停止worker
if err := worker.Stop(); err != nil {
log.Printf("Error stopping worker: %v", err)
}
// 关闭客户端
client.Close()
}()
// 启动worker
if err := worker.Start(); err != nil {
log.Fatal(err)
}
}
Hatchet为Go开发者提供了一个简单而强大的分布式任务队列解决方案,适合需要可靠异步任务处理的场景。