golang高效安全并发执行函数工具插件库conexec的使用

Golang高效安全并发执行函数工具插件库conexec的使用

简介

conexec是一个并发工具包,用于以高效安全的方式并发执行函数。它支持设置整体超时时间以避免阻塞。

使用方法

通常可以将其设置为单例以节省内存。以下是使用示例:

普通执行器(Actuator)

Actuator是用于并发执行函数的基础结构。

opt := &Options{TimeOut: DurationPtr(time.Millisecond * 50)}
c := NewActuator(opt)

err := c.Exec(
    func() error {
        fmt.Println(1)
        time.Sleep(time.Second * 2)
        return nil
    },
    func() error {
        fmt.Println(2)
        return nil
    },
    func() error {
        time.Sleep(time.Second * 1)
        fmt.Println(3)
        return nil
    },
)

if err != nil {
    // 处理错误
}

池化执行器(Pooled Actuator)

池化执行器使用goroutine池来执行函数,在某些情况下这是一种更高效的方式。

opt := &Options{TimeOut: DurationPtr(time.Millisecond * 50)}
c := NewPooledActuator(5, opt)

err := c.Exec(...)

if err != nil {
    // 处理错误
}

使用自定义goroutine池:

c := NewPooledActuator(5).WithPool(pool)

简单使用goroutine执行

done := Exec(...)

if !done {
    // 处理未完成情况
}

完整示例

package main

import (
    "fmt"
    "time"
    "github.com/ITcathyh/conexec"
)

func main() {
    // 普通执行器示例
    normalExample()
    
    // 池化执行器示例
    pooledExample()
}

func normalExample() {
    opt := &conexec.Options{TimeOut: conexec.DurationPtr(time.Millisecond * 50)}
    c := conexec.NewActuator(opt)
    
    err := c.Exec(
        func() error {
            fmt.Println("Task 1 started")
            time.Sleep(time.Millisecond * 30)
            fmt.Println("Task 1 completed")
            return nil
        },
        func() error {
            fmt.Println("Task 2 started")
            time.Sleep(time.Millisecond * 20)
            fmt.Println("Task 2 completed")
            return nil
        },
    )
    
    if err != nil {
        fmt.Printf("Error occurred: %v\n", err)
    }
}

func pooledExample() {
    opt := &conexec.Options{TimeOut: conexec.DurationPtr(time.Millisecond * 100)}
    c := conexec.NewPooledActuator(3, opt)
    
    err := c.Exec(
        func() error {
            fmt.Println("Pooled Task 1 started")
            time.Sleep(time.Millisecond * 50)
            fmt.Println("Pooled Task 1 completed")
            return nil
        },
        func() error {
            fmt.Println("Pooled Task 2 started")
            time.Sleep(time.Millisecond * 30)
            fmt.Println("Pooled Task 2 completed")
            return nil
        },
        func() error {
            fmt.Println("Pooled Task 3 started")
            time.Sleep(time.Millisecond * 70)
            fmt.Println("Pooled Task 3 completed")
            return nil
        },
    )
    
    if err != nil {
        fmt.Printf("Pooled Error occurred: %v\n", err)
    }
}

这个示例展示了如何使用conexec库来并发执行多个任务,包括普通执行器和池化执行器的用法。


更多关于golang高效安全并发执行函数工具插件库conexec的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效安全并发执行函数工具插件库conexec的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


conexec - Golang高效安全并发执行函数工具库

conexec 是一个用于 Golang 的高效、安全的并发执行函数工具库,它提供了简单易用的接口来并发执行多个函数,并收集结果或错误。

主要特性

  1. 并发安全:内置同步机制,避免竞态条件
  2. 错误处理:能够收集并返回所有执行过程中产生的错误
  3. 结果收集:支持收集所有函数的执行结果
  4. 超时控制:可以设置全局超时时间
  5. 并发控制:限制最大并发数

安装

go get github.com/sourcegraph/conc

基本使用示例

1. 简单并发执行

package main

import (
	"fmt"
	"time"
	"github.com/sourcegraph/conc"
)

func main() {
	// 创建并发执行器
	executor := conc.New()

	// 添加要并发执行的函数
	executor.Go(func() {
		fmt.Println("任务1开始")
		time.Sleep(1 * time.Second)
		fmt.Println("任务1结束")
	})

	executor.Go(func() {
		fmt.Println("任务2开始")
		time.Sleep(2 * time.Second)
		fmt.Println("任务2结束")
	})

	// 等待所有任务完成
	executor.Wait()
	fmt.Println("所有任务完成")
}

