golang实现可中断执行流程的灵活机制插件库breaker的使用

Golang实现可中断执行流程的灵活机制插件库breaker的使用

💡 核心思想

breaker提供了一种灵活的机制来使执行流程可中断。它携带一个取消信号来中断动作执行。

var NewYear = time.Time{}.AddDate(time.Now().Year(), 0, 0)

interrupter := breaker.Multiplex(
    breaker.BreakByContext(context.WithTimeout(req.Context(), time.Minute)),
    breaker.BreakByDeadline(NewYear),
    breaker.BreakBySignal(os.Interrupt),
)
defer interrupter.Close()

<-interrupter.Done() // 等待上下文取消、超时或中断信号

🏆 使用动机

breaker可以与其他库如retry和semaphore配合使用,使它们更加一致和可靠。

// 与retry配合使用
if err := retry.Retry(breaker.BreakByTimeout(time.Minute), action); err != nil {
    log.Fatal(err)
}

// 与semaphore配合使用
if err := semaphore.Acquire(breaker.BreakByTimeout(time.Minute), 5); err != nil {
    log.Fatal(err)
}

🤼‍♂️ 使用示例

带重试的HTTP请求

interrupter := breaker.Multiplex(
    breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
    breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()

ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "...")

req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
    panic(err)
}

var resp *http.Response
action := func(ctx context.Context) (err error) {
    req = req.Clone(ctx)

    source := ctx.Value(header).(string)
    req.Header.Set(header, source)

    resp, err = http.DefaultClient.Do(req)
    return err
}

if err := retry.Do(ctx, action); err != nil {
    panic(err)
}

完整示例:

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "net/http/httptest"
    "os"
    "syscall"
    "time"

    "github.com/kamilsk/breaker"
    "github.com/kamilsk/retry/v5"
)

func main() {
    const (
        header  = "X-Message"
        timeout = time.Minute
    )

    server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
        time.Sleep(timeout / 10)
        _, _ = rw.Write([]byte(req.Header.Get(header)))
    }))
    defer server.Close()

    interrupter := breaker.Multiplex(
        breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
        breaker.BreakByTimeout(timeout),
    )
    defer interrupter.Close()

    ctx := breaker.ToContext(interrupter)
    ctx = context.WithValue(ctx, header, "flexible mechanism to make execution flow interruptible")

    req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
    if err != nil {
        panic(err)
    }

    var resp *http.Response
    action := func(ctx context.Context) (err error) {
        req = req.Clone(ctx)

        source := ctx.Value(header).(string)
        req.Header.Set(header, source)

        resp, err = http.DefaultClient.Do(req)
        return err
    }

    if err := retry.Do(ctx, action); err != nil {
        fmt.Println("error:", err)
        return
    }
    _, _ = io.Copy(os.Stdout, resp.Body)
}

HTTP服务器的优雅关闭

interrupter := breaker.Multiplex(
    breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
    breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()

server := http.Server{
    BaseContext: func(net.Listener) context.Context {
        return breaker.ToContext(interrupter)
    },
}
go func() {
    if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
        log.Fatal(err)
    }
}()

<-interrupter.Done()
if errors.Is(interrupter.Err(), breaker.Interrupted) {
    if err := server.Shutdown(context.TODO()); err != nil {
        panic(err)
    }
}

完整示例:

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "net"
    "net/http"
    "os"
    "syscall"
    "time"

    "github.com/kamilsk/breaker"
)

func main() {
    const timeout = time.Minute

    interrupter := breaker.Multiplex(
        breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
        breaker.BreakByTimeout(timeout),
    )
    defer interrupter.Close()

    server := http.Server{
        Addr:    ":8080",
        Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}),
        BaseContext: func(net.Listener) context.Context {
            return breaker.ToContext(interrupter)
        },
    }
    go func() {
        if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
            log.Fatal(err)
        }
    }()

    <-interrupter.Done()
    if err := interrupter.Err(); errors.Is(err, breaker.Interrupted) {
        if err := server.Shutdown(context.TODO()); err != nil {
            panic(err)
        }
    }
    fmt.Println("graceful shutdown")
}

🧩 集成安装

使用go modules管理版本:

$ go get github.com/kamilsk/breaker@latest

🤲 应用场景

控制台工具执行限时命令

示例展示了如何执行限时10分钟的console命令:

$ date
# Thu Jan  7 21:02:21
$ breakit after 10m -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process                  | Status                     | Since    | Until    |
# +--------------------------+----------------------------+----------+----------+
# | server run --port=8080   | exit 1; panic: database... | 21:02:36 | -        |
# +--------------------------+----------------------------+----------+----------+
# |                          |                            |    Total |        1 |
# +--------------------------+----------------------------+----------+----------+
$ breakit after 10m -- database run --port=5432
$ breakit after 10m delay 5s -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process                  | Status                     | Since    | Until    |
# +--------------------------+----------------------------+----------+----------+
# | database run --port=5432 | running                    | 21:04:09 | 21:14:09 |
# | server run --port=8080   | delayed                    | 21:04:30 | 21:14:25 |
# +--------------------------+----------------------------+----------+----------+
# |                          |                            |    Total |        2 |
# +--------------------------+----------------------------+----------+----------+

