golang结构化并发编程插件库nursery的使用

Golang结构化并发编程插件库nursery的使用

简介

nursery是一个Go语言的结构化并发编程库,它提供了更高级的并发抽象,帮助开发者避免常见的并发问题如死锁、goroutine泄漏、竞态条件和错误处理不当等。

安装

go get -u github.com/arunsworld/nursery

基本使用示例

package main

import (
	"context"
	"log"
	"time"
	"github.com/arunsworld/nursery"
)

func main() {
	// 使用RunConcurrently并行执行多个任务
	err := nursery.RunConcurrently(
		// 任务1
		func(ctx context.Context, errCh chan error) {
			time.Sleep(time.Millisecond * 10)
			log.Println("Job 1 done...")
		},
		// 任务2
		func(ctx context.Context, errCh chan error) {
			time.Sleep(time.Millisecond * 5)
			log.Println("Job 2 done...")
		},
	)
	
	if err != nil {
		log.Fatal(err)
	}
	log.Println("All jobs done...")
}

主要功能函数

  1. RunConcurrently: 并行执行多个任务,确保所有任务完成后才返回
  2. RunConcurrentlyWithContext: 带上下文的并行执行
  3. RunMultipleCopiesConcurrently: 并行运行多个相同任务的副本
  4. RunMultipleCopiesConcurrentlyWithContext: 带上下文的并行运行多个副本
  5. RunUntilFirstCompletion: 并行执行多个任务,第一个任务完成后即返回
  6. RunUntilFirstCompletionWithContext: 带上下文的RunUntilFirstCompletion
  7. RunConcurrentlyWithTimeout: 带超时的并行执行
  8. RunUntilFirstCompletionWithTimeout: 带超时的RunUntilFirstCompletion

带上下文的示例

package main

import (
	"context"
	"log"
	"time"
	"github.com/arunsworld/nursery"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 带上下文的并行执行
	err := nursery.RunConcurrentlyWithContext(ctx,
		func(ctx context.Context, errCh chan error) {
			for {
				if nursery.IsContextDone(ctx) { // 检查上下文是否已完成
					log.Println("Job 1 cancelled")
					return
				}
				time.Sleep(100 * time.Millisecond)
				log.Println("Job 1 working...")
			}
		},
		func(ctx context.Context, errCh chan error) {
			time.Sleep(500 * time.Millisecond)
			log.Println("Job 2 done...")
			cancel() // 取消所有任务
		},
	)
	
	if err != nil {
		log.Fatal(err)
	}
}

运行多个副本示例

package main

import (
	"context"
	"log"
	"time"
	"github.com/arunsworld/nursery"
)

func main() {
	// 创建5个worker并行处理任务
	err := nursery.RunMultipleCopiesConcurrently(5,
		func(ctx context.Context, errCh chan error) {
			jobID := ctx.Value(nursery.JobID).(int) // 获取任务ID
			for i := 0; i < 3; i++ {
				if nursery.IsContextDone(ctx) {
					return
				}
				log.Printf("Worker %d processing item %d", jobID, i)
				time.Sleep(100 * time.Millisecond)
			}
		},
	)
	
	if err != nil {
		log.Fatal(err)
	}
}

带超时的示例

package main

import (
	"log"
	"time"
	"github.com/arunsworld/nursery"
)

func main() {
	// 带1秒超时的并行执行
	err := nursery.RunConcurrentlyWithTimeout(time.Second,
		func(ctx context.Context, errCh chan error) {
			time.Sleep(2 * time.Second) // 这个任务会超时
			log.Println("This won't be printed")
		},
		func(ctx context.Context, errCh chan error) {
			time.Sleep(500 * time.Millisecond)
			log.Println("This will be printed before timeout")
		},
	)
	
	if err != nil {
		log.Println("Error:", err) // 会输出超时错误
	}
}

注意事项

  1. 每个ConcurrentJob函数需要监听上下文的Done()通道,以便在取消时能正确清理资源
  2. 错误需要通过error通道报告
  3. 库不能防止共享资源导致的竞态条件或数据损坏
  4. 可以使用IsContextDone()作为循环中的保护条件

nursery库通过结构化并发模式简化了Go并发编程,使代码更易于理解和维护。


更多关于golang结构化并发编程插件库nursery的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang结构化并发编程插件库nursery的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang结构化并发编程插件库Nursery的使用

Nursery是一个用于Go语言的结构化并发编程库,它提供了一种更优雅的方式来管理和协调多个goroutine的执行。下面我将详细介绍Nursery的使用方法和示例代码。

Nursery的核心概念

Nursery的核心思想是"结构化并发",即并发操作应该有明确的开始和结束点,并且生命周期应该与其父级上下文相关联。这有助于避免goroutine泄漏和更清晰地管理并发流程。

