Golang中处理未知数量数据行的信号量实现

Golang中处理未知数量数据行的信号量实现 我需要循环处理大量数据。我们不知道输入数据包含多少条记录。可以假设类似于逐行读取CSV文件的情况。

我的目标是限制并行协程的数量,假设为10个。这意味着同一时刻最多只能处理10条记录。我编写了一个脚本,其中创建了一个监听器,使用通道作为信号量来判断所有记录是否已处理完毕。

以下是脚本:

package main

import (
	"fmt"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"time"
)

const (
	goroutinesNum = 100
	quotaLimit    = 10
)

func startWorker(i int, wg *sync.WaitGroup, ch chan map[int]interface{}, increment chan int) {
	defer wg.Done()

	record := make(map[int]interface{})
	record[i] = "test " + strconv.Itoa(i)

	ch <- record
	<-increment

	cnt := len(ch)

	fmt.Print(fmt.Sprintln("increment (", i, ") [channels ", cnt, "]", strings.Repeat("  ", cnt), "█"))
	runtime.Gosched()
}

func main() {
	//runtime.GOMAXPROCS(1)
	wg := &sync.WaitGroup{}

	increment := make(chan int, 30)
	ch := make(chan map[int]interface{}, quotaLimit)

	wg.Add(1)
	go func(wg *sync.WaitGroup, increment chan int, ch chan map[int]interface{}) {
		defer wg.Done()
		for i := 0; i < goroutinesNum; i++ {

			increment <- i

			for len(ch) == quotaLimit {
				time.Sleep(time.Second * 1)
			}
			wg.Add(1)
			go startWorker(i, wg, ch, increment)
		}
	}(wg, increment, ch)

	time.Sleep(time.Second)
	for {
		for len(ch) > 0 {
			cnt := len(ch)
			fmt.Print(fmt.Sprintln("channel (", cnt, ")", strings.Repeat("  ", cnt), "■"))
			<-ch
		}

		if len(increment) == 0 {
			break
		}
	}

	wg.Wait()
}

我不确定这样做是否正确。我尝试过使用select和定时器结构,似乎在没有时间等待器的情况下也实现了相同的效果。也许在这里使用NewTicker或上下文会更好。我尝试了不同的方法,但无法确定哪种是正确的。非常感谢任何批评和建议。

更新: 我也尝试过这个解决方案:

大家好 在下面的代码中,我的循环在c.ReadMessage行会无限期地阻塞,只有在有消息返回时才会继续执行,但这个时间点是未知的。假设终止信号来了,但循环仍然卡在c.readMessage处无法继续,那么我应该如何终止这个不再继续执行的循环?

