golang分布式消息推送服务插件库guble的使用

Golang分布式消息推送服务插件库Guble的使用

Guble是一个用Go编写的简单面向用户的消息传递和数据复制服务器。

概述

Guble目前处于早期状态(版本0.4)。它已经可以很好地工作并且非常有用,但协议、API和存储格式可能仍然会发生变化(直到达到0.7版本)。如果您打算使用guble,请与我们联系。

Guble的目标是成为一个简单快速的消息总线,用于用户交互和多个设备之间的数据复制:

  • 非常容易使用Web和移动客户端消费消息
  • 快速实时消息传递,以及从持久提交日志中回放消息
  • 在多个节点上可靠且可扩展
  • 用户感知语义,轻松支持使用多个设备的人之间的消息传递场景
  • 内置电池:可用作前端服务器,无需代理层
  • 自包含:不强制依赖其他服务

功能特性(0.4版本)

  • 发布和订阅主题和子主题的消息
  • 具有透明在线和离线获取的持久消息存储
  • 用于消息发布的WebSocket和REST API
  • 命令行客户端和Go客户端库
  • Firebase云消息(FCM)适配器:将消息作为FCM推送通知传递
  • 服务器和客户端的Docker镜像
  • 简单的身份验证和访问管理
  • 干净的关闭
  • 使用logrus和logstash格式化程序改进日志记录
  • 带有端点的健康检查
  • 基本指标的收集,带有端点
  • 添加Postgresql作为KV后端
  • 支持5000条消息/实例的负载测试
  • 支持Apple推送通知服务(与Firebase并列的新连接器)
  • Firebase连接器的升级、清理、抽象、文档和测试覆盖率
  • 获取订阅者列表/每个订阅者的主题列表(userID, deviceID)
  • 支持使用Nexmo发送短信(与Firebase并列的新连接器)

示例代码

使用Docker启动Guble服务器

docker run -p 8080:8080 smancke/guble

使用Guble CLI客户端连接

docker run -d --name guble smancke/guble
docker exec -it guble /usr/local/bin/guble-cli

从源代码构建和运行

sudo apt-get install golang
mkdir guble && cd guble
export GOPATH=`pwd`
go get github.com/smancke/guble
bin/guble --log=info

Go客户端示例

package main

import (
	"fmt"
	"github.com/smancke/guble/client"
	"time"
)

func main() {
	// 创建客户端
	c := client.New("ws://localhost:8080/stream", "http://localhost:8080")
	
	// 连接服务器
	err := c.Start()
	if err != nil {
		panic(err)
	}
	defer c.Stop()
	
	// 订阅主题
	err = c.Subscribe("/foo", func(msg *client.Message) {
		fmt.Printf("收到消息: %+v\n", msg)
	})
	if err != nil {
		panic(err)
	}
	
	// 发布消息
	err = c.Publish("/foo", "Hello World", nil)
	if err != nil {
		panic(err)
	}
	
	time.Sleep(1 * time.Second)
}

REST API示例

发布消息:

curl -X POST -H "x-Guble-Key: Value" --data Hello 'http://127.0.0.1:8080/api/message/foo?userId=marvin&messageId=42'

WebSocket协议示例

发送消息:

> /foo

Hello World

订阅主题:

+ /foo

取消订阅:

- /foo

协议参考

消息格式

所有从服务器发送到客户端的有效负载消息都使用以下格式:

<path:string>,<sequenceId:int64>,<publisherUserId:string>,<publisherApplicationId:string>,<publisherMessageId:string>,<messagePublishingTime:unix-timestamp>\n
[<application headers json>]\n
<body>

示例:

/foo/bar,42,user01,phone1,id123,1420110000
{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}
Hello World

主题

消息可以通过主题分层路由,因此它们由路径表示,用/分隔。服务器确保一条消息只传递一次,即使它匹配多个订阅路径。

子主题:

路径分隔符给出了子主题的语义。这样,对父主题(例如/foo)的订阅也会接收所有子主题(例如/foo/bar)的消息。

配置选项

CLI选项 环境变量 默认值 描述
--env GUBLE_ENV development | integration | preproduction | production development 应用程序运行的环境名称。主要用于日志记录
--health-endpoint GUBLE_HEALTH_ENDPOINT resource/path/to/healthendpoint /admin/healthcheck HTTP服务器使用的健康端点。通过将值设置为""可以禁用
--http GUBLE_HTTP_LISTEN 格式: [host]:port HTTP服务器监听的地址
--kvs GUBLE_KVS memory | file | postgres file 用于键值存储的存储后端
--log GUBLE_LOG panic | fatal | error | warn | info | debug error 进程记录的日志级别
--metrics-endpoint GUBLE_METRICS_ENDPOINT resource/path/to/metricsendpoint /admin/metrics HTTP服务器使用的指标端点。通过将值设置为""可以禁用
--ms GUBLE_MS memory | file file 消息存储后端
--profile GUBLE_PROFILE cpu | mem | block 要使用的分析器
--storage-path GUBLE_STORAGE_PATH path/to/storage /var/lib/guble 用于存储消息和键值数据(如订阅)的路径。路径必须存在!