更多关于golang实现可中断执行流程的灵活机制插件库breaker的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现可中断执行流程的灵活机制插件库breaker的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 实现可中断执行流程的机制 - breaker 库详解

在 Go 语言中实现可中断的执行流程是一个常见需求,特别是在需要处理长时间运行任务或需要优雅终止的场景。下面我将介绍如何使用 breaker 库来实现这一功能。

breaker 库简介

breaker 是一个轻量级的 Go 库,用于创建可中断的执行流程。它提供了类似于上下文(Context)的中断机制,但更加灵活和易用。

安装 breaker 库

go get github.com/kamilsk/breaker

基本用法示例

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/kamilsk/breaker"
)

func main() {
	// 创建一个新的 Breaker
	br := breaker.New(false)

	// 启动一个协程来执行可中断的任务
	go func() {
		err := br.Do(context.Background(), func() error {
			for i := 0; i < 10; i++ {
				// 检查是否被中断
				if br.IsAborted() {
					fmt.Println("任务被中断")
					return nil
				}
				fmt.Printf("正在处理第 %d 项\n", i)
				time.Sleep(500 * time.Millisecond)
			}
			return nil
		})
		if err != nil {
			fmt.Println("执行出错:", err)
		}
	}()

	// 3秒后中断任务
	time.Sleep(3 * time.Second)
	br.Abort()

	// 等待任务结束
	time.Sleep(1 * time.Second)
	fmt.Println("程序结束")
}

高级特性

1. 带上下文的执行

func main() {
	br := breaker.New(false)
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	err := br.Do(ctx, func() error {
		// 长时间运行的任务
		for i := 0; i < 10; i++ {
			if br.IsAborted() {
				return fmt.Errorf("任务被中断")
			}
			fmt.Println("处理中...")
			time.Sleep(500 * time.Millisecond)
		}
		return nil
	})

	if err != nil {
		fmt.Println("错误:", err)
	}
}

2. 组合多个 breaker

func main() {
	br1 := breaker.New(false)
	br2 := breaker.New(false)

	// 创建一个组合 breaker
	combined := breaker.Combine(br1, br2)

	go func() {
		err := combined.Do(context.Background(), func() error {
			for i := 0; i < 10; i++ {
				if combined.IsAborted() {
					fmt.Println("任一 breaker 中断了任务")
					return nil
				}
				fmt.Println("处理中...")
				time.Sleep(500 * time.Millisecond)
			}
			return nil
		})
		if err != nil {
			fmt.Println("错误:", err)
		}
	}()

	// 2秒后通过 br1 中断
	time.Sleep(2 * time.Second)
	br1.Abort()

	time.Sleep(1 * time.Second)
}

3. 自定义中断信号

func main() {
	br := breaker.New(false)
	
	// 自定义中断信号
	interrupt := make(chan struct{})
	br.OnAbort(func() {
		close(interrupt)
	})

	go func() {
		select {
		case <-interrupt:
			fmt.Println("收到自定义中断信号")
		case <-time.After(5 * time.Second):
			fmt.Println("正常完成")
		}
	}()

	// 2秒后中断
	time.Sleep(2 * time.Second)
	br.Abort()

	time.Sleep(1 * time.Second)
}

实际应用场景

1. HTTP 服务优雅关闭

func main() {
	br := breaker.New(false)
	server := &http.Server{Addr: ":8080"}

	// 启动服务器
	go func() {
		fmt.Println("服务器启动...")
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			fmt.Printf("服务器错误: %v\n", err)
		}
	}()

	// 处理中断信号
	go func() {
		<-br.Done()
		fmt.Println("收到中断信号,关闭服务器...")
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		if err := server.Shutdown(ctx); err != nil {
			fmt.Printf("服务器关闭错误: %v\n", err)
		}
	}()

	// 模拟中断信号
	time.Sleep(5 * time.Second)
	br.Abort()
	time.Sleep(1 * time.Second)
}

2. 并行任务控制

func processTask(id int, br *breaker.Breaker) error {
	return br.Do(context.Background(), func() error {
		for i := 0; i < 5; i++ {
			if br.IsAborted() {
				return fmt.Errorf("任务 %d 被中断", id)
			}
			fmt.Printf("任务 %d 进度 %d/5\n", id, i+1)
			time.Sleep(time.Duration(id) * 200 * time.Millisecond)
		}
		return nil
	})
}

func main() {
	br := breaker.New(false)
	var wg sync.WaitGroup

	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			if err := processTask(id, br); err != nil {
				fmt.Println(err)
			}
		}(i)
	}

	// 3秒后中断所有任务
	time.Sleep(3 * time.Second)
	br.Abort()

	wg.Wait()
	fmt.Println("所有任务完成")
}

总结

breaker 库提供了以下主要优势:

  1. 简单易用:比标准库 context 更简单的 API
  2. 灵活性:可以组合多个 breaker,自定义中断行为
  3. 明确的中断检查:通过 IsAborted() 方法明确检查中断状态
  4. 与 context 集成:可以与标准库 context 一起使用

通过合理使用 breaker 库,可以轻松实现 Go 程序中的可中断执行流程,提高程序的健壮性和可控性。

回到顶部