golang实现实时消息通信(WebSockets/SockJS)的服务器插件Centrifugo的使用

Golang 实现实时消息通信(WebSockets/SockJS)的服务器插件 Centrifugo 的使用

Centrifugo 简介

Centrifugo 是一个开源的、可扩展的实时消息服务器。它可以通过支持的传输协议(WebSocket、HTTP-streaming、Server-Sent Events、GRPC、WebTransport)即时将消息传递给在线的应用程序用户。Centrifugo 采用频道订阅的概念,是一个面向用户的 PUB/SUB 服务器。

Centrifugo 与语言无关,可以与任何后端结合使用,用于构建聊天应用、实时评论、多人在线游戏、实时数据可视化、协作工具等。它非常适合现代架构,可以将业务逻辑与实时传输层解耦。

核心概念

Centrifugo 的核心思想很简单 - 它是基于现代实时传输协议的 PUB/SUB 服务器:

Centrifugo PUB/SUB 协议

Golang 使用 Centrifugo 的完整示例

下面是一个使用 Golang 与 Centrifugo 集成的完整示例:

1. 安装 Centrifugo

# 使用 Docker 运行 Centrifugo
docker run -p 8000:8000 centrifugo/centrifugo:v5 centrifugo --config /centrifugo/config.json

2. 创建 config.json 配置文件

{
  "token_hmac_secret_key": "your_secret_key",
  "admin_password": "your_admin_password",
  "admin_secret": "your_admin_secret",
  "api_key": "your_api_key",
  "allowed_origins": ["*"]
}

3. Golang 客户端代码示例

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/centrifugal/centrifuge-go"
)

// 定义消息结构体
type Message struct {
	Text      string `json:"text"`
	Timestamp int64  `json:"timestamp"`
}

func main() {
	// 创建客户端
	client := centrifuge.NewJsonClient(
		"ws://localhost:8000/connection/websocket",
		centrifuge.Config{
			Token: generateToken("user123"), // 生成 JWT token
		},
	)

	// 设置事件处理器
	client.OnConnecting(func(e centrifuge.ConnectingEvent) {
		log.Printf("Connecting: %d (%s)", e.Code, e.Reason)
	})
	client.OnConnected(func(e centrifuge.ConnectedEvent) {
		log.Printf("Connected with ID: %s", e.ClientID)
	})
	client.OnDisconnected(func(e centrifuge.DisconnectedEvent) {
		log.Printf("Disconnected: %d (%s)", e.Code, e.Reason)
	})

	// 连接
	err := client.Connect()
	if err != nil {
		log.Fatalf("Could not connect: %v", err)
	}
	defer client.Close()

	// 订阅频道
	sub, err := client.NewSubscription("chat:general")
	if err != nil {
		log.Fatalf("Could not create subscription: %v", err)
	}

	// 设置订阅事件处理器
	sub.OnSubscribing(func(e centrifuge.SubscribingEvent) {
		log.Printf("Subscribing: %d (%s)", e.Code, e.Reason)
	})
	sub.OnSubscribed(func(e centrifuge.SubscribedEvent) {
		log.Printf("Subscribed to channel %s", sub.Channel())
	})
	sub.OnUnsubscribed(func(e centrifuge.UnsubscribedEvent) {
		log.Printf("Unsubscribed from channel %s", sub.Channel())
	})

	// 处理接收到的消息
	sub.OnPublication(func(e centrifuge.PublicationEvent) {
		var msg Message
		err := json.Unmarshal(e.Data, &msg)
		if err != nil {
			log.Printf("Error unmarshaling message: %v", err)
			return
		}
		log.Printf("Received message: %s (timestamp: %d)", msg.Text, msg.Timestamp)
	})

	// 订阅
	err = sub.Subscribe()
	if err != nil {
		log.Fatalf("Could not subscribe: %v", err)
	}
	defer sub.Unsubscribe()

	// 发布消息示例
	go func() {
		for i := 0; i < 5; i++ {
			msg := Message{
				Text:      fmt.Sprintf("Hello from Go! (%d)", i),
				Timestamp: time.Now().Unix(),
			}
			data, _ := json.Marshal(msg)
			_, err := sub.Publish(context.Background(), data)
			if err != nil {
				log.Printf("Error publishing message: %v", err)
			}
			time.Sleep(2 * time.Second)
		}
	}()

	// 保持运行
	select {}
}

