golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用
Golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用
Emitter: 分布式发布订阅平台
Emitter是一个基于MQTT协议构建的分布式、可扩展和容错的发布订阅平台,具有消息存储、安全性、监控等功能:
- 使用MQTT通过TCP或Websocket进行发布/订阅
- 具有弹性,高可用性和分区容错性(CAP中的AP)
- 能够在单个代理上处理每秒300万+的消息
- 支持消息存储和历史记录以及消息级过期
- 提供安全通道密钥和权限,可以面向互联网
- 自动TLS/SSL和加密的代理间通信
- 内置监控与Prometheus、StatsD等集成
- 共享订阅、链接和频道的私有链接
- 通过Docker和Kubernetes轻松部署生产就绪的集群
快速开始
运行服务器
使用docker run
命令快速启动Emitter代理:
docker run -d --name emitter -p 8080:8080 --restart=unless-stopped emitter/server
或者从源代码构建并运行:
# 安装git和go
# 根据系统可能需要安装gcc和musl-dev
git clone https://github.com/emitter-io/emitter
cd emitter
go get -x .
go build -x .
./emitter
获取许可证
启动服务器后,如果没有提供配置或环境变量,它将打印出类似以下消息:
[service] unable to find a license, make sure 'license' value is set in the config file or EMITTER_LICENSE environment variable
[service] generated new license: uppD0PFIcNK6VY-7PTo7uWH8EobaOGgRAAAAAAAAAAI
[service] generated new secret key: JUoOxjoXLc4muSxXynOpTc60nWtwUI3o
重新运行命令
使用生成的许可证重新运行:
docker run -d --name emitter -p 8080:8080 -e EMITTER_LICENSE=uppD0PFIcNK6VY-7PTo7uWH8EobaOGgRAAAAAAAAAAI --restart=unless-stopped emitter/server
生成密钥
打开浏览器访问http://127.0.0.1:8080/keygen
生成密钥。
警告:如果使用上述命令,你的密钥是JUoOxjoXLc4muSxXynOpTc60nWtwUI3o
,这不安全!
Go语言使用示例
下面是一个使用Go语言与Emitter交互的完整示例:
package main
import (
"fmt"
"log"
"time"
"github.com/emitter-io/go/v2"
)
func main() {
// 连接到Emitter服务
client, err := emitter.Connect("tcp://127.0.0.1:8080", func(_ *emitter.Client, msg emitter.Message) {
fmt.Printf("收到消息: %s\n", msg.Payload())
})
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(0)
// 订阅频道
client.Subscribe("my_channel_key", "chat", func(client *emitter.Client, msg emitter.Message) {
fmt.Printf("收到聊天消息: %s\n", msg.Payload())
})
// 发布消息
for i := 0; i < 5; i++ {
client.Publish("my_channel_key", "chat", fmt.Sprintf("Hello from Go %d", i))
time.Sleep(1 * time.Second)
}
}
配置选项
Emitter可以通过配置文件、环境变量或Hashicorp Vault进行配置。以下是主要配置选项:
属性 | 环境变量 | 描述 |
---|---|---|
license |
EMITTER_LICENSE |
用于代理的许可证文件 |
listen |
EMITTER_LISTEN |
用于TCP和Websocket通信的API地址 |
tls.listen |
EMITTER_TLS_LISTEN |
用于安全TCP和Websocket通信的API地址 |
cluster.name |
EMITTER_CLUSTER_NAME |
集群中此节点的名称 |
cluster.seed |
EMITTER_CLUSTER_SEED |
用于集群加入的种子地址 |
storage.provider |
EMITTER_STORAGE_PROVIDER |
消息存储模式(inmemory 或ssd ) |
构建和测试
Emitter需要Golang 1.9+。安装后,运行以下命令:
go get -u github.com/emitter-io/emitter && emitter
运行测试:
go test ./...
许可证
Copyright © 2009-2019 Misakai Ltd. 本项目采用Affero General Public License v3许可。
Emitter提供支持合同,现在也提供商业许可证。更多信息请联系info@emitter.io。
更多关于golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高性能分布式MQTT/Websocket发布订阅平台插件库emitter-io的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Emitter-io: 高性能分布式MQTT/Websocket发布订阅平台
Emitter-io是一个高性能的分布式MQTT和Websocket发布/订阅平台,专为实时通信设计。它提供了低延迟、高吞吐量的消息传递能力,非常适合IoT、实时应用和微服务通信场景。
主要特性
- 支持MQTT协议和Websocket协议
- 分布式架构,支持水平扩展
- 持久化消息存储
- 基于频道的安全模型
- 低延迟、高吞吐量
安装Emitter
go get github.com/emitter-io/go
基本使用示例
1. 连接到Emitter服务器
package main
import (
"fmt"
"log"
"time"
"github.com/emitter-io/go"
)
func main() {
// 创建Emitter客户端
client, err := emitter.Connect("tcp://127.0.0.1:8080", func(_ *emitter.Client, msg emitter.Message) {
fmt.Printf("收到消息: %s\n", msg.Payload())
})
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(0)
// 其他操作...
}
2. 发布消息
// 发布消息到指定频道
key := "your-channel-key" // 需要替换为实际的key
channel := "demo-channel"
message := "Hello, Emitter!"
// 发布消息
err = client.Publish(key, channel, message)
if err != nil {
log.Fatal(err)
}
3. 订阅频道
// 订阅频道
_, err = client.Subscribe(key, channel, func(client *emitter.Client, msg emitter.Message) {
fmt.Printf("收到订阅消息: %s\n", msg.Payload())
})
if err != nil {
log.Fatal(err)
}
// 保持连接
time.Sleep(30 * time.Second)
高级功能
1. 使用通配符订阅
// 订阅所有以demo开头的频道
_, err = client.Subscribe(key, "demo/+", func(client *emitter.Client, msg emitter.Message) {
fmt.Printf("通配符订阅收到消息: %s from %s\n", msg.Payload(), msg.Channel())
})
2. 消息持久化
// 发布持久化消息 (在频道名后加"/"和保留时间)
err = client.Publish(key, "demo-channel/10m", "This message will persist for 10 minutes")
3. 安全控制
Emitter使用基于密钥的访问控制:
// 生成只读密钥
readKey, err := client.GenerateKey("master-key", "demo-channel", emitter.AllowRead)
if err != nil {
log.Fatal(err)
}
fmt.Println("Read-only key:", readKey)
// 生成读写密钥
writeKey, err := client.GenerateKey("master-key", "demo-channel", emitter.AllowRead|emitter.AllowWrite)
if err != nil {
log.Fatal(err)
}
fmt.Println("Read-write key:", writeKey)
性能优化建议
- 连接池: 对于高并发场景,使用连接池管理Emitter连接
- 批量发布: 合并小消息为批量消息减少网络开销
- QoS选择: 根据场景选择合适的QoS级别
- 合理使用通配符: 避免过于宽泛的通配符订阅
完整示例
package main
import (
"fmt"
"log"
"time"
"github.com/emitter-io/go"
)
func main() {
// 1. 连接到Emitter服务器
client, err := emitter.Connect("tcp://127.0.0.1:8080", nil)
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(0)
// 2. 生成密钥
const masterKey = "your-master-key"
const channel = "demo-channel"
// 生成订阅密钥
subKey, err := client.GenerateKey(masterKey, channel, emitter.AllowRead)
if err != nil {
log.Fatal(err)
}
// 生成发布密钥
pubKey, err := client.GenerateKey(masterKey, channel, emitter.AllowWrite)
if err != nil {
log.Fatal(err)
}
// 3. 订阅频道
_, err = client.Subscribe(subKey, channel, func(_ *emitter.Client, msg emitter.Message) {
fmt.Printf("收到消息: %s\n", msg.Payload())
})
if err != nil {
log.Fatal(err)
}
// 4. 发布消息
for i := 0; i < 5; i++ {
msg := fmt.Sprintf("消息 %d", i+1)
err = client.Publish(pubKey, channel, msg)
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
}
部署建议
- 集群部署: 对于生产环境,建议部署Emitter集群
- 监控: 监控消息吞吐量和延迟
- 持久化配置: 根据业务需求配置适当的消息保留策略
Emitter-io是一个功能强大且灵活的发布/订阅平台,通过合理使用可以构建高性能的实时通信系统。