Golang如何通过WebSocket的Pub/Sub消息机制向浏览器发送数据

Golang如何通过WebSocket的Pub/Sub消息机制向浏览器发送数据 我有一个GCP Pub/Sub,它处于订阅模式,会持续接收来自一个Windows工具客户端的消息。我处理这些消息并得到一些结果,对于负面结果,我希望将其再次发送给Windows工具。为此,我考虑创建一个WebSocket连接,并仅通过此连接发送负面响应。但我不知道如何使用Pub/Sub消息并将其通过WebSocket发送。如果有人之前遇到过这个问题,请提供建议。

1 回复

更多关于Golang如何通过WebSocket的Pub/Sub消息机制向浏览器发送数据的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中通过WebSocket将Pub/Sub消息发送到浏览器,可以结合gorilla/websocket包和GCP Pub/Sub客户端库实现。以下是一个完整示例:

首先,建立WebSocket连接处理:

package main

import (
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
    "cloud.google.com/go/pubsub"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { return true },
}

type Client struct {
    conn *websocket.Conn
    send chan []byte
}

var (
    clients   = make(map[*Client]bool)
    clientsMu sync.RWMutex
)

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("WebSocket升级失败: %v", err)
        return
    }
    
    client := &Client{
        conn: conn,
        send: make(chan []byte, 256),
    }
    
    clientsMu.Lock()
    clients[client] = true
    clientsMu.Unlock()
    
    go client.writePump()
    go client.readPump()
}

func (c *Client) writePump() {
    defer func() {
        c.conn.Close()
        clientsMu.Lock()
        delete(clients, c)
        clientsMu.Unlock()
    }()
    
    for message := range c.send {
        err := c.conn.WriteMessage(websocket.TextMessage, message)
        if err != nil {
            break
        }
    }
}

然后,集成Pub/Sub订阅处理:

func processPubSubMessages(projectID, subscriptionID string) {
    ctx := context.Background()
    
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        log.Fatalf("创建Pub/Sub客户端失败: %v", err)
    }
    defer client.Close()
    
    sub := client.Subscription(subscriptionID)
    
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        // 处理消息,检查是否为负面结果
        if isNegativeResult(msg.Data) {
            // 通过WebSocket广播负面结果
            broadcastToClients(msg.Data)
            msg.Ack() // 确认消息已处理
        } else {
            msg.Ack() // 正常结果,仅确认
        }
    })
    
    if err != nil {
        log.Printf("接收Pub/Sub消息失败: %v", err)
    }
}

func isNegativeResult(data []byte) bool {
    // 实现你的负面结果判断逻辑
    // 示例:检查消息内容是否包含错误标识
    return bytes.Contains(data, []byte("error")) || 
           bytes.Contains(data, []byte("failure"))
}

func broadcastToClients(message []byte) {
    clientsMu.RLock()
    defer clientsMu.RUnlock()
    
    for client := range clients {
        select {
        case client.send <- message:
            // 消息已排队发送
        default:
            // 通道满,关闭连接
            close(client.send)
            delete(clients, client)
        }
    }
}

主函数整合:

func main() {
    // 启动WebSocket服务器
    http.HandleFunc("/ws", handleWebSocket)
    
    // 启动Pub/Sub消息处理
    go processPubSubMessages("your-project-id", "your-subscription-id")
    
    log.Println("服务器启动在 :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

浏览器端JavaScript连接示例:

const socket = new WebSocket('ws://localhost:8080/ws');

socket.onmessage = function(event) {
    const message = JSON.parse(event.data);
    console.log('收到负面结果:', message);
    // 在这里处理负面结果,可以显示给用户或采取其他操作
};

socket.onopen = function() {
    console.log('WebSocket连接已建立');
};

socket.onerror = function(error) {
    console.error('WebSocket错误:', error);
};

这个实现中,processPubSubMessages函数持续监听Pub/Sub订阅,当检测到负面结果时,通过broadcastToClients函数将消息发送给所有连接的WebSocket客户端。WebSocket连接管理使用线程安全的map和互斥锁,确保并发安全。

回到顶部