golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用

Golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用

Emitter Logo

Emitter: 分布式发布订阅平台

Emitter是一个基于MQTT协议构建的分布式、可扩展和容错的发布订阅平台,具有消息存储、安全性、监控等功能:

  • 使用MQTT通过TCP或Websocket进行发布/订阅
  • 具有弹性,高可用性和分区容错性(CAP中的AP)
  • 能够在单个代理上处理每秒300万+的消息
  • 支持消息存储和历史记录以及消息级过期
  • 提供安全通道密钥和权限,可以面向互联网
  • 自动TLS/SSL和加密的代理间通信
  • 内置监控与Prometheus、StatsD等集成
  • 共享订阅、链接和频道的私有链接
  • 通过Docker和Kubernetes轻松部署生产就绪的集群

快速开始

运行服务器

使用docker run命令快速启动Emitter代理:

docker run -d --name emitter -p 8080:8080 --restart=unless-stopped emitter/server

或者从源代码构建并运行:

# 安装git和go
# 根据系统可能需要安装gcc和musl-dev
git clone https://github.com/emitter-io/emitter
cd emitter
go get -x .
go build -x .
./emitter

获取许可证

启动服务器后,如果没有提供配置或环境变量,它将打印出类似以下消息:

[service] unable to find a license, make sure 'license' value is set in the config file or EMITTER_LICENSE environment variable
[service] generated new license: uppD0PFIcNK6VY-7PTo7uWH8EobaOGgRAAAAAAAAAAI
[service] generated new secret key: JUoOxjoXLc4muSxXynOpTc60nWtwUI3o

重新运行命令

使用生成的许可证重新运行:

docker run -d --name emitter -p 8080:8080 -e EMITTER_LICENSE=uppD0PFIcNK6VY-7PTo7uWH8EobaOGgRAAAAAAAAAAI --restart=unless-stopped emitter/server

生成密钥

打开浏览器访问http://127.0.0.1:8080/keygen生成密钥。

警告:如果使用上述命令,你的密钥是JUoOxjoXLc4muSxXynOpTc60nWtwUI3o,这不安全!

Go语言使用示例

下面是一个使用Go语言与Emitter交互的完整示例:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/emitter-io/go/v2"
)

