golang实现可中断执行流程的灵活机制插件库breaker的使用
Golang实现可中断执行流程的灵活机制插件库breaker的使用
💡 核心思想
breaker提供了一种灵活的机制来使执行流程可中断。它携带一个取消信号来中断动作执行。
var NewYear = time.Time{}.AddDate(time.Now().Year(), 0, 0)
interrupter := breaker.Multiplex(
breaker.BreakByContext(context.WithTimeout(req.Context(), time.Minute)),
breaker.BreakByDeadline(NewYear),
breaker.BreakBySignal(os.Interrupt),
)
defer interrupter.Close()
<-interrupter.Done() // 等待上下文取消、超时或中断信号
🏆 使用动机
breaker可以与其他库如retry和semaphore配合使用,使它们更加一致和可靠。
// 与retry配合使用
if err := retry.Retry(breaker.BreakByTimeout(time.Minute), action); err != nil {
log.Fatal(err)
}
// 与semaphore配合使用
if err := semaphore.Acquire(breaker.BreakByTimeout(time.Minute), 5); err != nil {
log.Fatal(err)
}
🤼♂️ 使用示例
带重试的HTTP请求
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "...")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
panic(err)
}
var resp *http.Response
action := func(ctx context.Context) (err error) {
req = req.Clone(ctx)
source := ctx.Value(header).(string)
req.Header.Set(header, source)
resp, err = http.DefaultClient.Do(req)
return err
}
if err := retry.Do(ctx, action); err != nil {
panic(err)
}
完整示例:
package main
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"syscall"
"time"
"github.com/kamilsk/breaker"
"github.com/kamilsk/retry/v5"
)
func main() {
const (
header = "X-Message"
timeout = time.Minute
)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(timeout / 10)
_, _ = rw.Write([]byte(req.Header.Get(header)))
}))
defer server.Close()
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "flexible mechanism to make execution flow interruptible")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
panic(err)
}
var resp *http.Response
action := func(ctx context.Context) (err error) {
req = req.Clone(ctx)
source := ctx.Value(header).(string)
req.Header.Set(header, source)
resp, err = http.DefaultClient.Do(req)
return err
}
if err := retry.Do(ctx, action); err != nil {
fmt.Println("error:", err)
return
}
_, _ = io.Copy(os.Stdout, resp.Body)
}
HTTP服务器的优雅关闭
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
server := http.Server{
BaseContext: func(net.Listener) context.Context {
return breaker.ToContext(interrupter)
},
}
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
<-interrupter.Done()
if errors.Is(interrupter.Err(), breaker.Interrupted) {
if err := server.Shutdown(context.TODO()); err != nil {
panic(err)
}
}
完整示例:
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"syscall"
"time"
"github.com/kamilsk/breaker"
)
func main() {
const timeout = time.Minute
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
server := http.Server{
Addr: ":8080",
Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}),
BaseContext: func(net.Listener) context.Context {
return breaker.ToContext(interrupter)
},
}
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
<-interrupter.Done()
if err := interrupter.Err(); errors.Is(err, breaker.Interrupted) {
if err := server.Shutdown(context.TODO()); err != nil {
panic(err)
}
}
fmt.Println("graceful shutdown")
}
🧩 集成安装
使用go modules管理版本:
$ go get github.com/kamilsk/breaker@latest
🤲 应用场景
控制台工具执行限时命令
示例展示了如何执行限时10分钟的console命令:
$ date
# Thu Jan 7 21:02:21
$ breakit after 10m -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process | Status | Since | Until |
# +--------------------------+----------------------------+----------+----------+
# | server run --port=8080 | exit 1; panic: database... | 21:02:36 | - |
# +--------------------------+----------------------------+----------+----------+
# | | | Total | 1 |
# +--------------------------+----------------------------+----------+----------+
$ breakit after 10m -- database run --port=5432
$ breakit after 10m delay 5s -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process | Status | Since | Until |
# +--------------------------+----------------------------+----------+----------+
# | database run --port=5432 | running | 21:04:09 | 21:14:09 |
# | server run --port=8080 | delayed | 21:04:30 | 21:14:25 |
# +--------------------------+----------------------------+----------+----------+
# | | | Total | 2 |
# +--------------------------+----------------------------+----------+----------+
更多关于golang实现可中断执行流程的灵活机制插件库breaker的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang实现可中断执行流程的灵活机制插件库breaker的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang 实现可中断执行流程的机制 - breaker 库详解
在 Go 语言中实现可中断的执行流程是一个常见需求,特别是在需要处理长时间运行任务或需要优雅终止的场景。下面我将介绍如何使用 breaker 库来实现这一功能。
breaker 库简介
breaker 是一个轻量级的 Go 库,用于创建可中断的执行流程。它提供了类似于上下文(Context)的中断机制,但更加灵活和易用。
安装 breaker 库
go get github.com/kamilsk/breaker
基本用法示例
package main
import (
"context"
"fmt"
"time"
"github.com/kamilsk/breaker"
)
func main() {
// 创建一个新的 Breaker
br := breaker.New(false)
// 启动一个协程来执行可中断的任务
go func() {
err := br.Do(context.Background(), func() error {
for i := 0; i < 10; i++ {
// 检查是否被中断
if br.IsAborted() {
fmt.Println("任务被中断")
return nil
}
fmt.Printf("正在处理第 %d 项\n", i)
time.Sleep(500 * time.Millisecond)
}
return nil
})
if err != nil {
fmt.Println("执行出错:", err)
}
}()
// 3秒后中断任务
time.Sleep(3 * time.Second)
br.Abort()
// 等待任务结束
time.Sleep(1 * time.Second)
fmt.Println("程序结束")
}
高级特性
1. 带上下文的执行
func main() {
br := breaker.New(false)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err := br.Do(ctx, func() error {
// 长时间运行的任务
for i := 0; i < 10; i++ {
if br.IsAborted() {
return fmt.Errorf("任务被中断")
}
fmt.Println("处理中...")
time.Sleep(500 * time.Millisecond)
}
return nil
})
if err != nil {
fmt.Println("错误:", err)
}
}
2. 组合多个 breaker
func main() {
br1 := breaker.New(false)
br2 := breaker.New(false)
// 创建一个组合 breaker
combined := breaker.Combine(br1, br2)
go func() {
err := combined.Do(context.Background(), func() error {
for i := 0; i < 10; i++ {
if combined.IsAborted() {
fmt.Println("任一 breaker 中断了任务")
return nil
}
fmt.Println("处理中...")
time.Sleep(500 * time.Millisecond)
}
return nil
})
if err != nil {
fmt.Println("错误:", err)
}
}()
// 2秒后通过 br1 中断
time.Sleep(2 * time.Second)
br1.Abort()
time.Sleep(1 * time.Second)
}
3. 自定义中断信号
func main() {
br := breaker.New(false)
// 自定义中断信号
interrupt := make(chan struct{})
br.OnAbort(func() {
close(interrupt)
})
go func() {
select {
case <-interrupt:
fmt.Println("收到自定义中断信号")
case <-time.After(5 * time.Second):
fmt.Println("正常完成")
}
}()
// 2秒后中断
time.Sleep(2 * time.Second)
br.Abort()
time.Sleep(1 * time.Second)
}
实际应用场景
1. HTTP 服务优雅关闭
func main() {
br := breaker.New(false)
server := &http.Server{Addr: ":8080"}
// 启动服务器
go func() {
fmt.Println("服务器启动...")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("服务器错误: %v\n", err)
}
}()
// 处理中断信号
go func() {
<-br.Done()
fmt.Println("收到中断信号,关闭服务器...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
fmt.Printf("服务器关闭错误: %v\n", err)
}
}()
// 模拟中断信号
time.Sleep(5 * time.Second)
br.Abort()
time.Sleep(1 * time.Second)
}
2. 并行任务控制
func processTask(id int, br *breaker.Breaker) error {
return br.Do(context.Background(), func() error {
for i := 0; i < 5; i++ {
if br.IsAborted() {
return fmt.Errorf("任务 %d 被中断", id)
}
fmt.Printf("任务 %d 进度 %d/5\n", id, i+1)
time.Sleep(time.Duration(id) * 200 * time.Millisecond)
}
return nil
})
}
func main() {
br := breaker.New(false)
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := processTask(id, br); err != nil {
fmt.Println(err)
}
}(i)
}
// 3秒后中断所有任务
time.Sleep(3 * time.Second)
br.Abort()
wg.Wait()
fmt.Println("所有任务完成")
}
总结
breaker 库提供了以下主要优势:
- 简单易用:比标准库 context 更简单的 API
- 灵活性:可以组合多个 breaker,自定义中断行为
- 明确的中断检查:通过 IsAborted() 方法明确检查中断状态
- 与 context 集成:可以与标准库 context 一起使用
通过合理使用 breaker 库,可以轻松实现 Go 程序中的可中断执行流程,提高程序的健壮性和可控性。