golang简单异步消息总线插件库messagebus的使用

Golang简单异步消息总线插件库messagebus的使用

关于message-bus

message-bus是一个Go语言的简单异步消息总线库,由Rafał Lorenz开发。它是一个轻量级的消息总线实现,支持异步消息发布和订阅。

功能特点

  • 简单易用的异步消息总线
  • 高性能(基准测试显示发布操作约250ns/op)
  • 支持基本的发布/订阅模式

安装

go get github.com/vardius/message-bus

基本使用示例

下面是一个完整的使用message-bus的示例代码:

package main

import (
	"fmt"
	"time"

	"github.com/vardius/message-bus"
)

func main() {
	// 创建一个新的消息总线,设置队列大小为100
	bus := messagebus.New(100)

	// 定义一个消息处理函数
	handler := func(msg string) {
		fmt.Printf("Received message: %s\n", msg)
	}

	// 订阅主题"test"
	bus.Subscribe("test", handler)

	// 发布消息到主题"test"
	bus.Publish("test", "Hello, World!")

	// 等待消息处理完成
	time.Sleep(100 * time.Millisecond)
}

发布/订阅模式示例

package main

import (
	"fmt"
	"time"

	"github.com/vardius/message-bus"
)

func main() {
	bus := messagebus.New(100)

	// 订阅者1
	bus.Subscribe("topic", func(msg string) {
		fmt.Printf("Subscriber 1 received: %s\n", msg)
	})

	// 订阅者2
	bus.Subscribe("topic", func(msg string) {
		fmt.Printf("Subscriber 2 received: %s\n", msg)
	})

	// 发布消息
	bus.Publish("topic", "First message")
	bus.Publish("topic", "Second message")

	// 等待消息处理完成
	time.Sleep(100 * time.Millisecond)
}

性能基准

根据基准测试结果:

  • 发布操作(Publish)平均耗时约250纳秒/次,零内存分配
  • 订阅操作(Subscribe)平均耗时约2037纳秒/次,分配735字节内存

许可证

message-bus采用MIT许可证发布。


更多关于golang简单异步消息总线插件库messagebus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang简单异步消息总线插件库messagebus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang异步消息总线库MessageBus使用指南

MessageBus是一个轻量级的Go语言异步消息总线库,它提供了一种简单的方式来实现发布/订阅模式。下面我将详细介绍它的使用方法。

安装

首先安装MessageBus库:

go get github.com/vardius/message-bus

基本使用

1. 创建消息总线

package main

import (
	"fmt"
	"github.com/vardius/message-bus"
)

func main() {
	// 创建一个消息总线,参数表示最大并发goroutine数
	bus := messagebus.New(10)
	
	// 发布订阅消息
	bus.Publish("topic", "Hello, World!")
	
	// 订阅主题
	bus.Subscribe("topic", func(msg string) {
		fmt.Println("Received:", msg)
	})
	
	// 再次发布消息,这次订阅者会收到
	bus.Publish("topic", "Hello again!")
}

2. 多订阅者示例

func main() {
	bus := messagebus.New(10)
	
	// 订阅者1
	bus.Subscribe("news", func(title string) {
		fmt.Println("Subscriber 1:", title)
	})
	
	// 订阅者2
	bus.Subscribe("news", func(title string) {
		fmt.Println("Subscriber 2:", title)
	})
	
	// 发布消息
	bus.Publish("news", "Breaking News: Go 1.20 released!")
	
	// 等待消息处理完成
	time.Sleep(time.Second)
}

3. 取消订阅

func main() {
	bus := messagebus.New(10)
	
	// 定义一个处理函数
	handler := func(msg string) {
		fmt.Println("Received:", msg)
	}
	
	// 订阅
	bus.Subscribe("topic", handler)
	
	// 发布消息
	bus.Publish("topic", "First message") // 会收到
	
	// 取消订阅
	bus.Unsubscribe("topic", handler)
	
	// 再次发布消息
	bus.Publish("topic", "Second message") // 不会收到
}

高级用法

1. 多参数消息

func main() {
	bus := messagebus.New(10)
	
	// 订阅多参数消息
	bus.Subscribe("user", func(id int, name string, active bool) {
		fmt.Printf("User: ID=%d, Name=%s, Active=%t\n", id, name, active)
	})
	
	// 发布多参数消息
	bus.Publish("user", 1, "Alice", true)
}

2. 错误处理

func main() {
	bus := messagebus.New(10)
	
	bus.Subscribe("data", func(data string) error {
		if data == "error" {
			return fmt.Errorf("invalid data")
		}
		fmt.Println("Processed:", data)
		return nil
	})
	
	// 发布会导致错误的消
	bus.Publish("data", "error")
	
	// 发布正常消息
	bus.Publish("data", "valid data")
}

3. 并发控制

func main() {
	// 限制最大并发goroutine数为2
	bus := messagebus.New(2)
	
	for i := 0; i < 5; i++ {
		id := i
		bus.Subscribe("task", func() {
			fmt.Printf("Task %d started\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Task %d finished\n", id)
		})
	}
	
	// 发布任务
	for i := 0; i < 5; i++ {
		bus.Publish("task")
	}
	
	// 等待所有任务完成
	time.Sleep(3 * time.Second)
}

实际应用示例

下面是一个更完整的示例,模拟用户注册后发送欢迎邮件和记录日志:

package main

import (
	"fmt"
	"log"
	"time"
	
	"github.com/vardius/message-bus"
)

type User struct {
	ID    int
	Name  string
	Email string
}

func main() {
	bus := messagebus.New(5)
	
	// 订阅发送欢迎邮件
	bus.Subscribe("user.registered", func(u User) {
		// 模拟发送邮件耗时
		time.Sleep(500 * time.Millisecond)
		fmt.Printf("Sending welcome email to %s <%s>\n", u.Name, u.Email)
	})
	
	// 订阅记录日志
	bus.Subscribe("user.registered", func(u User) {
		log.Printf("New user registered: ID=%d, Name=%s", u.ID, u.Name)
	})
	
	// 模拟用户注册
	registerUser := func(id int, name, email string) {
		user := User{ID: id, Name: name, Email: email}
		fmt.Printf("Registering user %s...\n", name)
		
		// 发布用户注册事件
		bus.Publish("user.registered", user)
		
		fmt.Printf("User %s registered\n", name)
	}
	
	// 注册几个用户
	registerUser(1, "Alice", "alice@example.com")
	registerUser(2, "Bob", "bob@example.com")
	
	// 等待异步任务完成
	time.Sleep(time.Second)
}

注意事项

  1. MessageBus是异步的,发布消息后不会等待订阅者处理完成
  2. 订阅者的处理函数必须与发布的消息参数类型和数量完全匹配
  3. 如果处理函数返回错误,MessageBus不会自动处理这些错误
  4. 在高并发场景下,合理设置最大goroutine数可以避免资源耗尽

MessageBus是一个简单但功能强大的库,适合需要轻量级消息总线的场景。对于更复杂的企业级需求,可能需要考虑更完整的消息队列系统如NATS、RabbitMQ等。

回到顶部