Golang中如何正确退出通道的迭代
Golang中如何正确退出通道的迭代 你好,
我想知道在发生错误或所有项目成功处理完毕后,退出通道迭代的最佳方式是什么。下面的示例完全不会退出,因此需求如下:
- 所有项目都在一个 goroutine 中处理。
- 一旦返回错误就立即退出。
- 当所有项目都处理完毕时退出。
- 如果发生错误,则停止所有其他正在运行的 goroutine。
谢谢
https://play.golang.org/p/jVTOQbexVER
package main
import (
"context"
"log"
)
func main() {
job := NewJob(5)
for i := 1; i <= 5; i++ {
go job.Add(i)
}
if err := job.Process(context.Background()); err != nil {
log.Fatalf("failed: %s", err.Error())
}
log.Println("all done")
}
type Job struct {
items chan int
}
func NewJob(size int) Job {
return Job{
items: make(chan int, size),
}
}
func (j Job) Add(job int) {
j.items <- job
}
func (j Job) Process(ctx context.Context) error {
for item := range j.items {
log.Println(item)
// - All items are handled in a goroutine.
// - Exit as soon as an error is returned.
// - Exit when all the items are processed
// - If an error occurs, all other running goroutines.
}
return nil
}
更多关于Golang中如何正确退出通道的迭代的实战教程也可以访问 https://www.itying.com/category-94-b0.html
2 回复
这个方法对我有效。已修正,我之前搞错了。
package main
import (
"context"
"log"
"sync"
)
func main() {
job := NewJob(5)
for i := 1; i <= 5; i++ {
job.wg.Add(1)
go job.Add(i)
}
job.wg.Wait()
if err := job.Process(context.Background()); err != nil {
log.Fatalf("failed: %s", err.Error())
}
log.Println("all done")
}
type Job struct {
items chan int
wg *sync.WaitGroup
}
func NewJob(size int) Job {
return Job{
items: make(chan int, size),
wg: &sync.WaitGroup{},
}
}
func (j Job) Add(job int) {
j.items <- job
j.wg.Done()
}
func (j Job) Process(ctx context.Context) error {
for len(j.items) > 0 {
item := <-j.items
log.Println(item)
// - All items are handled in a goroutine.
// - Exit as soon as an error is returned.
// - Exit when all the items are processed
// - If an error occurs, all other running goroutines.
}
return nil
}
更多关于Golang中如何正确退出通道的迭代的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Golang中正确退出通道迭代需要结合context和sync.WaitGroup。以下是修改后的代码示例:
package main
import (
"context"
"errors"
"log"
"sync"
"time"
)
func main() {
job := NewJob(5)
for i := 1; i <= 5; i++ {
go job.Add(i)
}
if err := job.Process(context.Background()); err != nil {
log.Fatalf("failed: %s", err.Error())
}
log.Println("all done")
}
type Job struct {
items chan int
wg sync.WaitGroup
}
func NewJob(size int) *Job {
return &Job{
items: make(chan int, size),
}
}
func (j *Job) Add(job int) {
j.items <- job
}
func (j *Job) Process(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errCh := make(chan error, 1)
done := make(chan struct{})
// 启动工作goroutine
j.wg.Add(1)
go func() {
defer j.wg.Done()
defer close(done)
for item := range j.items {
select {
case <-ctx.Done():
return
default:
// 模拟处理逻辑
log.Println("Processing item:", item)
time.Sleep(100 * time.Millisecond)
// 模拟错误发生
if item == 3 {
errCh <- errors.New("error processing item 3")
return
}
}
}
}()
// 等待所有项目添加完成
time.Sleep(200 * time.Millisecond)
close(j.items)
// 等待处理完成或发生错误
select {
case err := <-errCh:
cancel() // 取消context以停止其他goroutine
j.wg.Wait()
return err
case <-done:
j.wg.Wait()
return nil
case <-ctx.Done():
j.wg.Wait()
return ctx.Err()
}
}
更简洁的实现方式使用errgroup:
package main
import (
"context"
"errors"
"log"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
job := NewJob(5)
for i := 1; i <= 5; i++ {
go job.Add(i)
}
if err := job.Process(context.Background()); err != nil {
log.Fatalf("failed: %s", err.Error())
}
log.Println("all done")
}
type Job struct {
items chan int
}
func NewJob(size int) *Job {
return &Job{
items: make(chan int, size),
}
}
func (j *Job) Add(job int) {
j.items <- job
}
func (j *Job) Process(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
// 启动工作goroutine
g.Go(func() error {
for item := range j.items {
select {
case <-ctx.Done():
return ctx.Err()
default:
log.Println("Processing item:", item)
time.Sleep(100 * time.Millisecond)
if item == 3 {
return errors.New("error processing item 3")
}
}
}
return nil
})
// 等待所有项目添加完成
time.Sleep(200 * time.Millisecond)
close(j.items)
return g.Wait()
}
使用select和done通道的经典模式:
func (j *Job) Process(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
done := make(chan error, 1)
go func() {
var err error
defer func() {
done <- err
close(done)
}()
for item := range j.items {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
log.Println("Processing item:", item)
if item == 3 {
err = errors.New("error processing item 3")
cancel()
return
}
}
}
}()
time.Sleep(200 * time.Millisecond)
close(j.items)
return <-done
}
这些实现都满足你的需求:
- 所有项目在goroutine中处理
- 发生错误时立即退出
- 所有项目处理完成后退出
- 通过context取消机制停止其他goroutine

