golang轻量级事件总线实现插件库go-eventbus的使用

Golang轻量级事件总线实现插件库go-eventbus的使用

简介

这是一个简单的Golang事件总线实现,支持以下模式:

  • 发布/订阅模式
  • 请求/响应模式

快速开始

在你的项目中使用go-eventbus,可以运行以下命令:

go get github.com/stanipetrosyan/go-eventbus

然后导入:

import (
	goeventbus "github.com/stanipetrosyan/go-eventbus"
)

发布/订阅模式

这是一个简单的发布/订阅模式示例:

// 创建事件总线实例
eventbus := goeventbus.NewEventBus()

// 定义主题地址
address := "topic"

// 创建消息头选项
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()

// 创建消息
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").SetHeaders(options).Build()

// 订阅主题并设置处理函数
eventbus.Channel(address).Subscriber().Listen(func(dc goeventbus.Context) {
	fmt.Printf("Message %s\n", dc.Result().Data)
})

// 发布消息
eventbus.Channel(address).Publisher().Publish(message)

请求/响应模式

这是一个简单的请求/响应模式示例:

// 创建事件总线实例
eventbus := goeventbus.NewEventBus()

// 定义主题地址
address := "topic"

// 创建消息
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").Build()

// 订阅主题并设置处理函数
eventbus.Channel(address).Subscriber().Listen(func(context goeventbus.Context) {
	fmt.Printf("Message %s\n", context.Result().Extract())
	context.Reply("Hello from subscriber") // 响应消息
})

// 发送请求并设置响应处理函数
eventbus.Channel(address).Publisher().Request(message, func(context goeventbus.Context) {
	fmt.Printf("Message %s\n", context.Result().Extract()) // 处理响应
})

消息构造

要发布消息,你需要使用MessageBuilder创建消息对象:

// 创建消息头选项
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()

// 创建消息
message := goeventbus.NewMessageBuilder().SetPayload("Hi Topic").SetHeaders(options).Build()

// 发布消息
eventBus.Channel("address").Publisher().Publish(message)

处理器(Processor)

处理器类似于中间件,只有在调用context.Next()方法时才会转发消息。处理器会拦截从发布者到订阅者的消息。

eventbus.Channel("topic1").Processor().Listen(func(context goeventbus.Context) {
	// 检查消息头中是否包含特定header
	if context.Result().ExtractHeaders().Contains("header") {
		context.Next() // 只有满足条件才转发消息
	}
})

在处理器中,可以使用Map方法修改要转发的消息:

eventbus.Channel("topic1").Processor().Listen(func(context goeventbus.Context) {
	if context.Result().ExtractHeaders().Contains("header") {
		// 创建新消息
		newMessage := NewMessageBuilder().SetPayload("new message").Build()
		
		// 修改消息并转发
		context.Map(newMessage).Next()
	}
})

网络总线(Network Bus)

网络总线可以在不同服务之间建立TCP连接。NetworkBus是本地事件总线的包装器。

一个简单的服务器/客户端示例可以在examples/networkbus目录中找到。


更多关于golang轻量级事件总线实现插件库go-eventbus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级事件总线实现插件库go-eventbus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Go-EventBus: 轻量级事件总线实现

Go-EventBus 是一个轻量级的 Golang 事件总线库,它实现了发布-订阅模式,非常适合在应用程序中实现组件间的松耦合通信。下面我将详细介绍它的使用方法和示例代码。

基本概念

Go-EventBus 的核心概念包括:

  • 事件总线(EventBus): 负责管理订阅者和事件的中心枢纽
  • 主题(Topic): 事件的分类标识
  • 订阅者(Subscriber): 监听特定主题事件的函数
  • 发布者(Publisher): 向特定主题发布事件的组件

安装

go get github.com/asaskevich/EventBus

基本使用示例

package main

import (
	"fmt"
	"github.com/asaskevich/EventBus"
)

func main() {
	// 创建事件总线实例
	bus := EventBus.New()

	// 定义事件处理函数
	handler := func(topic string, data interface{}) {
		fmt.Printf("收到事件: 主题=%s, 数据=%v\n", topic, data)
	}

	// 订阅主题
	if err := bus.Subscribe("user:created", handler); err != nil {
		fmt.Println("订阅失败:", err)
		return
	}

	// 发布事件
	bus.Publish("user:created", map[string]interface{}{
		"id":    123,
		"name":  "张三",
		"email": "zhangsan@example.com",
	})

	// 取消订阅
	bus.Unsubscribe("user:created", handler)
}