// 生成 JWT token (简化版,实际应用中应该在后端生成)
func generateToken(userID string) string {
	// 实际应用中应该使用 HMAC 签名生成 JWT
	// 这里简化返回一个示例 token
	return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIn0.1234567890"
}

4. Golang 服务端 API 示例

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
)

type PublishRequest struct {
	Channel string      `json:"channel"`
	Data    interface{} `json:"data"`
}

func main() {
	// Centrifugo API 地址
	apiURL := "http://localhost:8000/api"

	// 准备发布的消息
	message := map[string]interface{}{
		"text":      "Server message",
		"timestamp": 1234567890,
	}

	// 创建请求体
	reqBody := PublishRequest{
		Channel: "chat:general",
		Data:    message,
	}
	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		log.Fatal(err)
	}

	// 创建 HTTP 请求
	req, err := http.NewRequest("POST", apiURL+"/publish", bytes.NewBuffer(jsonBody))
	if err != nil {
		log.Fatal(err)
	}

	// 设置请求头
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "apikey your_api_key") // 使用配置中的 API key

	// 发送请求
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()

	// 读取响应
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Response: %s\n", body)
}

主要特性

  • 高效的实时传输协议:WebSocket、HTTP-streaming、Server-Sent Events、GRPC、WebTransport
  • 内置可扩展性,支持 Redis、Redis Cluster 或兼容 Redis 的存储
  • 简单的 HTTP 和 GRPC 服务器 API,用于从应用后端与 Centrifugo 通信
  • 异步 PostgreSQL 和 Kafka 消费者,支持事务性发件箱和 CDC 模式
  • 灵活的连接认证机制:JWT 和代理式(通过 Centrifugo 向后端发送请求)
  • 在单个连接上多路复用频道订阅
  • 不同类型的订阅:客户端订阅和服务器端订阅
  • 各种频道权限策略,频道命名空间概念
  • 频道中的热消息历史记录,在重新连接时自动恢复消息

总结

Centrifugo 是一个功能强大的实时消息服务器,Golang 可以轻松地与其集成。上面的示例展示了如何创建客户端连接、订阅频道、发布和接收消息,以及如何从后端服务器通过 API 发布消息。


更多关于golang实现实时消息通信(WebSockets/SockJS)的服务器插件Centrifugo的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现实时消息通信(WebSockets/SockJS)的服务器插件Centrifugo的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Centrifugo在Golang中的实时消息通信实现

Centrifugo是一个用Golang编写的开源实时消息服务器,支持WebSocket和SockJS协议。下面我将介绍如何在Golang应用中使用Centrifugo实现实时消息通信。

Centrifugo基本概念

Centrifugo作为独立的实时消息服务器,与您的应用后端通过API进行交互。主要特点包括:

  • 支持WebSocket和SockJS协议
  • 水平可扩展
  • 支持多种消息协议(JSON、Protobuf)
  • 提供多种认证机制

安装Centrifugo

首先需要安装Centrifugo服务器:

# 使用Docker安装
docker pull centrifugo/centrifugo
docker run -p 8000:8000 centrifugo/centrifugo centrifugo --config config.json

或者从官网下载二进制文件直接运行。

基础配置

创建config.json配置文件:

{
  "token_hmac_secret_key": "your-secret-key",
  "admin_password": "admin-password",
  "admin_secret": "admin-secret",
  "api_key": "api-key",
  "allowed_origins": ["*"],
  "prometheus": true,
  "namespaces": [
    {
      "name": "chat",
      "publish": true,
      "subscribe": true,
      "presence": true,
      "join_leave": true
    }
  ]
}

Golang客户端实现

1. 服务端发布消息

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
	"time"
)