2. 带返回值的并发执行

package main

import (
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	executor := conc.New()

	// 定义带返回值的任务
	task1 := func() int {
		return 42
	}

	task2 := func() string {
		return "hello"
	}

	// 执行并获取结果
	var result1 int
	var result2 string

	executor.Go(func() {
		result1 = task1()
	})

	executor.Go(func() {
		result2 = task2()
	})

	executor.Wait()

	fmt.Printf("结果1: %d, 结果2: %s\n", result1, result2)
}

3. 错误处理

package main

import (
	"errors"
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	executor := conc.New()

	// 定义可能出错的任务
	task1 := func() error {
		return errors.New("任务1出错")
	}

	task2 := func() error {
		return nil
	}

	var err1, err2 error

	executor.Go(func() {
		err1 = task1()
	})

	executor.Go(func() {
		err2 = task2()
	})

	executor.Wait()

	if err1 != nil || err2 != nil {
		fmt.Printf("错误1: %v, 错误2: %v\n", err1, err2)
	} else {
		fmt.Println("所有任务成功完成")
	}
}

4. 限制并发数

package main

import (
	"fmt"
	"time"
	"github.com/sourcegraph/conc"
)

func main() {
	// 创建最大并发数为2的执行器
	executor := conc.New().WithMaxGoroutines(2)

	for i := 0; i < 5; i++ {
		id := i
		executor.Go(func() {
			fmt.Printf("任务%d开始\n", id)
			time.Sleep(1 * time.Second)
			fmt.Printf("任务%d结束\n", id)
		})
	}

	executor.Wait()
	fmt.Println("所有任务完成")
}

5. 带超时的并发执行

package main

import (
	"context"
	"fmt"
	"time"
	"github.com/sourcegraph/conc"
)

func main() {
	// 创建带超时的执行器(全局超时2秒)
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	executor := conc.New().WithContext(ctx)

	executor.Go(func() {
		time.Sleep(1 * time.Second)
		fmt.Println("任务1完成")
	})

	executor.Go(func() {
		time.Sleep(3 * time.Second)
		fmt.Println("任务2完成") // 这个任务会因为超时而不会完成
	})

	err := executor.Wait()
	if err != nil {
		fmt.Println("执行出错:", err)
	}
}

高级用法

批量执行任务并收集结果

package main

import (
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	tasks := []func() (int, error){
		func() (int, error) { return 1, nil },
		func() (int, error) { return 2, nil },
		func() (int, error) { return 3, nil },
	}

	results := make([]int, len(tasks))
	errs := make([]error, len(tasks))

	executor := conc.New()

	for i, task := range tasks {
		i, task := i, task // 创建局部变量副本
		executor.Go(func() {
			res, err := task()
			results[i] = res
			errs[i] = err
		})
	}

	executor.Wait()

	fmt.Println("结果:", results)
	fmt.Println("错误:", errs)
}

使用 WaitGroup 替代方案

package main

import (
	"fmt"
	"sync"
	"github.com/sourcegraph/conc"
)

func main() {
	var wg conc.WaitGroup
	var mu sync.Mutex
	sum := 0

	for i := 0; i < 10; i++ {
		wg.Go(func() {
			mu.Lock()
			defer mu.Unlock()
			sum += 1
		})
	}

	wg.Wait()
	fmt.Println("总和:", sum) // 输出: 总和: 10
}

性能考虑

  1. 对于大量小任务,考虑使用 conc.Pool 来重用 goroutine
  2. 避免在任务函数中使用阻塞操作,除非确实需要
  3. 合理设置最大并发数,通常设置为 CPU 核心数的 2-4 倍
package main

import (
	"fmt"
	"runtime"
	"github.com/sourcegraph/conc"
)

func main() {
	// 根据CPU核心数设置并发数
	maxConcurrent := runtime.NumCPU() * 2
	executor := conc.New().WithMaxGoroutines(maxConcurrent)

	// 添加任务...
}

conexec 库提供了比标准库 sync.WaitGroup 更高级的抽象,使得并发编程更加简单和安全。它特别适合需要并发执行多个独立任务并收集结果的场景。

回到顶部