高级特性

1. 异步事件处理

func main() {
	bus := EventBus.New()

	// 异步订阅
	bus.SubscribeAsync("order:completed", func(topic string, data interface{}) {
		order := data.(map[string]interface{})
		fmt.Printf("处理订单 #%d (异步)\n", order["id"].(int))
		time.Sleep(1 * time.Second)
	}, false) // false表示不保证顺序

	// 发布多个事件
	for i := 1; i <= 5; i++ {
		bus.Publish("order:completed", map[string]interface{}{
			"id":    i,
			"total": 100 * i,
		})
	}

	// 等待所有异步处理完成
	bus.WaitAsync()
	fmt.Println("所有订单处理完成")
}

2. 错误处理

func main() {
	bus := EventBus.New()

	// 带有错误处理的事件订阅
	bus.Subscribe("payment:failed", func(topic string, data interface{}) {
		payment := data.(map[string]interface{})
		if payment["amount"].(float64) > 1000 {
			panic("大额支付失败!")
		}
		fmt.Printf("处理失败支付: %v\n", payment)
	})

	// 设置全局错误处理器
	bus.SetErrorHandler(func(err error) {
		fmt.Printf("事件处理出错: %v\n", err)
	})

	// 触发会panic的事件
	bus.Publish("payment:failed", map[string]interface{}{
		"id":     "txn123",
		"amount": 1500.00,
	})
}

3. 通配符订阅

func main() {
	bus := EventBus.New()

	// 订阅所有用户相关事件
	bus.Subscribe("user.*", func(topic string, data interface{}) {
		fmt.Printf("用户事件[%s]: %v\n", topic, data)
	})

	// 发布不同类型的事件
	bus.Publish("user.created", map[string]interface{}{"id": 1, "name": "Alice"})
	bus.Publish("user.updated", map[string]interface{}{"id": 1, "name": "Alice Smith"})
	bus.Publish("user.deleted", map[string]interface{}{"id": 1})
}

实际应用场景示例

微服务间通信

package main

import (
	"fmt"
	"github.com/asaskevich/EventBus"
	"time"
)

type User struct {
	ID    int
	Name  string
	Email string
}

func main() {
	bus := EventBus.New()

	// 用户服务
	bus.Subscribe("user.create", func(topic string, data interface{}) {
		user := data.(User)
		fmt.Printf("创建用户: %+v\n", user)
		// 模拟处理延迟
		time.Sleep(500 * time.Millisecond)
		// 发布用户创建完成事件
		bus.Publish("user.created", user)
	})

	// 邮件服务
	bus.Subscribe("user.created", func(topic string, data interface{}) {
		user := data.(User)
		fmt.Printf("发送欢迎邮件给: %s <%s>\n", user.Name, user.Email)
	})

	// 日志服务
	bus.Subscribe("user.*", func(topic string, data interface{}) {
		user := data.(User)
		fmt.Printf("[日志] 事件: %s, 用户ID: %d\n", topic, user.ID)
	})

	// 创建新用户
	newUser := User{
		ID:    1,
		Name:  "李四",
		Email: "lisi@example.com",
	}
	bus.Publish("user.create", newUser)

	// 等待异步处理完成
	time.Sleep(1 * time.Second)
}

性能考虑

Go-EventBus 是非常轻量级的,但在高并发场景下仍需注意:

  1. 对于高频事件,考虑使用异步处理
  2. 避免在事件处理函数中执行耗时操作
  3. 对于需要顺序处理的事件,设置 SubscribeAsync 的第三个参数为 true

总结

Go-EventBus 提供了简单而强大的事件总线功能,适合以下场景:

  • 微服务内部组件通信
  • 插件系统的事件通知
  • 需要解耦的模块间通信
  • 需要广播通知的多接收者场景

它的轻量级特性使得它非常适合嵌入到各种规模的 Go 应用程序中,而不会带来显著的性能开销。

回到顶部