性能

在一台配备i5-2520M、双核和SSD的旧笔记本上测量。消息负载为"Hello Word"。负载驱动程序和服务器设置在同一台机器上,因此50%的CPU分配给负载驱动程序。

  • 端到端:每秒传递约35,000条持久消息
  • 获取:每秒接收约70,000条持久消息

在测试期间,服务器的内存消耗约为25 MB。


更多关于golang分布式消息推送服务插件库guble的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang分布式消息推送服务插件库guble的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Guble - Go分布式消息推送服务插件库

Guble是一个轻量级的分布式消息推送服务库,专为Go语言设计。它提供了发布/订阅模式的消息推送功能,支持WebSocket和HTTP长轮询协议。

主要特性

  • 轻量级设计
  • 支持主题(topic)和队列(queue)两种消息模式
  • 提供WebSocket和HTTP长轮询接口
  • 可扩展的存储后端
  • 简单的API设计

安装

go get github.com/smancke/guble

基本使用示例

1. 启动Guble服务器

package main

import (
	"log"
	"github.com/smancke/guble/server"
)

func main() {
	// 创建配置
	cfg := server.NewConfig()
	cfg.HttpListen = ":8080"  // HTTP监听端口
	cfg.WSListen = ":8081"   // WebSocket监听端口
    
	// 创建并启动服务器
	srv := server.NewServer(cfg)
	if err := srv.Start(); err != nil {
		log.Fatal("启动服务器失败:", err)
	}
	log.Println("Guble服务器已启动")
	
	// 阻塞主线程
	select {}
}

2. 发布消息

package main

import (
	"log"
	"github.com/smancke/guble/client"
)

func main() {
	// 创建客户端
	c, err := client.NewWebSocketClient("ws://localhost:8081/")
	if err != nil {
		log.Fatal("创建客户端失败:", err)
	}
	
	// 连接到服务器
	if err := c.Start(); err != nil {
		log.Fatal("连接服务器失败:", err)
	}
	
	// 发布消息到主题
	err = c.Send("/topic/example", "这是一条测试消息", nil)
	if err != nil {
		log.Fatal("发送消息失败:", err)
	}
	
	log.Println("消息已发布")
	c.Stop()
}

3. 订阅消息

package main

import (
	"log"
	"time"
	
	"github.com/smancke/guble/client"
)

func main() {
	// 创建客户端
	c, err := client.NewWebSocketClient("ws://localhost:8081/")
	if err != nil {
		log.Fatal("创建客户端失败:", err)
	}
	
	// 连接到服务器
	if err := c.Start(); err != nil {
		log.Fatal("连接服务器失败:", err)
	}
	
	// 订阅主题
	err = c.Subscribe("/topic/example", func(msg *client.Message) {
		log.Printf("收到消息: 主题=%s, 内容=%s, ID=%d", 
			msg.Path, msg.Body, msg.Id)
	})
	if err != nil {
		log.Fatal("订阅失败:", err)
	}
	
	log.Println("已订阅主题 /topic/example")
	
	// 保持连接
	time.Sleep(5 * time.Minute)
	c.Stop()
}

高级功能

消息过滤

Guble支持基于消息ID的过滤:

// 只接收ID大于100的消息
err = c.Subscribe("/topic/example", func(msg *client.Message) {
	log.Printf("收到新消息: %s", msg.Body)
}, client.StartId(100))

使用HTTP长轮询

package main

import (
	"log"
	"net/http"
	"github.com/smancke/guble/client"
)

func main() {
	// 创建HTTP客户端
	c := client.NewHttpClient("http://localhost:8080/")
	
	// 订阅主题
	err := c.Subscribe("/topic/example", func(msg *client.Message) {
		log.Printf("收到消息: %s", msg.Body)
	})
	if err != nil {
		log.Fatal("订阅失败:", err)
	}
	
	// 启动HTTP服务器来接收回调
	http.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) {
		c.HandleCallback(w, r)
	})
	
	log.Println("启动回调服务器 :8082")
	http.ListenAndServe(":8082", nil)
}

集群模式

Guble支持简单的集群模式,多个节点可以通过配置共享存储后端:

cfg := server.NewConfig()
cfg.ClusterEnabled = true
cfg.ClusterNodes = []string{"node1:8080", "node2:8080"} // 其他节点地址

存储后端

Guble支持多种存储后端,默认使用内存存储,也可以配置为使用文件或Redis:

// 使用文件存储
cfg.StoragePath = "./data"

// 使用Redis存储
cfg.RedisEnabled = true
cfg.RedisAddr = "localhost:6379"

性能考虑

  • 对于高吞吐量场景,建议使用WebSocket协议
  • 集群模式可以水平扩展处理能力
  • Redis存储后端可以提高持久化性能

Guble是一个简单但功能强大的消息推送解决方案,特别适合需要实时通知功能的Go应用程序。它的轻量级设计使得它可以很容易地集成到现有系统中。

回到顶部