golang嵌入式高性能MQTT v5/v3代理服务器插件mochi mqtt的使用

Golang嵌入式高性能MQTT v5/v3代理服务器插件mochi mqtt的使用

Mochi-MQTT是一个完全兼容、可嵌入的高性能Go语言MQTT v5(和v3.1.1)代理服务器/服务器,专为遥测和物联网项目开发而设计。

主要特性

  • 完全兼容MQTTv5规范,同时支持MQTT v3.1.1和v3.0.0
  • 开发者友好:大部分核心代理代码已导出并可访问
  • 高性能且稳定:采用经典的trie主题订阅模型
  • 支持TCP、WebSocket(包括SSL/TLS)和$SYS仪表板监听器
  • 内置Redis、Badger、Pebble和Bolt持久化存储钩子
  • 内置基于规则的认证和ACL账本钩子

快速开始

作为Go包导入使用

import (
  "log"
  "os"
  "os/signal"
  "syscall"

  mqtt "github.com/mochi-mqtt/server/v2"
  "github.com/mochi-mqtt/server/v2/hooks/auth"
  "github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
  // 创建信号通道以便在中断时运行服务器
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()

  // 创建新的MQTT服务器
  server := mqtt.New(nil)
  
  // 允许所有连接
  _ = server.AddHook(new(auth.AllowHook), nil)
  
  // 在标准端口上创建TCP监听器
  tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }

  go func() {
    err := server.Serve()
    if err != nil {
      log.Fatal(err)
    }
  }()

  // 运行服务器直到被中断
  <-done

  // 清理
  server.Close()
}

使用Docker运行

docker pull mochimqtt/server
# 或者
docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server

文件配置示例

listeners:
  - type: "tcp"
    id: "tcp12"
    address: ":1883"
  - type: "ws"
    id: "ws1"
    address: ":1882"
  - type: "sysinfo"
    id: "stats"
    address: ":1880"
hooks:
  auth:
    allow_all: true
options:
  inline_client: true

事件钩子(Hooks)

Mochi MQTT提供了通用的事件钩子系统,允许开发者钩入服务器和客户端生命周期的各个部分来添加和修改代理功能。

认证钩子示例

err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // 默认拒绝所有认证
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL默认允许所有
      {Remote: "127.0.0.1:*"}, // 本地超级用户允许所有
      {
        // 用户melon可以读写自己的主题
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // 可以写入updates,但不能读取其他人的updates
        },
      },
      {
        // 否则,没有客户端有发布权限
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

持久化存储示例

Redis存储

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // 默认redis地址
    Password: "",               // 你的密码
    DB:       0,                // 你的redis db
  },
})
if err != nil {
  log.Fatal(err)
}

Pebble DB存储

err := server.AddHook(new(pebble.Hook), &pebble.Options{
  Path: pebblePath,
  Mode: pebble.NoSync,
})
if err != nil {
  log.Fatal(err)
}

内联客户端(Inline Client)

从v2.4.0开始,可以直接从嵌入代码中订阅和发布主题:

server := mqtt.New(&mqtt.Options{
  InlineClient: true,
})

// 发布消息
err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

// 订阅主题
callbackFn := func(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
    server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
}
server.Subscribe("direct/#", 1, callbackFn)

// 取消订阅
server.Unsubscribe("direct/#", 1)

性能基准

Mochi MQTT性能与Mosquitto、EMQX等流行代理相当。以下是基准测试结果示例:

Broker 发布最快 中位数 最慢 接收最快 中位数 最慢
Mochi v2.2.10 124,772 125,456 124,614 314,461 313,186 311,910
Mosquitto v2.0.15 155,920 155,919 155,918 185,485 185,097 184,709

贡献指南

欢迎贡献和反馈!提交PR时请遵循以下准则:

  • 尽可能保持测试覆盖率
  • 清楚说明PR的作用和原因
  • 记得在修改的文件中添加SPDX FileContributor标签
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <optional@email.address>

package name

Mochi MQTT是一个功能强大且灵活的MQTT代理服务器,非常适合需要嵌入式MQTT解决方案的Go项目。


