golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用

Golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用

项目状态

由于个人原因,本项目维护较少。以下是给想要使用本项目的人的一些信息:

Gmqtt已在生产环境中使用并运行良好。生产环境服务于数百个(或可能是数千甚至上万)客户端,每个客户端每15秒发布一条QoS 1消息来报告其状态。云端的订阅者会将这些状态存储到持久化后端。所有客户端都是非持久会话并使用v3.1.1协议。

建议在生产环境运行前仔细评估和测试。

特性

  • 提供钩子方法来自定义代理行为(认证、ACL等)。详情见server/hooks.go
  • 支持tls/ssl和websocket
  • 提供灵活的插件机制。详情见server/plugin.go/plugin
  • 提供Go接口供扩展与服务器交互
  • 提供指标(使用Prometheus)
  • 提供GRPC和REST API与服务器交互
  • 提供会话持久化
  • 提供集群功能(实验性功能)

快速开始

要从源代码编译gmqtt,请确保您有一个可用的Go环境。

以下命令将使用默认配置启动gmqtt代理。代理监听1883端口用于TCP服务器,8883端口用于websocket服务器,并加载adminprometheus插件。

$ git clone https://github.com/DrmagicE/gmqtt
$ cd gmqtt/cmd/gmqttd
$ go run . start -c default_config.yml

配置

Gmqtt使用-c标志定义配置路径。如果未设置,gmqtt默认读取$HOME/gmqtt.yml

会话持久化

Gmqtt默认使用内存存储会话数据,这是推荐的方式,因为性能良好。但代理重启后会话数据会丢失。您可以使用redis作为后端存储来防止数据丢失:

persistence:
  type: redis  
  redis:
    # redis服务器地址
    addr: "127.0.0.1:6379"
    # redis连接池中的最大空闲连接数
    max_idle: 1000
    # redis连接池在给定时间分配的最大连接数
    max_active: 0
    # 连接空闲超时时间
    idle_timeout: 240s
    password: ""
    # redis数据库编号
    database: 0

认证

Gmqtt提供简单的用户名/密码认证机制(由auth插件提供)。默认配置中未启用,您可以更改配置来启用它:

# 插件加载顺序
plugin_order:
  - auth
  - prometheus
  - admin

当auth插件启用时,每个客户端都需要一个账户来连接。您可以通过HTTP API添加账户:

# 创建: 用户名 = user1, 密码 = user1pass
$ curl -X POST -d '{"password":"user1pass"}' 127.0.0.1:8083/v1/accounts/user1
{}
# 查询
$ curl 127.0.0.1:8083/v1/accounts/user1
{"account":{"username":"user1","password":"20a0db53bc1881a7f739cd956b740039"}}

Docker

$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8883:8883 -p 8082:8082 -p 8083:8083 -p 8084:8084 gmqtt

文档

钩子

Gmqtt实现了以下钩子:

名称 钩子点 可能用途
OnAccept 当接受TCP连接时 连接速率限制,IP允许/阻止列表
OnStop 当gmqtt停止时
OnSubscribe 当收到订阅包时 订阅访问控制,修改订阅
OnSubscribed 当订阅成功时
OnUnsubscribe 当收到取消订阅包时 取消订阅访问控制,修改要取消订阅的主题
OnUnsubscribed 当取消订阅成功时
OnMsgArrived 当收到发布包时 发布访问控制,在交付前修改消息
OnBasicAuth 当收到没有AuthMethod属性的连接包时 认证
OnEnhancedAuth 当收到带有AuthMethod属性的连接包时(仅适用于v5客户端) 认证
OnReAuth 当收到认证包时(仅适用于v5客户端) 认证
OnConnected 当客户端连接成功时
OnSessionCreated 当创建新会话时
OnSessionResumed 当从旧会话恢复时
OnSessionTerminated 当会话终止时
OnDelivered 当消息交付给客户端时
OnClosed 当客户端关闭时
OnMsgDropped 当消息因某些原因被丢弃时
OnWillPublish 当客户端即将交付遗嘱消息时 修改或丢弃遗嘱消息
OnWillPublished 当遗嘱消息已交付时

如何编写插件

贡献

欢迎贡献,请参阅贡献指南获取完整的贡献指南。

测试

单元测试

$ go test -race ./...

集成测试

完整示例代码

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt"
	"github.com/DrmagicE/gmqtt/config"
	"github.com/DrmagicE/gmqtt/server"
)

func main() {
	// 创建服务器实例
	s := server.New(
		server.WithConfig(config.DefaultConfig()),
	)
	
	// 添加钩子示例
	s.AddHook(&ExampleHook{}, nil)
	
	// 启动服务器
	if err := s.Init(); err != nil {
		log.Fatal(err)
	}
	
	go func() {
		if err := s.Run(); err != nil {
			log.Fatal(err)
		}
	}()
	
	// 等待中断信号
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
	<-sig
	
	// 关闭服务器
	s.Stop(context.Background())
	fmt.Println("server stopped")
}

// ExampleHook 是一个简单的钩子实现示例
type ExampleHook struct{}

func (e *ExampleHook) OnSubscribe(ctx *gmqtt.HookSubscribeContext) {
	fmt.Printf("client %s subscribed to topic %s\n", ctx.ClientID, ctx.Subscribe.Topics[0].Name)
}

