golang异步任务处理插件库async的使用

Golang异步任务处理插件库async的使用

Async是一个类似async/await的任务处理包,用于Go语言中实现异步任务处理。

特性

  • 支持TaskAction的Wait/WaitAny/WaitN方法
  • 支持带有timeoutcancelcontext.Context
  • 使用泛型而不是interface{}

安装async

安装main分支最新提交

go get github.com/yaitoo/async@main

安装最新发布版本

go get github.com/yaitoo/async@latest

使用示例

Wait - 等待所有任务完成

t := async.New[int](func(ctx context.Context) (int, error) {
    return 1, nil
}, func(ctx context.Context) (int, error) {
    return 2, nil
})

result, err, taskErrs := t.Wait(context.Background())

fmt.Println(result)  // [1,2] 或 [2,1]
fmt.Println(err)     // nil
fmt.Println(taskErrs) // nil

WaitAny - 等待任意一个任务完成

t := async.New[int](func(ctx context.Context) (int, error) {
    time.Sleep(2 * time.Second)
    return 1, nil
}, func(ctx context.Context) (int, error) {
    return 2, nil
})

result, err, taskErrs := t.WaitAny(context.Background())

fmt.Println(result)  // 2
fmt.Println(err)     // nil
fmt.Println(taskErrs) // nil

WaitN - 等待N个任务完成

t := async.New[int](func(ctx context.Context) (int, error) {
    time.Sleep(2 * time.Second)
    return 1, nil
}, func(ctx context.Context) (int, error) {
    return 2, nil
}, func(ctx context.Context) (int, error) {
    return 3, nil
})

result, err, taskErrs := t.WaitN(context.Background(), 2)

fmt.Println(result)  // [2,3] 或 [3,2]
fmt.Println(err)     // nil
fmt.Println(taskErrs) // nil

Timeout - 超时控制

t := async.New[int](func(ctx context.Context) (int, error) {
    time.Sleep(2 * time.Second)
    return 1, nil
}, func(ctx context.Context) (int, error) {
    time.Sleep(2 * time.Second)
    return 2, nil
})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

result, err, tasks := t.WaitAny(ctx)
// 或者 result, err, tasks := t.Wait(ctx)

fmt.Println(result) // nil
fmt.Println(err)   // context.DeadlineExceeded
fmt.Println(taskErrs) // nil

Cancel - 手动取消任务

t := async.New[int](func(ctx context.Context) (int, error) {
    time.Sleep(2 * time.Second)
    return 1, nil
}, func(ctx context.Context) (int, error) {
    time.Sleep(2 * time.Second)
    return 2, nil
})

ctx, cancel := context.WithCancel(context.Background())
go func(){
    time.Sleep(1 * time.Second)
    cancel()
}()

// result, err, taskErrs := t.WaitAny(ctx)
result, err, taskErrs := t.Wait(ctx)

fmt.Println(result)  // nil
fmt.Println(err)     // context.Cancelled
fmt.Println(taskErrs) // nil

贡献

欢迎贡献!如果您有兴趣参与贡献,请随时参与。

许可证

MIT License


更多关于golang异步任务处理插件库async的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang异步任务处理插件库async的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang异步任务处理插件库async的使用

async是一个轻量级的Go语言异步任务处理库,它提供了简单易用的API来执行并发任务。下面我将详细介绍async库的使用方法。

安装

首先安装async库:

go get github.com/reugn/async

基本用法

1. Future模式

Future模式允许你提交一个任务并稍后获取结果:

package main

import (
	"fmt"
	"time"
	"github.com/reugn/async"
)

func main() {
	// 创建一个Future
	future := async.NewFuture(func() (interface{}, error) {
		time.Sleep(1 * time.Second) // 模拟耗时操作
		return "任务完成", nil
	})

	// 执行其他操作...

	// 获取结果(会阻塞直到任务完成)
	result, err := future.Get()
	if err != nil {
		fmt.Println("任务出错:", err)
		return
	}
	fmt.Println("结果:", result)
}

2. Promise模式

