Golang设计问题:带状态Web GUI的TCP服务器实现

Golang设计问题:带状态Web GUI的TCP服务器实现 我想开发一个带有状态网页界面的TCP服务器。该服务器将维护许多长期运行的TCP连接。状态网页应显示每个连接的状态…

例如,单个TCP连接(即一个goroutine)包含一个存储了大量状态信息的结构体。如何以优雅的方式(尽可能减少"锁"的使用)读取这些正在运行的众多TCP连接的信息!?

1 回复

更多关于Golang设计问题:带状态Web GUI的TCP服务器实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现带状态Web GUI的TCP服务器时,确实需要谨慎处理并发访问问题。以下是一种使用通道和原子操作来优雅读取连接状态的方案:

package main

import (
    "fmt"
    "net"
    "net/http"
    "sync"
    "sync/atomic"
    "time"
)

// ConnectionState 表示TCP连接的状态
type ConnectionState struct {
    ID          string
    RemoteAddr  string
    BytesRead   uint64
    BytesWritten uint64
    ConnectedAt time.Time
    Status      string
}

// ConnectionManager 管理所有TCP连接
type ConnectionManager struct {
    connections sync.Map // 使用sync.Map存储连接状态
    totalConnections int32
}

// TCPConnection 表示单个TCP连接
type TCPConnection struct {
    conn     net.Conn
    state    *ConnectionState
    manager  *ConnectionManager
    stopChan chan struct{}
}

func NewConnectionManager() *ConnectionManager {
    return &ConnectionManager{
        connections: sync.Map{},
    }
}

func (cm *ConnectionManager) AddConnection(conn net.Conn) *TCPConnection {
    connID := fmt.Sprintf("conn-%d", atomic.AddInt32(&cm.totalConnections, 1))
    
    state := &ConnectionState{
        ID:          connID,
        RemoteAddr:  conn.RemoteAddr().String(),
        ConnectedAt: time.Now(),
        Status:      "connected",
    }
    
    tcpConn := &TCPConnection{
        conn:     conn,
        state:    state,
        manager:  cm,
        stopChan: make(chan struct{}),
    }
    
    cm.connections.Store(connID, tcpConn)
    return tcpConn
}

func (cm *ConnectionManager) RemoveConnection(connID string) {
    cm.connections.Delete(connID)
}

func (cm *ConnectionManager) GetAllStates() []ConnectionState {
    var states []ConnectionState
    
    cm.connections.Range(func(key, value interface{}) bool {
        if tcpConn, ok := value.(*TCPConnection); ok {
            // 使用原子操作读取状态,避免锁竞争
            state := ConnectionState{
                ID:          tcpConn.state.ID,
                RemoteAddr:  tcpConn.state.RemoteAddr,
                BytesRead:   atomic.LoadUint64(&tcpConn.state.BytesRead),
                BytesWritten: atomic.LoadUint64(&tcpConn.state.BytesWritten),
                ConnectedAt: tcpConn.state.ConnectedAt,
                Status:      tcpConn.state.Status,
            }
            states = append(states, state)
        }
        return true
    })
    
    return states
}

func (tc *TCPConnection) Handle() {
    defer tc.conn.Close()
    defer tc.manager.RemoveConnection(tc.state.ID)
    
    buffer := make([]byte, 1024)
    
    for {
        select {
        case <-tc.stopChan:
            return
        default:
            tc.conn.SetReadDeadline(time.Now().Add(10 * time.Second))
            
            n, err := tc.conn.Read(buffer)
            if err != nil {
                tc.state.Status = "disconnected"
                return
            }
            
            // 原子更新读取字节数
            atomic.AddUint64(&tc.state.BytesRead, uint64(n))
            
            // 处理业务逻辑...
            response := fmt.Sprintf("Received %d bytes", n)
            _, err = tc.conn.Write([]byte(response))
            if err == nil {
                atomic.AddUint64(&tc.state.BytesWritten, uint64(len(response)))
            }
        }
    }
}

func (tc *TCPConnection) Stop() {
    close(tc.stopChan)
}

// Web界面处理函数
func (cm *ConnectionManager) StatusHandler(w http.ResponseWriter, r *http.Request) {
    states := cm.GetAllStates()
    
    w.Header().Set("Content-Type", "application/json")
    fmt.Fprintf(w, `{"connections": [`)
    
    for i, state := range states {
        if i > 0 {
            fmt.Fprintf(w, ",")
        }
        fmt.Fprintf(w, `{"id": "%s", "remote_addr": "%s", "bytes_read": %d, "bytes_written": %d, "connected_at": "%s", "status": "%s"}`,
            state.ID, state.RemoteAddr, state.BytesRead, state.BytesWritten, 
            state.ConnectedAt.Format(time.RFC3339), state.Status)
    }
    
    fmt.Fprintf(w, `]}`)
}

func main() {
    manager := NewConnectionManager()
    
    // 启动TCP服务器
    go func() {
        listener, err := net.Listen("tcp", ":8080")
        if err != nil {
            panic(err)
        }
        defer listener.Close()
        
        for {
            conn, err := listener.Accept()
            if err != nil {
                continue
            }
            
            tcpConn := manager.AddConnection(conn)
            go tcpConn.Handle()
        }
    }()
    
    // 启动Web服务器
    http.HandleFunc("/status", manager.StatusHandler)
    fmt.Println("Web GUI available at http://localhost:8081/status")
    http.ListenAndServe(":8081", nil)
}

这个实现的关键特点:

  1. 使用sync.Map:用于存储连接状态,内部使用更细粒度的锁机制
  2. 原子操作:对频繁更新的计数器使用atomic包操作
  3. 通道控制:使用stopChan优雅关闭连接
  4. 无锁读取:在获取所有状态时,通过原子加载避免数据竞争

Web界面通过/status端点提供JSON格式的连接状态信息,可以通过前端框架(如React、Vue)构建实时监控界面。

回到顶部