golang分布式消息推送服务插件库guble的使用
Golang分布式消息推送服务插件库Guble的使用
Guble是一个用Go编写的简单面向用户的消息传递和数据复制服务器。
概述
Guble目前处于早期状态(版本0.4)。它已经可以很好地工作并且非常有用,但协议、API和存储格式可能仍然会发生变化(直到达到0.7版本)。如果您打算使用guble,请与我们联系。
Guble的目标是成为一个简单快速的消息总线,用于用户交互和多个设备之间的数据复制:
- 非常容易使用Web和移动客户端消费消息
- 快速实时消息传递,以及从持久提交日志中回放消息
- 在多个节点上可靠且可扩展
- 用户感知语义,轻松支持使用多个设备的人之间的消息传递场景
- 内置电池:可用作前端服务器,无需代理层
- 自包含:不强制依赖其他服务
功能特性(0.4版本)
- 发布和订阅主题和子主题的消息
- 具有透明在线和离线获取的持久消息存储
- 用于消息发布的WebSocket和REST API
- 命令行客户端和Go客户端库
- Firebase云消息(FCM)适配器:将消息作为FCM推送通知传递
- 服务器和客户端的Docker镜像
- 简单的身份验证和访问管理
- 干净的关闭
- 使用logrus和logstash格式化程序改进日志记录
- 带有端点的健康检查
- 基本指标的收集,带有端点
- 添加Postgresql作为KV后端
- 支持5000条消息/实例的负载测试
- 支持Apple推送通知服务(与Firebase并列的新连接器)
- Firebase连接器的升级、清理、抽象、文档和测试覆盖率
- 获取订阅者列表/每个订阅者的主题列表(userID, deviceID)
- 支持使用Nexmo发送短信(与Firebase并列的新连接器)
示例代码
使用Docker启动Guble服务器
docker run -p 8080:8080 smancke/guble
使用Guble CLI客户端连接
docker run -d --name guble smancke/guble
docker exec -it guble /usr/local/bin/guble-cli
从源代码构建和运行
sudo apt-get install golang
mkdir guble && cd guble
export GOPATH=`pwd`
go get github.com/smancke/guble
bin/guble --log=info
Go客户端示例
package main
import (
"fmt"
"github.com/smancke/guble/client"
"time"
)
func main() {
// 创建客户端
c := client.New("ws://localhost:8080/stream", "http://localhost:8080")
// 连接服务器
err := c.Start()
if err != nil {
panic(err)
}
defer c.Stop()
// 订阅主题
err = c.Subscribe("/foo", func(msg *client.Message) {
fmt.Printf("收到消息: %+v\n", msg)
})
if err != nil {
panic(err)
}
// 发布消息
err = c.Publish("/foo", "Hello World", nil)
if err != nil {
panic(err)
}
time.Sleep(1 * time.Second)
}
REST API示例
发布消息:
curl -X POST -H "x-Guble-Key: Value" --data Hello 'http://127.0.0.1:8080/api/message/foo?userId=marvin&messageId=42'
WebSocket协议示例
发送消息:
> /foo
Hello World
订阅主题:
+ /foo
取消订阅:
- /foo
协议参考
消息格式
所有从服务器发送到客户端的有效负载消息都使用以下格式:
<path:string>,<sequenceId:int64>,<publisherUserId:string>,<publisherApplicationId:string>,<publisherMessageId:string>,<messagePublishingTime:unix-timestamp>\n
[<application headers json>]\n
<body>
示例:
/foo/bar,42,user01,phone1,id123,1420110000
{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}
Hello World
主题
消息可以通过主题分层路由,因此它们由路径表示,用/
分隔。服务器确保一条消息只传递一次,即使它匹配多个订阅路径。
子主题:
路径分隔符给出了子主题的语义。这样,对父主题(例如/foo
)的订阅也会接收所有子主题(例如/foo/bar
)的消息。
配置选项
CLI选项 | 环境变量 | 值 | 默认值 | 描述 |
---|---|---|---|---|
--env |
GUBLE_ENV | development | integration | preproduction | production | development | 应用程序运行的环境名称。主要用于日志记录 |
--health-endpoint |
GUBLE_HEALTH_ENDPOINT | resource/path/to/healthendpoint | /admin/healthcheck | HTTP服务器使用的健康端点。通过将值设置为""可以禁用 |
--http |
GUBLE_HTTP_LISTEN | 格式: [host]:port | HTTP服务器监听的地址 | |
--kvs |
GUBLE_KVS | memory | file | postgres | file | 用于键值存储的存储后端 |
--log |
GUBLE_LOG | panic | fatal | error | warn | info | debug | error | 进程记录的日志级别 |
--metrics-endpoint |
GUBLE_METRICS_ENDPOINT | resource/path/to/metricsendpoint | /admin/metrics | HTTP服务器使用的指标端点。通过将值设置为""可以禁用 |
--ms |
GUBLE_MS | memory | file | file | 消息存储后端 |
--profile |
GUBLE_PROFILE | cpu | mem | block | 要使用的分析器 | |
--storage-path |
GUBLE_STORAGE_PATH | path/to/storage | /var/lib/guble | 用于存储消息和键值数据(如订阅)的路径。路径必须存在! |
性能
在一台配备i5-2520M、双核和SSD的旧笔记本上测量。消息负载为"Hello Word"。负载驱动程序和服务器设置在同一台机器上,因此50%的CPU分配给负载驱动程序。
- 端到端:每秒传递约35,000条持久消息
- 获取:每秒接收约70,000条持久消息
在测试期间,服务器的内存消耗约为25 MB。
更多关于golang分布式消息推送服务插件库guble的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang分布式消息推送服务插件库guble的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Guble - Go分布式消息推送服务插件库
Guble是一个轻量级的分布式消息推送服务库,专为Go语言设计。它提供了发布/订阅模式的消息推送功能,支持WebSocket和HTTP长轮询协议。
主要特性
- 轻量级设计
- 支持主题(topic)和队列(queue)两种消息模式
- 提供WebSocket和HTTP长轮询接口
- 可扩展的存储后端
- 简单的API设计
安装
go get github.com/smancke/guble
基本使用示例
1. 启动Guble服务器
package main
import (
"log"
"github.com/smancke/guble/server"
)
func main() {
// 创建配置
cfg := server.NewConfig()
cfg.HttpListen = ":8080" // HTTP监听端口
cfg.WSListen = ":8081" // WebSocket监听端口
// 创建并启动服务器
srv := server.NewServer(cfg)
if err := srv.Start(); err != nil {
log.Fatal("启动服务器失败:", err)
}
log.Println("Guble服务器已启动")
// 阻塞主线程
select {}
}
2. 发布消息
package main
import (
"log"
"github.com/smancke/guble/client"
)
func main() {
// 创建客户端
c, err := client.NewWebSocketClient("ws://localhost:8081/")
if err != nil {
log.Fatal("创建客户端失败:", err)
}
// 连接到服务器
if err := c.Start(); err != nil {
log.Fatal("连接服务器失败:", err)
}
// 发布消息到主题
err = c.Send("/topic/example", "这是一条测试消息", nil)
if err != nil {
log.Fatal("发送消息失败:", err)
}
log.Println("消息已发布")
c.Stop()
}
3. 订阅消息
package main
import (
"log"
"time"
"github.com/smancke/guble/client"
)
func main() {
// 创建客户端
c, err := client.NewWebSocketClient("ws://localhost:8081/")
if err != nil {
log.Fatal("创建客户端失败:", err)
}
// 连接到服务器
if err := c.Start(); err != nil {
log.Fatal("连接服务器失败:", err)
}
// 订阅主题
err = c.Subscribe("/topic/example", func(msg *client.Message) {
log.Printf("收到消息: 主题=%s, 内容=%s, ID=%d",
msg.Path, msg.Body, msg.Id)
})
if err != nil {
log.Fatal("订阅失败:", err)
}
log.Println("已订阅主题 /topic/example")
// 保持连接
time.Sleep(5 * time.Minute)
c.Stop()
}
高级功能
消息过滤
Guble支持基于消息ID的过滤:
// 只接收ID大于100的消息
err = c.Subscribe("/topic/example", func(msg *client.Message) {
log.Printf("收到新消息: %s", msg.Body)
}, client.StartId(100))
使用HTTP长轮询
package main
import (
"log"
"net/http"
"github.com/smancke/guble/client"
)
func main() {
// 创建HTTP客户端
c := client.NewHttpClient("http://localhost:8080/")
// 订阅主题
err := c.Subscribe("/topic/example", func(msg *client.Message) {
log.Printf("收到消息: %s", msg.Body)
})
if err != nil {
log.Fatal("订阅失败:", err)
}
// 启动HTTP服务器来接收回调
http.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) {
c.HandleCallback(w, r)
})
log.Println("启动回调服务器 :8082")
http.ListenAndServe(":8082", nil)
}
集群模式
Guble支持简单的集群模式,多个节点可以通过配置共享存储后端:
cfg := server.NewConfig()
cfg.ClusterEnabled = true
cfg.ClusterNodes = []string{"node1:8080", "node2:8080"} // 其他节点地址
存储后端
Guble支持多种存储后端,默认使用内存存储,也可以配置为使用文件或Redis:
// 使用文件存储
cfg.StoragePath = "./data"
// 使用Redis存储
cfg.RedisEnabled = true
cfg.RedisAddr = "localhost:6379"
性能考虑
- 对于高吞吐量场景,建议使用WebSocket协议
- 集群模式可以水平扩展处理能力
- Redis存储后端可以提高持久化性能
Guble是一个简单但功能强大的消息推送解决方案,特别适合需要实时通知功能的Go应用程序。它的轻量级设计使得它可以很容易地集成到现有系统中。