golang分布式容错任务队列插件库hatchet的使用

Golang分布式容错任务队列插件库Hatchet的使用

什么是Hatchet?

Hatchet是一个基于Postgres构建的运行后台任务的平台。您可以使用Hatchet在最小配置或基础设施的情况下,将函数分发到一组工作节点之间,而不需要管理自己的任务队列或发布/订阅系统。

何时使用Hatchet?

当您的任务变得复杂时,传统的基于Redis或RabbitMQ的队列(如Celery或BullMQ)会变得难以调试、监控,并开始以意外的方式失败。Hatchet是一个功能齐全的后台任务管理平台,内置支持将复杂任务链接成工作流、失败警报、使任务更持久以及在实时Web仪表板中查看任务。

Go语言使用示例

基本任务队列

// 1. 定义任务输入
type SimpleInput struct {
  Message string `json:"message"`
}

// 2. 使用factory.NewTask定义任务
simple := factory.NewTask(
  create.StandaloneTask{
    Name: "simple-task",
  }, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
    return &SimpleResult{
      TransformedMessage: strings.ToLower(input.Message),
    }, nil
  },
  hatchet,
)

// 3. 在工作节点上注册任务
worker, err := hatchet.Worker(v1worker.WorkerOpts{
  Name: "simple-worker",
  Workflows: []workflow.WorkflowBase{
    simple,
  },
})

worker.StartBlocking()

// 4. 从应用程序调用任务
simple.Run(context.Background(), SimpleInput{Message: "Hello, World!"})

工作流(DAG)

// 1. 定义工作流(工作流是任务的集合)
simple := v1.WorkflowFactory[DagInput, DagOutput](
    workflow.CreateOpts[DagInput]{
        Name: "simple-workflow",
    },
    hatchet,
)

// 2. 将第一个任务附加到工作流
const task1 = simple.Task(
    task.CreateOpts[DagInput]{
        Name: "task-1",
        Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
            return &SimpleOutput{
                Result: "task-1",
            }, nil
        },
    },
);

// 3. 将第二个任务附加到工作流,该任务在task-1之后执行
const task2 = simple.Task(
    task.CreateOpts[DagInput]{
        Name: "task-2",
        Parents: []task.NamedTask{
            step1,
        },
        Fn: func(ctx worker.HatchetContext, _ DagInput) (*SimpleOutput, error) {
            return &SimpleOutput{
                Result: "task-2",
            }, nil
        },
    },
);

// 4. 从应用程序调用工作流
simple.Run(ctx, DagInput{})

流控制(限流)

// 基于每个用户限制并发
flowControlWorkflow := factory.NewWorkflow[DagInput, DagResult](
  create.WorkflowCreateOpts[DagInput]{
    Name: "simple-dag",
    Concurrency: []*types.Concurrency{
      {
        Expression:    "input.userId",
        MaxRuns:       1,
        LimitStrategy: types.GroupRoundRobin,
      },
    },
  },
  hatchet,
)

// 将任务按用户限制为每分钟10个任务,每个任务消耗1个单位
flowControlWorkflow.Task(
  create.WorkflowTask[FlowControlInput, FlowControlOutput]{
    Name: "rate-limit-task",
    RateLimits: []*types.RateLimit{
      {
        Key:            "user-rate-limit",
        KeyExpr:        "input.userId",
        Units:          1,
        LimitValueExpr: 10,
        Duration:       types.Minute,
      },
    },
  }, func(ctx worker.HatchetContext, input FlowControlInput) (interface{}, error) {
    return &SimpleOutput{
      Step: 1,
    }, nil
  },
)

调度任务

const tomorrow = time.Now().Add(24 * time.Hour);

// 安排任务明天运行
simple.Schedule(ctx, tomorrow, ScheduleInput{
  Message: "Hello, World!",
})

// 安排任务每天午夜运行
simple.Cron(ctx, "every-day", "0 0 * * *", CronInput{
  Message: "Hello, World!",
})

事件触发

// 创建一个等待外部用户事件或休眠10秒的任务
simple.Task(
  conditionOpts{
    Name: "Step2",
    Parents: []create.NamedTask{
      step1,
    },
    WaitFor: condition.Conditions(
      condition.UserEventCondition("user:event", "'true'"),
      condition.SleepCondition(10 * time.Second),
    ),
  }, func(ctx worker.HatchetContext, input DagWithConditionsInput) (interface{}, error) {
    // ...
  },
);

主要特性

  • 队列: 持久化任务队列,确保工作完成(或收到警报)
  • 任务编排: 支持构建复杂的工作流,可以并行生成多个任务
  • 流控制: 支持并发限制和速率限制
  • 调度: 支持cron、一次性调度和暂停执行
  • 任务路由: 支持粘性分配和工作节点亲和性
  • 事件触发和监听: 支持基于事件的架构
  • 实时Web UI: 内置监控、日志和告警功能

快速开始

Hatchet可作为云版本或自托管版本使用。请参阅文档快速入门。

文档

最新文档可以在Hatchet官方文档中找到。


更多关于golang分布式容错任务队列插件库hatchet的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang分布式容错任务队列插件库hatchet的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Hatchet - Go分布式容错任务队列库使用指南

Hatchet是一个轻量级的Go分布式任务队列库,提供了容错、重试和分布式执行能力。下面我将详细介绍Hatchet的使用方法。

