golang异步同步替代库插件async的使用

Golang异步同步替代库插件async的使用

async库logo

Async是一个用于Go语言的同步和异步计算包,提供了多种实用的并发工具。

主要功能

  1. ConcurrentMap - 通过委托给底层的sync.Map来实现线程安全的通用async.Map接口
  2. ShardedMap - 使用键哈希计算分片号,将加载/存储操作委托给底层async.SynchronizedMap之一,实现线程安全的通用async.Map接口
  3. Future - 一个可能还不存在的值的占位符对象
  4. Promise - 可写、单次赋值的容器,用于完成future
  5. Executor - 用于执行异步任务的worker池,每个提交返回一个代表任务结果的Future实例
  6. Task - 用于控制可能延迟和异步计算的数据类型
  7. Once - 类似sync.Once的对象,但Do方法接受f func() (T, error)并返回(T, error)
  8. Value - 类似atomic.Value的对象,但没有一致的类型约束
  9. WaitGroupContext - 支持context.Context的WaitGroup,用于优雅解除阻塞
  10. ReentrantLock - 允许goroutine多次进入资源锁的互斥锁
  11. PriorityLock - 非重入互斥锁,允许指定锁获取优先级

使用示例

Future/Promise示例

package main

import (
	"fmt"
	"time"

	"github.com/reugn/async"
)

func main() {
	// 创建一个新的Promise
	promise := async.NewPromise[int]()

	// 异步设置Promise的值
	go func() {
		time.Sleep(1 * time.Second)
		promise.Success(42) // 设置成功值
		// 或者 promise.Failure(err) 设置错误
	}()

	// 获取关联的Future
	future := promise.Future()

	// 阻塞等待结果
	result, err := future.Get()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println("Result:", result) // 输出: Result: 42

	// 非阻塞检查
	if future.IsComplete() {
		fmt.Println("Future is complete")
	}
}

Executor示例

package main

import (
	"fmt"
	"time"

	"github.com/reugn/async"
)

func main() {
	// 创建一个固定大小的executor
	executor := async.NewFixedExecutor(4, 100)
	defer executor.Close()

	// 提交任务
	future := executor.Submit(func() (interface{}, error) {
		time.Sleep(500 * time.Millisecond)
		return "task completed", nil
	})

	// 获取结果
	result, err := future.Get()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println(result) // 输出: task completed
}

ConcurrentMap示例

package main

import (
	"fmt"

	"github.com/reugn/async"
)

func main() {
	// 创建并发安全的map
	cmap := async.NewConcurrentMap[string, int]()

	// 存储值
	cmap.Store("key1", 100)
	cmap.Store("key2", 200)

	// 加载值
	if value, ok := cmap.Load("key1"); ok {
		fmt.Println("key1:", value) // 输出: key1: 100
	}

	// 遍历map
	cmap.Range(func(key string, value int) bool {
		fmt.Printf("%s: %d\n", key, value)
		return true // 继续迭代
	})
}

ReentrantLock示例

package main

import (
	"fmt"
	"time"

	"github.com/reugn/async"
)

func main() {
	lock := async.NewReentrantLock()

	// 第一个锁获取
	lock.Lock()
	fmt.Println("First lock acquired")

	// 可重入锁获取
	lock.Lock()
	fmt.Println("Second lock acquired (reentrant)")

	// 释放锁
	lock.Unlock()
	lock.Unlock()

	// 尝试锁
	if lock.TryLock() {
		fmt.Println("TryLock succeeded")
		lock.Unlock()
	}

	// 带超时的锁
	if lock.TryLockWithTimeout(1 * time.Second) {
		fmt.Println("TryLockWithTimeout succeeded")
		lock.Unlock()
	}
}

Async库提供了丰富的并发工具,可以根据具体需求选择合适的组件来实现异步和同步操作。


更多关于golang异步同步替代库插件async的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang异步同步替代库插件async的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang异步同步替代库async的使用

