golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用
Golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用
Temporal是一个分布式、可扩展、持久化且高可用的编排引擎,用于以可扩展和弹性的方式执行异步长时间运行的业务逻辑。"Temporal Go SDK"是Temporal的框架,用于使用Go语言编写工作流和活动。
如何使用
克隆仓库到指定位置:
git clone https://github.com/temporalio/sdk-go.git
使用slog
如果使用Go 1.21+版本,Go SDK提供了与标准slog包的内置集成。
package main
import (
"log/slog"
"os"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
)
func main() {
clientOptions := client.Options{
Logger: log.NewStructuredLogger(
slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelDebug,
}))),
}
temporalClient, err := client.Dial(clientOptions)
// ...
}
完整示例Demo
下面是一个完整的Temporal Go SDK使用示例,包含工作流和活动的定义:
package main
import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
// 定义工作流接口
type GreetingWorkflow interface {
GetGreeting(ctx workflow.Context, name string) (string, error)
}
// 实现工作流
type greetingWorkflow struct{}
func (w *greetingWorkflow) GetGreeting(ctx workflow.Context, name string) (string, error) {
// 定义活动选项
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 执行活动
var result string
err := workflow.ExecuteActivity(ctx, ComposeGreeting, name).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}
// 定义活动函数
func ComposeGreeting(ctx context.Context, name string) (string, error) {
greeting := "Hello " + name + "!"
return greeting, nil
}
func main() {
// 创建Temporal客户端
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// 创建工作流worker
w := worker.New(c, "greeting-task-queue", worker.Options{})
// 注册工作流和活动
w.RegisterWorkflowWithOptions(
(*greetingWorkflow).GetGreeting,
workflow.RegisterOptions{Name: "GreetingWorkflow"},
)
w.RegisterActivity(ComposeGreeting)
// 启动worker
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
工作流确定性检查器
Temporal提供了工具来检测工作流定义中的非确定性行为。
贡献指南
欢迎贡献代码使Temporal Go SDK变得更好。请查看贡献指南了解详情。
升级注意事项
从v1.25.1升级到更高版本时需要注意:
- proto类型的变化:time.Time变为timestamppb.Timestamp,time.Duration变为durationpb.Duration
- V2生成的struct嵌入了锁,因此不能直接解引用
- Proto枚举的JSON格式从PascalCase变为SCREAMING_SNAKE_CASE
数据转换器配置
升级时可能需要修改默认数据转换器来处理这些payload:
converter.NewCompositeDataConverter(
converter.NewNilPayloadConverter(),
converter.NewByteSlicePayloadConverter(),
converter.NewProtoPayloadConverter(),
converter.NewProtoJSONPayloadConverterWithOptions(converter.ProtoJSONPayloadConverterOptions{
LegacyTemporalProtoCompat: true,
}),
converter.NewJSONPayloadConverter(),
)
许可证
MIT许可证,详情请查看LICENSE文件。
更多关于golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现代码容错与简化开发的持久化执行系统插件Temporal的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用Temporal实现Golang代码容错与简化开发的持久化执行系统
Temporal是一个开源的持久化执行系统,它可以帮助开发者构建可靠、可扩展的分布式应用,同时简化开发流程。下面我将介绍如何在Golang中使用Temporal来实现代码容错和简化开发。
Temporal核心概念
- Workflow - 业务逻辑的组织单元,可以长时间运行(几天甚至几个月)
- Activity - 执行具体任务的单元,如调用API、数据库操作等
- Worker - 执行Workflow和Activity的进程
安装Temporal Go SDK
go get go.temporal.io/sdk
基本使用示例
1. 定义Activity
package activities
import (
"context"
"time"
)
func GreetingActivity(ctx context.Context, name string) (string, error) {
// 模拟耗时操作
time.Sleep(2 * time.Second)
// 这里可以添加业务逻辑
greeting := "Hello " + name + "!"
return greeting, nil
}
2. 定义Workflow
package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
"yourproject/activities"
)
func GreetingWorkflow(ctx workflow.Context, name string) (string, error) {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Second * 5,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, options)
var result string
err := workflow.ExecuteActivity(ctx, activities.GreetingActivity, name).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}
3. 创建Worker
package main
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"log"
"yourproject/activities"
"yourproject/workflows"
)
func main() {
// 创建Temporal客户端
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// 创建Worker
w := worker.New(c, "greeting-task-queue", worker.Options{})
// 注册Workflow和Activity
w.RegisterWorkflow(workflows.GreetingWorkflow)
w.RegisterActivity(activities.GreetingActivity)
// 启动Worker
err = w.Run()
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
4. 启动Workflow
package main
import (
"context"
"log"
"time"
"go.temporal.io/sdk/client"
"yourproject/workflows"
)
func main() {
// 创建Temporal客户端
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// Workflow选项
options := client.StartWorkflowOptions{
ID: "greeting-workflow",
TaskQueue: "greeting-task-queue",
}
// 启动Workflow
name := "World"
we, err := c.ExecuteWorkflow(context.Background(), options, workflows.GreetingWorkflow, name)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
// 获取结果
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable to get workflow result", err)
}
log.Println("Workflow result:", result)
}
Temporal的容错特性
- 自动重试 - 通过RetryPolicy配置自动重试策略
- 持久化执行 - 即使Worker崩溃,工作流也会从中断处恢复
- 超时控制 - 可以设置各种超时参数
- 幂等性 - 确保Activity可以安全重试
高级特性
信号机制(Signal)
// Workflow中接收信号
func OrderWorkflow(ctx workflow.Context, orderID string) error {
var status string
// 设置信号通道
signalChan := workflow.GetSignalChannel(ctx, "update-status")
// 在select中处理信号
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &status)
// 处理状态更新
})
// 等待信号
selector.Select(ctx)
return nil
}
// 客户端发送信号
err := c.SignalWorkflow(context.Background(), "order-workflow-id", "", "update-status", "shipped")
定时器(Timer)
func ReminderWorkflow(ctx workflow.Context) error {
// 设置24小时后触发的定时器
timerCtx, _ := workflow.NewTimer(ctx, time.Hour*24)
// 等待定时器触发
selector := workflow.NewSelector(ctx)
selector.AddFuture(timerCtx, func(f workflow.Future) {
// 定时器触发后的处理
})
selector.Select(ctx)
return nil
}
最佳实践
- 保持Workflow确定性 - 不要在Workflow中使用随机数、时间等非确定性操作
- 合理设置超时 - 根据业务需求设置适当的超时时间
- 使用合理的重试策略 - 根据Activity特性配置重试参数
- 合理设计Workflow和Activity的粒度 - 不要过于细粒度或粗粒度
Temporal通过将业务逻辑与基础设施分离,大大简化了分布式系统的开发,同时提供了强大的容错能力。通过上述示例和概念,您可以在Golang项目中快速集成Temporal来实现可靠的持久化执行系统。