Golang中使用Go协程处理外部事件的实现方法
Golang中使用Go协程处理外部事件的实现方法 我想验证一个包含事件和 goroutine 的模式。问题是——从 go 子例程中调用在主循环中运行的事件是否是一个好方法。考虑以下示例:
https://play.golang.org/p/PpBfXG34GN8
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
type SomeRandomState struct {
GlobalBusObj EventBus.Bus
}
func NewRandomState() *SomeRandomState {
return &SomeRandomState{
GlobalBusObj: EventBus.New(),
}
}
func GlobalEventBusMessageReceived(msg string) {
fmt.Println("Message Received to Global Bus:" + msg)
}
func main() {
done := make(chan bool)
fmt.Println("Hello, playground")
s := NewRandomState()
s.GlobalBusObj.Subscribe("message_to_global_event_bus", GlobalEventBusMessageReceived)
s.GlobalBusObj.Publish("message_to_global_event_bus", "Publishing msg:Hello World") //the
publishing of event works fine
go func() {
fmt.Println("Inside go subrountine")
s.GlobalBusObj.Publish("message_to_global_event_bus", "This is msg from the subroutine")
//is this a good idea to send msg via events from a go subroutine to the main subroutine?
done <- true
}()
<-done
}
更多关于Golang中使用Go协程处理外部事件的实现方法的实战教程也可以访问 https://www.itying.com/category-94-b0.html
3 回复
更多关于Golang中使用Go协程处理外部事件的实现方法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
这对我来说似乎有道理。我认为这是一个有效的用法。
此外,它也无法轻易地被通道(channels)替代,因为同一个事件总线(event bus)可能有多个订阅者。
在Go语言中,从goroutine向主循环发送事件是完全可行的设计模式,特别是使用事件总线时。这种模式可以实现松耦合的并发通信。以下是针对你代码的分析和改进示例:
package main
import (
"fmt"
"sync"
"time"
"github.com/asaskevich/EventBus"
)
type SomeRandomState struct {
GlobalBusObj EventBus.Bus
mu sync.RWMutex
messageCount int
}
func NewRandomState() *SomeRandomState {
return &SomeRandomState{
GlobalBusObj: EventBus.New(),
messageCount: 0,
}
}
func (s *SomeRandomState) incrementCount() {
s.mu.Lock()
defer s.mu.Unlock()
s.messageCount++
}
func (s *SomeRandomState) getCount() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.messageCount
}
func GlobalEventBusMessageReceived(msg string) {
fmt.Printf("Message Received: %s at %v\n", msg, time.Now().Format("15:04:05.000"))
}
func main() {
const workerCount = 3
var wg sync.WaitGroup
s := NewRandomState()
// 订阅事件
s.GlobalBusObj.Subscribe("message_to_global_event_bus", GlobalEventBusMessageReceived)
// 主goroutine先发布一个事件
s.GlobalBusObj.Publish("message_to_global_event_bus", "Main goroutine message")
s.incrementCount()
// 启动多个worker goroutine
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 模拟一些处理
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
// 从goroutine发布事件
msg := fmt.Sprintf("Worker %d message", id)
s.GlobalBusObj.Publish("message_to_global_event_bus", msg)
s.incrementCount()
fmt.Printf("Worker %d published message\n", id)
}(i)
}
// 启动一个监控goroutine
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 5; i++ {
<-ticker.C
s.GlobalBusObj.Publish("message_to_global_event_bus",
fmt.Sprintf("Monitor tick %d", i))
s.incrementCount()
}
}()
// 等待所有goroutine完成
wg.Wait()
// 验证所有消息都已处理
fmt.Printf("\nTotal messages processed: %d\n", s.getCount())
// 演示异步事件处理
asyncDone := make(chan bool, 1)
s.GlobalBusObj.SubscribeAsync("async_event", func(data interface{}) {
msg := data.(string)
fmt.Printf("Async event received: %s\n", msg)
asyncDone <- true
}, false)
go func() {
time.Sleep(50 * time.Millisecond)
s.GlobalBusObj.Publish("async_event", "Async message from goroutine")
}()
<-asyncDone
fmt.Println("All events processed successfully")
}
这个示例展示了几个关键点:
- 并发安全:使用
sync.RWMutex保护共享状态 - 多goroutine通信:多个worker goroutine都可以安全地向事件总线发布消息
- 异步事件处理:EventBus支持异步事件处理模式
- 等待组:使用
sync.WaitGroup正确等待所有goroutine完成
从goroutine调用主循环中的事件是推荐的做法,特别是当:
- 需要解耦事件生产者和消费者
- 有多个goroutine需要通信
- 需要广播消息到多个订阅者
- 希望实现事件驱动的架构
EventBus内部已经处理了并发安全问题,但你的业务状态(如messageCount)需要自己保护。这种模式在微服务、实时系统和高并发应用中很常见。

