Golang中如何实现周期性数据生产与消费的同步处理

Golang中如何实现周期性数据生产与消费的同步处理 我使用 time.NewTicker 在两个 goroutine 中周期性地生成数据,并将其放入一个 channel。 多个消费者 goroutine 从 channel 中读取数据,并将数据存入一个 slice

我希望在一个单独的函数中处理这个 slice,然后重置它(slice = slice[:0])并恢复生产者的工作。 这个过程应该在每次 ticker 触发时重复,直到用户按下 CTRL+C。

遇到的问题

  1. 我不知道如何暂停生产者,然后在切片处理完成后恢复它们
  2. 我不知道如何检查消费者是否已经完全清空了通道
  3. 我不知道如何确定一个安全的时机来处理 slice

最小可验证完整示例

目前我可以提供从我的 VS Code 中复制的可运行代码。如果我解决了之前列出的任何问题,我将更新代码以反映我的进展。代码如下:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func producer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel chan<- int, ticker <-chan time.Time, foo func(int) int) {
    defer waitGroup.Done()
    // recordset slice 用于模拟复杂数据,
    // 变量 i 用于模拟每次 tick 时数据的变化
    i := 0
    recordset := make([]int, 3)
    for {
        select {
        case <-ticker:
            // 模拟数据变化
            i = foo(i)
            recordset[0] = i
            i = foo(i)
            recordset[1] = i
            i = foo(i)
            recordset[2] = i
            // 将数据转储到 dataChannel
            for _, record := range recordset {
                dataChannel <- record
            }
        case <-ctx.Done():
            return
        }
    }
}

func consumer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel <-chan int, slice *[]int, mutex *sync.Mutex) {
    defer waitGroup.Done()
    for {
        select {
        case <-ctx.Done():
            return
        case value := <-dataChannel:
            mutex.Lock()
            *slice = append(*slice, value)
            mutex.Unlock()
        }
    }
}

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    slice := make([]int, 0, 3)

    dataChannel := make(chan int, 100)
    mutex := &sync.Mutex{}
    waitGroup := &sync.WaitGroup{}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    waitGroup.Add(1)
    // 我们将使用简单的切片元素递增来模拟数据变化
    go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i + 1 })
    waitGroup.Add(1)
    // 我们将使用简单的切片元素递减来模拟数据变化
    // 这样我们可以轻松地从终端跟踪程序行为
    go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i - 1 })

    for i := 0; i < 2; i++ {
        waitGroup.Add(1)
        go consumer(ctx, waitGroup, dataChannel, &slice, mutex)
    }

    waitGroup.Add(1)
    go func() {
        defer waitGroup.Done()
        <-sigs
        cancel()
        close(dataChannel)
    }()

    waitGroup.Wait()

    // 让我们检查一切是否正常
    fmt.Println("Final slice: ")
    for _, element := range slice {
        fmt.Printf("%d ", element)
    }
}

更多关于Golang中如何实现周期性数据生产与消费的同步处理的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

Ticker 是共享的,因为两个生产者应该以相同的间隔产生数据。 你能提供更好的方法吗?

更多关于Golang中如何实现周期性数据生产与消费的同步处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


通常不共享 time.Ticker。因为当时间触发时,只会产生一个数据。这意味着时间到来时,只有一个生产者会工作。 如果你期望生产者同时工作,应该使用通道来触发生产者工作,而不是共享 time.Ticker

你的代码看起来太乱了。我重写了一下,逻辑不难看出:

stop -> producer -> consumer -> print

所以,很容易得到以下代码:

func producer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel chan<- int, ticker <-chan time.Time, foo func(int) int) {
	defer waitGroup.Done()
	// recordset 切片用于模拟复杂数据,
	// 变量 i 用于模拟每次 tick 时数据的变化
	i := 0
	recordset := make([]int, 3)
	for {
		select {
		case <-ticker:
			// 模拟数据变化
			i = foo(i)
			recordset[0] = i
			i = foo(i)
			recordset[1] = i
			i = foo(i)
			recordset[2] = i
			// 将数据转储到 dataChannel
			for _, record := range recordset {
				dataChannel <- record
			}
		case <-ctx.Done():
			return
		}
	}
}

func consumer(waitGroup *sync.WaitGroup, dataChannel <-chan int, slice *[]int, mutex *sync.Mutex) {
	defer waitGroup.Done()
	for {
		value, ok := <-dataChannel
		if !ok {
			return
		}
		mutex.Lock()
		*slice = append(*slice, value)
		mutex.Unlock()
	}
}

func main() {
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dataChannel := make(chan int, 100)
	slice := make([]int, 0, 3)

	// producer
	ticker := time.NewTicker(3 * time.Second) //为什么生产者共享一个计时器?这似乎是个bug。
	defer ticker.Stop()
	go func() {
		var wg sync.WaitGroup

		wg.Add(1)
		go producer(ctx, &wg, dataChannel, ticker.C, func(i int) int { return i + 1 })
		wg.Add(1)
		go producer(ctx, &wg, dataChannel, ticker.C, func(i int) int { return i - 1 })

		wg.Wait()
		close(dataChannel)
	}()

	// consumer
	var mutex sync.Mutex
	var waitGroup sync.WaitGroup
	for i := 0; i < 2; i++ {
		waitGroup.Add(1)
		go consumer(&waitGroup, dataChannel, &slice, &mutex)
	}

	// stop
	<-sigs
	cancel()
	waitGroup.Wait()

	// 让我们检查一下一切是否正常
	fmt.Println("Final slice: ")
	for _, element := range slice {
		fmt.Printf("%d ", element)
	}
}

