golang轻量级内部通信消息总线插件库Bus的使用

Golang轻量级内部通信消息总线插件库Bus的使用

Bus是一个用于内部通信的极简事件/消息总线实现,灵感来源于Elixir语言的event_bus包。

安装

通过go包安装:

go get github.com/mustafaturan/bus/v3

完整示例

下面是一个完整的Bus使用示例:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/mustafaturan/bus/v3"
	"github.com/mustafaturan/monoton/v2"
	"github.com/mustafaturan/monoton/v2/sequencer"
)

func main() {
	// 初始化Bus
	b := NewBus()
	
	// 注册订单处理handler
	handler := bus.Handler{
		Handle: func(ctx context.Context, e bus.Event) {
			// 异步处理订单事件
			go func() {
				fmt.Printf("处理订单事件: %+v\n", e)
				// 这里可以添加具体的业务逻辑
			}()
		},
		Matcher: "order.*", // 匹配所有order开头的topic
	}
	b.RegisterHandler("order-handler", handler)

	// 模拟订单事件
	ctx := context.Background()
	ctx = context.WithValue(ctx, bus.CtxKeyTxID, "tx-123456")
	ctx = context.WithValue(ctx, bus.CtxKeySource, "order-service")

	order := map[string]string{
		"orderID":     "123456",
		"orderAmount": "112.20",
		"currency":    "USD",
	}

	// 发送订单接收事件
	err := b.Emit(ctx, "order.received", order)
	if err != nil {
		fmt.Println("发送事件失败:", err)
	}

	// 等待handler处理
	time.Sleep(100 * time.Millisecond)
}

func NewBus() *bus.Bus {
	// 配置ID生成器
	node := uint64(1)
	initialTime := uint64(1577865600000) // 设置2020-01-01 PST为初始时间
	m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
	if err != nil {
		panic(err)
	}

	// 初始化ID生成器
	var idGenerator bus.Next = m.Next

	// 创建新的Bus实例
	b, err := bus.NewBus(idGenerator)
	if err != nil {
		panic(err)
	}

	// 注册topic
	b.RegisterTopics("order.received", "order.fulfilled")

	return b
}

核心功能

1. 配置Bus

Bus需要一个唯一的ID生成器来为事件分配ID:

import (
    "github.com/mustafaturan/bus/v3"
    "github.com/mustafaturan/monoton/v2"
    "github.com/mustafaturan/monoton/v2/sequencer"
)

func NewBus() *bus.Bus {
    // 配置ID生成器
    node := uint64(1)
    initialTime := uint64(1577865600000) // 设置2020-01-01 PST为初始时间
    m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
    if err != nil {
        panic(err)
    }

    // 初始化ID生成器
    var idGenerator bus.Next = m.Next

    // 创建新的Bus实例
    b, err := bus.NewBus(idGenerator)
    if err != nil {
        panic(err)
    }

    // 注册topic
    b.RegisterTopics("order.received", "order.fulfilled")

    return b
}

2. 注册事件Topic

在发送事件到topic之前,需要先注册topic名称:

// 注册topic
b.RegisterTopics("order.received", "order.fulfilled")

3. 注册事件处理器

要接收topic事件,需要注册处理器:

handler := bus.Handler{
    Handle: func(ctx context.Context, e bus.Event) {
        // 处理事件
        // 注意:强烈建议以异步方式处理事件
    },
    Matcher: ".*", // 匹配所有topic
}
b.RegisterHandler("handler的唯一key", handler)

4. 发送事件

发送事件的基本方式:

// 如果txID为空,bus会使用ID生成器生成一个
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "some-transaction-id-if-exists")
// 可选来源
ctx = context.WithValue(ctx, bus.CtxKeySource, "source-of-the-event")

// 事件topic名称(必须提前注册)
topic := "order.received"

// 事件数据
order := make(map[string]string)
order["orderID"] = "123456"
order["orderAmount"] = "112.20"
order["currency"] = "USD"

// 发送事件
err := b.Emit(ctx, topic, order)

if err != nil {
    // 处理错误
    fmt.Println(err)
}

// 使用选项发送事件
err := b.EmitWithOpts(ctx, topic, order, bus.WithTxID("some-tx-id"))

if err != nil {
    // 处理错误
    fmt.Println(err)
}

5. 事件结构

处理器接收的事件结构如下:

// 事件数据结构
type Event struct {
    ID         string      // 标识符
    TxID       string      // 事务标识符
    Topic      string      // topic名称
    Source     string      // 事件来源
    OccurredAt time.Time   // 创建时间(纳秒)
    Data       interface{} // 实际事件数据
}

性能基准

基准测试结果:

goos: darwin
goarch: amd64
pkg: github.com/mustafaturan/bus/v3
cpu: Intel(R) Core(TM) i5-6267U CPU @ 2.90GHz
BenchmarkEmit-4                      10000000       180.5 ns/op       8 B/op       0 allocs/op
BenchmarkEmitWithoutTxID-4           10000000       244.6 ns/op      72 B/op       2 allocs/op
BenchmarkEmitWithOpts-4              10000000       280.8 ns/op     112 B/op       4 allocs/op
BenchmarkEmitWithOptsUnspecified-4   10000000       169.4 ns/op       8 B/op       0 allocs/op
PASS
ok      github.com/mustafaturan/bus/v3       8.884s

许可证

Apache License 2.0


更多关于golang轻量级内部通信消息总线插件库Bus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级内部通信消息总线插件库Bus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang轻量级内部通信消息总线插件库Bus使用指南