安装Nursery

go get github.com/arunsworld/nursery

基本使用方法

1. 并行执行多个任务

package main

import (
	"fmt"
	"time"

	"github.com/arunsworld/nursery"
)

func main() {
	err := nursery.RunConcurrently(
		// 第一个goroutine
		func(context.Context, chan error) {
			time.Sleep(1 * time.Second)
			fmt.Println("Task 1 completed")
		},
		// 第二个goroutine
		func(context.Context, chan error) {
			time.Sleep(2 * time.Second)
			fmt.Println("Task 2 completed")
		},
	)
	
	if err != nil {
		fmt.Println("Error:", err)
	}
	fmt.Println("All tasks completed")
}

2. 带错误处理的并发执行

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/arunsworld/nursery"
)

func main() {
	err := nursery.RunConcurrently(
		func(ctx context.Context, errCh chan error) {
			time.Sleep(1 * time.Second)
			fmt.Println("Task 1 completed")
		},
		func(ctx context.Context, errCh chan error) {
			time.Sleep(500 * time.Millisecond)
			errCh <- errors.New("something went wrong in task 2")
		},
	)
	
	if err != nil {
		fmt.Println("Error occurred:", err)
	}
}

3. 使用Context取消所有goroutine

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/arunsworld/nursery"
)

func main() {
	err := nursery.RunConcurrently(
		func(ctx context.Context, errCh chan error) {
			select {
			case <-time.After(5 * time.Second):
				fmt.Println("Task 1 completed (shouldn't happen)")
			case <-ctx.Done():
				fmt.Println("Task 1 cancelled")
			}
		},
		func(ctx context.Context, errCh chan error) {
			time.Sleep(1 * time.Second)
			errCh <- fmt.Errorf("triggering cancellation")
		},
	)
	
	if err != nil {
		fmt.Println("Error:", err)
	}
}

4. 嵌套使用Nursery

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/arunsworld/nursery"
)

func main() {
	err := nursery.RunConcurrently(
		func(ctx context.Context, errCh chan error) {
			// 嵌套的nursery
			nestedErr := nursery.RunConcurrently(
				func(ctx context.Context, errCh chan error) {
					time.Sleep(1 * time.Second)
					fmt.Println("Nested task 1 completed")
				},
				func(ctx context.Context, errCh chan error) {
					time.Sleep(2 * time.Second)
					fmt.Println("Nested task 2 completed")
				},
			)
			if nestedErr != nil {
				errCh <- nestedErr
			}
		},
		func(ctx context.Context, errCh chan error) {
			time.Sleep(3 * time.Second)
			fmt.Println("Main task completed")
		},
	)
	
	if err != nil {
		fmt.Println("Error:", err)
	}
}

Nursery的优势

  1. 结构化并发:确保所有goroutine在父函数返回前完成
  2. 错误传播:自动收集和传播子goroutine中的错误
  3. 取消支持:当一个goroutine失败时自动取消其他goroutine
  4. 简洁API:比直接使用sync.WaitGroup和context更简洁

实际应用示例

并行获取多个API数据

package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io/ioutil"
	"net/http"
	"time"

	"github.com/arunsworld/nursery"
)

type User struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

type Post struct {
	ID     int    `json:"id"`
	Title  string `json:"title"`
	UserID int    `json:"userId"`
}

func main() {
	var user User
	var posts []Post

	err := nursery.RunConcurrently(
		func(ctx context.Context, errCh chan error) {
			// 获取用户数据
			resp, err := http.Get("https://jsonplaceholder.typicode.com/users/1")
			if err != nil {
				errCh <- err
				return
			}
			defer resp.Body.Close()
			
			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				errCh <- err
				return
			}
			
			if err := json.Unmarshal(body, &user); err != nil {
				errCh <- err
				return
			}
		},
		func(ctx context.Context, errCh chan error) {
			// 获取帖子数据
			resp, err := http.Get("https://jsonplaceholder.typicode.com/posts?userId=1")
			if err != nil {
				errCh <- err
				return
			}
			defer resp.Body.Close()
			
			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				errCh <- err
				return
			}
			
			if err := json.Unmarshal(body, &posts); err != nil {
				errCh <- err
				return
			}
		},
	)
	
	if err != nil {
		fmt.Println("Error fetching data:", err)
		return
	}
	
	fmt.Printf("User: %+v\n", user)
	fmt.Printf("Number of posts: %d\n", len(posts))
}

总结

Nursery为Go语言提供了一种更结构化的并发编程方式,它简化了goroutine的管理和错误处理,使得并发代码更易于编写和维护。通过使用Nursery,你可以避免常见的goroutine泄漏问题,并确保资源得到正确清理。

回到顶部