for {
    select {
    case <-sign:
        break
    default:
        //我的循环卡在下面这行注释处
        // 并且终止信号会发送信号来终...

但如果没有超时,它就无法正常工作,对吗?我不需要超时,因为我必须确保所有行都已处理完毕,并且它需要挂起足够长的时间。

更新1: 我发现如果协程数量非常多,这部分代码会挂起:

for len(ch) == quotaLimit {
	time.Sleep(time.Second * 1)
}

看起来这是一个错误的方法。那么问题来了,如何暂停循环直到队列有空闲位置?


更多关于Golang中处理未知数量数据行的信号量实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

感谢您的回复!是的,您的理解是正确的。 在您的代码中,如果某个通道运行时间较长,那么工作线程会调用 break,对吗?

for item := range items {
    time.Pause(time.Second * 10)
	inputChan <- item
}

与此同时,我似乎在 StackOverflow 上找到了解决方案: https://play.golang.org/p/rSqcU9EZLzn

据我理解,在 range 结构中的 jobs 会等待通道中的任何元素,直到通道关闭,是这样吗?

for j := range jobs {

更多关于Golang中处理未知数量数据行的信号量实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我不理解你问题中的这一部分:

anthonys: 以防某些通道运行时间较长

通道只是一种将项目安全传递给多个协程的方式,那么你说它“运行时间较长”是什么意思呢?

也许你的意思是,如果你尝试从通道中获取项目一段时间,但一直没有获取到任何东西? 如果是这种情况,那么你需要一个带有两个 case 的 select 语句:一个用于拉取新项目,另一个用于超时处理。因此,工作函数将如下所示:

func worker(input chan int) {
	for {
    select {
		case item, more := <-input :
      if more {
        item++
        fmt.Println(item)
      } else {
        break
      }
    case <-time.After(time.Second * 10):
      break
    }
	}
}

这个 select 语句将等待从通道中获取一个项目或者超时结束。如果超时结束,它将跳出循环。

你的描述有点模糊,以下是我的理解:

  1. 你有一组数据需要并行处理
  2. 你想并行处理它们,但最多同时处理10项?还是总共就处理10项?
  3. 我假设是第一种情况(最多同时并行处理10项),但当我阅读代码时,它对于你的描述来说似乎过于复杂了。

下面的代码片段演示了如何获取一组数据项并进行并行处理。请告诉我我对你问题的理解哪里不对,我们可以一起改进它。

package main

import (
	"fmt"
	"sync"
)

func main() {
	items := []int{1, 2, 3, 4, 5, 6, 7}
	maxWorkers := 10
	inputChan := make(chan int, maxWorkers)
	wg := sync.WaitGroup{}

	for i := 0; i < maxWorkers; i++ {
		wg.Add(1)
		go func() {
			worker(inputChan)
			wg.Done()
		}()
	}

	go func() {
		for item := range items {
			inputChan <- item
		}
		close(inputChan)
	}()

	wg.Wait()
}

func worker(input chan int) {
	for {
		item, more := <-input
		if more {
			item++
			fmt.Println(item)
		} else {
			break
		}
	}
}

这是一个典型的并发控制场景,使用缓冲通道作为信号量是正确的思路。你的代码可以简化,以下是更清晰的实现:

package main

import (
	"fmt"
	"sync"
	"time"
)

const (
	maxWorkers = 10
	totalTasks = 100 // 模拟未知数量的数据行
)

func processRecord(id int) {
	time.Sleep(100 * time.Millisecond) // 模拟处理时间
	fmt.Printf("Processed record %d\n", id)
}

func main() {
	var wg sync.WaitGroup
	semaphore := make(chan struct{}, maxWorkers) // 信号量通道

	// 模拟读取未知数量的数据行
	for i := 0; i < totalTasks; i++ {
		wg.Add(1)
		semaphore <- struct{}{} // 获取信号量

		go func(id int) {
			defer wg.Done()
			defer func() { <-semaphore }() // 释放信号量

			processRecord(id)
		}(i)
	}

	wg.Wait()
	fmt.Println("All records processed")
}

对于你的具体场景(逐行读取CSV),可以这样实现:

package main

import (
	"encoding/csv"
	"fmt"
	"io"
	"os"
	"sync"
)

const maxConcurrent = 10

func worker(id int, record []string, wg *sync.WaitGroup, sem chan struct{}) {
	defer wg.Done()
	defer func() { <-sem }()

	fmt.Printf("Worker %d processing: %v\n", id, record)
	// 处理记录的逻辑
}

func main() {
	file, _ := os.Open("data.csv")
	defer file.Close()

	reader := csv.NewReader(file)
	var wg sync.WaitGroup
	sem := make(chan struct{}, maxConcurrent)

	recordNum := 0
	for {
		record, err := reader.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			continue // 或处理错误
		}

		wg.Add(1)
		sem <- struct{}{} // 等待有空闲槽位
		recordNum++

		go worker(recordNum, record, &wg, sem)
	}

	wg.Wait()
	fmt.Println("Processing complete")
}

对于你的阻塞读取场景,可以使用带超时的select:

func processStream(sign chan struct{}, c *websocket.Conn) {
	sem := make(chan struct{}, 10)
	var wg sync.WaitGroup

	for {
		select {
		case <-sign:
			wg.Wait()
			return
		default:
			sem <- struct{}{}
			wg.Add(1)

			go func() {
				defer wg.Done()
				defer func() { <-sem }()

				msg, err := c.ReadMessage()
				if err != nil {
					return
				}
				processMessage(msg)
			}()
		}
	}
}

关键点:

  1. 缓冲通道作为信号量(sem := make(chan struct{}, N)
  2. sem <- struct{}{} 会阻塞直到有空闲槽位
  3. <-sem 在goroutine完成时释放槽位
  4. sync.WaitGroup 等待所有goroutine完成

这种模式避免了忙等待(time.Sleep循环),更高效且不会挂起。

回到顶部