如何优化Golang并发代码

如何优化Golang并发代码 这个函数用于获取一些XML内容并保存。我遇到的一个问题是,如何一次只发送 case s.updates <- pending[0]: 这一个条目。目前的代码存在访问索引越界的错误。

这里是完整代码的链接:Better Go Playground

func(s *sub) loop(){
	var pending []Item // appended by fetch; consumed by send
	var next time.Time
	var err error
	var seen = make(map[string]bool) // set of item.GUIDS
	for{
		// var updates chan Item
		var fetchDone chan fetchResult// if non-nil, fetch is running
		var fetchDelay time.Duration // initally 0
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending {
			startFetch = time.After(fetchDelay)
		}
		select{
		case errc := <-s.closing:
			errc <- err
			close(s.updates)// tells receiver we're done
			return
		case <-startFetch:
			fmt.Println("start fetch")
			var fetched []Item
			fetchDone = make(chan fetchResult, 1)
			go func(){
				fetched, next, err = s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()


		case s.updates <- pending[0]:
			fmt.Println("add pending")
      if len(pending) > 0 {
				pending = pending[1:]
			}
		case result := <-fetchDone:
			fmt.Println("fetch done")
			fetchDone = nil
			if result.err != nil {
					next = time.Now().Add(10 * time.Second)
					break
			}
			for _, item := range result.fetched {
					if !seen[item.Channel.Link] {
							pending = append(pending, item)
							seen[item.Channel.Title] = true
					}
			}
		}
	
	}
}

更多关于如何优化Golang并发代码的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

你好, 链接 Better Go Playground 中的代码是关于从网络上获取 .xml 数据并存储。我希望每次获取 10 个项目,并通过一个名为 updates 的通道逐个存储它们。 谢谢

更多关于如何优化Golang并发代码的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这段代码存在很多问题。首先是变量的作用域问题:变量 fetchDone 是在 for 循环内部声明的,因此每次迭代后它都会被重置。这破坏了您的大部分逻辑。

总的来说,对于您试图实现的目标,您的代码过于复杂。编写并行函数(扇出、扇入)有既定的模式,可以使用 sync.WaitGroup 或类似的机制。

尝试构建易于阅读的代码:一组工作协程,它们都从一个单一的工作输入通道读取数据,并将结果推送到一个单一的结果输出通道。使用 for w := range over channel 来优雅地处理通道的关闭。

Go Concurrency Patterns: Pipelines and cancellation - The Go Programming...

Go Concurrency Patterns: Pipelines and cancellation - The Go Programming…

如何使用 Go 的并发性来构建数据处理管道。

问题在于 pending[0] 的访问可能发生在 pending 切片为空时,这会导致索引越界错误。以下是修复后的代码,通过检查 pending 长度来避免越界访问,并确保只有在有数据时才发送到 updates 通道:

func (s *sub) loop() {
	var pending []Item // appended by fetch; consumed by send
	var next time.Time
	var err error
	var seen = make(map[string]bool) // set of item.GUIDS
	for {
		var fetchDone chan fetchResult // if non-nil, fetch is running
		var fetchDelay time.Duration   // initially 0
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending {
			startFetch = time.After(fetchDelay)
		}

		select {
		case errc := <-s.closing:
			errc <- err
			close(s.updates) // tells receiver we're done
			return
		case <-startFetch:
			fmt.Println("start fetch")
			var fetched []Item
			fetchDone = make(chan fetchResult, 1)
			go func() {
				fetched, next, err = s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()

		case s.updates <- pending[0]:
			fmt.Println("add pending")
			if len(pending) > 0 {
				pending = pending[1:]
			}
		case result := <-fetchDone:
			fmt.Println("fetch done")
			fetchDone = nil
			if result.err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range result.fetched {
				if !seen[item.Channel.Link] {
					pending = append(pending, item)
					seen[item.Channel.Title] = true
				}
			}
		}
	}
}

主要修改:

  1. case s.updates <- pending[0]: 前添加了长度检查,确保 pending 不为空时才执行发送操作。
  2. 修正了 pending 切片移除逻辑,确保在发送后正确移除已发送的元素。

这个修复确保了并发安全,避免了索引越界错误,同时保持了原有的功能逻辑。

回到顶部