golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用

Golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用

Temporal是一个分布式、可扩展、持久化且高可用的编排引擎,用于以可扩展和弹性的方式执行异步长时间运行的业务逻辑。"Temporal Go SDK"是Temporal的框架,用于使用Go语言编写工作流和活动。

如何使用

克隆仓库到指定位置:

git clone https://github.com/temporalio/sdk-go.git

使用slog

如果使用Go 1.21+版本,Go SDK提供了与标准slog包的内置集成。

package main

import (
	"log/slog"
	"os"

	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/log"
	"go.temporal.io/sdk/worker"
)

func main() {
	clientOptions := client.Options{
		Logger: log.NewStructuredLogger(
			slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
				AddSource: true,
				Level:     slog.LevelDebug,
			}))),
	}
	temporalClient, err := client.Dial(clientOptions)
	// ...
}

完整示例Demo

下面是一个完整的Temporal Go SDK使用示例,包含工作流和活动的定义:

package main

import (
	"context"
	"log"
	"time"

	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

// 定义工作流接口
type GreetingWorkflow interface {
	GetGreeting(ctx workflow.Context, name string) (string, error)
}

// 实现工作流
type greetingWorkflow struct{}

func (w *greetingWorkflow) GetGreeting(ctx workflow.Context, name string) (string, error) {
	// 定义活动选项
	ao := workflow.ActivityOptions{
		StartToCloseTimeout: 10 * time.Second,
	}
	ctx = workflow.WithActivityOptions(ctx, ao)

	// 执行活动
	var result string
	err := workflow.ExecuteActivity(ctx, ComposeGreeting, name).Get(ctx, &result)
	if err != nil {
		return "", err
	}

	return result, nil
}

// 定义活动函数
func ComposeGreeting(ctx context.Context, name string) (string, error) {
	greeting := "Hello " + name + "!"
	return greeting, nil
}

func main() {
	// 创建Temporal客户端
	c, err := client.Dial(client.Options{})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}
	defer c.Close()

	// 创建工作流worker
	w := worker.New(c, "greeting-task-queue", worker.Options{})
	
	// 注册工作流和活动
	w.RegisterWorkflowWithOptions(
		(*greetingWorkflow).GetGreeting,
		workflow.RegisterOptions{Name: "GreetingWorkflow"},
	)
	w.RegisterActivity(ComposeGreeting)

	// 启动worker
	err = w.Run(worker.InterruptCh())
	if err != nil {
		log.Fatalln("Unable to start worker", err)
	}
}

工作流确定性检查器

Temporal提供了工具来检测工作流定义中的非确定性行为。

贡献指南

欢迎贡献代码使Temporal Go SDK变得更好。请查看贡献指南了解详情。

升级注意事项

从v1.25.1升级到更高版本时需要注意:

  • proto类型的变化:time.Time变为timestamppb.Timestamp,time.Duration变为durationpb.Duration
  • V2生成的struct嵌入了锁,因此不能直接解引用
  • Proto枚举的JSON格式从PascalCase变为SCREAMING_SNAKE_CASE

数据转换器配置

升级时可能需要修改默认数据转换器来处理这些payload:

converter.NewCompositeDataConverter(
	converter.NewNilPayloadConverter(),
	converter.NewByteSlicePayloadConverter(),
	converter.NewProtoPayloadConverter(),
	converter.NewProtoJSONPayloadConverterWithOptions(converter.ProtoJSONPayloadConverterOptions{
		LegacyTemporalProtoCompat: true,
	}),
	converter.NewJSONPayloadConverter(),
)

许可证

MIT许可证,详情请查看LICENSE文件。


更多关于golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Temporal实现Golang代码容错与简化开发的持久化执行系统

Temporal是一个开源的持久化执行系统,它可以帮助开发者构建可靠、可扩展的分布式应用,同时简化开发流程。下面我将介绍如何在Golang中使用Temporal来实现代码容错和简化开发。

Temporal核心概念

  1. Workflow - 业务逻辑的组织单元,可以长时间运行(几天甚至几个月)
  2. Activity - 执行具体任务的单元,如调用API、数据库操作等
  3. Worker - 执行Workflow和Activity的进程

安装Temporal Go SDK

go get go.temporal.io/sdk

基本使用示例

1. 定义Activity

package activities

import (
	"context"
	"time"
)

