golang简单异步消息总线插件库messagebus的使用
Golang简单异步消息总线插件库messagebus的使用
关于message-bus
message-bus是一个Go语言的简单异步消息总线库,由Rafał Lorenz开发。它是一个轻量级的消息总线实现,支持异步消息发布和订阅。
功能特点
- 简单易用的异步消息总线
- 高性能(基准测试显示发布操作约250ns/op)
- 支持基本的发布/订阅模式
安装
go get github.com/vardius/message-bus
基本使用示例
下面是一个完整的使用message-bus的示例代码:
package main
import (
"fmt"
"time"
"github.com/vardius/message-bus"
)
func main() {
// 创建一个新的消息总线,设置队列大小为100
bus := messagebus.New(100)
// 定义一个消息处理函数
handler := func(msg string) {
fmt.Printf("Received message: %s\n", msg)
}
// 订阅主题"test"
bus.Subscribe("test", handler)
// 发布消息到主题"test"
bus.Publish("test", "Hello, World!")
// 等待消息处理完成
time.Sleep(100 * time.Millisecond)
}
发布/订阅模式示例
package main
import (
"fmt"
"time"
"github.com/vardius/message-bus"
)
func main() {
bus := messagebus.New(100)
// 订阅者1
bus.Subscribe("topic", func(msg string) {
fmt.Printf("Subscriber 1 received: %s\n", msg)
})
// 订阅者2
bus.Subscribe("topic", func(msg string) {
fmt.Printf("Subscriber 2 received: %s\n", msg)
})
// 发布消息
bus.Publish("topic", "First message")
bus.Publish("topic", "Second message")
// 等待消息处理完成
time.Sleep(100 * time.Millisecond)
}
性能基准
根据基准测试结果:
- 发布操作(Publish)平均耗时约250纳秒/次,零内存分配
- 订阅操作(Subscribe)平均耗时约2037纳秒/次,分配735字节内存
许可证
message-bus采用MIT许可证发布。
更多关于golang简单异步消息总线插件库messagebus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang简单异步消息总线插件库messagebus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang异步消息总线库MessageBus使用指南
MessageBus是一个轻量级的Go语言异步消息总线库,它提供了一种简单的方式来实现发布/订阅模式。下面我将详细介绍它的使用方法。
安装
首先安装MessageBus库:
go get github.com/vardius/message-bus
基本使用
1. 创建消息总线
package main
import (
"fmt"
"github.com/vardius/message-bus"
)
func main() {
// 创建一个消息总线,参数表示最大并发goroutine数
bus := messagebus.New(10)
// 发布订阅消息
bus.Publish("topic", "Hello, World!")
// 订阅主题
bus.Subscribe("topic", func(msg string) {
fmt.Println("Received:", msg)
})
// 再次发布消息,这次订阅者会收到
bus.Publish("topic", "Hello again!")
}
2. 多订阅者示例
func main() {
bus := messagebus.New(10)
// 订阅者1
bus.Subscribe("news", func(title string) {
fmt.Println("Subscriber 1:", title)
})
// 订阅者2
bus.Subscribe("news", func(title string) {
fmt.Println("Subscriber 2:", title)
})
// 发布消息
bus.Publish("news", "Breaking News: Go 1.20 released!")
// 等待消息处理完成
time.Sleep(time.Second)
}
3. 取消订阅
func main() {
bus := messagebus.New(10)
// 定义一个处理函数
handler := func(msg string) {
fmt.Println("Received:", msg)
}
// 订阅
bus.Subscribe("topic", handler)
// 发布消息
bus.Publish("topic", "First message") // 会收到
// 取消订阅
bus.Unsubscribe("topic", handler)
// 再次发布消息
bus.Publish("topic", "Second message") // 不会收到
}
高级用法
1. 多参数消息
func main() {
bus := messagebus.New(10)
// 订阅多参数消息
bus.Subscribe("user", func(id int, name string, active bool) {
fmt.Printf("User: ID=%d, Name=%s, Active=%t\n", id, name, active)
})
// 发布多参数消息
bus.Publish("user", 1, "Alice", true)
}
2. 错误处理
func main() {
bus := messagebus.New(10)
bus.Subscribe("data", func(data string) error {
if data == "error" {
return fmt.Errorf("invalid data")
}
fmt.Println("Processed:", data)
return nil
})
// 发布会导致错误的消
bus.Publish("data", "error")
// 发布正常消息
bus.Publish("data", "valid data")
}
3. 并发控制
func main() {
// 限制最大并发goroutine数为2
bus := messagebus.New(2)
for i := 0; i < 5; i++ {
id := i
bus.Subscribe("task", func() {
fmt.Printf("Task %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d finished\n", id)
})
}
// 发布任务
for i := 0; i < 5; i++ {
bus.Publish("task")
}
// 等待所有任务完成
time.Sleep(3 * time.Second)
}
实际应用示例
下面是一个更完整的示例,模拟用户注册后发送欢迎邮件和记录日志:
package main
import (
"fmt"
"log"
"time"
"github.com/vardius/message-bus"
)
type User struct {
ID int
Name string
Email string
}
func main() {
bus := messagebus.New(5)
// 订阅发送欢迎邮件
bus.Subscribe("user.registered", func(u User) {
// 模拟发送邮件耗时
time.Sleep(500 * time.Millisecond)
fmt.Printf("Sending welcome email to %s <%s>\n", u.Name, u.Email)
})
// 订阅记录日志
bus.Subscribe("user.registered", func(u User) {
log.Printf("New user registered: ID=%d, Name=%s", u.ID, u.Name)
})
// 模拟用户注册
registerUser := func(id int, name, email string) {
user := User{ID: id, Name: name, Email: email}
fmt.Printf("Registering user %s...\n", name)
// 发布用户注册事件
bus.Publish("user.registered", user)
fmt.Printf("User %s registered\n", name)
}
// 注册几个用户
registerUser(1, "Alice", "alice@example.com")
registerUser(2, "Bob", "bob@example.com")
// 等待异步任务完成
time.Sleep(time.Second)
}
注意事项
- MessageBus是异步的,发布消息后不会等待订阅者处理完成
- 订阅者的处理函数必须与发布的消息参数类型和数量完全匹配
- 如果处理函数返回错误,MessageBus不会自动处理这些错误
- 在高并发场景下,合理设置最大goroutine数可以避免资源耗尽
MessageBus是一个简单但功能强大的库,适合需要轻量级消息总线的场景。对于更复杂的企业级需求,可能需要考虑更完整的消息队列系统如NATS、RabbitMQ等。