Golang实现TCP客户端重连与错误捕获的最佳实践

Golang实现TCP客户端重连与错误捕获的最佳实践 到目前为止,我已经找到了一些关于创建简单TCP客户端的示例,例如:

http://www.inanzzz.com/index.php/post/j3n1/creating-a-concurrent-tcp-client-and-server-example-with-golang

Linode Guides & Tutorials

使用Go创建TCP和UDP客户端及服务器

使用Go编程语言创建TCP和UDP客户端及服务器。

这些文章是一个很好的开端。

我在哪里可以找到更复杂的示例?例如,具有重连功能的TCP客户端,或为不同错误情况做好准备的客户端。

您能推荐一些与此学习相关的文章、书籍或开源项目吗?

非常感谢您的帮助。


更多关于Golang实现TCP客户端重连与错误捕获的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

我编写了一个RPC框架,你可以试试:

https://github.com/lesismal/arpc

更多关于Golang实现TCP客户端重连与错误捕获的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你可以查看以下链接以获取更复杂的示例。

图片

通过构建TCP协议来理解Go中的字节

通过构建一个基于TCP的聊天协议,学习处理字节和字节切片([]byte)所需的一切知识。

以下是一个具备重连机制和错误处理的TCP客户端实现示例:

package main

import (
    "bufio"
    "context"
    "fmt"
    "net"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type TCPClient struct {
    address     string
    conn        net.Conn
    mu          sync.RWMutex
    reconnectCh chan struct{}
    stopCh      chan struct{}
    wg          sync.WaitGroup
}

func NewTCPClient(address string) *TCPClient {
    return &TCPClient{
        address:     address,
        reconnectCh: make(chan struct{}, 1),
        stopCh:      make(chan struct{}),
    }
}

func (c *TCPClient) connect() error {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if c.conn != nil {
        c.conn.Close()
    }
    
    dialer := &net.Dialer{
        Timeout:   5 * time.Second,
        KeepAlive: 30 * time.Second,
    }
    
    conn, err := dialer.Dial("tcp", c.address)
    if err != nil {
        return fmt.Errorf("dial failed: %w", err)
    }
    
    c.conn = conn
    fmt.Printf("Connected to %s\n", c.address)
    return nil
}

func (c *TCPClient) reconnect() {
    backoff := time.Second
    
    for {
        select {
        case <-c.stopCh:
            return
        default:
            fmt.Printf("Attempting to reconnect in %v...\n", backoff)
            time.Sleep(backoff)
            
            if err := c.connect(); err == nil {
                backoff = time.Second
                return
            }
            
            if backoff < 30*time.Second {
                backoff *= 2
            }
        }
    }
}

func (c *TCPClient) readLoop() {
    defer c.wg.Done()
    
    for {
        select {
        case <-c.stopCh:
            return
        default:
            c.mu.RLock()
            conn := c.conn
            c.mu.RUnlock()
            
            if conn == nil {
                time.Sleep(100 * time.Millisecond)
                continue
            }
            
            conn.SetReadDeadline(time.Now().Add(10 * time.Second))
            reader := bufio.NewReader(conn)
            message, err := reader.ReadString('\n')
            
            if err != nil {
                if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                    continue
                }
                
                fmt.Printf("Read error: %v\n", err)
                c.scheduleReconnect()
                return
            }
            
            fmt.Printf("Received: %s", message)
        }
    }
}

func (c *TCPClient) writeLoop() {
    defer c.wg.Done()
    
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-c.stopCh:
            return
        case <-ticker.C:
            c.mu.RLock()
            conn := c.conn
            c.mu.RUnlock()
            
            if conn == nil {
                continue
            }
            
            msg := fmt.Sprintf("Ping at %s\n", time.Now().Format(time.RFC3339))
            conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
            _, err := conn.Write([]byte(msg))
            
            if err != nil {
                fmt.Printf("Write error: %v\n", err)
                c.scheduleReconnect()
                return
            }
            
            fmt.Printf("Sent: %s", msg)
        }
    }
}

func (c *TCPClient) scheduleReconnect() {
    select {
    case c.reconnectCh <- struct{}{}:
    default:
    }
}

func (c *TCPClient) reconnectLoop() {
    defer c.wg.Done()
    
    for {
        select {
        case <-c.stopCh:
            return
        case <-c.reconnectCh:
            c.reconnect()
        }
    }
}

func (c *TCPClient) Start() error {
    if err := c.connect(); err != nil {
        return err
    }
    
    c.wg.Add(3)
    go c.readLoop()
    go c.writeLoop()
    go c.reconnectLoop()
    
    return nil
}

func (c *TCPClient) Stop() {
    close(c.stopCh)
    
    c.mu.Lock()
    if c.conn != nil {
        c.conn.Close()
    }
    c.mu.Unlock()
    
    c.wg.Wait()
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    client := NewTCPClient("localhost:8080")
    
    if err := client.Start(); err != nil {
        fmt.Printf("Failed to start client: %v\n", err)
        os.Exit(1)
    }
    
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    select {
    case <-sigCh:
        fmt.Println("Shutting down...")
    case <-ctx.Done():
    }
    
    client.Stop()
    fmt.Println("Client stopped")
}

错误处理增强版示例:

type ErrorHandler struct {
    maxRetries    int
    retryCount    int
    errorCallback func(error)
}

func (h *ErrorHandler) HandleError(err error) bool {
    if h.errorCallback != nil {
        h.errorCallback(err)
    }
    
    switch e := err.(type) {
    case net.Error:
        if e.Timeout() {
            fmt.Println("Network timeout error")
            return h.retryCount < h.maxRetries
        }
        if e.Temporary() {
            fmt.Println("Temporary network error")
            return h.retryCount < h.maxRetries
        }
    case *net.OpError:
        fmt.Printf("Operation error: %v\n", e)
        return true
    default:
        fmt.Printf("Unknown error: %v\n", e)
    }
    
    return h.retryCount < h.maxRetries
}

func (c *TCPClient) connectWithRetry() error {
    handler := &ErrorHandler{
        maxRetries: 5,
        errorCallback: func(err error) {
            fmt.Printf("Connection error: %v\n", err)
        },
    }
    
    for handler.retryCount < handler.maxRetries {
        if err := c.connect(); err != nil {
            handler.retryCount++
            if !handler.HandleError(err) {
                return fmt.Errorf("max retries exceeded: %w", err)
            }
            time.Sleep(time.Duration(handler.retryCount) * time.Second)
            continue
        }
        return nil
    }
    
    return fmt.Errorf("failed to connect after %d attempts", handler.maxRetries)
}

连接健康检查示例:

func (c *TCPClient) healthCheck() bool {
    c.mu.RLock()
    conn := c.conn
    c.mu.RUnlock()
    
    if conn == nil {
        return false
    }
    
    conn.SetReadDeadline(time.Now().Add(1 * time.Second))
    oneByte := make([]byte, 1)
    _, err := conn.Read(oneByte)
    
    if err != nil {
        if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
            conn.SetReadDeadline(time.Time{})
            return true
        }
        return false
    }
    
    return false
}

func (c *TCPClient) monitorConnection() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-c.stopCh:
            return
        case <-ticker.C:
            if !c.healthCheck() {
                fmt.Println("Connection unhealthy, scheduling reconnect")
                c.scheduleReconnect()
            }
        }
    }
}

这些示例展示了TCP客户端重连机制、错误分类处理和连接健康监控的实现方法。

回到顶部