Golang中如何实现周期性数据生产与消费的同步处理
Golang中如何实现周期性数据生产与消费的同步处理
我使用 time.NewTicker 在两个 goroutine 中周期性地生成数据,并将其放入一个 channel。
多个消费者 goroutine 从 channel 中读取数据,并将数据存入一个 slice。
我希望在一个单独的函数中处理这个 slice,然后重置它(slice = slice[:0])并恢复生产者的工作。
这个过程应该在每次 ticker 触发时重复,直到用户按下 CTRL+C。
遇到的问题
- 我不知道如何暂停生产者,然后在切片处理完成后恢复它们
- 我不知道如何检查消费者是否已经完全清空了通道
- 我不知道如何确定一个安全的时机来处理
slice
最小可验证完整示例
目前我可以提供从我的 VS Code 中复制的可运行代码。如果我解决了之前列出的任何问题,我将更新代码以反映我的进展。代码如下:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func producer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel chan<- int, ticker <-chan time.Time, foo func(int) int) {
defer waitGroup.Done()
// recordset slice 用于模拟复杂数据,
// 变量 i 用于模拟每次 tick 时数据的变化
i := 0
recordset := make([]int, 3)
for {
select {
case <-ticker:
// 模拟数据变化
i = foo(i)
recordset[0] = i
i = foo(i)
recordset[1] = i
i = foo(i)
recordset[2] = i
// 将数据转储到 dataChannel
for _, record := range recordset {
dataChannel <- record
}
case <-ctx.Done():
return
}
}
}
func consumer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel <-chan int, slice *[]int, mutex *sync.Mutex) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
return
case value := <-dataChannel:
mutex.Lock()
*slice = append(*slice, value)
mutex.Unlock()
}
}
}
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
slice := make([]int, 0, 3)
dataChannel := make(chan int, 100)
mutex := &sync.Mutex{}
waitGroup := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
waitGroup.Add(1)
// 我们将使用简单的切片元素递增来模拟数据变化
go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i + 1 })
waitGroup.Add(1)
// 我们将使用简单的切片元素递减来模拟数据变化
// 这样我们可以轻松地从终端跟踪程序行为
go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i - 1 })
for i := 0; i < 2; i++ {
waitGroup.Add(1)
go consumer(ctx, waitGroup, dataChannel, &slice, mutex)
}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
<-sigs
cancel()
close(dataChannel)
}()
waitGroup.Wait()
// 让我们检查一切是否正常
fmt.Println("Final slice: ")
for _, element := range slice {
fmt.Printf("%d ", element)
}
}
更多关于Golang中如何实现周期性数据生产与消费的同步处理的实战教程也可以访问 https://www.itying.com/category-94-b0.html
Ticker 是共享的,因为两个生产者应该以相同的间隔产生数据。 你能提供更好的方法吗?
更多关于Golang中如何实现周期性数据生产与消费的同步处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
通常不共享 time.Ticker。因为当时间触发时,只会产生一个数据。这意味着时间到来时,只有一个生产者会工作。
如果你期望生产者同时工作,应该使用通道来触发生产者工作,而不是共享 time.Ticker。
针对你的问题,这里提供一个完整的解决方案。核心思路是使用额外的控制通道来协调生产者和消费者的同步:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func producer(ctx context.Context, wg *sync.WaitGroup, dataChan chan<- int,
ticker <-chan time.Time, pauseChan <-chan struct{}, resumeChan <-chan struct{},
foo func(int) int) {
defer wg.Done()
i := 0
recordset := make([]int, 3)
for {
select {
case <-ticker:
// 检查是否需要暂停
select {
case <-pauseChan:
// 等待恢复信号
<-resumeChan
default:
}
// 生成数据
i = foo(i)
recordset[0] = i
i = foo(i)
recordset[1] = i
i = foo(i)
recordset[2] = i
// 发送数据
for _, record := range recordset {
select {
case dataChan <- record:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}
func consumer(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan int,
slice *[]int, mu *sync.Mutex, doneChan chan<- struct{}) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case value, ok := <-dataChan:
if !ok {
return
}
mu.Lock()
*slice = append(*slice, value)
mu.Unlock()
// 通知数据处理完成
select {
case doneChan <- struct{}{}:
default:
}
}
}
}
func processSlice(ctx context.Context, wg *sync.WaitGroup, slice *[]int, mu *sync.Mutex,
pauseChan chan<- struct{}, resumeChan chan<- struct{}, doneChan <-chan struct{},
ticker <-chan time.Time) {
defer wg.Done()
for {
select {
case <-ticker:
// 1. 暂停生产者
pauseChan <- struct{}{}
// 2. 等待所有消费者完成当前批次
// 简单方案:等待通道为空
time.Sleep(50 * time.Millisecond) // 给消费者一点时间
// 3. 处理切片
mu.Lock()
if len(*slice) > 0 {
fmt.Printf("Processing slice (len=%d): ", len(*slice))
for _, v := range *slice {
fmt.Printf("%d ", v)
}
fmt.Println()
// 重置切片
*slice = (*slice)[:0]
}
mu.Unlock()
// 4. 恢复生产者
resumeChan <- struct{}{}
case <-ctx.Done():
return
}
}
}
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
slice := make([]int, 0, 100)
dataChan := make(chan int, 100)
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
// 控制通道
pauseChan := make(chan struct{}, 1)
resumeChan := make(chan struct{}, 1)
doneChan := make(chan struct{}, 10)
// 启动生产者
wg.Add(2)
go producer(ctx, wg, dataChan, ticker.C, pauseChan, resumeChan,
func(i int) int { return i + 1 })
go producer(ctx, wg, dataChan, ticker.C, pauseChan, resumeChan,
func(i int) int { return i - 1 })
// 启动消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go consumer(ctx, wg, dataChan, &slice, mu, doneChan)
}
// 启动切片处理器
wg.Add(1)
go processSlice(ctx, wg, &slice, mu, pauseChan, resumeChan, doneChan, ticker.C)
// 信号处理
wg.Add(1)
go func() {
defer wg.Done()
<-sigs
cancel()
close(dataChan)
}()
wg.Wait()
fmt.Println("Program terminated")
}
这个解决方案的关键点:
- 暂停/恢复机制:使用
pauseChan和resumeChan控制生产者的执行流程 - 通道清空检查:通过短暂延迟确保消费者处理完当前数据
- 安全时机处理:在暂停生产者并确认通道清空后处理切片
- 消费者通知:消费者通过
doneChan通知数据处理完成
对于更精确的通道清空检查,可以使用以下改进方案:
func waitForChannelEmpty(dataChan chan int, timeout time.Duration) bool {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return false
default:
if len(dataChan) == 0 {
// 双重检查
time.Sleep(10 * time.Millisecond)
if len(dataChan) == 0 {
return true
}
}
time.Sleep(5 * time.Millisecond)
}
}
}
// 在 processSlice 中使用:
case <-ticker:
pauseChan <- struct{}{}
// 等待通道清空
if waitForChannelEmpty(dataChan, 100*time.Millisecond) {
mu.Lock()
// 处理切片...
mu.Unlock()
}
resumeChan <- struct{}{}
这个方案提供了生产者-消费者-处理器之间的精确同步,确保在安全时机处理数据切片。