type PublishRequest struct {
	Channel string      `json:"channel"`
	Data    interface{} `json:"data"`
}

func publishToCentrifugo(channel string, data interface{}) error {
	apiURL := "http://localhost:8000/api/publish"
	apiKey := "your-api-key"

	reqBody := PublishRequest{
		Channel: channel,
		Data:    data,
	}

	jsonBody, err := json.Marshal(reqBody)
	if err != nil {
		return err
	}

	req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonBody))
	if err != nil {
		return err
	}

	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "apikey "+apiKey)

	client := &http.Client{Timeout: 5 * time.Second}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}

	return nil
}

func main() {
	// 示例:发布消息到chat频道
	err := publishToCentrifugo("chat", map[string]string{
		"text": "Hello from Go!",
		"user": "gopher",
	})
	if err != nil {
		fmt.Println("Error publishing message:", err)
	}
}

2. 生成连接令牌(JWT)

客户端连接Centrifugo需要有效的JWT令牌:

package main

import (
	"fmt"
	"time"

	"github.com/golang-jwt/jwt/v4"
)

func generateConnectionToken(userID, secret string, expiresIn time.Duration) (string, error) {
	claims := jwt.MapClaims{
		"sub": userID,
		"exp": time.Now().Add(expiresIn).Unix(),
	}

	token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
	return token.SignedString([]byte(secret))
}

func main() {
	token, err := generateConnectionToken("user123", "your-secret-key", 24*time.Hour)
	if err != nil {
		fmt.Println("Error generating token:", err)
		return
	}
	fmt.Println("Generated token:", token)
}

3. 前端连接示例

<!DOCTYPE html>
<html>
<head>
    <title>Centrifugo Demo</title>
    <script src="https://unpkg.com/centrifuge@5.0.0/dist/centrifuge.js"></script>
</head>
<body>
    <script>
        const token = "YOUR_GENERATED_JWT_TOKEN"; // 用上面Go代码生成的token
        const centrifuge = new Centrifuge("ws://localhost:8000/connection/websocket", {
            token: token
        });

        centrifuge.on('connect', function(ctx) {
            console.log("Connected", ctx);
        });

        centrifuge.on('disconnect', function(ctx) {
            console.log("Disconnected", ctx);
        });

        const subscription = centrifuge.newSubscription("chat");

        subscription.on('publication', function(ctx) {
            console.log("New message:", ctx.data);
        });

        subscription.subscribe();
        centrifuge.connect();
    </script>
</body>
</html>

高级功能

1. 私有频道订阅

// 生成私有频道订阅令牌
func generateSubscriptionToken(clientID, channel, secret string, expiresIn time.Duration) (string, error) {
	claims := jwt.MapClaims{
		"sub": clientID,
		"channel": channel,
		"exp": time.Now().Add(expiresIn).Unix(),
	}

	token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
	return token.SignedString([]byte(secret))
}

2. 批量发布消息

type BatchPublishRequest struct {
	Channels []string    `json:"channels"`
	Data     interface{} `json:"data"`
}

func batchPublish(channels []string, data interface{}) error {
	// 类似publishToCentrifugo实现,但使用/api/batch_publish端点
	// ...
	return nil
}

性能优化建议

  1. 使用连接池管理HTTP客户端
  2. 批量发送消息减少API调用
  3. 考虑使用Protobuf协议替代JSON
  4. 对于高并发场景,部署多个Centrifugo节点

监控和管理

Centrifugo提供Prometheus指标和Admin界面,可以通过以下方式访问:

  • 管理界面: http://localhost:8000
  • Prometheus指标: http://localhost:8000/metrics
  • 健康检查: http://localhost:8000/health

总结

Centrifugo为Golang应用提供了强大的实时消息通信能力,通过简单的API集成即可实现WebSocket/SockJS通信。它的优势在于:

  • 与主应用解耦,独立运行
  • 支持大规模并发连接
  • 提供丰富的客户端SDK
  • 灵活的认证和授权机制

以上代码示例展示了基本用法,实际项目中可以根据需求扩展更复杂的实时交互功能。

回到顶部