Golang中如何通过不同代理主机复用HTTP客户端连接

Golang中如何通过不同代理主机复用HTTP客户端连接 以下是我的代码:https://goplay.space/#z5J6i_EPWmx

我想用这段代码来检查代理的可用性。

在设置了 ulimit -n 256 并运行一段时间后,我遇到了错误 socket too many open files

命令 ll /proc/$(ps aux|grep -v "grep"|grep './main'|awk '{print $2}'|head)/fd|grep -v '\.\.'|grep -v '\.'|wc -l 的输出是 256,我检查了 netstat,发现有很多 TIME_WAIT 状态。

我查看了 pprof,发现有很多 goroutine 超过了 queueChan 的长度,这是否意味着在请求完全关闭之前就创建了新的请求,这是 goroutine 泄漏吗?

如果移除 http.Client 的 Timeout 而保留 transport.Dial 的 Timeout,TIME_WAIT 会减少,但 ESTABLISHED 会增加,并且在拨号超时后请求不会完成。

我的问题是:

  • 当请求超时时,如何立即关闭连接?
  • 不同的代理主机可以复用连接吗?

更多关于Golang中如何通过不同代理主机复用HTTP客户端连接的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何通过不同代理主机复用HTTP客户端连接的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


问题分析

从你的描述来看,主要存在两个问题:

  1. 连接泄漏导致文件描述符耗尽
  2. TIME_WAIT状态过多影响连接复用

解决方案

1. 立即关闭超时连接

对于超时请求,可以通过自定义TransportDialContext来强制关闭连接:

package main

import (
    "context"
    "net"
    "net/http"
    "net/url"
    "time"
)

// 创建可复用且能立即关闭超时连接的HTTP客户端
func createHTTPClient(proxyURL string) (*http.Client, error) {
    proxy, err := url.Parse(proxyURL)
    if err != nil {
        return nil, err
    }

    transport := &http.Transport{
        Proxy: http.ProxyURL(proxy),
        // 关键配置:控制连接池
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        MaxConnsPerHost:     100,
        IdleConnTimeout:     90 * time.Second,
        
        // 自定义DialContext以支持超时立即关闭
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dialer := &net.Dialer{
                Timeout:   30 * time.Second,
                KeepAlive: 30 * time.Second,
            }
            
            conn, err := dialer.DialContext(ctx, network, addr)
            if err != nil {
                return nil, err
            }
            
            // 包装连接以支持超时立即关闭
            return &timeoutConn{
                Conn:    conn,
                timeout: 30 * time.Second,
            }, nil
        },
        
        // 强制关闭空闲连接
        ForceAttemptHTTP2:     true,
        TLSHandshakeTimeout:   10 * time.Second,
        ExpectContinueTimeout: 1 * time.Second,
    }

    return &http.Client{
        Transport: transport,
        Timeout:   30 * time.Second, // 总体超时
    }, nil
}

// 自定义连接类型,支持超时立即关闭
type timeoutConn struct {
    net.Conn
    timeout time.Duration
}

func (c *timeoutConn) SetDeadline(t time.Time) error {
    return c.Conn.SetDeadline(t)
}

func (c *timeoutConn) SetReadDeadline(t time.Time) error {
    return c.Conn.SetReadDeadline(t)
}

func (c *timeoutConn) SetWriteDeadline(t time.Time) error {
    return c.Conn.SetWriteDeadline(t)
}

