Golang中处理未知数量数据行的信号量实现
Golang中处理未知数量数据行的信号量实现 我需要循环处理大量数据。我们不知道输入数据包含多少条记录。可以假设类似于逐行读取CSV文件的情况。
我的目标是限制并行协程的数量,假设为10个。这意味着同一时刻最多只能处理10条记录。我编写了一个脚本,其中创建了一个监听器,使用通道作为信号量来判断所有记录是否已处理完毕。
以下是脚本:
package main
import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
const (
goroutinesNum = 100
quotaLimit = 10
)
func startWorker(i int, wg *sync.WaitGroup, ch chan map[int]interface{}, increment chan int) {
defer wg.Done()
record := make(map[int]interface{})
record[i] = "test " + strconv.Itoa(i)
ch <- record
<-increment
cnt := len(ch)
fmt.Print(fmt.Sprintln("increment (", i, ") [channels ", cnt, "]", strings.Repeat(" ", cnt), "█"))
runtime.Gosched()
}
func main() {
//runtime.GOMAXPROCS(1)
wg := &sync.WaitGroup{}
increment := make(chan int, 30)
ch := make(chan map[int]interface{}, quotaLimit)
wg.Add(1)
go func(wg *sync.WaitGroup, increment chan int, ch chan map[int]interface{}) {
defer wg.Done()
for i := 0; i < goroutinesNum; i++ {
increment <- i
for len(ch) == quotaLimit {
time.Sleep(time.Second * 1)
}
wg.Add(1)
go startWorker(i, wg, ch, increment)
}
}(wg, increment, ch)
time.Sleep(time.Second)
for {
for len(ch) > 0 {
cnt := len(ch)
fmt.Print(fmt.Sprintln("channel (", cnt, ")", strings.Repeat(" ", cnt), "■"))
<-ch
}
if len(increment) == 0 {
break
}
}
wg.Wait()
}
我不确定这样做是否正确。我尝试过使用select和定时器结构,似乎在没有时间等待器的情况下也实现了相同的效果。也许在这里使用NewTicker或上下文会更好。我尝试了不同的方法,但无法确定哪种是正确的。非常感谢任何批评和建议。
更新: 我也尝试过这个解决方案:
大家好 在下面的代码中,我的循环在
c.ReadMessage行会无限期地阻塞,只有在有消息返回时才会继续执行,但这个时间点是未知的。假设终止信号来了,但循环仍然卡在c.readMessage处无法继续,那么我应该如何终止这个不再继续执行的循环?for { select { case <-sign: break default: //我的循环卡在下面这行注释处 // 并且终止信号会发送信号来终...
但如果没有超时,它就无法正常工作,对吗?我不需要超时,因为我必须确保所有行都已处理完毕,并且它需要挂起足够长的时间。
更新1: 我发现如果协程数量非常多,这部分代码会挂起:
for len(ch) == quotaLimit {
time.Sleep(time.Second * 1)
}
看起来这是一个错误的方法。那么问题来了,如何暂停循环直到队列有空闲位置?
更多关于Golang中处理未知数量数据行的信号量实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢您的回复!是的,您的理解是正确的。 在您的代码中,如果某个通道运行时间较长,那么工作线程会调用 break,对吗?
for item := range items {
time.Pause(time.Second * 10)
inputChan <- item
}
与此同时,我似乎在 StackOverflow 上找到了解决方案: https://play.golang.org/p/rSqcU9EZLzn
据我理解,在 range 结构中的 jobs 会等待通道中的任何元素,直到通道关闭,是这样吗?
for j := range jobs {
更多关于Golang中处理未知数量数据行的信号量实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我不理解你问题中的这一部分:
anthonys: 以防某些通道运行时间较长
通道只是一种将项目安全传递给多个协程的方式,那么你说它“运行时间较长”是什么意思呢?
也许你的意思是,如果你尝试从通道中获取项目一段时间,但一直没有获取到任何东西? 如果是这种情况,那么你需要一个带有两个 case 的 select 语句:一个用于拉取新项目,另一个用于超时处理。因此,工作函数将如下所示:
func worker(input chan int) {
for {
select {
case item, more := <-input :
if more {
item++
fmt.Println(item)
} else {
break
}
case <-time.After(time.Second * 10):
break
}
}
}
这个 select 语句将等待从通道中获取一个项目或者超时结束。如果超时结束,它将跳出循环。
你的描述有点模糊,以下是我的理解:
- 你有一组数据需要并行处理
- 你想并行处理它们,但最多同时处理10项?还是总共就处理10项?
- 我假设是第一种情况(最多同时并行处理10项),但当我阅读代码时,它对于你的描述来说似乎过于复杂了。
下面的代码片段演示了如何获取一组数据项并进行并行处理。请告诉我我对你问题的理解哪里不对,我们可以一起改进它。
package main
import (
"fmt"
"sync"
)
func main() {
items := []int{1, 2, 3, 4, 5, 6, 7}
maxWorkers := 10
inputChan := make(chan int, maxWorkers)
wg := sync.WaitGroup{}
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func() {
worker(inputChan)
wg.Done()
}()
}
go func() {
for item := range items {
inputChan <- item
}
close(inputChan)
}()
wg.Wait()
}
func worker(input chan int) {
for {
item, more := <-input
if more {
item++
fmt.Println(item)
} else {
break
}
}
}
这是一个典型的并发控制场景,使用缓冲通道作为信号量是正确的思路。你的代码可以简化,以下是更清晰的实现:
package main
import (
"fmt"
"sync"
"time"
)
const (
maxWorkers = 10
totalTasks = 100 // 模拟未知数量的数据行
)
func processRecord(id int) {
time.Sleep(100 * time.Millisecond) // 模拟处理时间
fmt.Printf("Processed record %d\n", id)
}
func main() {
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxWorkers) // 信号量通道
// 模拟读取未知数量的数据行
for i := 0; i < totalTasks; i++ {
wg.Add(1)
semaphore <- struct{}{} // 获取信号量
go func(id int) {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
processRecord(id)
}(i)
}
wg.Wait()
fmt.Println("All records processed")
}
对于你的具体场景(逐行读取CSV),可以这样实现:
package main
import (
"encoding/csv"
"fmt"
"io"
"os"
"sync"
)
const maxConcurrent = 10
func worker(id int, record []string, wg *sync.WaitGroup, sem chan struct{}) {
defer wg.Done()
defer func() { <-sem }()
fmt.Printf("Worker %d processing: %v\n", id, record)
// 处理记录的逻辑
}
func main() {
file, _ := os.Open("data.csv")
defer file.Close()
reader := csv.NewReader(file)
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent)
recordNum := 0
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
continue // 或处理错误
}
wg.Add(1)
sem <- struct{}{} // 等待有空闲槽位
recordNum++
go worker(recordNum, record, &wg, sem)
}
wg.Wait()
fmt.Println("Processing complete")
}
对于你的阻塞读取场景,可以使用带超时的select:
func processStream(sign chan struct{}, c *websocket.Conn) {
sem := make(chan struct{}, 10)
var wg sync.WaitGroup
for {
select {
case <-sign:
wg.Wait()
return
default:
sem <- struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }()
msg, err := c.ReadMessage()
if err != nil {
return
}
processMessage(msg)
}()
}
}
}
关键点:
- 缓冲通道作为信号量(
sem := make(chan struct{}, N)) sem <- struct{}{}会阻塞直到有空闲槽位<-sem在goroutine完成时释放槽位sync.WaitGroup等待所有goroutine完成
这种模式避免了忙等待(time.Sleep循环),更高效且不会挂起。

