golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用

Golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用

Event Bus NSQ

  • 一个围绕go-nsq主题和通道的轻量级封装
  • 使用gobreaker保护NSQ调用

安装

go get -u github.com/rafaeljesus/nsq-event-bus

使用

nsq-event-bus包提供了发送和监听事件的接口。

发送器(Emitter)

import "github.com/rafaeljesus/nsq-event-bus"

topic := "events"
emitter, err := bus.NewEmitter(bus.EmitterConfig{
  Address: "localhost:4150",
  MaxInFlight: 25,
})

e := event{}
if err = emitter.Emit(topic, &e); err != nil {
  // 处理发送消息失败的情况
}

// 异步发送消息
if err = emitter.EmitAsync(topic, &e); err != nil {
  // 处理发送消息失败的情况
}

监听器(Listener)

import "github.com/rafaeljesus/nsq-event-bus"

if err = bus.On(bus.ListenerConfig{
  Topic:              "topic",
  Channel:            "test_on",
  HandlerFunc:        handler,
  HandlerConcurrency: 4,
}); err != nil {
  // 处理监听消息失败的情况
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  if message.Attempts > MAX_DELIVERY_ATTEMPTS {
    message.Finish()
    return
  }

  err, _ = doWork(&e)
  if err != nil {
    message.Requeue(BACKOFF_TIME)
    return
  }

  message.Finish()
  return
}

请求(Request/Reply)

import "github.com/rafaeljesus/nsq-event-bus"

topic := "user_signup"
emitter, err = bus.NewEmitter(bus.EmitterConfig{})

e := event{Login: "rafa", Password: "ilhabela_is_the_place"}
if err = bus.Request(topic, &e, handler); err != nil {
  // 处理监听消息失败的情况
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  reply = &Reply{}
  message.Finish()
  return
}

完整示例Demo

生产者示例

package main

import (
	"github.com/rafaeljesus/nsq-event-bus"
	"log"
)

type UserEvent struct {
	ID    int    `json:"id"`
	Name  string `json:"name"`
	Email string `json:"email"`
}

func main() {
	// 创建发送器
	emitter, err := bus.NewEmitter(bus.EmitterConfig{
		Address:    "localhost:4150",
		MaxInFlight: 10,
	})
	if err != nil {
		log.Fatal(err)
	}

	// 创建事件
	user := UserEvent{
		ID:    1,
		Name:  "John Doe",
		Email: "john@example.com",
	}

	// 发送事件
	topic := "user_created"
	if err := emitter.Emit(topic, &user); err != nil {
		log.Printf("Failed to emit message: %v", err)
	}

	log.Printf("Event emitted to topic %s", topic)
}

消费者示例

package main

import (
	"github.com/rafaeljesus/nsq-event-bus"
	"log"
)

type UserEvent struct {
	ID    int    `json:"id"`
	Name  string `json:"name"`
	Email string `json:"email"`
}

func main() {
	// 监听事件
	err := bus.On(bus.ListenerConfig{
		Topic:              "user_created",
		Channel:            "user_service",
		HandlerFunc:        handleUserCreated,
		HandlerConcurrency: 5,
	})
	if err != nil {
		log.Fatal(err)
	}

	// 保持程序运行
	select {}
}

func handleUserCreated(message *bus.Message) (reply interface{}, err error) {
	var user UserEvent
	if err := message.DecodePayload(&user); err != nil {
		log.Printf("Failed to decode message: %v", err)
		message.Finish()
		return nil, err
	}

	log.Printf("Received new user: ID=%d, Name=%s, Email=%s", 
		user.ID, user.Name, user.Email)

	// 处理业务逻辑...

	message.Finish()
	return nil, nil
}

贡献

  • Fork仓库
  • 创建你的特性分支 (git checkout -b my-new-feature)
  • 提交你的更改 (git commit -am 'Add some feature')
  • 推送到分支 (git push origin my-new-feature)
  • 创建新的Pull Request

更多关于golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


NSQ-Event-Bus: Golang轻量级NSQ封装库使用指南

NSQ-Event-Bus是一个轻量级的Golang库,用于简化NSQ消息队列的使用。它提供了主题(Topic)和通道(Channel)的封装,使得在Golang项目中使用NSQ变得更加简单和高效。

安装

