Golang实现TCP客户端重连与错误捕获的最佳实践
Golang实现TCP客户端重连与错误捕获的最佳实践 到目前为止,我已经找到了一些关于创建简单TCP客户端的示例,例如:
使用Go创建TCP和UDP客户端及服务器
使用Go编程语言创建TCP和UDP客户端及服务器。
这些文章是一个很好的开端。
我在哪里可以找到更复杂的示例?例如,具有重连功能的TCP客户端,或为不同错误情况做好准备的客户端。
您能推荐一些与此学习相关的文章、书籍或开源项目吗?
非常感谢您的帮助。
更多关于Golang实现TCP客户端重连与错误捕获的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html
3 回复
我编写了一个RPC框架,你可以试试:
更多关于Golang实现TCP客户端重连与错误捕获的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
以下是一个具备重连机制和错误处理的TCP客户端实现示例:
package main
import (
"bufio"
"context"
"fmt"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type TCPClient struct {
address string
conn net.Conn
mu sync.RWMutex
reconnectCh chan struct{}
stopCh chan struct{}
wg sync.WaitGroup
}
func NewTCPClient(address string) *TCPClient {
return &TCPClient{
address: address,
reconnectCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
}
}
func (c *TCPClient) connect() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
c.conn.Close()
}
dialer := &net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}
conn, err := dialer.Dial("tcp", c.address)
if err != nil {
return fmt.Errorf("dial failed: %w", err)
}
c.conn = conn
fmt.Printf("Connected to %s\n", c.address)
return nil
}
func (c *TCPClient) reconnect() {
backoff := time.Second
for {
select {
case <-c.stopCh:
return
default:
fmt.Printf("Attempting to reconnect in %v...\n", backoff)
time.Sleep(backoff)
if err := c.connect(); err == nil {
backoff = time.Second
return
}
if backoff < 30*time.Second {
backoff *= 2
}
}
}
}
func (c *TCPClient) readLoop() {
defer c.wg.Done()
for {
select {
case <-c.stopCh:
return
default:
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
time.Sleep(100 * time.Millisecond)
continue
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
reader := bufio.NewReader(conn)
message, err := reader.ReadString('\n')
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
fmt.Printf("Read error: %v\n", err)
c.scheduleReconnect()
return
}
fmt.Printf("Received: %s", message)
}
}
}
func (c *TCPClient) writeLoop() {
defer c.wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
continue
}
msg := fmt.Sprintf("Ping at %s\n", time.Now().Format(time.RFC3339))
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, err := conn.Write([]byte(msg))
if err != nil {
fmt.Printf("Write error: %v\n", err)
c.scheduleReconnect()
return
}
fmt.Printf("Sent: %s", msg)
}
}
}
func (c *TCPClient) scheduleReconnect() {
select {
case c.reconnectCh <- struct{}{}:
default:
}
}
func (c *TCPClient) reconnectLoop() {
defer c.wg.Done()
for {
select {
case <-c.stopCh:
return
case <-c.reconnectCh:
c.reconnect()
}
}
}
func (c *TCPClient) Start() error {
if err := c.connect(); err != nil {
return err
}
c.wg.Add(3)
go c.readLoop()
go c.writeLoop()
go c.reconnectLoop()
return nil
}
func (c *TCPClient) Stop() {
close(c.stopCh)
c.mu.Lock()
if c.conn != nil {
c.conn.Close()
}
c.mu.Unlock()
c.wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := NewTCPClient("localhost:8080")
if err := client.Start(); err != nil {
fmt.Printf("Failed to start client: %v\n", err)
os.Exit(1)
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
fmt.Println("Shutting down...")
case <-ctx.Done():
}
client.Stop()
fmt.Println("Client stopped")
}
错误处理增强版示例:
type ErrorHandler struct {
maxRetries int
retryCount int
errorCallback func(error)
}
func (h *ErrorHandler) HandleError(err error) bool {
if h.errorCallback != nil {
h.errorCallback(err)
}
switch e := err.(type) {
case net.Error:
if e.Timeout() {
fmt.Println("Network timeout error")
return h.retryCount < h.maxRetries
}
if e.Temporary() {
fmt.Println("Temporary network error")
return h.retryCount < h.maxRetries
}
case *net.OpError:
fmt.Printf("Operation error: %v\n", e)
return true
default:
fmt.Printf("Unknown error: %v\n", e)
}
return h.retryCount < h.maxRetries
}
func (c *TCPClient) connectWithRetry() error {
handler := &ErrorHandler{
maxRetries: 5,
errorCallback: func(err error) {
fmt.Printf("Connection error: %v\n", err)
},
}
for handler.retryCount < handler.maxRetries {
if err := c.connect(); err != nil {
handler.retryCount++
if !handler.HandleError(err) {
return fmt.Errorf("max retries exceeded: %w", err)
}
time.Sleep(time.Duration(handler.retryCount) * time.Second)
continue
}
return nil
}
return fmt.Errorf("failed to connect after %d attempts", handler.maxRetries)
}
连接健康检查示例:
func (c *TCPClient) healthCheck() bool {
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
return false
}
conn.SetReadDeadline(time.Now().Add(1 * time.Second))
oneByte := make([]byte, 1)
_, err := conn.Read(oneByte)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
conn.SetReadDeadline(time.Time{})
return true
}
return false
}
return false
}
func (c *TCPClient) monitorConnection() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
if !c.healthCheck() {
fmt.Println("Connection unhealthy, scheduling reconnect")
c.scheduleReconnect()
}
}
}
}
这些示例展示了TCP客户端重连机制、错误分类处理和连接健康监控的实现方法。