// 使用示例
func checkProxy(proxyURL string) error {
    client, err := createHTTPClient(proxyURL)
    if err != nil {
        return err
    }
    
    // 测试请求
    req, err := http.NewRequest("GET", "http://example.com", nil)
    if err != nil {
        return err
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    req = req.WithContext(ctx)
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    return nil
}

2. 不同代理主机复用连接

不同代理主机需要独立的连接池,但可以在应用层面复用HTTP客户端:

package main

import (
    "sync"
    "time"
)

// 代理客户端管理器
type ProxyClientManager struct {
    clients map[string]*http.Client
    mu      sync.RWMutex
    cleanup *time.Ticker
}

func NewProxyClientManager() *ProxyClientManager {
    mgr := &ProxyClientManager{
        clients: make(map[string]*http.Client),
        cleanup: time.NewTicker(5 * time.Minute),
    }
    
    // 定期清理空闲客户端
    go mgr.cleanupIdleClients()
    
    return mgr
}

func (m *ProxyClientManager) GetClient(proxyURL string) (*http.Client, error) {
    m.mu.RLock()
    client, exists := m.clients[proxyURL]
    m.mu.RUnlock()
    
    if exists {
        return client, nil
    }
    
    m.mu.Lock()
    defer m.mu.Unlock()
    
    // 双重检查
    if client, exists := m.clients[proxyURL]; exists {
        return client, nil
    }
    
    client, err := createHTTPClient(proxyURL)
    if err != nil {
        return nil, err
    }
    
    m.clients[proxyURL] = client
    return client, nil
}

func (m *ProxyClientManager) cleanupIdleClients() {
    for range m.cleanup.C {
        m.mu.Lock()
        for proxyURL, client := range m.clients {
            // 检查连接池中的空闲连接
            if transport, ok := client.Transport.(*http.Transport); ok {
                transport.CloseIdleConnections()
            }
        }
        m.mu.Unlock()
    }
}

// 使用示例
func main() {
    proxyManager := NewProxyClientManager()
    
    proxies := []string{
        "http://proxy1:8080",
        "http://proxy2:8080",
        "http://proxy3:8080",
    }
    
    var wg sync.WaitGroup
    for _, proxy := range proxies {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()
            
            client, err := proxyManager.GetClient(p)
            if err != nil {
                return
            }
            
            // 使用client进行请求
            _ = checkProxyWithClient(client)
        }(proxy)
    }
    wg.Wait()
}

func checkProxyWithClient(client *http.Client) error {
    req, err := http.NewRequest("GET", "http://example.com", nil)
    if err != nil {
        return err
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    return nil
}

3. 优化版本:结合连接池和立即关闭

package main

import (
    "context"
    "net"
    "net/http"
    "sync"
    "time"
)

// 优化的代理检查器
type ProxyChecker struct {
    clientPool sync.Pool
    proxyURLs  []string
}

func NewProxyChecker(proxyURLs []string) *ProxyChecker {
    return &ProxyChecker{
        clientPool: sync.Pool{
            New: func() interface{} {
                // 创建基础transport,代理在每次使用时设置
                transport := &http.Transport{
                    MaxIdleConns:        50,
                    MaxIdleConnsPerHost: 10,
                    IdleConnTimeout:     30 * time.Second,
                    TLSHandshakeTimeout: 10 * time.Second,
                    DialContext: (&net.Dialer{
                        Timeout:   15 * time.Second,
                        KeepAlive: 30 * time.Second,
                        DualStack: true,
                    }).DialContext,
                }
                
                return &http.Client{
                    Transport: transport,
                    Timeout:   30 * time.Second,
                }
            },
        },
        proxyURLs: proxyURLs,
    }
}

func (pc *ProxyChecker) CheckProxy(proxyURL string) error {
    client := pc.clientPool.Get().(*http.Client)
    defer pc.clientPool.Put(client)
    
    // 为当前请求设置代理
    proxy, _ := url.Parse(proxyURL)
    if transport, ok := client.Transport.(*http.Transport); ok {
        transport.Proxy = http.ProxyURL(proxy)
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
    defer cancel()
    
    req, _ := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil)
    
    resp, err := client.Do(req)
    if err != nil {
        // 立即关闭所有空闲连接
        if transport, ok := client.Transport.(*http.Transport); ok {
            transport.CloseIdleConnections()
        }
        return err
    }
    
    // 读取响应体以确保连接可复用
    io.Copy(io.Discard, resp.Body)
    resp.Body.Close()
    
    return nil
}

// 批量检查代理
func (pc *ProxyChecker) CheckAllProxies() map[string]error {
    results := make(map[string]error)
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    semaphore := make(chan struct{}, 50) // 限制并发数
    
    for _, proxy := range pc.proxyURLs {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()
            
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            err := pc.CheckProxy(p)
            
            mu.Lock()
            results[p] = err
            mu.Unlock()
        }(proxy)
    }
    
    wg.Wait()
    return results
}

关键点说明

  1. 立即关闭超时连接:通过自定义DialContext和连接包装器,在超时时立即关闭底层TCP连接
  2. 连接复用:每个代理主机使用独立的HTTP客户端,但共享相同的连接池配置
  3. 资源管理:使用sync.Pool管理客户端,定期清理空闲连接
  4. 并发控制:通过信号量限制并发请求数,避免文件描述符耗尽

这些方案可以有效解决socket too many open files和TIME_WAIT过多的问题,同时实现不同代理主机的连接复用。

回到顶部