Promise模式允许你链式处理异步操作:

package main

import (
	"fmt"
	"time"
	"github.com/reugn/async"
)

func main() {
	promise := async.NewPromise()

	// 启动goroutine执行任务
	go func() {
		time.Sleep(1 * time.Second)
		promise.Success("任务成功完成")
		// 如果出错可以使用 promise.Failure(err)
	}()

	// 设置回调
	promise.OnSuccess(func(v interface{}) {
		fmt.Println("成功回调:", v)
	}).OnFailure(func(err error) {
		fmt.Println("失败回调:", err)
	})

	// 阻塞等待结果
	result, err := promise.Get()
	if err != nil {
		fmt.Println("获取结果出错:", err)
		return
	}
	fmt.Println("最终结果:", result)
}

3. 并行执行多个任务

package main

import (
	"fmt"
	"time"
	"github.com/reugn/async"
)

func main() {
	// 创建多个Future
	future1 := async.NewFuture(func() (interface{}, error) {
		time.Sleep(2 * time.Second)
		return "任务1完成", nil
	})

	future2 := async.NewFuture(func() (interface{}, error) {
		time.Sleep(1 * time.Second)
		return "任务2完成", nil
	})

	// 并行执行并等待所有任务完成
	results, errs := async.AwaitAll(future1, future2)
	for i, result := range results {
		if errs[i] != nil {
			fmt.Printf("任务%d出错: %v\n", i+1, errs[i])
			continue
		}
		fmt.Printf("任务%d结果: %v\n", i+1, result)
	}
}

4. 超时控制

package main

import (
	"fmt"
	"time"
	"github.com/reugn/async"
)

func main() {
	future := async.NewFuture(func() (interface{}, error) {
		time.Sleep(3 * time.Second)
		return "任务完成", nil
	})

	// 设置2秒超时
	result, err := future.GetWithTimeout(2 * time.Second)
	if err != nil {
		if err == async.ErrTimeout {
			fmt.Println("任务超时")
		} else {
			fmt.Println("其他错误:", err)
		}
		return
	}
	fmt.Println("结果:", result)
}

高级用法

1. 任务取消

package main

import (
	"fmt"
	"time"
	"github.com/reugn/async"
)

func main() {
	future := async.NewCancelableFuture(func() (interface{}, error) {
		for i := 0; i < 10; i++ {
			select {
			case <-future.Cancelled():
				return nil, async.ErrCancelled
			default:
				time.Sleep(500 * time.Millisecond)
				fmt.Println("处理中...", i)
			}
		}
		return "完成", nil
	})

	// 启动goroutine在2秒后取消任务
	go func() {
		time.Sleep(2 * time.Second)
		future.Cancel()
	}()

	_, err := future.Get()
	if err == async.ErrCancelled {
		fmt.Println("任务被取消")
	} else if err != nil {
		fmt.Println("其他错误:", err)
	} else {
		fmt.Println("任务正常完成")
	}
}

2. 结果转换

package main

import (
	"fmt"
	"strconv"
	"time"
	"github.com/reugn/async"
)

func main() {
	future := async.NewFuture(func() (interface{}, error) {
		time.Sleep(1 * time.Second)
		return 42, nil
	})

	// 转换结果为字符串
	strFuture := future.Then(func(v interface{}) (interface{}, error) {
		return strconv.Itoa(v.(int)), nil
	})

	result, err := strFuture.Get()
	if err != nil {
		fmt.Println("出错:", err)
		return
	}
	fmt.Printf("转换后的结果: %v (类型: %T)\n", result, result)
}

注意事项

  1. async库是线程安全的,可以在多个goroutine中使用
  2. Future.Get()方法会阻塞直到任务完成
  3. 使用CancelableFuture时,任务函数需要定期检查Cancelled()通道
  4. 对于长时间运行的任务,建议使用WithTimeout或实现取消逻辑

async库提供了简单而强大的异步任务处理能力,适合各种并发场景。根据你的需求选择合适的模式,可以显著提高程序的并发性能。

回到顶部