Golang如何通过WebSocket的Pub/Sub消息机制向浏览器发送数据
Golang如何通过WebSocket的Pub/Sub消息机制向浏览器发送数据 我有一个GCP Pub/Sub,它处于订阅模式,会持续接收来自一个Windows工具客户端的消息。我处理这些消息并得到一些结果,对于负面结果,我希望将其再次发送给Windows工具。为此,我考虑创建一个WebSocket连接,并仅通过此连接发送负面响应。但我不知道如何使用Pub/Sub消息并将其通过WebSocket发送。如果有人之前遇到过这个问题,请提供建议。
1 回复
更多关于Golang如何通过WebSocket的Pub/Sub消息机制向浏览器发送数据的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Golang中通过WebSocket将Pub/Sub消息发送到浏览器,可以结合gorilla/websocket包和GCP Pub/Sub客户端库实现。以下是一个完整示例:
首先,建立WebSocket连接处理:
package main
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
"cloud.google.com/go/pubsub"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
type Client struct {
conn *websocket.Conn
send chan []byte
}
var (
clients = make(map[*Client]bool)
clientsMu sync.RWMutex
)
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket升级失败: %v", err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
}
clientsMu.Lock()
clients[client] = true
clientsMu.Unlock()
go client.writePump()
go client.readPump()
}
func (c *Client) writePump() {
defer func() {
c.conn.Close()
clientsMu.Lock()
delete(clients, c)
clientsMu.Unlock()
}()
for message := range c.send {
err := c.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
break
}
}
}
然后,集成Pub/Sub订阅处理:
func processPubSubMessages(projectID, subscriptionID string) {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("创建Pub/Sub客户端失败: %v", err)
}
defer client.Close()
sub := client.Subscription(subscriptionID)
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// 处理消息,检查是否为负面结果
if isNegativeResult(msg.Data) {
// 通过WebSocket广播负面结果
broadcastToClients(msg.Data)
msg.Ack() // 确认消息已处理
} else {
msg.Ack() // 正常结果,仅确认
}
})
if err != nil {
log.Printf("接收Pub/Sub消息失败: %v", err)
}
}
func isNegativeResult(data []byte) bool {
// 实现你的负面结果判断逻辑
// 示例:检查消息内容是否包含错误标识
return bytes.Contains(data, []byte("error")) ||
bytes.Contains(data, []byte("failure"))
}
func broadcastToClients(message []byte) {
clientsMu.RLock()
defer clientsMu.RUnlock()
for client := range clients {
select {
case client.send <- message:
// 消息已排队发送
default:
// 通道满,关闭连接
close(client.send)
delete(clients, client)
}
}
}
主函数整合:
func main() {
// 启动WebSocket服务器
http.HandleFunc("/ws", handleWebSocket)
// 启动Pub/Sub消息处理
go processPubSubMessages("your-project-id", "your-subscription-id")
log.Println("服务器启动在 :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
浏览器端JavaScript连接示例:
const socket = new WebSocket('ws://localhost:8080/ws');
socket.onmessage = function(event) {
const message = JSON.parse(event.data);
console.log('收到负面结果:', message);
// 在这里处理负面结果,可以显示给用户或采取其他操作
};
socket.onopen = function() {
console.log('WebSocket连接已建立');
};
socket.onerror = function(error) {
console.error('WebSocket错误:', error);
};
这个实现中,processPubSubMessages函数持续监听Pub/Sub订阅,当检测到负面结果时,通过broadcastToClients函数将消息发送给所有连接的WebSocket客户端。WebSocket连接管理使用线程安全的map和互斥锁,确保并发安全。