Bus是一个轻量级的Go语言内部通信消息总线库,它提供了一种简单的方式来实现发布/订阅模式的消息通信。下面我将详细介绍如何使用这个库。

安装

首先安装Bus库:

go get github.com/mustafaturan/bus

基本使用

1. 初始化Bus

package main

import (
	"fmt"
	"github.com/mustafaturan/bus"
	"github.com/mustafaturan/bus/topic"
)

func main() {
	// 创建一个新的Bus实例
	b, err := bus.NewBus()
	if err != nil {
		panic(err)
	}

	// 注册一个主题
	userCreatedTopic := topic.Topic{
		Name:        "user.created",
		Description: "Triggered when a new user is created",
		Source:      "/users",
	}
	err = b.RegisterTopics(userCreatedTopic)
	if err != nil {
		panic(err)
	}
}

2. 订阅消息

// 定义一个处理器函数
func userCreatedHandler(topic string, data interface{}) {
	fmt.Printf("Received message on topic %s: %+v\n", topic, data)
}

// 注册处理器
handlerID, err := b.RegisterHandler("user.created", "user.created.handler", userCreatedHandler)
if err != nil {
	panic(err)
}

// 也可以使用闭包
handlerID2, err := b.RegisterHandler("user.created", "user.created.handler2", func(topic string, data interface{}) {
	user := data.(map[string]interface{})
	fmt.Printf("User %s created!\n", user["name"])
})

3. 发布消息

// 发布消息到主题
err = b.Emit("user.created", map[string]interface{}{
	"id":   "123",
	"name": "John Doe",
	"email": "john@example.com",
})
if err != nil {
	fmt.Printf("Failed to emit message: %v\n", err)
}

高级功能

1. 取消订阅

// 取消之前注册的处理器
err = b.DeregisterHandler(handlerID)
if err != nil {
	fmt.Printf("Failed to deregister handler: %v\n", err)
}

2. 通配符订阅

Bus支持通配符订阅,可以监听多个相关主题:

// 订阅所有user相关事件
handlerID, err := b.RegisterHandler("user.*", "user.all.handler", func(topic string, data interface{}) {
	fmt.Printf("User event %s occurred: %+v\n", topic, data)
})

3. 错误处理

// 带错误处理的处理器
errorHandler := func(topic string, data interface{}) error {
	user, ok := data.(map[string]interface{})
	if !ok {
		return fmt.Errorf("invalid user data format")
	}
	
	if user["name"] == "" {
		return fmt.Errorf("user name cannot be empty")
	}
	
	fmt.Printf("Processing user: %s\n", user["name"])
	return nil
}

// 注册带错误处理的处理器
handlerID, err := b.RegisterHandler("user.created", "user.error.handler", errorHandler)

实际应用示例

下面是一个完整的示例,展示如何在微服务中使用Bus进行内部通信:

package main

import (
	"fmt"
	"time"
	
	"github.com/mustafaturan/bus"
	"github.com/mustafaturan/bus/topic"
)

type User struct {
	ID    string
	Name  string
	Email string
}

func main() {
	// 初始化Bus
	b, err := bus.NewBus()
	if err != nil {
		panic(err)
	}

	// 注册主题
	topics := []topic.Topic{
		{
			Name:        "user.created",
			Description: "User creation event",
			Source:      "/users",
		},
		{
			Name:        "user.deleted",
			Description: "User deletion event",
			Source:      "/users",
		},
	}
	err = b.RegisterTopics(topics...)
	if err != nil {
		panic(err)
	}

	// 注册用户创建处理器
	_, err = b.RegisterHandler("user.created", "email.service", func(topic string, data interface{}) {
		user := data.(User)
		fmt.Printf("[Email Service] Sending welcome email to %s (%s)\n", user.Name, user.Email)
	})
	if err != nil {
		panic(err)
	}

	// 注册用户删除处理器
	_, err = b.RegisterHandler("user.deleted", "audit.service", func(topic string, data interface{}) {
		user := data.(User)
		fmt.Printf("[Audit Service] Recording deletion of user %s\n", user.ID)
	})
	if err != nil {
		panic(err)
	}

	// 模拟用户服务
	go func() {
		for i := 1; i <= 5; i++ {
			user := User{
				ID:    fmt.Sprintf("user-%d", i),
				Name:  fmt.Sprintf("User %d", i),
				Email: fmt.Sprintf("user%d@example.com", i),
			}
			
			// 发布用户创建事件
			err = b.Emit("user.created", user)
			if err != nil {
				fmt.Printf("Error emitting user.created: %v\n", err)
			}
			
			time.Sleep(1 * time.Second)
		}
	}()

	// 模拟删除用户
	time.Sleep(3 * time.Second)
	userToDelete := User{ID: "user-2", Name: "User 2"}
	err = b.Emit("user.deleted", userToDelete)
	if err != nil {
		fmt.Printf("Error emitting user.deleted: %v\n", err)
	}

	// 保持程序运行
	time.Sleep(5 * time.Second)
}

性能考虑

Bus库设计为轻量级,但在高并发场景下仍需注意:

  1. 避免在处理器中执行耗时操作
  2. 考虑使用缓冲通道处理高吞吐量消息
  3. 对于关键业务,实现重试机制

Bus库非常适合中小型应用的内部通信需求,但对于大规模分布式系统,可能需要考虑更专业的消息队列系统如RabbitMQ或Kafka。

希望这个指南能帮助你快速上手使用Bus库进行Go应用的内部通信!

回到顶部