func main() {
	// 连接到Emitter服务
	client, err := emitter.Connect("tcp://127.0.0.1:8080", func(_ *emitter.Client, msg emitter.Message) {
		fmt.Printf("收到消息: %s\n", msg.Payload())
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(0)

	// 订阅频道
	client.Subscribe("my_channel_key", "chat", func(client *emitter.Client, msg emitter.Message) {
		fmt.Printf("收到聊天消息: %s\n", msg.Payload())
	})

	// 发布消息
	for i := 0; i < 5; i++ {
		client.Publish("my_channel_key", "chat", fmt.Sprintf("Hello from Go %d", i))
		time.Sleep(1 * time.Second)
	}
}

配置选项

Emitter可以通过配置文件、环境变量或Hashicorp Vault进行配置。以下是主要配置选项:

属性 环境变量 描述
license EMITTER_LICENSE 用于代理的许可证文件
listen EMITTER_LISTEN 用于TCP和Websocket通信的API地址
tls.listen EMITTER_TLS_LISTEN 用于安全TCP和Websocket通信的API地址
cluster.name EMITTER_CLUSTER_NAME 集群中此节点的名称
cluster.seed EMITTER_CLUSTER_SEED 用于集群加入的种子地址
storage.provider EMITTER_STORAGE_PROVIDER 消息存储模式(inmemoryssd

构建和测试

Emitter需要Golang 1.9+。安装后,运行以下命令:

go get -u github.com/emitter-io/emitter && emitter

运行测试:

go test ./...

许可证

Copyright © 2009-2019 Misakai Ltd. 本项目采用Affero General Public License v3许可。

Emitter提供支持合同,现在也提供商业许可证。更多信息请联系info@emitter.io


更多关于golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Emitter-io: 高性能分布式MQTT/Websocket发布订阅平台

Emitter-io是一个高性能的分布式MQTT和Websocket发布/订阅平台,专为实时通信设计。它提供了低延迟、高吞吐量的消息传递能力,非常适合IoT、实时应用和微服务通信场景。

主要特性

  • 支持MQTT协议和Websocket协议
  • 分布式架构,支持水平扩展
  • 持久化消息存储
  • 基于频道的安全模型
  • 低延迟、高吞吐量

安装Emitter

go get github.com/emitter-io/go

基本使用示例

1. 连接到Emitter服务器

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/emitter-io/go"
)

func main() {
	// 创建Emitter客户端
	client, err := emitter.Connect("tcp://127.0.0.1:8080", func(_ *emitter.Client, msg emitter.Message) {
		fmt.Printf("收到消息: %s\n", msg.Payload())
	})
	
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(0)
	
	// 其他操作...
}

2. 发布消息

// 发布消息到指定频道
key := "your-channel-key" // 需要替换为实际的key
channel := "demo-channel"
message := "Hello, Emitter!"

// 发布消息
err = client.Publish(key, channel, message)
if err != nil {
	log.Fatal(err)
}

3. 订阅频道

// 订阅频道
_, err = client.Subscribe(key, channel, func(client *emitter.Client, msg emitter.Message) {
	fmt.Printf("收到订阅消息: %s\n", msg.Payload())
})
if err != nil {
	log.Fatal(err)
}

// 保持连接
time.Sleep(30 * time.Second)

高级功能

1. 使用通配符订阅

// 订阅所有以demo开头的频道
_, err = client.Subscribe(key, "demo/+", func(client *emitter.Client, msg emitter.Message) {
	fmt.Printf("通配符订阅收到消息: %s from %s\n", msg.Payload(), msg.Channel())
})

2. 消息持久化

// 发布持久化消息 (在频道名后加"/"和保留时间)
err = client.Publish(key, "demo-channel/10m", "This message will persist for 10 minutes")

3. 安全控制

Emitter使用基于密钥的访问控制:

// 生成只读密钥
readKey, err := client.GenerateKey("master-key", "demo-channel", emitter.AllowRead)
if err != nil {
	log.Fatal(err)
}
fmt.Println("Read-only key:", readKey)

// 生成读写密钥
writeKey, err := client.GenerateKey("master-key", "demo-channel", emitter.AllowRead|emitter.AllowWrite)
if err != nil {
	log.Fatal(err)
}
fmt.Println("Read-write key:", writeKey)

性能优化建议

  1. 连接池: 对于高并发场景,使用连接池管理Emitter连接
  2. 批量发布: 合并小消息为批量消息减少网络开销
  3. QoS选择: 根据场景选择合适的QoS级别
  4. 合理使用通配符: 避免过于宽泛的通配符订阅

完整示例

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/emitter-io/go"
)

func main() {
	// 1. 连接到Emitter服务器
	client, err := emitter.Connect("tcp://127.0.0.1:8080", nil)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(0)

	// 2. 生成密钥
	const masterKey = "your-master-key"
	const channel = "demo-channel"
	
	// 生成订阅密钥
	subKey, err := client.GenerateKey(masterKey, channel, emitter.AllowRead)
	if err != nil {
		log.Fatal(err)
	}
	
	// 生成发布密钥
	pubKey, err := client.GenerateKey(masterKey, channel, emitter.AllowWrite)
	if err != nil {
		log.Fatal(err)
	}

	// 3. 订阅频道
	_, err = client.Subscribe(subKey, channel, func(_ *emitter.Client, msg emitter.Message) {
		fmt.Printf("收到消息: %s\n", msg.Payload())
	})
	if err != nil {
		log.Fatal(err)
	}

	// 4. 发布消息
	for i := 0; i < 5; i++ {
		msg := fmt.Sprintf("消息 %d", i+1)
		err = client.Publish(pubKey, channel, msg)
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(1 * time.Second)
	}
}

部署建议

  1. 集群部署: 对于生产环境,建议部署Emitter集群
  2. 监控: 监控消息吞吐量和延迟
  3. 持久化配置: 根据业务需求配置适当的消息保留策略

Emitter-io是一个功能强大且灵活的发布/订阅平台,通过合理使用可以构建高性能的实时通信系统。

回到顶部