安装

go get github.com/hatchet-dev/hatchet-go

基本使用

1. 初始化Hatchet客户端

package main

import (
	"context"
	"log"
	"time"

	"github.com/hatchet-dev/hatchet-go"
)

func main() {
	// 创建Hatchet客户端
	client, err := hatchet.New(
		hatchet.WithLogLevel("debug"),
		hatchet.WithRetries(3),
		hatchet.WithRetryDelay(2*time.Second),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()
	
	// 使用client进行操作...
}

2. 定义工作任务

// 定义一个简单的任务
type EmailTask struct{}

func (e *EmailTask) Name() string {
	return "send-email"
}

func (e *EmailTask) Execute(ctx context.Context, payload []byte) error {
	// 解析任务数据
	var emailData struct {
		To      string `json:"to"`
		Subject string `json:"subject"`
		Body    string `json:"body"`
	}
	
	if err := json.Unmarshal(payload, &emailData); err != nil {
		return fmt.Errorf("failed to unmarshal email data: %w", err)
	}
	
	// 模拟发送邮件
	log.Printf("Sending email to %s with subject: %s", emailData.To, emailData.Subject)
	
	// 模拟可能发生的错误
	if emailData.To == "fail@example.com" {
		return fmt.Errorf("deliberate failure for testing")
	}
	
	return nil
}

3. 注册工作任务

func registerTasks(client *hatchet.Client) error {
	// 注册邮件任务
	emailTask := &EmailTask{}
	if err := client.RegisterTask(emailTask); err != nil {
		return fmt.Errorf("failed to register email task: %w", err)
	}
	
	// 可以注册更多任务...
	return nil
}

4. 启动Worker

func startWorker(client *hatchet.Client) error {
	// 启动worker处理任务
	worker := client.NewWorker("email-worker")
	
	// 设置并发数
	worker.SetConcurrency(5)
	
	// 启动worker
	if err := worker.Start(); err != nil {
		return fmt.Errorf("failed to start worker: %w", err)
	}
	
	return nil
}

5. 提交任务

func submitEmailTask(client *hatchet.Client) error {
	emailData := map[string]interface{}{
		"to":      "user@example.com",
		"subject": "Welcome to our service",
		"body":    "Thank you for signing up!",
	}
	
	payload, err := json.Marshal(emailData)
	if err != nil {
		return fmt.Errorf("failed to marshal email data: %w", err)
	}
	
	// 提交任务
	taskID, err := client.SubmitTask("send-email", payload)
	if err != nil {
		return fmt.Errorf("failed to submit email task: %w", err)
	}
	
	log.Printf("Submitted email task with ID: %s", taskID)
	return nil
}

高级功能

1. 任务重试

Hatchet内置了自动重试机制。当任务执行失败时,会根据配置自动重试:

client, err := hatchet.New(
	hatchet.WithRetries(3),                  // 最大重试次数
	hatchet.WithRetryDelay(5*time.Second),  // 重试间隔
	hatchet.WithRetryBackoff(true),         // 启用指数退避
)

2. 定时任务

// 提交一个延迟10分钟执行的任务
taskID, err := client.SubmitScheduledTask(
	"send-email",
	payload,
	time.Now().Add(10*time.Minute),
)

// 提交一个周期性任务(每1小时执行一次)
taskID, err := client.SubmitPeriodicTask(
	"cleanup-task",
	payload,
	1*time.Hour,
)

3. 任务结果处理

// 获取任务状态
status, err := client.GetTaskStatus(taskID)
if err != nil {
	log.Printf("Failed to get task status: %v", err)
} else {
	log.Printf("Task status: %s, Last error: %v", status.State, status.LastError)
}

// 设置任务完成回调
client.SetTaskCompletionHandler(func(taskID string, success bool, err error) {
	if success {
		log.Printf("Task %s completed successfully", taskID)
	} else {
		log.Printf("Task %s failed: %v", taskID, err)
	}
})

分布式部署

Hatchet支持多worker分布式部署,只需在不同节点上启动worker即可:

// Worker节点1
worker1 := client.NewWorker("worker-group-1")
worker1.Start()

// Worker节点2
worker2 := client.NewWorker("worker-group-1")
worker2.Start()

任务会自动在可用worker之间分配。

最佳实践

  1. 幂等性设计:确保任务可以安全地多次执行
  2. 合理设置重试:根据任务特性设置适当的重试次数和间隔
  3. 资源控制:根据服务器资源设置合适的并发数
  4. 监控任务:实现任务状态监控和报警机制
  5. 优雅关闭:处理SIGTERM信号确保任务完成后再退出
// 优雅关闭示例
func main() {
	// ...初始化代码...
	
	// 捕获中断信号
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()
	
	go func() {
		<-ctx.Done()
		log.Println("Shutting down gracefully...")
		
		// 停止worker
		if err := worker.Stop(); err != nil {
			log.Printf("Error stopping worker: %v", err)
		}
		
		// 关闭客户端
		client.Close()
	}()
	
	// 启动worker
	if err := worker.Start(); err != nil {
		log.Fatal(err)
	}
}

Hatchet为Go开发者提供了一个简单而强大的分布式任务队列解决方案,适合需要可靠异步任务处理的场景。

回到顶部