Golang中布尔变量在作用域外被修改的问题

Golang中布尔变量在作用域外被修改的问题 我在找到解决方案前花了很多时间处理这个问题,尽管我不明白为什么需要这样做。

我有一个函数级变量,在名为 RealTimeQuotes() 的函数中有一个 doRealtime bool。它被设置为 true,用于决定函数是否应该继续运行。在循环内部,通过 WebSocket 获取数据。数据被添加到一个缓冲通道中,缓冲区大小为 30。当数据(目前最多 14 条记录)被添加到通道,并且为每条记录添加了等待组后,会有一个 wg.wait()wg 是一个模块级变量(不确定正确的术语是什么)。它是在函数外部声明的。

我有另一个函数 SseHandlerRealTime(w http.ResponseWriter, request *http.Request),它是一个 HTTP 处理器,大约每 5 秒通过服务器端事件从浏览器调用一次。

以下是相关代码的基本结构:

var wg sync.WaitGroup
var channelStocks chan *data.StockRealTime

func SseHandlerRealTime(w http.ResponseWriter, request *http.Request) {
	stocksToSend := make([]*data.StockRealTime, 0)

	for stock := range channelStocks {
		// add to array (stocksToSend) to send to browser
	}
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	
	jsonStock, _ := json.Marshal(stocksToSend)
	stringJSONStock := string(jsonStock)
	fmt.Fprintf(w, "data: %v\n\n", stringJSONStock)
	
}

func (realTime RealTime) RealTimeQuotes(){
	var doRealTime bool. //later moved to modular-level to fix issue

	wg = sync.WaitGroup{}
	doRealTime = true
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	
	go func() {
		<-realTimeExecutionChannel
		doRealTime = false
		channelStocks = make(chan *data.StockRealTime, channelBufferSize)
		realTimeExecutionChannel <- struct{}{}
	}()

	for doRealTime{
		// get latest ticks from websocket-based function
		for tickKey, tick := range newTicksMap {
			var stockRealTime data.StockRealTime
			// populate stockRealTime 
			wg.Add(1)
			channelStocks <- &stockRealTime
			
		}
		close(channelStocks)
		wg.Wait()
	}
}

这个 Go 协程允许我从浏览器取消操作。这段代码在第一次运行时工作正常。如果我取消了它,然后再次运行它(不重启 Web 服务器),当它第二次运行到 wg.wait() 时,不知何故 doRealTime 变成了 false,导致函数退出。我添加了大量的日志记录来尝试解决这个问题。我最终通过将 doRealTime 的作用域更改为模块级来使其正常工作,尽管它只在 RealTimeQuotes() 中被访问。这解决了问题,但我不明白为什么。


更多关于Golang中布尔变量在作用域外被修改的问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

你好 @rschluet 抱歉,但我没有完全理解问题所在。 仅从代码来看,我注意到你每次调用 RealTimeQuotes 函数时都在初始化 wg 这个包变量。这对我来说没有意义。也许这与问题有关?

更多关于Golang中布尔变量在作用域外被修改的问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


该函数仅被调用一次。它包含一个循环,持续获取股票价格并将其放入缓冲通道,最多约20个。然后等待通道清空,再获取下一批数据,依此类推。浏览器页面有一个取消按钮,通过将 doRealTime 设置为 false 来通知循环退出。这按预期工作;但是,如果我再次运行它,当它执行到 wg.wait() 时,doRealTime 不知何故被设置为 false,尽管没有执行任何代码来这样做。将 doRealTime 设为模块级变量可以解决问题,但我不明白原因。

