golang异步流程控制更直观的插件库Hunch的使用

Golang异步流程控制更直观的插件库Hunch的使用

Hunch是一个Golang库,提供了AllFirstRetryWaterfall等功能,使异步流程控制更加直观。

关于Hunch

Hunch基于context包,提供了可以帮助你轻松处理复杂异步逻辑的函数。

安装

go get

$ go get -u -v github.com/aaronjan/hunch

go mod (推荐)

import "github.com/aaronjan/hunch"
$ go mod tidy

类型

type Executable func(context.Context) (interface{}, error)

type ExecutableInSequence func(context.Context, interface{}) (interface{}, error)

API

All

func All(parentCtx context.Context, execs ...Executable) ([]interface{}, error)

All返回所有Executables的所有输出,顺序保证。

示例

ctx := context.Background()
r, err := hunch.All(
    ctx,
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(300 * time.Millisecond)
        return 1, nil
    },
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(200 * time.Millisecond)
        return 2, nil
    },
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(100 * time.Millisecond)
        return 3, nil
    },
)

fmt.Println(r, err)
// Output:
// [1 2 3] <nil>

Take

func Take(parentCtx context.Context, num int, execs ...Executable) ([]interface{}, error)

Take返回Executables输出的前num个值。

示例

ctx := context.Background()
r, err := hunch.Take(
    ctx,
    // 只需要前2个值
    2,
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(300 * time.Millisecond)
        return 1, nil
    },
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(200 * time.Millisecond)
        return 2, nil
    },
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(100 * time.Millisecond)
        return 3, nil
    },
)

fmt.Println(r, err)
// Output:
// [3 2] <nil>

Last

func Last(parentCtx context.Context, num int, execs ...Executable) ([]interface{}, error)

Last返回Executables输出的最后num个值。

示例

ctx := context.Background()
r, err := hunch.Last(
    ctx,
    // 只需要最后2个值
    2,
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(300 * time.Millisecond)
        return 1, nil
    },
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(200 * time.Millisecond)
        return 2, nil
    },
    func(ctx context.Context) (interface{}, error) {
        time.Sleep(100 * time.Millisecond)
        return 3, nil
    },
)

fmt.Println(r, err)
// Output:
// [2 1] <nil>

Waterfall

func Waterfall(parentCtx context.Context, execs ...ExecutableInSequence) (interface{}, error)

Waterfall一个接一个地运行ExecutableInSequence,将前一个结果作为输入传递给下一个Executable。当发生错误时,它会停止处理并返回错误。当父Context被取消时,它会立即返回其Err()

示例

ctx := context.Background()
r, err := hunch.Waterfall(
    ctx,
    func(ctx context.Context, n interface{}) (interface{}, error) {
        return 1, nil
    },
    func(ctx context.Context, n interface{}) (interface{}, error) {
        return n.(int) + 1, nil
    },
    func(ctx context.Context, n interface{}) (interface{}, error) {
        return n.(int) + 1, nil
    },
)

fmt.Println(r, err)
// Output:
// 3 <nil>

Retry

func Retry(parentCtx context.Context, retries int, fn Executable) (interface{}, error)

Retry尝试从Executable获取值而不是错误。当失败时,它会继续重新运行Executable,不超过retries次。此外,当父Context被取消时,它会立即返回其Err()

示例

count := 0
getStuffFromAPI := func() (int, error) {
    if count == 5 {
        return 1, nil
    }
    count++

    return 0, fmt.Errorf("timeout")
}

ctx := context.Background()
r, err := hunch.Retry(
    ctx,
    10,
    func(ctx context.Context) (interface{}, error) {
        rs, err := getStuffFromAPI()

        return rs, err
    },
)

fmt.Println(r, err, count)
// Output:
// 1 <nil> 5

许可证

Apache 2.0


更多关于golang异步流程控制更直观的插件库Hunch的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang异步流程控制更直观的插件库Hunch的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Hunch - Golang异步流程控制库

Hunch是一个让Golang异步流程控制更直观的插件库,它提供了类似JavaScript中Promise的链式调用风格,使得异步代码更易读和维护。

主要特性

  1. 链式调用风格
  2. 支持并行执行
  3. 支持顺序执行
  4. 错误处理机制
  5. 超时控制

安装

go get github.com/AaronJan/Hunch

基本使用示例

1. 并行执行多个任务

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/AaronJan/Hunch"
)

