Golang中waitgroup和goroutine混合使用会导致程序异常或挂起吗?

Golang中waitgroup和goroutine混合使用会导致程序异常或挂起吗?

你做了什么?

type waitGroup struct {
	sync.WaitGroup
}

func (w *waitGroup) Wrap(fn func()) {
	w.Add(1)
	go func() {
		fn()
		w.Done()
	}()
}

func (w *waitGroup) WrapRecover(fn func()) {
	w.Add(1)
	go func() {
		defer func() {
			if err := recover(); err != nil {
				log.Println("exec recover: ", err)
			}
		}()

		fn()
		w.Done()
	}()
}

func getData(i int64) string {
	return strconv.FormatInt(i, 10)
}

func demo(ctx context.Context) error {
	go func() {
		ticker := time.NewTicker(500 * time.Millisecond)
		defer ticker.Stop()
		var i int64
		var wg = &waitGroup{}
		for {
			select {
			case <-ticker.C:
				data := getData(i)
				i++
				wg.Wrap(func() {
					log.Println("data: ", data)
				})
			case <-ctx.Done():
				wg.Wait()
				return
			}
		}
	}()

	return nil
}

main.go

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"net/http/pprof"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"
)

var (
	port = 1339
	wait = 3 * time.Second
)

func init() {
	httpMux := http.NewServeMux()
	httpMux.HandleFunc("/debug/pprof/", pprof.Index)
	httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
	httpMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
	httpMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
	httpMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
	httpMux.HandleFunc("/check", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte(`{"alive": true}`))
	})

	go func() {
		defer func() {
			if err := recover(); err != nil {
				log.Println("PProf exec recover: ", err)
			}
		}()

		log.Println("server PProf run on: ", port)

		if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", port), httpMux); err != nil {
			log.Println("PProf listen error: ", err)
		}

	}()

}

func main() {
	ctx1, cancelFn := context.WithCancel(context.Background())
	demo(ctx1)

	// graceful exit
	ch := make(chan os.Signal, 1)
	// We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C)
	// recv signal to exit main goroutine
	// window signal
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, syscall.SIGHUP)

	// linux signal if you use linux on production,please use this code.
	// signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR2, os.Interrupt, syscall.SIGHUP)

	// Block until we receive our signal.
	sig := <-ch

	cancelFn()

	log.Println("exit signal: ", sig.String())
	// Create a deadline to wait for.
	ctx, cancel := context.WithTimeout(context.Background(), wait)
	defer cancel()

	<-ctx.Done()

	log.Println("services shutting down")
}

type waitGroup struct {
	sync.WaitGroup
}

func (w *waitGroup) Wrap(fn func()) {
	w.Add(1)
	go func() {
		fn()
		w.Done()
	}()
}

func (w *waitGroup) WrapRecover(fn func()) {
	w.Add(1)
	go func() {
		defer func() {
			if err := recover(); err != nil {
				log.Println("exec recover: ", err)
			}
		}()

		fn()
		w.Done()
	}()
}

func getData(i int64) string {
	return strconv.FormatInt(i, 10)
}

func demo(ctx context.Context) error {
	go func() {
		ticker := time.NewTicker(500 * time.Millisecond)
		defer ticker.Stop()

		var i int64
		var wg = &waitGroup{}
		for {
			select {
			case <-ticker.C:
				data := getData(i)
				i++
				wg.Wrap(func() {
					log.Println("data: ", data)
				})
			case <-ctx.Done():
				wg.Wait() // when the program exits, call the wait method here 
				return
			}
		}
	}()

	return nil
}

当我在Web服务中启动这个演示函数时,该演示的ctx1尚未被取消。这里我们使用wrap方法作为回调函数。内部的waitgroup只进行Add(1)和Done()操作,请注意我在这里的使用方式。当我进行pprof分析时,发现sync block和blocking syscall指标比较严重。我这样使用waitgroup是否存在问题? 在并发场景下,混合使用waitgroup和goroutine是否会导致程序异常或挂起?


更多关于Golang中waitgroup和goroutine混合使用会导致程序异常或挂起吗?的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

简短的答案是“如果使用得当,就不会”。然而,分析你上面的代码需要花费大量时间。

更多关于Golang中waitgroup和goroutine混合使用会导致程序异常或挂起吗?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


是的,混合使用 WaitGroup 和 goroutine 不当确实会导致程序异常或挂起。在你的代码中,存在几个关键问题:

主要问题分析

1. WaitGroup.Add() 调用位置错误

WaitGroup.Add(1) 应该在启动 goroutine 之前调用,而不是在 goroutine 内部。你的 Wrap 方法虽然看起来正确,但在并发场景下存在竞态条件:

func (w *waitGroup) Wrap(fn func()) {
    w.Add(1)  // 这里可能被并发调用
    go func() {
        fn()
        w.Done()
    }()
}

2. goroutine 泄漏

ctx.Done() 触发时,已经通过 Wrap 启动的 goroutine 可能还在执行,但 wg.Wait() 会立即等待,这可能导致死锁:

case <-ctx.Done():
    wg.Wait()  // 如果还有 goroutine 没执行完,这里会阻塞
    return

3. 数据竞争

多个 goroutine 同时访问 wg.Add(1) 可能导致 WaitGroup 内部计数器不一致。

修复后的示例代码

type SafeWaitGroup struct {
    sync.WaitGroup
    mu sync.Mutex
}

func (w *SafeWaitGroup) Wrap(fn func()) {
    w.mu.Lock()
    w.Add(1)
    w.mu.Unlock()
    
    go func() {
        defer w.Done()
        fn()
    }()
}

func demo(ctx context.Context) error {
    go func() {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        
        var wg = &SafeWaitGroup{}
        var stop bool
        
        // 启动一个单独的 goroutine 来监听退出信号
        go func() {
            <-ctx.Done()
            stop = true
            wg.Wait()
        }()
        
        for !stop {
            select {
            case <-ticker.C:
                data := getData(time.Now().UnixNano())
                wg.Wrap(func() {
                    log.Println("data: ", data)
                })
            default:
                time.Sleep(10 * time.Millisecond)
            }
        }
    }()
    
    return nil
}

更安全的模式

func demoFixed(ctx context.Context) error {
    var wg sync.WaitGroup
    done := make(chan struct{})
    
    go func() {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                wg.Add(1)
                go func(data string) {
                    defer wg.Done()
                    log.Println("data: ", data)
                }(getData(time.Now().UnixNano()))
                
            case <-ctx.Done():
                close(done)
                return
            }
        }
    }()
    
    go func() {
        <-done
        wg.Wait()
    }()
    
    return nil
}

关键要点

  1. WaitGroup.Add() 必须在 goroutine 外部调用
  2. 确保所有 Add() 调用都有对应的 Done()
  3. 避免在可能并发的场景下直接操作 WaitGroup
  4. 使用带缓冲的 channel 或 sync.Mutex 保护 WaitGroup 操作

你的代码中出现的 sync block 和 blocking syscall 指标异常,很可能是由于 WaitGroup 使用不当导致的竞态条件和死锁。

回到顶部