首先,使用go get安装nsq-event-bus:

go get github.com/rafaeljesus/nsq-event-bus

基本使用

1. 初始化连接

package main

import (
	"log"
	"time"

	"github.com/rafaeljesus/nsq-event-bus"
)

func main() {
	// 创建NSQ事件总线实例
	bus, err := nsq.NewEventBus(
		nsq.SetNsqdAddr("127.0.0.1:4150"), // NSQD地址
		nsq.SetNsqLookupdAddr("127.0.0.1:4161"), // Lookupd地址
		nsq.SetMaxInFlight(100), // 最大飞行中消息数
	)
	if err != nil {
		log.Fatalf("Failed to create event bus: %v", err)
	}
	defer bus.Stop()
}

2. 发布消息

// 发布消息到指定主题
err := bus.Emit("user.created", []byte(`{"id": 123, "name": "John"}`))
if err != nil {
	log.Printf("Failed to emit event: %v", err)
}

3. 订阅消息

// 定义消息处理函数
handler := func(message *nsq.Message) error {
	log.Printf("Received message: %s", string(message.Body))
	// 处理消息逻辑...
	return nil // 返回nil表示成功处理
}

// 订阅主题和通道
err := bus.On("user.created", "user-service", handler)
if err != nil {
	log.Printf("Failed to subscribe: %v", err)
}

高级功能

1. 消息重试机制

handler := func(message *nsq.Message) error {
	// 模拟处理失败
	if someCondition {
		return errors.New("processing failed") // 返回错误会触发重试
	}
	return nil
}

2. 并发处理

bus, err := nsq.NewEventBus(
	nsq.SetNsqdAddr("127.0.0.1:4150"),
	nsq.SetConcurrency(10), // 设置并发处理数为10
)

3. 自定义配置

bus, err := nsq.NewEventBus(
	nsq.SetNsqdAddr("127.0.0.1:4150"),
	nsq.SetMaxAttempts(5), // 最大重试次数
	nsq.SetRequeueDelay(10 * time.Second), // 重试延迟
	nsq.SetBackoffStrategy(nsq.ExponentialBackoff), // 退避策略
)

完整示例

package main

import (
	"encoding/json"
	"log"
	"time"

	"github.com/rafaeljesus/nsq-event-bus"
)

type User struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

func main() {
	// 初始化NSQ事件总线
	bus, err := nsq.NewEventBus(
		nsq.SetNsqdAddr("127.0.0.1:4150"),
		nsq.SetNsqLookupdAddr("127.0.0.1:4161"),
		nsq.SetMaxInFlight(100),
		nsq.SetConcurrency(5),
	)
	if err != nil {
		log.Fatalf("Failed to create event bus: %v", err)
	}
	defer bus.Stop()

	// 订阅用户创建事件
	err = bus.On("user.created", "user-service", func(msg *nsq.Message) error {
		var user User
		if err := json.Unmarshal(msg.Body, &user); err != nil {
			log.Printf("Failed to unmarshal user: %v", err)
			return err
		}
		log.Printf("Processing user: %+v", user)
		// 模拟处理耗时
		time.Sleep(500 * time.Millisecond)
		return nil
	})
	if err != nil {
		log.Fatalf("Failed to subscribe: %v", err)
	}

	// 模拟发布用户创建事件
	for i := 1; i <= 10; i++ {
		user := User{ID: i, Name: "User " + string(i)}
		payload, _ := json.Marshal(user)
		if err := bus.Emit("user.created", payload); err != nil {
			log.Printf("Failed to emit event: %v", err)
		}
	}

	// 保持程序运行
	select {}
}

最佳实践

  1. 错误处理:始终处理消息处理函数中的错误,NSQ会根据返回值决定是否重试
  2. 幂等性:确保消息处理是幂等的,因为NSQ可能会重试失败的消息
  3. 资源清理:使用defer bus.Stop()确保在程序退出时正确关闭连接
  4. 监控:考虑添加监控来跟踪消息处理时间和成功率
  5. 消息大小:NSQ适合中小型消息,大消息应考虑其他方案

NSQ-Event-Bus通过简单的API封装了NSQ的复杂性,使得在Golang项目中使用NSQ变得更加便捷。它特别适合需要轻量级消息队列的微服务架构。

回到顶部