Golang中使用Go协程处理外部事件的实现方法

Golang中使用Go协程处理外部事件的实现方法 我想验证一个包含事件和 goroutine 的模式。问题是——从 go 子例程中调用在主循环中运行的事件是否是一个好方法。考虑以下示例:

https://play.golang.org/p/PpBfXG34GN8

package main

import (
"fmt"

"github.com/asaskevich/EventBus"
)

type SomeRandomState struct {
GlobalBusObj EventBus.Bus
}

func NewRandomState() *SomeRandomState {
return &SomeRandomState{
	GlobalBusObj: EventBus.New(),
}

}

func GlobalEventBusMessageReceived(msg string) {
fmt.Println("Message Received to Global Bus:" + msg)
}
func main() {
done := make(chan bool)

fmt.Println("Hello, playground")

s := NewRandomState()
s.GlobalBusObj.Subscribe("message_to_global_event_bus", GlobalEventBusMessageReceived)
s.GlobalBusObj.Publish("message_to_global_event_bus", "Publishing msg:Hello World") //the 
publishing of event works fine

go func() {
	fmt.Println("Inside go subrountine")
	s.GlobalBusObj.Publish("message_to_global_event_bus", "This is msg from the subroutine") 
//is this a good idea to send msg via events from a go subroutine to the main subroutine?

	done <- true
}()

<-done

}

更多关于Golang中使用Go协程处理外部事件的实现方法的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

更多关于Golang中使用Go协程处理外部事件的实现方法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这对我来说似乎有道理。我认为这是一个有效的用法。

此外,它也无法轻易地被通道(channels)替代,因为同一个事件总线(event bus)可能有多个订阅者。

在Go语言中,从goroutine向主循环发送事件是完全可行的设计模式,特别是使用事件总线时。这种模式可以实现松耦合的并发通信。以下是针对你代码的分析和改进示例:

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/asaskevich/EventBus"
)

type SomeRandomState struct {
	GlobalBusObj EventBus.Bus
	mu           sync.RWMutex
	messageCount int
}

func NewRandomState() *SomeRandomState {
	return &SomeRandomState{
		GlobalBusObj: EventBus.New(),
		messageCount: 0,
	}
}

func (s *SomeRandomState) incrementCount() {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.messageCount++
}

func (s *SomeRandomState) getCount() int {
	s.mu.RLock()
	defer s.mu.RUnlock()
	return s.messageCount
}

func GlobalEventBusMessageReceived(msg string) {
	fmt.Printf("Message Received: %s at %v\n", msg, time.Now().Format("15:04:05.000"))
}

func main() {
	const workerCount = 3
	var wg sync.WaitGroup
	
	s := NewRandomState()
	
	// 订阅事件
	s.GlobalBusObj.Subscribe("message_to_global_event_bus", GlobalEventBusMessageReceived)
	
	// 主goroutine先发布一个事件
	s.GlobalBusObj.Publish("message_to_global_event_bus", "Main goroutine message")
	s.incrementCount()
	
	// 启动多个worker goroutine
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			// 模拟一些处理
			time.Sleep(time.Duration(id) * 100 * time.Millisecond)
			
			// 从goroutine发布事件
			msg := fmt.Sprintf("Worker %d message", id)
			s.GlobalBusObj.Publish("message_to_global_event_bus", msg)
			s.incrementCount()
			
			fmt.Printf("Worker %d published message\n", id)
		}(i)
	}
	
	// 启动一个监控goroutine
	wg.Add(1)
	go func() {
		defer wg.Done()
		ticker := time.NewTicker(200 * time.Millisecond)
		defer ticker.Stop()
		
		for i := 0; i < 5; i++ {
			<-ticker.C
			s.GlobalBusObj.Publish("message_to_global_event_bus", 
				fmt.Sprintf("Monitor tick %d", i))
			s.incrementCount()
		}
	}()
	
	// 等待所有goroutine完成
	wg.Wait()
	
	// 验证所有消息都已处理
	fmt.Printf("\nTotal messages processed: %d\n", s.getCount())
	
	// 演示异步事件处理
	asyncDone := make(chan bool, 1)
	
	s.GlobalBusObj.SubscribeAsync("async_event", func(data interface{}) {
		msg := data.(string)
		fmt.Printf("Async event received: %s\n", msg)
		asyncDone <- true
	}, false)
	
	go func() {
		time.Sleep(50 * time.Millisecond)
		s.GlobalBusObj.Publish("async_event", "Async message from goroutine")
	}()
	
	<-asyncDone
	fmt.Println("All events processed successfully")
}

这个示例展示了几个关键点:

  1. 并发安全:使用sync.RWMutex保护共享状态
  2. 多goroutine通信:多个worker goroutine都可以安全地向事件总线发布消息
  3. 异步事件处理:EventBus支持异步事件处理模式
  4. 等待组:使用sync.WaitGroup正确等待所有goroutine完成

从goroutine调用主循环中的事件是推荐的做法,特别是当:

  • 需要解耦事件生产者和消费者
  • 有多个goroutine需要通信
  • 需要广播消息到多个订阅者
  • 希望实现事件驱动的架构

EventBus内部已经处理了并发安全问题,但你的业务状态(如messageCount)需要自己保护。这种模式在微服务、实时系统和高并发应用中很常见。

回到顶部