在Go语言中,虽然原生支持goroutine实现并发,但有时我们需要更高级的异步/同步控制。async库是一个流行的Go异步编程工具,提供了Promise/Future模式,让异步代码更易管理。

安装async库

go get github.com/reugn/async

基本使用示例

1. 创建Promise

package main

import (
	"fmt"
	"time"
	"github.com/reugn/async"
)

func main() {
	promise := async.NewPromise()
	
	go func() {
		time.Sleep(1 * time.Second)
		promise.Success("Operation completed!")
	}()
	
	result, err := promise.Get()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println("Result:", result)
}

2. 链式调用

func chainExample() {
	async.NewPromise().
		Then(func(v interface{}) (interface{}, error) {
			fmt.Println("Step 1:", v)
			return "processed " + v.(string), nil
		}).
		Then(func(v interface{}) (interface{}, error) {
			fmt.Println("Step 2:", v)
			return len(v.(string)), nil
		}).
		OnSuccess(func(v interface{}) {
			fmt.Println("Final result:", v)
		}).
		OnFailure(func(err error) {
			fmt.Println("Error occurred:", err)
		})
}

3. 异步任务组合

func combineExample() {
	p1 := async.NewPromise()
	p2 := async.NewPromise()

	go func() {
		time.Sleep(500 * time.Millisecond)
		p1.Success("Hello")
	}()

	go func() {
		time.Sleep(1 * time.Second)
		p2.Success("World")
	}()

	// 等待所有Promise完成
	combined := async.WhenAll(p1, p2)
	result, _ := combined.Get()
	fmt.Println("Combined results:", result) // [Hello World]

	// 等待任意一个Promise完成
	first := async.WhenAny(p1, p2)
	firstResult, _ := first.Get()
	fmt.Println("First completed:", firstResult)
}

4. Future模式

func futureExample() {
	future := async.NewFuture(func() (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 42, nil
	})

	fmt.Println("Waiting for future...")
	result, err := future.Get()
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	fmt.Println("Future result:", result)
}

高级功能

1. 超时控制

func withTimeout() {
	promise := async.NewPromise()
	
	go func() {
		time.Sleep(3 * time.Second)
		promise.Success("Too late!")
	}()
	
	// 设置1秒超时
	result, err := promise.GetWithTimeout(1 * time.Second)
	if err == async.ErrTimeout {
		fmt.Println("Operation timed out")
		return
	}
	fmt.Println("Result:", result)
}

2. 错误处理

func errorHandling() {
	promise := async.NewPromise()
	
	go func() {
		time.Sleep(1 * time.Second)
		promise.Failure(fmt.Errorf("something went wrong"))
	}()
	
	_, err := promise.Get()
	if err != nil {
		fmt.Println("Caught error:", err)
	}
}

3. 转换回调为Promise

func callbackToPromise() {
	// 模拟一个回调函数
	callbackFunc := func(callback func(string, error)) {
		time.Sleep(1 * time.Second)
		callback("callback result", nil)
	}
	
	// 将回调转换为Promise
	promise := async.FromCallback(func() (string, error) {
		var result string
		var err error
		done := make(chan struct{})
		
		callbackFunc(func(s string, e error) {
			result = s
			err = e
			close(done)
		})
		
		<-done
		return result, err
	})
	
	result, _ := promise.Get()
	fmt.Println("Converted result:", result)
}

实际应用场景

  1. 并发API调用:同时调用多个API并合并结果
  2. 超时控制:为长时间运行的操作设置超时
  3. 任务流水线:创建一系列依赖的异步任务
  4. 错误传播:在异步链中优雅地处理错误

注意事项

  1. Promise一旦完成(成功或失败),状态就不能再改变
  2. Get()方法是阻塞的,考虑使用GetWithTimeout()
  3. 大量使用Promise可能会增加代码复杂性,评估是否真的需要

async库为Go提供了更灵活的异步编程模式,特别适合需要复杂异步控制的场景。但对于简单并发,标准库的goroutine和channel可能更合适。

回到顶部