这是一个猜测,但我可以想象类似这样的情况会发生:

  1. 代码已经运行了一段时间;它目前执行到这里: // get latest ticks from websocket-based function

    • 无论这段代码做什么,它都在等待获取下一批“ticks”。
    • SseHandlerRealTime 正在等待当前的 channelStocks 通道值。
  2. 通过点击取消来关闭 realTimeExecutionChannel

  3. <-realTimeExecutionChannel 执行:

    • 设置 doRealTime = false
    • channelStocks 分配一个新的通道(注意 SseHandlerRealTime 仍在等待从旧的 channelStocks 通道值中接收数据)
  4. // get latest ticks from websocket-based function 完成

    • for tickKey, tick := range newTicksMap { 遍历 map 并将值加载到新的 channelStocks 通道中,
    • 关闭新的 channelStocks 通道
    • 等待 SseHandlerRealTime 完成,但它永远不会完成,因为它正在等待旧的 channelStocks 通道值。

然而,这并没有真正解释为什么将 doRealTime 移到包级变量就能解决问题,但我可以说,从两个或更多 goroutine 对 doRealTimechannelStocks 变量进行读写操作存在数据竞争,如果你没有进行任何同步操作,就无法保证这两个操作的相对顺序。一个潜在的修复方法是使用原子操作:

  • 将 doRealTime 切换为 int32 = 1
  • 将你的 for 条件从 for doRealTime 改为 for atomic.LoadInt32(&doRealTime) == 1
  • doRealTime = false 改为 atomic.StoreInt32(&doRealTime, 0)

或者使用通道:

doRealTime := make(chan struct{})

// ...

go func() {
	<-realTimeExecutionChannel
	close(doRealTime)
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	realTimeExecutionChannel <- struct{}{}
}()

for {
	if _, _, closed := tryRecv(done); closed {
		break
	}

	// the rest of the loop
}

func tryRecv[T any](ch chan T) (v T, received, closed bool) {
	select {
	case v, received = <-ch:
		closed = !received
	default:
	}
	return
}

如果你没有更多数据了,能否使用单个通道并从 SseHandlerRealTime 中跳出循环?

var channelStocksFeed = make(chan []*data.StockRealTime, 30)

func SseHandlerRealTime(w http.ResponseWriter, request *http.Request) {
	stocksToSend := make([]*data.StockRealTime, 0)

	for {
		channelStocks, received, closed := tryRecv(channelStocksFeed)
		if !received || closed {
			// 目前没有(更多)股票数据;跳出循环以发送我们已有的数据
			// (如果目前什么都没有,则发送空切片)。
			break
		}
		for _, channelStock := range channelStocks {
			// 添加到数组 (stocksToSend) 中以发送给浏览器
		}
	}

	jsonStock, _ := json.Marshal(stocksToSend)
	stringJSONStock := string(jsonStock)
	fmt.Fprintf(w, "data: %v\n\n", stringJSONStock)
}

func (realTime RealTime) RealTimeQuotes(){
	for {
		if _, received, closed := tryRecv(realTimeExecutionChannel); received || closed {
			break
		}

		// get latest ticks from websocket-based function

		channelStocks := make([]*data.StockRealTime, 0, len(newTicksMap))
		for tickKey, tick := range newTicksMap {
			var stockRealTime data.StockRealTime
			// populate stockRealTime 
			wg.Add(1)
			channelStocks = append(channelStocks, &stockRealTime)
		}

		channelStocksFeed <- channelStocks
	}
}

在Golang中,这个问题涉及到闭包捕获变量goroutine执行时机的问题。当你将doRealTime声明为函数级变量时,在goroutine中捕获的是这个局部变量的引用。当RealTimeQuotes()函数返回后,这个局部变量可能被垃圾回收或处于未定义状态,导致goroutine中访问到意外的值。

以下是问题重现和解决方案的示例:

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup
var channelStocks chan string
var channelBufferSize = 30

// 问题版本:doRealTime作为局部变量
func RealTimeQuotesProblem() {
	var doRealTime bool // 局部变量
	
	wg = sync.WaitGroup{}
	doRealTime = true
	channelStocks = make(chan string, channelBufferSize)
	
	// goroutine捕获局部变量doRealTime的引用
	go func() {
		time.Sleep(100 * time.Millisecond) // 模拟延迟
		doRealTime = false // 这里修改的是局部变量的副本
		fmt.Println("Goroutine set doRealTime to false")
	}()

	counter := 0
	for doRealTime {
		counter++
		if counter > 3 {
			break // 防止无限循环
		}
		fmt.Printf("Loop iteration %d, doRealTime=%v\n", counter, doRealTime)
		time.Sleep(50 * time.Millisecond)
	}
	fmt.Println("Function exited, doRealTime=", doRealTime)
}

// 解决方案:使用模块级变量
var doRealTimeGlobal bool

func RealTimeQuotesSolution() {
	wg = sync.WaitGroup{}
	doRealTimeGlobal = true // 修改模块级变量
	channelStocks = make(chan string, channelBufferSize)
	
	go func() {
		time.Sleep(100 * time.Millisecond)
		doRealTimeGlobal = false // 修改的是同一个模块级变量
		fmt.Println("Goroutine set doRealTimeGlobal to false")
	}()

	counter := 0
	for doRealTimeGlobal {
		counter++
		if counter > 3 {
			break
		}
		fmt.Printf("Loop iteration %d, doRealTimeGlobal=%v\n", counter, doRealTimeGlobal)
		time.Sleep(50 * time.Millisecond)
	}
	fmt.Println("Function exited, doRealTimeGlobal=", doRealTimeGlobal)
}

// 更好的解决方案:使用通道进行同步控制
func RealTimeQuotesBetter() {
	stopChan := make(chan struct{}) // 控制通道
	wg = sync.WaitGroup{}
	channelStocks = make(chan string, channelBufferSize)
	
	// 启动控制goroutine
	go func() {
		time.Sleep(100 * time.Millisecond)
		close(stopChan) // 发送停止信号
		fmt.Println("Goroutine sent stop signal")
	}()

	counter := 0
	for {
		select {
		case <-stopChan:
			fmt.Println("Received stop signal, exiting")
			return
		default:
			counter++
			if counter > 3 {
				return
			}
			fmt.Printf("Loop iteration %d\n", counter)
			time.Sleep(50 * time.Millisecond)
		}
	}
}

func main() {
	fmt.Println("=== Problem Version ===")
	RealTimeQuotesProblem()
	
	time.Sleep(200 * time.Millisecond)
	
	fmt.Println("\n=== Solution Version ===")
	RealTimeQuotesSolution()
	
	time.Sleep(200 * time.Millisecond)
	
	fmt.Println("\n=== Better Solution ===")
	RealTimeQuotesBetter()
}

在你的代码中,当RealTimeQuotes()函数第一次执行完成后,局部变量doRealTime的作用域结束。当函数第二次执行时,创建了一个新的doRealTime变量,但之前启动的goroutine可能还在运行(或者其修改影响了内存状态),导致新变量被意外修改。

模块级变量之所以能解决问题,是因为:

  1. 所有访问都指向同一个内存位置
  2. 变量的生命周期与程序运行周期一致
  3. goroutine修改的是同一个变量

更健壮的解决方案是使用通道来控制goroutine的停止:

func (realTime RealTime) RealTimeQuotesBetter() {
	stopChan := make(chan struct{})
	wg = sync.WaitGroup{}
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	
	// 启动停止监听goroutine
	go func() {
		<-realTimeExecutionChannel
		close(stopChan) // 发送停止信号
		channelStocks = make(chan *data.StockRealTime, channelBufferSize)
		realTimeExecutionChannel <- struct{}{}
	}()

	for {
		select {
		case <-stopChan:
			return // 收到停止信号,退出函数
		default:
			// 原有的业务逻辑
			for tickKey, tick := range newTicksMap {
				var stockRealTime data.StockRealTime
				wg.Add(1)
				channelStocks <- &stockRealTime
			}
			close(channelStocks)
			wg.Wait()
		}
	}
}

这种使用通道的模式是Go语言中推荐的goroutine通信方式,避免了共享变量的竞态条件问题。

回到顶部