针对你的问题,这里提供一个完整的解决方案。核心思路是使用额外的控制通道来协调生产者和消费者的同步:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func producer(ctx context.Context, wg *sync.WaitGroup, dataChan chan<- int, 
    ticker <-chan time.Time, pauseChan <-chan struct{}, resumeChan <-chan struct{}, 
    foo func(int) int) {
    defer wg.Done()
    i := 0
    recordset := make([]int, 3)
    
    for {
        select {
        case <-ticker:
            // 检查是否需要暂停
            select {
            case <-pauseChan:
                // 等待恢复信号
                <-resumeChan
            default:
            }
            
            // 生成数据
            i = foo(i)
            recordset[0] = i
            i = foo(i)
            recordset[1] = i
            i = foo(i)
            recordset[2] = i
            
            // 发送数据
            for _, record := range recordset {
                select {
                case dataChan <- record:
                case <-ctx.Done():
                    return
                }
            }
        case <-ctx.Done():
            return
        }
    }
}

func consumer(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan int, 
    slice *[]int, mu *sync.Mutex, doneChan chan<- struct{}) {
    defer wg.Done()
    
    for {
        select {
        case <-ctx.Done():
            return
        case value, ok := <-dataChan:
            if !ok {
                return
            }
            mu.Lock()
            *slice = append(*slice, value)
            mu.Unlock()
            
            // 通知数据处理完成
            select {
            case doneChan <- struct{}{}:
            default:
            }
        }
    }
}

func processSlice(ctx context.Context, wg *sync.WaitGroup, slice *[]int, mu *sync.Mutex,
    pauseChan chan<- struct{}, resumeChan chan<- struct{}, doneChan <-chan struct{},
    ticker <-chan time.Time) {
    defer wg.Done()
    
    for {
        select {
        case <-ticker:
            // 1. 暂停生产者
            pauseChan <- struct{}{}
            
            // 2. 等待所有消费者完成当前批次
            // 简单方案:等待通道为空
            time.Sleep(50 * time.Millisecond) // 给消费者一点时间
            
            // 3. 处理切片
            mu.Lock()
            if len(*slice) > 0 {
                fmt.Printf("Processing slice (len=%d): ", len(*slice))
                for _, v := range *slice {
                    fmt.Printf("%d ", v)
                }
                fmt.Println()
                
                // 重置切片
                *slice = (*slice)[:0]
            }
            mu.Unlock()
            
            // 4. 恢复生产者
            resumeChan <- struct{}{}
            
        case <-ctx.Done():
            return
        }
    }
}

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    
    slice := make([]int, 0, 100)
    dataChan := make(chan int, 100)
    mu := &sync.Mutex{}
    wg := &sync.WaitGroup{}
    
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()
    
    // 控制通道
    pauseChan := make(chan struct{}, 1)
    resumeChan := make(chan struct{}, 1)
    doneChan := make(chan struct{}, 10)
    
    // 启动生产者
    wg.Add(2)
    go producer(ctx, wg, dataChan, ticker.C, pauseChan, resumeChan, 
        func(i int) int { return i + 1 })
    go producer(ctx, wg, dataChan, ticker.C, pauseChan, resumeChan, 
        func(i int) int { return i - 1 })
    
    // 启动消费者
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go consumer(ctx, wg, dataChan, &slice, mu, doneChan)
    }
    
    // 启动切片处理器
    wg.Add(1)
    go processSlice(ctx, wg, &slice, mu, pauseChan, resumeChan, doneChan, ticker.C)
    
    // 信号处理
    wg.Add(1)
    go func() {
        defer wg.Done()
        <-sigs
        cancel()
        close(dataChan)
    }()
    
    wg.Wait()
    fmt.Println("Program terminated")
}

这个解决方案的关键点:

  1. 暂停/恢复机制:使用 pauseChanresumeChan 控制生产者的执行流程
  2. 通道清空检查:通过短暂延迟确保消费者处理完当前数据
  3. 安全时机处理:在暂停生产者并确认通道清空后处理切片
  4. 消费者通知:消费者通过 doneChan 通知数据处理完成

对于更精确的通道清空检查,可以使用以下改进方案:

func waitForChannelEmpty(dataChan chan int, timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    defer timer.Stop()
    
    for {
        select {
        case <-timer.C:
            return false
        default:
            if len(dataChan) == 0 {
                // 双重检查
                time.Sleep(10 * time.Millisecond)
                if len(dataChan) == 0 {
                    return true
                }
            }
            time.Sleep(5 * time.Millisecond)
        }
    }
}

// 在 processSlice 中使用:
case <-ticker:
    pauseChan <- struct{}{}
    
    // 等待通道清空
    if waitForChannelEmpty(dataChan, 100*time.Millisecond) {
        mu.Lock()
        // 处理切片...
        mu.Unlock()
    }
    
    resumeChan <- struct{}{}

这个方案提供了生产者-消费者-处理器之间的精确同步,确保在安全时机处理数据切片。

回到顶部