如何优化Golang并发代码
如何优化Golang并发代码
这个函数用于获取一些XML内容并保存。我遇到的一个问题是,如何一次只发送 case s.updates <- pending[0]: 这一个条目。目前的代码存在访问索引越界的错误。
这里是完整代码的链接:Better Go Playground
func(s *sub) loop(){
var pending []Item // appended by fetch; consumed by send
var next time.Time
var err error
var seen = make(map[string]bool) // set of item.GUIDS
for{
// var updates chan Item
var fetchDone chan fetchResult// if non-nil, fetch is running
var fetchDelay time.Duration // initally 0
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay)
}
select{
case errc := <-s.closing:
errc <- err
close(s.updates)// tells receiver we're done
return
case <-startFetch:
fmt.Println("start fetch")
var fetched []Item
fetchDone = make(chan fetchResult, 1)
go func(){
fetched, next, err = s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case s.updates <- pending[0]:
fmt.Println("add pending")
if len(pending) > 0 {
pending = pending[1:]
}
case result := <-fetchDone:
fmt.Println("fetch done")
fetchDone = nil
if result.err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range result.fetched {
if !seen[item.Channel.Link] {
pending = append(pending, item)
seen[item.Channel.Title] = true
}
}
}
}
}
更多关于如何优化Golang并发代码的实战教程也可以访问 https://www.itying.com/category-94-b0.html
你好,
链接 Better Go Playground 中的代码是关于从网络上获取 .xml 数据并存储。我希望每次获取 10 个项目,并通过一个名为 updates 的通道逐个存储它们。
谢谢
更多关于如何优化Golang并发代码的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
这段代码存在很多问题。首先是变量的作用域问题:变量 fetchDone 是在 for 循环内部声明的,因此每次迭代后它都会被重置。这破坏了您的大部分逻辑。
总的来说,对于您试图实现的目标,您的代码过于复杂。编写并行函数(扇出、扇入)有既定的模式,可以使用 sync.WaitGroup 或类似的机制。
尝试构建易于阅读的代码:一组工作协程,它们都从一个单一的工作输入通道读取数据,并将结果推送到一个单一的结果输出通道。使用 for w := range over channel 来优雅地处理通道的关闭。

Go Concurrency Patterns: Pipelines and cancellation - The Go Programming…
如何使用 Go 的并发性来构建数据处理管道。
问题在于 pending[0] 的访问可能发生在 pending 切片为空时,这会导致索引越界错误。以下是修复后的代码,通过检查 pending 长度来避免越界访问,并确保只有在有数据时才发送到 updates 通道:
func (s *sub) loop() {
var pending []Item // appended by fetch; consumed by send
var next time.Time
var err error
var seen = make(map[string]bool) // set of item.GUIDS
for {
var fetchDone chan fetchResult // if non-nil, fetch is running
var fetchDelay time.Duration // initially 0
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay)
}
select {
case errc := <-s.closing:
errc <- err
close(s.updates) // tells receiver we're done
return
case <-startFetch:
fmt.Println("start fetch")
var fetched []Item
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err = s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case s.updates <- pending[0]:
fmt.Println("add pending")
if len(pending) > 0 {
pending = pending[1:]
}
case result := <-fetchDone:
fmt.Println("fetch done")
fetchDone = nil
if result.err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range result.fetched {
if !seen[item.Channel.Link] {
pending = append(pending, item)
seen[item.Channel.Title] = true
}
}
}
}
}
主要修改:
- 在
case s.updates <- pending[0]:前添加了长度检查,确保pending不为空时才执行发送操作。 - 修正了
pending切片移除逻辑,确保在发送后正确移除已发送的元素。
这个修复确保了并发安全,避免了索引越界错误,同时保持了原有的功能逻辑。

