golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用
Golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用
项目状态
由于个人原因,本项目维护较少。以下是给想要使用本项目的人的一些信息:
Gmqtt已在生产环境中使用并运行良好。生产环境服务于数百个(或可能是数千甚至上万)客户端,每个客户端每15秒发布一条QoS 1消息来报告其状态。云端的订阅者会将这些状态存储到持久化后端。所有客户端都是非持久会话并使用v3.1.1协议。
建议在生产环境运行前仔细评估和测试。
特性
- 提供钩子方法来自定义代理行为(认证、ACL等)。详情见
server/hooks.go
- 支持tls/ssl和websocket
- 提供灵活的插件机制。详情见
server/plugin.go
和/plugin
- 提供Go接口供扩展与服务器交互
- 提供指标(使用Prometheus)
- 提供GRPC和REST API与服务器交互
- 提供会话持久化
- 提供集群功能(实验性功能)
快速开始
要从源代码编译gmqtt,请确保您有一个可用的Go环境。
以下命令将使用默认配置启动gmqtt代理。代理监听1883端口用于TCP服务器,8883端口用于websocket服务器,并加载admin
和prometheus
插件。
$ git clone https://github.com/DrmagicE/gmqtt
$ cd gmqtt/cmd/gmqttd
$ go run . start -c default_config.yml
配置
Gmqtt使用-c
标志定义配置路径。如果未设置,gmqtt默认读取$HOME/gmqtt.yml
。
会话持久化
Gmqtt默认使用内存存储会话数据,这是推荐的方式,因为性能良好。但代理重启后会话数据会丢失。您可以使用redis作为后端存储来防止数据丢失:
persistence:
type: redis
redis:
# redis服务器地址
addr: "127.0.0.1:6379"
# redis连接池中的最大空闲连接数
max_idle: 1000
# redis连接池在给定时间分配的最大连接数
max_active: 0
# 连接空闲超时时间
idle_timeout: 240s
password: ""
# redis数据库编号
database: 0
认证
Gmqtt提供简单的用户名/密码认证机制(由auth
插件提供)。默认配置中未启用,您可以更改配置来启用它:
# 插件加载顺序
plugin_order:
- auth
- prometheus
- admin
当auth插件启用时,每个客户端都需要一个账户来连接。您可以通过HTTP API添加账户:
# 创建: 用户名 = user1, 密码 = user1pass
$ curl -X POST -d '{"password":"user1pass"}' 127.0.0.1:8083/v1/accounts/user1
{}
# 查询
$ curl 127.0.0.1:8083/v1/accounts/user1
{"account":{"username":"user1","password":"20a0db53bc1881a7f739cd956b740039"}}
Docker
$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8883:8883 -p 8082:8082 -p 8083:8083 -p 8084:8084 gmqtt
文档
钩子
Gmqtt实现了以下钩子:
名称 | 钩子点 | 可能用途 |
---|---|---|
OnAccept | 当接受TCP连接时 | 连接速率限制,IP允许/阻止列表 |
OnStop | 当gmqtt停止时 | |
OnSubscribe | 当收到订阅包时 | 订阅访问控制,修改订阅 |
OnSubscribed | 当订阅成功时 | |
OnUnsubscribe | 当收到取消订阅包时 | 取消订阅访问控制,修改要取消订阅的主题 |
OnUnsubscribed | 当取消订阅成功时 | |
OnMsgArrived | 当收到发布包时 | 发布访问控制,在交付前修改消息 |
OnBasicAuth | 当收到没有AuthMethod属性的连接包时 | 认证 |
OnEnhancedAuth | 当收到带有AuthMethod属性的连接包时(仅适用于v5客户端) | 认证 |
OnReAuth | 当收到认证包时(仅适用于v5客户端) | 认证 |
OnConnected | 当客户端连接成功时 | |
OnSessionCreated | 当创建新会话时 | |
OnSessionResumed | 当从旧会话恢复时 | |
OnSessionTerminated | 当会话终止时 | |
OnDelivered | 当消息交付给客户端时 | |
OnClosed | 当客户端关闭时 | |
OnMsgDropped | 当消息因某些原因被丢弃时 | |
OnWillPublish | 当客户端即将交付遗嘱消息时 | 修改或丢弃遗嘱消息 |
OnWillPublished | 当遗嘱消息已交付时 |
如何编写插件
贡献
欢迎贡献,请参阅贡献指南获取完整的贡献指南。
测试
单元测试
$ go test -race ./...
集成测试
完整示例代码
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt"
"github.com/DrmagicE/gmqtt/config"
"github.com/DrmagicE/gmqtt/server"
)
func main() {
// 创建服务器实例
s := server.New(
server.WithConfig(config.DefaultConfig()),
)
// 添加钩子示例
s.AddHook(&ExampleHook{}, nil)
// 启动服务器
if err := s.Init(); err != nil {
log.Fatal(err)
}
go func() {
if err := s.Run(); err != nil {
log.Fatal(err)
}
}()
// 等待中断信号
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
<-sig
// 关闭服务器
s.Stop(context.Background())
fmt.Println("server stopped")
}
// ExampleHook 是一个简单的钩子实现示例
type ExampleHook struct{}
func (e *ExampleHook) OnSubscribe(ctx *gmqtt.HookSubscribeContext) {
fmt.Printf("client %s subscribed to topic %s\n", ctx.ClientID, ctx.Subscribe.Topics[0].Name)
}
func (e *ExampleHook) OnConnected(ctx *gmqtt.HookConnectedContext) {
fmt.Printf("client %s connected\n", ctx.ClientID)
}
// 确保实现了所有必要的钩子方法
var _ gmqtt.Hook = (*ExampleHook)(nil)
这个示例展示了如何创建一个基本的gmqtt服务器实例,添加自定义钩子,并启动服务器。您可以根据需要扩展这个示例,添加更多钩子实现或配置选项。
更多关于golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用gmqtt构建高性能MQTT V3.1.1代理服务器
gmqtt是一个基于Go语言开发的高性能MQTT代理服务器实现,支持MQTT 3.1.1和5.0协议。下面我将介绍如何使用gmqtt来构建一个MQTT代理服务器。
基本安装
首先安装gmqtt库:
go get github.com/DrmagicE/gmqtt
快速启动一个MQTT服务器
下面是一个最简单的gmqtt服务器实现:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt/server"
)
func main() {
// 创建服务器实例
s := server.New(server.WithTCPListener(":1883"))
// 设置服务器关闭钩子
s.Run()
// 等待中断信号
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
// 关闭服务器
s.Stop()
log.Println("MQTT server stopped")
}
添加认证功能
gmqtt支持多种认证方式,下面是一个简单的用户名密码认证示例:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt/server"
"github.com/DrmagicE/gmqtt/server/auth"
)
func main() {
// 创建认证管理器
authenticator := auth.NewMockAuthenticator(nil)
// 添加用户
authenticator.AddUser("user1", "password1")
authenticator.AddUser("user2", "password2")
// 创建服务器并配置认证
s := server.New(
server.WithTCPListener(":1883"),
server.WithAuthenticator(authenticator),
)
s.Run()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
s.Stop()
log.Println("MQTT server stopped")
}
实现消息持久化
gmqtt支持多种持久化方式,下面使用内存存储的示例:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt/persistence/memory"
"github.com/DrmagicE/gmqtt/server"
)
func main() {
// 创建内存存储
storage := memory.New(memory.DefaultConfig)
// 创建服务器并配置存储
s := server.New(
server.WithTCPListener(":1883"),
server.WithPersistence(storage),
)
s.Run()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
s.Stop()
log.Println("MQTT server stopped")
}
添加WebSocket支持
gmqtt也支持WebSocket协议:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt/server"
)
func main() {
// 创建服务器并配置WebSocket监听
s := server.New(
server.WithTCPListener(":1883"),
server.WithWebsocketServer(":8080", "/mqtt"),
)
s.Run()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
s.Stop()
log.Println("MQTT server stopped")
}
实现插件系统
gmqtt支持插件扩展,下面是一个简单的插件示例:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt/server"
"github.com/DrmagicE/gmqtt/server/plugin"
)
// 定义一个简单的插件
type myPlugin struct{}
func (p *myPlugin) Load(service server.Server) error {
log.Println("MyPlugin loaded")
return nil
}
func (p *myPlugin) Unload() error {
log.Println("MyPlugin unloaded")
return nil
}
func (p *myPlugin) Name() string {
return "MyPlugin"
}
func main() {
// 创建服务器并添加插件
s := server.New(
server.WithTCPListener(":1883"),
server.WithPlugin(&myPlugin{}),
)
s.Run()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
s.Stop()
log.Println("MQTT server stopped")
}
性能优化建议
- 使用合适的持久化后端:对于高吞吐量场景,考虑使用Redis或其他高性能存储
- 调整并发参数:通过
server.WithOptions
配置并发参数 - 启用TCP Keepalive:减少无效连接占用资源
- 监控和指标:集成Prometheus等监控工具
完整示例
下面是一个结合了上述功能的完整示例:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"github.com/DrmagicE/gmqtt/server"
"github.com/DrmagicE/gmqtt/server/auth"
"github.com/DrmagicE/gmqtt/persistence/memory"
)
func main() {
// 认证配置
authenticator := auth.NewMockAuthenticator(nil)
authenticator.AddUser("admin", "admin123")
// 持久化配置
storage := memory.New(memory.DefaultConfig)
// 创建服务器
s := server.New(
server.WithTCPListener(":1883"),
server.WithWebsocketServer(":8080", "/mqtt"),
server.WithAuthenticator(authenticator),
server.WithPersistence(storage),
)
// 启动服务器
if err := s.Run(); err != nil {
log.Fatal(err)
}
log.Println("MQTT server started on :1883")
log.Println("MQTT over WebSocket available on :8080/mqtt")
// 等待中断信号
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
// 关闭服务器
s.Stop()
log.Println("MQTT server stopped gracefully")
}
gmqtt提供了丰富的配置选项和扩展接口,可以根据实际需求进行定制。官方文档提供了更多高级功能和配置选项的详细信息。