golang实现实时消息通信(WebSockets/SockJS)的服务器插件Centrifugo的使用
Golang 实现实时消息通信(WebSockets/SockJS)的服务器插件 Centrifugo 的使用
Centrifugo 简介
Centrifugo 是一个开源的、可扩展的实时消息服务器。它可以通过支持的传输协议(WebSocket、HTTP-streaming、Server-Sent Events、GRPC、WebTransport)即时将消息传递给在线的应用程序用户。Centrifugo 采用频道订阅的概念,是一个面向用户的 PUB/SUB 服务器。
Centrifugo 与语言无关,可以与任何后端结合使用,用于构建聊天应用、实时评论、多人在线游戏、实时数据可视化、协作工具等。它非常适合现代架构,可以将业务逻辑与实时传输层解耦。
核心概念
Centrifugo 的核心思想很简单 - 它是基于现代实时传输协议的 PUB/SUB 服务器:
Golang 使用 Centrifugo 的完整示例
下面是一个使用 Golang 与 Centrifugo 集成的完整示例:
1. 安装 Centrifugo
# 使用 Docker 运行 Centrifugo
docker run -p 8000:8000 centrifugo/centrifugo:v5 centrifugo --config /centrifugo/config.json
2. 创建 config.json 配置文件
{
"token_hmac_secret_key": "your_secret_key",
"admin_password": "your_admin_password",
"admin_secret": "your_admin_secret",
"api_key": "your_api_key",
"allowed_origins": ["*"]
}
3. Golang 客户端代码示例
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/centrifugal/centrifuge-go"
)
// 定义消息结构体
type Message struct {
Text string `json:"text"`
Timestamp int64 `json:"timestamp"`
}
func main() {
// 创建客户端
client := centrifuge.NewJsonClient(
"ws://localhost:8000/connection/websocket",
centrifuge.Config{
Token: generateToken("user123"), // 生成 JWT token
},
)
// 设置事件处理器
client.OnConnecting(func(e centrifuge.ConnectingEvent) {
log.Printf("Connecting: %d (%s)", e.Code, e.Reason)
})
client.OnConnected(func(e centrifuge.ConnectedEvent) {
log.Printf("Connected with ID: %s", e.ClientID)
})
client.OnDisconnected(func(e centrifuge.DisconnectedEvent) {
log.Printf("Disconnected: %d (%s)", e.Code, e.Reason)
})
// 连接
err := client.Connect()
if err != nil {
log.Fatalf("Could not connect: %v", err)
}
defer client.Close()
// 订阅频道
sub, err := client.NewSubscription("chat:general")
if err != nil {
log.Fatalf("Could not create subscription: %v", err)
}
// 设置订阅事件处理器
sub.OnSubscribing(func(e centrifuge.SubscribingEvent) {
log.Printf("Subscribing: %d (%s)", e.Code, e.Reason)
})
sub.OnSubscribed(func(e centrifuge.SubscribedEvent) {
log.Printf("Subscribed to channel %s", sub.Channel())
})
sub.OnUnsubscribed(func(e centrifuge.UnsubscribedEvent) {
log.Printf("Unsubscribed from channel %s", sub.Channel())
})
// 处理接收到的消息
sub.OnPublication(func(e centrifuge.PublicationEvent) {
var msg Message
err := json.Unmarshal(e.Data, &msg)
if err != nil {
log.Printf("Error unmarshaling message: %v", err)
return
}
log.Printf("Received message: %s (timestamp: %d)", msg.Text, msg.Timestamp)
})
// 订阅
err = sub.Subscribe()
if err != nil {
log.Fatalf("Could not subscribe: %v", err)
}
defer sub.Unsubscribe()
// 发布消息示例
go func() {
for i := 0; i < 5; i++ {
msg := Message{
Text: fmt.Sprintf("Hello from Go! (%d)", i),
Timestamp: time.Now().Unix(),
}
data, _ := json.Marshal(msg)
_, err := sub.Publish(context.Background(), data)
if err != nil {
log.Printf("Error publishing message: %v", err)
}
time.Sleep(2 * time.Second)
}
}()
// 保持运行
select {}
}
// 生成 JWT token (简化版,实际应用中应该在后端生成)
func generateToken(userID string) string {
// 实际应用中应该使用 HMAC 签名生成 JWT
// 这里简化返回一个示例 token
return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIn0.1234567890"
}
4. Golang 服务端 API 示例
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
)
type PublishRequest struct {
Channel string `json:"channel"`
Data interface{} `json:"data"`
}
func main() {
// Centrifugo API 地址
apiURL := "http://localhost:8000/api"
// 准备发布的消息
message := map[string]interface{}{
"text": "Server message",
"timestamp": 1234567890,
}
// 创建请求体
reqBody := PublishRequest{
Channel: "chat:general",
Data: message,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
log.Fatal(err)
}
// 创建 HTTP 请求
req, err := http.NewRequest("POST", apiURL+"/publish", bytes.NewBuffer(jsonBody))
if err != nil {
log.Fatal(err)
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "apikey your_api_key") // 使用配置中的 API key
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
// 读取响应
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Response: %s\n", body)
}
主要特性
- 高效的实时传输协议:WebSocket、HTTP-streaming、Server-Sent Events、GRPC、WebTransport
- 内置可扩展性,支持 Redis、Redis Cluster 或兼容 Redis 的存储
- 简单的 HTTP 和 GRPC 服务器 API,用于从应用后端与 Centrifugo 通信
- 异步 PostgreSQL 和 Kafka 消费者,支持事务性发件箱和 CDC 模式
- 灵活的连接认证机制:JWT 和代理式(通过 Centrifugo 向后端发送请求)
- 在单个连接上多路复用频道订阅
- 不同类型的订阅:客户端订阅和服务器端订阅
- 各种频道权限策略,频道命名空间概念
- 频道中的热消息历史记录,在重新连接时自动恢复消息
总结
Centrifugo 是一个功能强大的实时消息服务器,Golang 可以轻松地与其集成。上面的示例展示了如何创建客户端连接、订阅频道、发布和接收消息,以及如何从后端服务器通过 API 发布消息。
更多关于golang实现实时消息通信(WebSockets/SockJS)的服务器插件Centrifugo的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现实时消息通信(WebSockets/SockJS)的服务器插件Centrifugo的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Centrifugo在Golang中的实时消息通信实现
Centrifugo是一个用Golang编写的开源实时消息服务器,支持WebSocket和SockJS协议。下面我将介绍如何在Golang应用中使用Centrifugo实现实时消息通信。
Centrifugo基本概念
Centrifugo作为独立的实时消息服务器,与您的应用后端通过API进行交互。主要特点包括:
- 支持WebSocket和SockJS协议
- 水平可扩展
- 支持多种消息协议(JSON、Protobuf)
- 提供多种认证机制
安装Centrifugo
首先需要安装Centrifugo服务器:
# 使用Docker安装
docker pull centrifugo/centrifugo
docker run -p 8000:8000 centrifugo/centrifugo centrifugo --config config.json
或者从官网下载二进制文件直接运行。
基础配置
创建config.json
配置文件:
{
"token_hmac_secret_key": "your-secret-key",
"admin_password": "admin-password",
"admin_secret": "admin-secret",
"api_key": "api-key",
"allowed_origins": ["*"],
"prometheus": true,
"namespaces": [
{
"name": "chat",
"publish": true,
"subscribe": true,
"presence": true,
"join_leave": true
}
]
}
Golang客户端实现
1. 服务端发布消息
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
type PublishRequest struct {
Channel string `json:"channel"`
Data interface{} `json:"data"`
}
func publishToCentrifugo(channel string, data interface{}) error {
apiURL := "http://localhost:8000/api/publish"
apiKey := "your-api-key"
reqBody := PublishRequest{
Channel: channel,
Data: data,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
return err
}
req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonBody))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "apikey "+apiKey)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
func main() {
// 示例:发布消息到chat频道
err := publishToCentrifugo("chat", map[string]string{
"text": "Hello from Go!",
"user": "gopher",
})
if err != nil {
fmt.Println("Error publishing message:", err)
}
}
2. 生成连接令牌(JWT)
客户端连接Centrifugo需要有效的JWT令牌:
package main
import (
"fmt"
"time"
"github.com/golang-jwt/jwt/v4"
)
func generateConnectionToken(userID, secret string, expiresIn time.Duration) (string, error) {
claims := jwt.MapClaims{
"sub": userID,
"exp": time.Now().Add(expiresIn).Unix(),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString([]byte(secret))
}
func main() {
token, err := generateConnectionToken("user123", "your-secret-key", 24*time.Hour)
if err != nil {
fmt.Println("Error generating token:", err)
return
}
fmt.Println("Generated token:", token)
}
3. 前端连接示例
<!DOCTYPE html>
<html>
<head>
<title>Centrifugo Demo</title>
<script src="https://unpkg.com/centrifuge@5.0.0/dist/centrifuge.js"></script>
</head>
<body>
<script>
const token = "YOUR_GENERATED_JWT_TOKEN"; // 用上面Go代码生成的token
const centrifuge = new Centrifuge("ws://localhost:8000/connection/websocket", {
token: token
});
centrifuge.on('connect', function(ctx) {
console.log("Connected", ctx);
});
centrifuge.on('disconnect', function(ctx) {
console.log("Disconnected", ctx);
});
const subscription = centrifuge.newSubscription("chat");
subscription.on('publication', function(ctx) {
console.log("New message:", ctx.data);
});
subscription.subscribe();
centrifuge.connect();
</script>
</body>
</html>
高级功能
1. 私有频道订阅
// 生成私有频道订阅令牌
func generateSubscriptionToken(clientID, channel, secret string, expiresIn time.Duration) (string, error) {
claims := jwt.MapClaims{
"sub": clientID,
"channel": channel,
"exp": time.Now().Add(expiresIn).Unix(),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString([]byte(secret))
}
2. 批量发布消息
type BatchPublishRequest struct {
Channels []string `json:"channels"`
Data interface{} `json:"data"`
}
func batchPublish(channels []string, data interface{}) error {
// 类似publishToCentrifugo实现,但使用/api/batch_publish端点
// ...
return nil
}
性能优化建议
- 使用连接池管理HTTP客户端
- 批量发送消息减少API调用
- 考虑使用Protobuf协议替代JSON
- 对于高并发场景,部署多个Centrifugo节点
监控和管理
Centrifugo提供Prometheus指标和Admin界面,可以通过以下方式访问:
- 管理界面:
http://localhost:8000
- Prometheus指标:
http://localhost:8000/metrics
- 健康检查:
http://localhost:8000/health
总结
Centrifugo为Golang应用提供了强大的实时消息通信能力,通过简单的API集成即可实现WebSocket/SockJS通信。它的优势在于:
- 与主应用解耦,独立运行
- 支持大规模并发连接
- 提供丰富的客户端SDK
- 灵活的认证和授权机制
以上代码示例展示了基本用法,实际项目中可以根据需求扩展更复杂的实时交互功能。