Golang中如何正确退出通道的迭代

Golang中如何正确退出通道的迭代 你好,

我想知道在发生错误或所有项目成功处理完毕后,退出通道迭代的最佳方式是什么。下面的示例完全不会退出,因此需求如下:

  • 所有项目都在一个 goroutine 中处理。
  • 一旦返回错误就立即退出。
  • 当所有项目都处理完毕时退出。
  • 如果发生错误,则停止所有其他正在运行的 goroutine。

谢谢

https://play.golang.org/p/jVTOQbexVER

package main

import (
	"context"
	"log"
)

func main() {
	job := NewJob(5)

	for i := 1; i <= 5; i++ {
		go job.Add(i)
	}

	if err := job.Process(context.Background()); err != nil {
		log.Fatalf("failed: %s", err.Error())
	}
	log.Println("all done")
}

type Job struct {
	items chan int
}

func NewJob(size int) Job {
	return Job{
		items: make(chan int, size),
	}
}

func (j Job) Add(job int) {
	j.items <- job
}

func (j Job) Process(ctx context.Context) error {
	for item := range j.items {
		log.Println(item)

		// - All items are handled in a goroutine.
		// - Exit as soon as an error is returned.
		// - Exit when all the items are processed
		// - If an error occurs, all other running goroutines.
	}

	return nil
}

更多关于Golang中如何正确退出通道的迭代的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

这个方法对我有效。已修正,我之前搞错了。

package main

import (
	"context"
	"log"
	"sync"
)

func main() {

	job := NewJob(5)

	for i := 1; i <= 5; i++ {
		job.wg.Add(1)
		go job.Add(i)
	}
	job.wg.Wait()
	if err := job.Process(context.Background()); err != nil {
		log.Fatalf("failed: %s", err.Error())
	}
	log.Println("all done")
}

type Job struct {
	items chan int
	wg    *sync.WaitGroup
}

func NewJob(size int) Job {

	return Job{
		items: make(chan int, size),
		wg:    &sync.WaitGroup{},
	}

}

func (j Job) Add(job int) {
	j.items <- job
	j.wg.Done()
}

func (j Job) Process(ctx context.Context) error {

	for len(j.items) > 0 {
		item := <-j.items
		log.Println(item)

		// - All items are handled in a goroutine.
		// - Exit as soon as an error is returned.
		// - Exit when all the items are processed
		// - If an error occurs, all other running goroutines.
	}

	return nil
}

更多关于Golang中如何正确退出通道的迭代的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中正确退出通道迭代需要结合context和sync.WaitGroup。以下是修改后的代码示例:

package main

import (
	"context"
	"errors"
	"log"
	"sync"
	"time"
)

func main() {
	job := NewJob(5)

	for i := 1; i <= 5; i++ {
		go job.Add(i)
	}

	if err := job.Process(context.Background()); err != nil {
		log.Fatalf("failed: %s", err.Error())
	}
	log.Println("all done")
}

type Job struct {
	items chan int
	wg    sync.WaitGroup
}

func NewJob(size int) *Job {
	return &Job{
		items: make(chan int, size),
	}
}

func (j *Job) Add(job int) {
	j.items <- job
}

func (j *Job) Process(ctx context.Context) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	errCh := make(chan error, 1)
	done := make(chan struct{})

	// 启动工作goroutine
	j.wg.Add(1)
	go func() {
		defer j.wg.Done()
		defer close(done)

		for item := range j.items {
			select {
			case <-ctx.Done():
				return
			default:
				// 模拟处理逻辑
				log.Println("Processing item:", item)
				time.Sleep(100 * time.Millisecond)

				// 模拟错误发生
				if item == 3 {
					errCh <- errors.New("error processing item 3")
					return
				}
			}
		}
	}()

	// 等待所有项目添加完成
	time.Sleep(200 * time.Millisecond)
	close(j.items)

	// 等待处理完成或发生错误
	select {
	case err := <-errCh:
		cancel() // 取消context以停止其他goroutine
		j.wg.Wait()
		return err
	case <-done:
		j.wg.Wait()
		return nil
	case <-ctx.Done():
		j.wg.Wait()
		return ctx.Err()
	}
}

更简洁的实现方式使用errgroup:

package main

import (
	"context"
	"errors"
	"log"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	job := NewJob(5)

	for i := 1; i <= 5; i++ {
		go job.Add(i)
	}

	if err := job.Process(context.Background()); err != nil {
		log.Fatalf("failed: %s", err.Error())
	}
	log.Println("all done")
}

type Job struct {
	items chan int
}

func NewJob(size int) *Job {
	return &Job{
		items: make(chan int, size),
	}
}

func (j *Job) Add(job int) {
	j.items <- job
}

func (j *Job) Process(ctx context.Context) error {
	g, ctx := errgroup.WithContext(ctx)

	// 启动工作goroutine
	g.Go(func() error {
		for item := range j.items {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
				log.Println("Processing item:", item)
				time.Sleep(100 * time.Millisecond)

				if item == 3 {
					return errors.New("error processing item 3")
				}
			}
		}
		return nil
	})

	// 等待所有项目添加完成
	time.Sleep(200 * time.Millisecond)
	close(j.items)

	return g.Wait()
}

使用select和done通道的经典模式:

func (j *Job) Process(ctx context.Context) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	done := make(chan error, 1)
	
	go func() {
		var err error
		defer func() {
			done <- err
			close(done)
		}()

		for item := range j.items {
			select {
			case <-ctx.Done():
				err = ctx.Err()
				return
			default:
				log.Println("Processing item:", item)
				
				if item == 3 {
					err = errors.New("error processing item 3")
					cancel()
					return
				}
			}
		}
	}()

	time.Sleep(200 * time.Millisecond)
	close(j.items)

	return <-done
}

这些实现都满足你的需求:

  1. 所有项目在goroutine中处理
  2. 发生错误时立即退出
  3. 所有项目处理完成后退出
  4. 通过context取消机制停止其他goroutine
回到顶部