golang异步任务处理插件库async的使用
Golang异步任务处理插件库async的使用
Async是一个类似async/await的任务处理包,用于Go语言中实现异步任务处理。
特性
- 支持
Task
和Action
的Wait/WaitAny/WaitN方法 - 支持带有
timeout
和cancel
的context.Context
- 使用泛型而不是
interface{}
安装async
安装main分支最新提交
go get github.com/yaitoo/async@main
安装最新发布版本
go get github.com/yaitoo/async@latest
使用示例
Wait - 等待所有任务完成
t := async.New[int](func(ctx context.Context) (int, error) {
return 1, nil
}, func(ctx context.Context) (int, error) {
return 2, nil
})
result, err, taskErrs := t.Wait(context.Background())
fmt.Println(result) // [1,2] 或 [2,1]
fmt.Println(err) // nil
fmt.Println(taskErrs) // nil
WaitAny - 等待任意一个任务完成
t := async.New[int](func(ctx context.Context) (int, error) {
time.Sleep(2 * time.Second)
return 1, nil
}, func(ctx context.Context) (int, error) {
return 2, nil
})
result, err, taskErrs := t.WaitAny(context.Background())
fmt.Println(result) // 2
fmt.Println(err) // nil
fmt.Println(taskErrs) // nil
WaitN - 等待N个任务完成
t := async.New[int](func(ctx context.Context) (int, error) {
time.Sleep(2 * time.Second)
return 1, nil
}, func(ctx context.Context) (int, error) {
return 2, nil
}, func(ctx context.Context) (int, error) {
return 3, nil
})
result, err, taskErrs := t.WaitN(context.Background(), 2)
fmt.Println(result) // [2,3] 或 [3,2]
fmt.Println(err) // nil
fmt.Println(taskErrs) // nil
Timeout - 超时控制
t := async.New[int](func(ctx context.Context) (int, error) {
time.Sleep(2 * time.Second)
return 1, nil
}, func(ctx context.Context) (int, error) {
time.Sleep(2 * time.Second)
return 2, nil
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err, tasks := t.WaitAny(ctx)
// 或者 result, err, tasks := t.Wait(ctx)
fmt.Println(result) // nil
fmt.Println(err) // context.DeadlineExceeded
fmt.Println(taskErrs) // nil
Cancel - 手动取消任务
t := async.New[int](func(ctx context.Context) (int, error) {
time.Sleep(2 * time.Second)
return 1, nil
}, func(ctx context.Context) (int, error) {
time.Sleep(2 * time.Second)
return 2, nil
})
ctx, cancel := context.WithCancel(context.Background())
go func(){
time.Sleep(1 * time.Second)
cancel()
}()
// result, err, taskErrs := t.WaitAny(ctx)
result, err, taskErrs := t.Wait(ctx)
fmt.Println(result) // nil
fmt.Println(err) // context.Cancelled
fmt.Println(taskErrs) // nil
贡献
欢迎贡献!如果您有兴趣参与贡献,请随时参与。
许可证
MIT License
更多关于golang异步任务处理插件库async的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang异步任务处理插件库async的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang异步任务处理插件库async的使用
async是一个轻量级的Go语言异步任务处理库,它提供了简单易用的API来执行并发任务。下面我将详细介绍async库的使用方法。
安装
首先安装async库:
go get github.com/reugn/async
基本用法
1. Future模式
Future模式允许你提交一个任务并稍后获取结果:
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
// 创建一个Future
future := async.NewFuture(func() (interface{}, error) {
time.Sleep(1 * time.Second) // 模拟耗时操作
return "任务完成", nil
})
// 执行其他操作...
// 获取结果(会阻塞直到任务完成)
result, err := future.Get()
if err != nil {
fmt.Println("任务出错:", err)
return
}
fmt.Println("结果:", result)
}
2. Promise模式
Promise模式允许你链式处理异步操作:
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
promise := async.NewPromise()
// 启动goroutine执行任务
go func() {
time.Sleep(1 * time.Second)
promise.Success("任务成功完成")
// 如果出错可以使用 promise.Failure(err)
}()
// 设置回调
promise.OnSuccess(func(v interface{}) {
fmt.Println("成功回调:", v)
}).OnFailure(func(err error) {
fmt.Println("失败回调:", err)
})
// 阻塞等待结果
result, err := promise.Get()
if err != nil {
fmt.Println("获取结果出错:", err)
return
}
fmt.Println("最终结果:", result)
}
3. 并行执行多个任务
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
// 创建多个Future
future1 := async.NewFuture(func() (interface{}, error) {
time.Sleep(2 * time.Second)
return "任务1完成", nil
})
future2 := async.NewFuture(func() (interface{}, error) {
time.Sleep(1 * time.Second)
return "任务2完成", nil
})
// 并行执行并等待所有任务完成
results, errs := async.AwaitAll(future1, future2)
for i, result := range results {
if errs[i] != nil {
fmt.Printf("任务%d出错: %v\n", i+1, errs[i])
continue
}
fmt.Printf("任务%d结果: %v\n", i+1, result)
}
}
4. 超时控制
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
future := async.NewFuture(func() (interface{}, error) {
time.Sleep(3 * time.Second)
return "任务完成", nil
})
// 设置2秒超时
result, err := future.GetWithTimeout(2 * time.Second)
if err != nil {
if err == async.ErrTimeout {
fmt.Println("任务超时")
} else {
fmt.Println("其他错误:", err)
}
return
}
fmt.Println("结果:", result)
}
高级用法
1. 任务取消
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
future := async.NewCancelableFuture(func() (interface{}, error) {
for i := 0; i < 10; i++ {
select {
case <-future.Cancelled():
return nil, async.ErrCancelled
default:
time.Sleep(500 * time.Millisecond)
fmt.Println("处理中...", i)
}
}
return "完成", nil
})
// 启动goroutine在2秒后取消任务
go func() {
time.Sleep(2 * time.Second)
future.Cancel()
}()
_, err := future.Get()
if err == async.ErrCancelled {
fmt.Println("任务被取消")
} else if err != nil {
fmt.Println("其他错误:", err)
} else {
fmt.Println("任务正常完成")
}
}
2. 结果转换
package main
import (
"fmt"
"strconv"
"time"
"github.com/reugn/async"
)
func main() {
future := async.NewFuture(func() (interface{}, error) {
time.Sleep(1 * time.Second)
return 42, nil
})
// 转换结果为字符串
strFuture := future.Then(func(v interface{}) (interface{}, error) {
return strconv.Itoa(v.(int)), nil
})
result, err := strFuture.Get()
if err != nil {
fmt.Println("出错:", err)
return
}
fmt.Printf("转换后的结果: %v (类型: %T)\n", result, result)
}
注意事项
- async库是线程安全的,可以在多个goroutine中使用
- Future.Get()方法会阻塞直到任务完成
- 使用CancelableFuture时,任务函数需要定期检查Cancelled()通道
- 对于长时间运行的任务,建议使用WithTimeout或实现取消逻辑
async库提供了简单而强大的异步任务处理能力,适合各种并发场景。根据你的需求选择合适的模式,可以显著提高程序的并发性能。