golang异步同步替代库插件async的使用
Golang异步同步替代库插件async的使用
Async是一个用于Go语言的同步和异步计算包,提供了多种实用的并发工具。
主要功能
- ConcurrentMap - 通过委托给底层的
sync.Map
来实现线程安全的通用async.Map
接口 - ShardedMap - 使用键哈希计算分片号,将加载/存储操作委托给底层
async.SynchronizedMap
之一,实现线程安全的通用async.Map
接口 - Future - 一个可能还不存在的值的占位符对象
- Promise - 可写、单次赋值的容器,用于完成future
- Executor - 用于执行异步任务的worker池,每个提交返回一个代表任务结果的Future实例
- Task - 用于控制可能延迟和异步计算的数据类型
- Once - 类似sync.Once的对象,但Do方法接受
f func() (T, error)
并返回(T, error)
- Value - 类似atomic.Value的对象,但没有一致的类型约束
- WaitGroupContext - 支持
context.Context
的WaitGroup,用于优雅解除阻塞 - ReentrantLock - 允许goroutine多次进入资源锁的互斥锁
- PriorityLock - 非重入互斥锁,允许指定锁获取优先级
使用示例
Future/Promise示例
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
// 创建一个新的Promise
promise := async.NewPromise[int]()
// 异步设置Promise的值
go func() {
time.Sleep(1 * time.Second)
promise.Success(42) // 设置成功值
// 或者 promise.Failure(err) 设置错误
}()
// 获取关联的Future
future := promise.Future()
// 阻塞等待结果
result, err := future.Get()
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Result:", result) // 输出: Result: 42
// 非阻塞检查
if future.IsComplete() {
fmt.Println("Future is complete")
}
}
Executor示例
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
// 创建一个固定大小的executor
executor := async.NewFixedExecutor(4, 100)
defer executor.Close()
// 提交任务
future := executor.Submit(func() (interface{}, error) {
time.Sleep(500 * time.Millisecond)
return "task completed", nil
})
// 获取结果
result, err := future.Get()
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println(result) // 输出: task completed
}
ConcurrentMap示例
package main
import (
"fmt"
"github.com/reugn/async"
)
func main() {
// 创建并发安全的map
cmap := async.NewConcurrentMap[string, int]()
// 存储值
cmap.Store("key1", 100)
cmap.Store("key2", 200)
// 加载值
if value, ok := cmap.Load("key1"); ok {
fmt.Println("key1:", value) // 输出: key1: 100
}
// 遍历map
cmap.Range(func(key string, value int) bool {
fmt.Printf("%s: %d\n", key, value)
return true // 继续迭代
})
}
ReentrantLock示例
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
lock := async.NewReentrantLock()
// 第一个锁获取
lock.Lock()
fmt.Println("First lock acquired")
// 可重入锁获取
lock.Lock()
fmt.Println("Second lock acquired (reentrant)")
// 释放锁
lock.Unlock()
lock.Unlock()
// 尝试锁
if lock.TryLock() {
fmt.Println("TryLock succeeded")
lock.Unlock()
}
// 带超时的锁
if lock.TryLockWithTimeout(1 * time.Second) {
fmt.Println("TryLockWithTimeout succeeded")
lock.Unlock()
}
}
Async库提供了丰富的并发工具,可以根据具体需求选择合适的组件来实现异步和同步操作。
更多关于golang异步同步替代库插件async的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang异步同步替代库插件async的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang异步同步替代库async的使用
在Go语言中,虽然原生支持goroutine实现并发,但有时我们需要更高级的异步/同步控制。async库是一个流行的Go异步编程工具,提供了Promise/Future模式,让异步代码更易管理。
安装async库
go get github.com/reugn/async
基本使用示例
1. 创建Promise
package main
import (
"fmt"
"time"
"github.com/reugn/async"
)
func main() {
promise := async.NewPromise()
go func() {
time.Sleep(1 * time.Second)
promise.Success("Operation completed!")
}()
result, err := promise.Get()
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Result:", result)
}
2. 链式调用
func chainExample() {
async.NewPromise().
Then(func(v interface{}) (interface{}, error) {
fmt.Println("Step 1:", v)
return "processed " + v.(string), nil
}).
Then(func(v interface{}) (interface{}, error) {
fmt.Println("Step 2:", v)
return len(v.(string)), nil
}).
OnSuccess(func(v interface{}) {
fmt.Println("Final result:", v)
}).
OnFailure(func(err error) {
fmt.Println("Error occurred:", err)
})
}
3. 异步任务组合
func combineExample() {
p1 := async.NewPromise()
p2 := async.NewPromise()
go func() {
time.Sleep(500 * time.Millisecond)
p1.Success("Hello")
}()
go func() {
time.Sleep(1 * time.Second)
p2.Success("World")
}()
// 等待所有Promise完成
combined := async.WhenAll(p1, p2)
result, _ := combined.Get()
fmt.Println("Combined results:", result) // [Hello World]
// 等待任意一个Promise完成
first := async.WhenAny(p1, p2)
firstResult, _ := first.Get()
fmt.Println("First completed:", firstResult)
}
4. Future模式
func futureExample() {
future := async.NewFuture(func() (interface{}, error) {
time.Sleep(2 * time.Second)
return 42, nil
})
fmt.Println("Waiting for future...")
result, err := future.Get()
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Future result:", result)
}
高级功能
1. 超时控制
func withTimeout() {
promise := async.NewPromise()
go func() {
time.Sleep(3 * time.Second)
promise.Success("Too late!")
}()
// 设置1秒超时
result, err := promise.GetWithTimeout(1 * time.Second)
if err == async.ErrTimeout {
fmt.Println("Operation timed out")
return
}
fmt.Println("Result:", result)
}
2. 错误处理
func errorHandling() {
promise := async.NewPromise()
go func() {
time.Sleep(1 * time.Second)
promise.Failure(fmt.Errorf("something went wrong"))
}()
_, err := promise.Get()
if err != nil {
fmt.Println("Caught error:", err)
}
}
3. 转换回调为Promise
func callbackToPromise() {
// 模拟一个回调函数
callbackFunc := func(callback func(string, error)) {
time.Sleep(1 * time.Second)
callback("callback result", nil)
}
// 将回调转换为Promise
promise := async.FromCallback(func() (string, error) {
var result string
var err error
done := make(chan struct{})
callbackFunc(func(s string, e error) {
result = s
err = e
close(done)
})
<-done
return result, err
})
result, _ := promise.Get()
fmt.Println("Converted result:", result)
}
实际应用场景
- 并发API调用:同时调用多个API并合并结果
- 超时控制:为长时间运行的操作设置超时
- 任务流水线:创建一系列依赖的异步任务
- 错误传播:在异步链中优雅地处理错误
注意事项
- Promise一旦完成(成功或失败),状态就不能再改变
- Get()方法是阻塞的,考虑使用GetWithTimeout()
- 大量使用Promise可能会增加代码复杂性,评估是否真的需要
async库为Go提供了更灵活的异步编程模式,特别适合需要复杂异步控制的场景。但对于简单并发,标准库的goroutine和channel可能更合适。