golang结构化并发编程工具插件库conc的使用

Golang结构化并发编程工具插件库conc的使用

conc

简介

conc是Go语言的结构化并发工具包,使常见任务更简单、更安全。

安装:

go get github.com/sourcegraph/conc

主要功能概览

  • 使用conc.WaitGroup作为更安全的sync.WaitGroup替代
  • 使用pool.Pool实现并发限制的任务运行器
  • 使用pool.ResultPool收集任务结果
  • 使用pool.(Result)?ErrorPool处理可能失败的任务
  • 使用pool.(Result)?ContextPool实现失败时取消任务
  • 使用stream.Stream并行处理有序任务流
  • 使用iter.Map并发映射切片
  • 使用iter.ForEach并发迭代切片
  • 使用panics.Catcher捕获goroutine中的panic

完整示例

基本WaitGroup使用

func main() {
    var wg conc.WaitGroup
    defer wg.Wait()  // 确保等待所有goroutine完成

    startTheThing(&wg)
}

func startTheThing(wg *conc.WaitGroup) {
    wg.Go(func() { 
        // 并发任务代码
    })
}

带并发限制的任务池

func process(stream chan int) {
    p := pool.New().WithMaxGoroutines(10)  // 限制10个并发
    for elem := range stream {
        elem := elem  // 重要:创建局部变量副本
        p.Go(func() {
            handle(elem)
        })
    }
    p.Wait()
}

并发映射切片

func concMap(input []int, f func(*int) int) []int {
    return iter.Map(input, f)
}

处理有序流

func mapStream(in chan int, out chan int, f func(int) int) {
    s := stream.New().WithMaxGoroutines(10)
    for elem := range in {
        elem := elem
        s.Go(func() stream.Callback {
            res := f(elem)
            return func() { out <- res }
        })
    }
    s.Wait()
}

主要目标

  1. 防止goroutine泄漏:所有并发操作都有明确的作用域和所有者
  2. 优雅处理panic:自动捕获并传播panic信息
  3. 简化并发代码:减少样板代码,提高可读性

状态说明

当前版本为1.0之前的预览版,在1.0正式发布前可能会有小的API调整。


更多关于golang结构化并发编程工具插件库conc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang结构化并发编程工具插件库conc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang结构化并发编程工具库conc的使用指南

conc是一个Go语言的并发工具库,旨在提供更安全、更结构化的并发编程方式。它通过提供高级抽象来简化并发模式,减少goroutine泄漏和竞态条件的风险。

主要特性

  1. 结构化并发:确保所有goroutine都能正确完成或取消
  2. 任务组:方便地管理一组相关goroutine
  3. 迭代器:简化并行迭代操作
  4. 流处理:提供类似管道的并发数据处理

安装

go get github.com/sourcegraph/conc

核心组件及使用示例

1. WaitGroup替代品 - conc.WaitGroup

比标准库的sync.WaitGroup更安全,能捕获panic并提供更好的错误处理。

package main

import (
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	var wg conc.WaitGroup
	defer wg.Wait() // 确保所有任务完成

	wg.Go(func() {
		fmt.Println("Task 1 running")
	})

	wg.Go(func() {
		fmt.Println("Task 2 running")
	})

	// 会自动等待所有任务完成
}

2. 并行迭代 - conc.Iterator

package main

import (
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	items := []int{1, 2, 3, 4, 5}

	// 并行处理每个元素
	conc.Iterator(len(items), func(i int) {
		item := items[i]
		fmt.Printf("Processing item %d\n", item)
		// 处理逻辑...
	})
}

3. 结构化并发 - conc.Context

package main

import (
	"context"
	"fmt"
	"time"
	
	"github.com/sourcegraph/conc"
)

func main() {
	ctx := context.Background()
	concCtx := conc.NewContext(ctx)

	concCtx.Go(func(ctx context.Context) error {
		for {
			select {
			case <-ctx.Done():
				return nil
			default:
				fmt.Println("Worker running")
				time.Sleep(500 * time.Millisecond)
			}
		}
	})

	// 5秒后取消所有goroutine
	time.Sleep(5 * time.Second)
	concCtx.Cancel()
}

4. 流处理 - conc.Stream

package main

import (
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	// 创建数据源
	source := make(chan int)
	go func() {
		defer close(source)
		for i := 0; i < 10; i++ {
			source <- i
		}
	}()

	// 创建流处理器
	stream := conc.NewStream(source)

	// 添加处理阶段 - 平方运算
	squared := stream.Map(func(x int) int {
		return x * x
	})

	// 添加处理阶段 - 过滤偶数
	filtered := squared.Filter(func(x int) bool {
		return x%2 == 0
	})

	// 收集结果
	results := make([]int, 0)
	for x := range filtered.Out() {
		results = append(results, x)
		fmt.Println("Processed:", x)
	}

	fmt.Println("Final results:", results)
}

5. 错误处理 - conc.WithRecover

package main

import (
	"fmt"
	"github.com/sourcegraph/conc"
)

func main() {
	var wg conc.WaitGroup
	wg.Go(conc.WithRecover(func() {
		// 这个任务可能会panic
		panic("something went wrong")
	}))

	wg.Go(conc.WithRecover(func() {
		fmt.Println("This task will run successfully")
	}))

	wg.Wait() // 不会因为panic而崩溃
}

最佳实践

  1. 始终使用defer wg.Wait():确保所有goroutine完成
  2. 限制并发度:使用conc.NewPool(workers int)创建有限数量的worker
  3. 处理错误:使用conc.WithRecover捕获panic
  4. 使用context:通过context实现优雅取消

性能考虑

conc在提供安全性的同时会有轻微的性能开销,适合以下场景:

  • 需要结构化并发管理的业务逻辑
  • 复杂的并发流程控制
  • 需要更好的错误处理和可观测性的场景

对于极高性能要求的底层并发,可能更适合直接使用原生goroutine和channel。

conc库通过提供更高级的抽象,使Go并发编程更安全、更易维护,特别适合中大型项目中的并发控制需求。

回到顶部