func (e *ExampleHook) OnConnected(ctx *gmqtt.HookConnectedContext) {
	fmt.Printf("client %s connected\n", ctx.ClientID)
}

// 确保实现了所有必要的钩子方法
var _ gmqtt.Hook = (*ExampleHook)(nil)

这个示例展示了如何创建一个基本的gmqtt服务器实例,添加自定义钩子,并启动服务器。您可以根据需要扩展这个示例,添加更多钩子实现或配置选项。


更多关于golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能MQTT协议V3.1.1代理服务器插件库gmqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用gmqtt构建高性能MQTT V3.1.1代理服务器

gmqtt是一个基于Go语言开发的高性能MQTT代理服务器实现,支持MQTT 3.1.1和5.0协议。下面我将介绍如何使用gmqtt来构建一个MQTT代理服务器。

基本安装

首先安装gmqtt库:

go get github.com/DrmagicE/gmqtt

快速启动一个MQTT服务器

下面是一个最简单的gmqtt服务器实现:

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt/server"
)

func main() {
	// 创建服务器实例
	s := server.New(server.WithTCPListener(":1883"))
	
	// 设置服务器关闭钩子
	s.Run()
	
	// 等待中断信号
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	// 关闭服务器
	s.Stop()
	log.Println("MQTT server stopped")
}

添加认证功能

gmqtt支持多种认证方式,下面是一个简单的用户名密码认证示例:

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt/server"
	"github.com/DrmagicE/gmqtt/server/auth"
)

func main() {
	// 创建认证管理器
	authenticator := auth.NewMockAuthenticator(nil)
	// 添加用户
	authenticator.AddUser("user1", "password1")
	authenticator.AddUser("user2", "password2")

	// 创建服务器并配置认证
	s := server.New(
		server.WithTCPListener(":1883"),
		server.WithAuthenticator(authenticator),
	)
	
	s.Run()
	
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	s.Stop()
	log.Println("MQTT server stopped")
}

实现消息持久化

gmqtt支持多种持久化方式,下面使用内存存储的示例:

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt/persistence/memory"
	"github.com/DrmagicE/gmqtt/server"
)

func main() {
	// 创建内存存储
	storage := memory.New(memory.DefaultConfig)
	
	// 创建服务器并配置存储
	s := server.New(
		server.WithTCPListener(":1883"),
		server.WithPersistence(storage),
	)
	
	s.Run()
	
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	s.Stop()
	log.Println("MQTT server stopped")
}

添加WebSocket支持

gmqtt也支持WebSocket协议:

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt/server"
)

func main() {
	// 创建服务器并配置WebSocket监听
	s := server.New(
		server.WithTCPListener(":1883"),
		server.WithWebsocketServer(":8080", "/mqtt"),
	)
	
	s.Run()
	
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	s.Stop()
	log.Println("MQTT server stopped")
}

实现插件系统

gmqtt支持插件扩展,下面是一个简单的插件示例:

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt/server"
	"github.com/DrmagicE/gmqtt/server/plugin"
)

// 定义一个简单的插件
type myPlugin struct{}

func (p *myPlugin) Load(service server.Server) error {
	log.Println("MyPlugin loaded")
	return nil
}

func (p *myPlugin) Unload() error {
	log.Println("MyPlugin unloaded")
	return nil
}

func (p *myPlugin) Name() string {
	return "MyPlugin"
}

func main() {
	// 创建服务器并添加插件
	s := server.New(
		server.WithTCPListener(":1883"),
		server.WithPlugin(&myPlugin{}),
	)
	
	s.Run()
	
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	s.Stop()
	log.Println("MQTT server stopped")
}

性能优化建议

  1. 使用合适的持久化后端:对于高吞吐量场景,考虑使用Redis或其他高性能存储
  2. 调整并发参数:通过server.WithOptions配置并发参数
  3. 启用TCP Keepalive:减少无效连接占用资源
  4. 监控和指标:集成Prometheus等监控工具

完整示例

下面是一个结合了上述功能的完整示例:

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/DrmagicE/gmqtt/server"
	"github.com/DrmagicE/gmqtt/server/auth"
	"github.com/DrmagicE/gmqtt/persistence/memory"
)

func main() {
	// 认证配置
	authenticator := auth.NewMockAuthenticator(nil)
	authenticator.AddUser("admin", "admin123")
	
	// 持久化配置
	storage := memory.New(memory.DefaultConfig)
	
	// 创建服务器
	s := server.New(
		server.WithTCPListener(":1883"),
		server.WithWebsocketServer(":8080", "/mqtt"),
		server.WithAuthenticator(authenticator),
		server.WithPersistence(storage),
	)
	
	// 启动服务器
	if err := s.Run(); err != nil {
		log.Fatal(err)
	}
	
	log.Println("MQTT server started on :1883")
	log.Println("MQTT over WebSocket available on :8080/mqtt")
	
	// 等待中断信号
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	// 关闭服务器
	s.Stop()
	log.Println("MQTT server stopped gracefully")
}

gmqtt提供了丰富的配置选项和扩展接口,可以根据实际需求进行定制。官方文档提供了更多高级功能和配置选项的详细信息。

回到顶部