func GreetingActivity(ctx context.Context, name string) (string, error) {
	// 模拟耗时操作
	time.Sleep(2 * time.Second)
	
	// 这里可以添加业务逻辑
	greeting := "Hello " + name + "!"
	
	return greeting, nil
}

2. 定义Workflow

package workflows

import (
	"time"

	"go.temporal.io/sdk/workflow"
	"yourproject/activities"
)

func GreetingWorkflow(ctx workflow.Context, name string) (string, error) {
	options := workflow.ActivityOptions{
		StartToCloseTimeout: time.Second * 5,
		RetryPolicy: &temporal.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2.0,
			MaximumInterval:    time.Minute,
			MaximumAttempts:    3,
		},
	}
	
	ctx = workflow.WithActivityOptions(ctx, options)
	
	var result string
	err := workflow.ExecuteActivity(ctx, activities.GreetingActivity, name).Get(ctx, &result)
	if err != nil {
		return "", err
	}
	
	return result, nil
}

3. 创建Worker

package main

import (
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"log"
	"yourproject/activities"
	"yourproject/workflows"
)

func main() {
	// 创建Temporal客户端
	c, err := client.NewClient(client.Options{})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}
	defer c.Close()

	// 创建Worker
	w := worker.New(c, "greeting-task-queue", worker.Options{})
	
	// 注册Workflow和Activity
	w.RegisterWorkflow(workflows.GreetingWorkflow)
	w.RegisterActivity(activities.GreetingActivity)

	// 启动Worker
	err = w.Run()
	if err != nil {
		log.Fatalln("Unable to start worker", err)
	}
}

4. 启动Workflow

package main

import (
	"context"
	"log"
	"time"

	"go.temporal.io/sdk/client"
	"yourproject/workflows"
)

func main() {
	// 创建Temporal客户端
	c, err := client.NewClient(client.Options{})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}
	defer c.Close()

	// Workflow选项
	options := client.StartWorkflowOptions{
		ID:        "greeting-workflow",
		TaskQueue: "greeting-task-queue",
	}

	// 启动Workflow
	name := "World"
	we, err := c.ExecuteWorkflow(context.Background(), options, workflows.GreetingWorkflow, name)
	if err != nil {
		log.Fatalln("Unable to execute workflow", err)
	}

	// 获取结果
	var result string
	err = we.Get(context.Background(), &result)
	if err != nil {
		log.Fatalln("Unable to get workflow result", err)
	}

	log.Println("Workflow result:", result)
}

Temporal的容错特性

  1. 自动重试 - 通过RetryPolicy配置自动重试策略
  2. 持久化执行 - 即使Worker崩溃,工作流也会从中断处恢复
  3. 超时控制 - 可以设置各种超时参数
  4. 幂等性 - 确保Activity可以安全重试

高级特性

信号机制(Signal)

// Workflow中接收信号
func OrderWorkflow(ctx workflow.Context, orderID string) error {
	var status string
	
	// 设置信号通道
	signalChan := workflow.GetSignalChannel(ctx, "update-status")
	
	// 在select中处理信号
	selector := workflow.NewSelector(ctx)
	selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
		c.Receive(ctx, &status)
		// 处理状态更新
	})
	
	// 等待信号
	selector.Select(ctx)
	
	return nil
}

// 客户端发送信号
err := c.SignalWorkflow(context.Background(), "order-workflow-id", "", "update-status", "shipped")

定时器(Timer)

func ReminderWorkflow(ctx workflow.Context) error {
	// 设置24小时后触发的定时器
	timerCtx, _ := workflow.NewTimer(ctx, time.Hour*24)
	
	// 等待定时器触发
	selector := workflow.NewSelector(ctx)
	selector.AddFuture(timerCtx, func(f workflow.Future) {
		// 定时器触发后的处理
	})
	selector.Select(ctx)
	
	return nil
}

最佳实践

  1. 保持Workflow确定性 - 不要在Workflow中使用随机数、时间等非确定性操作
  2. 合理设置超时 - 根据业务需求设置适当的超时时间
  3. 使用合理的重试策略 - 根据Activity特性配置重试参数
  4. 合理设计Workflow和Activity的粒度 - 不要过于细粒度或粗粒度

Temporal通过将业务逻辑与基础设施分离,大大简化了分布式系统的开发,同时提供了强大的容错能力。通过上述示例和概念,您可以在Golang项目中快速集成Temporal来实现可靠的持久化执行系统。

回到顶部