func main() {
	ctx := context.Background()
	
	results, errors := Hunch.All(
		ctx,
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(2 * time.Second)
			return "Task 1", nil
		},
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(1 * time.Second)
			return "Task 2", nil
		},
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(3 * time.Second)
			return 100, nil
		},
	)

	for i, result := range results {
		fmt.Printf("Result %d: %v\n", i, result)
	}
	
	if len(errors) > 0 {
		fmt.Println("Errors occurred:", errors)
	}
}

2. 顺序执行任务

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/AaronJan/Hunch"
)

func main() {
	ctx := context.Background()
	
	results, errors := Hunch.Waterfall(
		ctx,
		func(ctx context.Context, n interface{}) (interface{}, error) {
			fmt.Println("First task")
			return "initial data", nil
		},
		func(ctx context.Context, n interface{}) (interface{}, error) {
			fmt.Println("Second task received:", n)
			time.Sleep(1 * time.Second)
			return "processed data", nil
		},
		func(ctx context.Context, n interface{}) (interface{}, error) {
			fmt.Println("Third task received:", n)
			return "final result", nil
		},
	)

	fmt.Println("Final result:", results)
	if len(errors) > 0 {
		fmt.Println("Errors occurred:", errors)
	}
}

3. 带超时控制的并行任务

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/AaronJan/Hunch"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	
	results, errors := Hunch.All(
		ctx,
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(3 * time.Second) // 这个任务会超时
			return "Task 1", nil
		},
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(1 * time.Second)
			return "Task 2", nil
		},
	)

	fmt.Println("Results:", results)
	fmt.Println("Errors:", errors) // 会包含context deadline exceeded错误
}

4. 链式调用

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/AaronJan/Hunch"
)

func main() {
	ctx := context.Background()
	
	Hunch.New(ctx).
		Then(func(ctx context.Context) (interface{}, error) {
			fmt.Println("First task")
			return 10, nil
		}).
		Then(func(ctx context.Context, n interface{}) (interface{}, error) {
			num := n.(int)
			fmt.Println("Got number:", num)
			return num * 2, nil
		}).
		Then(func(ctx context.Context, n interface{}) (interface{}, error) {
			num := n.(int)
			fmt.Println("Final number:", num)
			return nil, nil
		}).
		Catch(func(ctx context.Context, err error) (interface{}, error) {
			fmt.Println("Error occurred:", err)
			return nil, err
		}).
		Finally(func(ctx context.Context, n interface{}) (interface{}, error) {
			fmt.Println("All done")
			return nil, nil
		})
}

高级用法

1. 限制并发数

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/AaronJan/Hunch"
)

func main() {
	ctx := context.Background()
	
	// 限制最多同时执行2个任务
	results, errors := Hunch.All(
		ctx,
		Hunch.WithConcurrency(2),
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(2 * time.Second)
			return "Task 1", nil
		},
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(1 * time.Second)
			return "Task 2", nil
		},
		func(ctx context.Context) (interface{}, error) {
			time.Sleep(3 * time.Second)
			return "Task 3", nil
		},
	)

	fmt.Println(results)
	fmt.Println(errors)
}

2. 重试机制

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/AaronJan/Hunch"
)

func main() {
	ctx := context.Background()
	
	// 重试3次,每次间隔1秒
	result, err := Hunch.Retry(
		ctx,
		3,
		time.Second,
		func(ctx context.Context) (interface{}, error) {
			fmt.Println("Attempting task...")
			return nil, fmt.Errorf("temporary error")
		},
	)

	if err != nil {
		fmt.Println("All attempts failed:", err)
	} else {
		fmt.Println("Success:", result)
	}
}

总结

Hunch库提供了以下几种核心功能:

  1. All() - 并行执行所有任务
  2. Waterfall() - 顺序执行任务,前一个任务的输出是后一个任务的输入
  3. Race() - 执行所有任务,返回第一个完成的任务结果
  4. Retry() - 重试机制
  5. 链式调用API

相比原生Golang的goroutine和channel,Hunch提供了更高层次的抽象,使得异步流程控制代码更简洁、更易读。特别适合需要复杂异步控制的场景,如微服务调用、并发数据处理等。

Hunch的API设计借鉴了JavaScript Promise的风格,对于有前端经验的开发者来说学习曲线较为平缓。同时它完全基于Golang的context包实现,可以很好地与现有的Golang生态系统集成。

回到顶部