golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用
Golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用
Event Bus NSQ
- 一个围绕go-nsq主题和通道的轻量级封装
- 使用gobreaker保护NSQ调用
安装
go get -u github.com/rafaeljesus/nsq-event-bus
使用
nsq-event-bus包提供了发送和监听事件的接口。
发送器(Emitter)
import "github.com/rafaeljesus/nsq-event-bus"
topic := "events"
emitter, err := bus.NewEmitter(bus.EmitterConfig{
Address: "localhost:4150",
MaxInFlight: 25,
})
e := event{}
if err = emitter.Emit(topic, &e); err != nil {
// 处理发送消息失败的情况
}
// 异步发送消息
if err = emitter.EmitAsync(topic, &e); err != nil {
// 处理发送消息失败的情况
}
监听器(Listener)
import "github.com/rafaeljesus/nsq-event-bus"
if err = bus.On(bus.ListenerConfig{
Topic: "topic",
Channel: "test_on",
HandlerFunc: handler,
HandlerConcurrency: 4,
}); err != nil {
// 处理监听消息失败的情况
}
func handler(message *Message) (reply interface{}, err error) {
e := event{}
if err = message.DecodePayload(&e); err != nil {
message.Finish()
return
}
if message.Attempts > MAX_DELIVERY_ATTEMPTS {
message.Finish()
return
}
err, _ = doWork(&e)
if err != nil {
message.Requeue(BACKOFF_TIME)
return
}
message.Finish()
return
}
请求(Request/Reply)
import "github.com/rafaeljesus/nsq-event-bus"
topic := "user_signup"
emitter, err = bus.NewEmitter(bus.EmitterConfig{})
e := event{Login: "rafa", Password: "ilhabela_is_the_place"}
if err = bus.Request(topic, &e, handler); err != nil {
// 处理监听消息失败的情况
}
func handler(message *Message) (reply interface{}, err error) {
e := event{}
if err = message.DecodePayload(&e); err != nil {
message.Finish()
return
}
reply = &Reply{}
message.Finish()
return
}
完整示例Demo
生产者示例
package main
import (
"github.com/rafaeljesus/nsq-event-bus"
"log"
)
type UserEvent struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func main() {
// 创建发送器
emitter, err := bus.NewEmitter(bus.EmitterConfig{
Address: "localhost:4150",
MaxInFlight: 10,
})
if err != nil {
log.Fatal(err)
}
// 创建事件
user := UserEvent{
ID: 1,
Name: "John Doe",
Email: "john@example.com",
}
// 发送事件
topic := "user_created"
if err := emitter.Emit(topic, &user); err != nil {
log.Printf("Failed to emit message: %v", err)
}
log.Printf("Event emitted to topic %s", topic)
}
消费者示例
package main
import (
"github.com/rafaeljesus/nsq-event-bus"
"log"
)
type UserEvent struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func main() {
// 监听事件
err := bus.On(bus.ListenerConfig{
Topic: "user_created",
Channel: "user_service",
HandlerFunc: handleUserCreated,
HandlerConcurrency: 5,
})
if err != nil {
log.Fatal(err)
}
// 保持程序运行
select {}
}
func handleUserCreated(message *bus.Message) (reply interface{}, err error) {
var user UserEvent
if err := message.DecodePayload(&user); err != nil {
log.Printf("Failed to decode message: %v", err)
message.Finish()
return nil, err
}
log.Printf("Received new user: ID=%d, Name=%s, Email=%s",
user.ID, user.Name, user.Email)
// 处理业务逻辑...
message.Finish()
return nil, nil
}
贡献
- Fork仓库
- 创建你的特性分支 (
git checkout -b my-new-feature
) - 提交你的更改 (
git commit -am 'Add some feature'
) - 推送到分支 (
git push origin my-new-feature
) - 创建新的Pull Request
更多关于golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang轻量级NSQ主题与通道封装插件库nsq-event-bus的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
NSQ-Event-Bus: Golang轻量级NSQ封装库使用指南
NSQ-Event-Bus是一个轻量级的Golang库,用于简化NSQ消息队列的使用。它提供了主题(Topic)和通道(Channel)的封装,使得在Golang项目中使用NSQ变得更加简单和高效。
安装
首先,使用go get安装nsq-event-bus:
go get github.com/rafaeljesus/nsq-event-bus
基本使用
1. 初始化连接
package main
import (
"log"
"time"
"github.com/rafaeljesus/nsq-event-bus"
)
func main() {
// 创建NSQ事件总线实例
bus, err := nsq.NewEventBus(
nsq.SetNsqdAddr("127.0.0.1:4150"), // NSQD地址
nsq.SetNsqLookupdAddr("127.0.0.1:4161"), // Lookupd地址
nsq.SetMaxInFlight(100), // 最大飞行中消息数
)
if err != nil {
log.Fatalf("Failed to create event bus: %v", err)
}
defer bus.Stop()
}
2. 发布消息
// 发布消息到指定主题
err := bus.Emit("user.created", []byte(`{"id": 123, "name": "John"}`))
if err != nil {
log.Printf("Failed to emit event: %v", err)
}
3. 订阅消息
// 定义消息处理函数
handler := func(message *nsq.Message) error {
log.Printf("Received message: %s", string(message.Body))
// 处理消息逻辑...
return nil // 返回nil表示成功处理
}
// 订阅主题和通道
err := bus.On("user.created", "user-service", handler)
if err != nil {
log.Printf("Failed to subscribe: %v", err)
}
高级功能
1. 消息重试机制
handler := func(message *nsq.Message) error {
// 模拟处理失败
if someCondition {
return errors.New("processing failed") // 返回错误会触发重试
}
return nil
}
2. 并发处理
bus, err := nsq.NewEventBus(
nsq.SetNsqdAddr("127.0.0.1:4150"),
nsq.SetConcurrency(10), // 设置并发处理数为10
)
3. 自定义配置
bus, err := nsq.NewEventBus(
nsq.SetNsqdAddr("127.0.0.1:4150"),
nsq.SetMaxAttempts(5), // 最大重试次数
nsq.SetRequeueDelay(10 * time.Second), // 重试延迟
nsq.SetBackoffStrategy(nsq.ExponentialBackoff), // 退避策略
)
完整示例
package main
import (
"encoding/json"
"log"
"time"
"github.com/rafaeljesus/nsq-event-bus"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
}
func main() {
// 初始化NSQ事件总线
bus, err := nsq.NewEventBus(
nsq.SetNsqdAddr("127.0.0.1:4150"),
nsq.SetNsqLookupdAddr("127.0.0.1:4161"),
nsq.SetMaxInFlight(100),
nsq.SetConcurrency(5),
)
if err != nil {
log.Fatalf("Failed to create event bus: %v", err)
}
defer bus.Stop()
// 订阅用户创建事件
err = bus.On("user.created", "user-service", func(msg *nsq.Message) error {
var user User
if err := json.Unmarshal(msg.Body, &user); err != nil {
log.Printf("Failed to unmarshal user: %v", err)
return err
}
log.Printf("Processing user: %+v", user)
// 模拟处理耗时
time.Sleep(500 * time.Millisecond)
return nil
})
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// 模拟发布用户创建事件
for i := 1; i <= 10; i++ {
user := User{ID: i, Name: "User " + string(i)}
payload, _ := json.Marshal(user)
if err := bus.Emit("user.created", payload); err != nil {
log.Printf("Failed to emit event: %v", err)
}
}
// 保持程序运行
select {}
}
最佳实践
- 错误处理:始终处理消息处理函数中的错误,NSQ会根据返回值决定是否重试
- 幂等性:确保消息处理是幂等的,因为NSQ可能会重试失败的消息
- 资源清理:使用defer bus.Stop()确保在程序退出时正确关闭连接
- 监控:考虑添加监控来跟踪消息处理时间和成功率
- 消息大小:NSQ适合中小型消息,大消息应考虑其他方案
NSQ-Event-Bus通过简单的API封装了NSQ的复杂性,使得在Golang项目中使用NSQ变得更加便捷。它特别适合需要轻量级消息队列的微服务架构。