更多关于golang嵌入式高性能MQTT v5/v3代理服务器插件mochi mqtt的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang嵌入式高性能MQTT v5/v3代理服务器插件mochi mqtt的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Mochi MQTT - 高性能嵌入式MQTT代理服务器插件

Mochi MQTT 是一个用Go语言编写的高性能嵌入式MQTT(v5/v3.1.1)代理服务器,设计轻量且易于集成到现有应用中。以下是它的主要特点和使用方法。

主要特性

  • 支持MQTT v5和v3.1.1协议
  • 高性能,低内存占用
  • 嵌入式设计,可作为库使用
  • 支持持久化存储
  • 支持WebSocket
  • 灵活的认证和授权机制

基本使用示例

安装

go get github.com/mochi-mqtt/server/v2

创建简单MQTT服务器

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/hooks/auth"
	"github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
	// 创建新的MQTT服务器
	server := mqtt.New(&mqtt.Options{
		InlineClient: true, // 启用内联客户端
	})

	// 允许所有客户端连接(生产环境应配置认证)
	server.AddHook(new(auth.AllowHook), nil)

	// 创建TCP监听器
	tcp := listeners.NewTCP("t1", ":1883", nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	// 捕获中断信号以优雅关闭
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	// 启动服务器
	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	// 等待关闭信号
	<-done
	server.Log.Warn("caught signal, stopping...")
	server.Close()
	server.Log.Info("mqtt server stopped")
}

高级功能示例

添加认证

// 创建基于用户名密码的认证
server.AddHook(new(auth.Hook), &auth.Options{
	Ledger: &auth.Ledger{
		Auth: auth.AuthRules{ // 认证规则
			{Username: "user1", Password: "pass1", Allow: true},
			{Username: "user2", Password: "pass2", Allow: true},
		},
		ACL: auth.ACLRules{ // 访问控制规则
			{Username: "user1", Filters: auth.Filters{
				"#": auth.ReadWrite, // user1可以读写所有主题
			}},
			{Username: "user2", Filters: auth.Filters{
				"public/#": auth.Read, // user2只能读取public/前缀的主题
			}},
		},
	},
})

添加WebSocket支持

ws := listeners.NewWebSocket("ws1", ":1882", nil)
err := server.AddListener(ws)
if err != nil {
	log.Fatal(err)
}

处理客户端事件

// 自定义钩子处理客户端事件
type Hook struct {
	mqtt.HookBase
}

func (h *Hook) ID() string {
	return "events-example"
}

func (h *Hook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnect,
		mqtt.OnDisconnect,
		mqtt.OnPublished,
		mqtt.OnSubscribed,
		mqtt.OnUnsubscribed,
	}, []byte{b})
}

func (h *Hook) OnConnect(cl *mqtt.Client, pk packets.Packet) {
	h.Log.Info("client connected", "client", cl.ID)
}

// 注册钩子
server.AddHook(new(Hook), nil)

持久化存储

import "github.com/mochi-mqtt/server/v2/hooks/storage/bolt"

// 使用BoltDB持久化存储
server.AddHook(new(bolt.Hook), &bolt.Options{
	Path: "mqtt.db",
})

性能调优

server := mqtt.New(&mqtt.Options{
	// 增加工作池大小提高并发处理能力
	WorkerPoolSize: 1024,
	
	// 调整客户端会话缓冲大小
	ClientNetWriteBufferSize: 2048,
	ClientNetReadBufferSize:  2048,
	
	// 限制最大连接数
	Capabilities: &mqtt.Capabilities{
		MaxClients: 5000,
	},
})

集群模式

Mochi MQTT本身是单机版本,但可以通过以下方式实现简单集群:

// 使用Redis作为共享订阅的存储后端
import "github.com/mochi-mqtt/server/v2/hooks/storage/redis"

server.AddHook(new(redis.Hook), &redis.Options{
	Options: &redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	},
})

Mochi MQTT是一个灵活且高性能的MQTT代理实现,非常适合嵌入式或需要轻量级MQTT服务的场景。根据实际需求,你可以进一步定制认证、存储和事件处理逻辑。

回到顶部