在低配Windows PC上利用Golang并发实现百万次GET请求

在低配Windows PC上利用Golang并发实现百万次GET请求 我的代码在 ip.txt 文件有 3000 行时运行良好。当我增加行数,尝试了 5 万行时,这意味着我想并发地发起 5 万个 HTTP GET 请求。但我遇到了:

SOCKET: Too many open files

我尝试了许多解决方案。我尝试减少 Go 协程的数量,但仍然遇到同样的错误。我的代码:

const (
    // this is where you can specify how many maxFileDescriptors
    // you want to allow open
    maxFileDescriptors = 1000
)
var wg sync.WaitGroup

// Final Literation
func main() {
    file, err := os.Open("ip.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
    outfile, err := os.Create("urls.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer outfile.Close()
    results := make(chan []string, maxFileDescriptors)
    go func() {
        for output := range results {
            for _, url := range output {
                fmt.Fprintln(outfile, url)
            }
        }
    }()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        wg.Add(1)
        go Grabber(scanner.Text(), results)

    }
    wg.Wait()
    close(results)

    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}

Grabber() 函数只是发送请求、解析数据,并将字符串列表返回给它的通道。有什么解决方案吗?如何在不改变 ulimit 的情况下使其工作?我不知道如何在 Windows 上更改 ulimit。😭😭

func Grabber(ip string, results chan []string) {
	defer wg.Done()
	var output []string
	if ip == "" {
		return
	}
	page := 1
	for page < 251 {
		client := &http.Client{}
		req, err := http.NewRequest(
			http.MethodGet,
			fmt.Sprintf(
				"http://www.bing.com/search?q=ip:%s+&count=50&first=1",
				url.QueryEscape(ip),
			),
			nil,
		)
		if err != nil {
			fmt.Println(err.Error())
		}
		req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:57.0) Gecko/20100101 Firefox/57.0")
		res, err := client.Do(req)
		if err != nil {
            fmt.Println("Invalid Request")
            return
		}
		defer res.Body.Close()
		body, err := ioutil.ReadAll(res.Body)
		if err != nil {
			fmt.Println("Couldn't Read")
		}
		re := regexp.MustCompile(`<h2><a href="(.*?)"`)
		links := re.FindAllString(string(body), -1)
		if links != nil {
			for l := range links {
				o := strings.Split(links[l], `"`)
				d := strings.Split(o[1], "/")
				s := d[0] + "//" + d[2]
				if !stringInArray(s, output) {
					output = append(output, s)
				}
			}
		}
		page = page + 50
	}
	results <- output
	for _, links := range output {
		fmt.Println(links)
	}
}

更多关于在低配Windows PC上利用Golang并发实现百万次GET请求的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复
  1. 你可以像使用 WaitGroup 一样,一次性编译正则表达式。
re := regexp.MustCompile(`<h2><a href="(.*?)"`)
  1. 你可以使用 fasthttp 来复用内存。

更多关于在低配Windows PC上利用Golang并发实现百万次GET请求的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我相当确定这就是问题所在。如果响应体(body)没有正确关闭,连接就不会被妥善关闭。这将导致操作系统无法立即关闭连接(超时后才会关闭),从而迅速达到连接数限制。(在我们的网络环境中就发生过这种情况,几天后它几乎搞垮了我们的互联网连接。别提了 😉)

// 代码示例:确保关闭响应体
resp, err := http.Get("http://example.com")
if err != nil {
    // 处理错误
}
defer resp.Body.Close()
// 处理响应...

你的代码中存在一个延迟泄漏问题。在 for page < 251 循环内部,你执行了一个 HTTP 请求并延迟关闭响应体。这个延迟操作只有在函数结束时才会执行。因此,你累积了 251 个延迟操作,从而导致 251 个响应体没有被关闭。

要修复这个错误,我建议你将这段代码:

defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)

替换为:

body, err := ioutil.ReadAll(res.Body)
res.Body.Close()

我不确定这是否能完全解决你的问题,但这可能会有所帮助,因为不关闭响应体会导致连接保持打开状态。

您可以调整设计以绕过Windows处理程序的限制。创建一个缓冲通道来接收您的IP地址,以及一个从该通道接收IP的函数。类似这样:

jobs := make(chan string, 100)// 对不同通道长度进行基准测试,找到最适合您的方案(CPU+IO)
func run(ctx context.Context) (results chan Response, err error){
 for {
      select {
        // case ip:= <-jobs{} 发起请求,将响应发送到结果通道
       //  case timeout 实现超时
      }
      select{
         // 处理上下文取消
         // case ok: ctx.Done(){}
      }
  }
}

这个链接可以为您提供一些关于如何创建工作池的提示。

我从不认为更多的 goroutine 能让程序运行得更快。我们的硬件资源是有限的。在纯 CPU 计算程序中,我希望我的 goroutine 数量不要过多地超过 CPU 逻辑核心数,越接近越好。这将减少 CPU 的上下文切换,保持更快的计算速度。

当我需要处理大量 IO 时,我期望有更多的 goroutine,以便在 IO 阻塞时 CPU 可以切换到其他任务。根据 IO 延迟,我们应该有选择地保持 goroutine 的数量。例如,如果我在一个 8 核服务器上请求一个延迟为 1 秒的网页,我通常会启动 50-100 个 goroutine 来处理任务,这使得大部分 CPU 时间用于处理程序逻辑,而不是上下文切换或等待 IO。

以下是一个任务分发的示例。我不知道这是否对您有帮助。您需要多少个 goroutine?您需要进行实际验证,以确保能获得一个 CPU 利用率更高的高效数量。

package main

import (
	"net/http"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	urls := make([]string, 1000000)
	gnum := 100
	wg.Add(gnum)

	queue := make(chan string)
	for i := 0; i < gnum; i++ {
		go func() {
			defer wg.Done()
			request(queue)
		}()
	}

	for _, url := range urls {
		queue <- url
	}

	close(queue)
	wg.Wait()
}

func request(queue chan string) {
	for url := range queue {
		http.Get(url)
	}
}

你的代码问题在于为每个IP地址创建了独立的HTTP客户端,并且没有控制并发协程数量。当处理5万个IP时,每个IP还会发起最多5次请求(page < 251),这会导致同时打开大量网络连接和文件描述符。

以下是改进方案,使用连接池和协程池来控制并发:

package main

import (
    "bufio"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "os"
    "regexp"
    "strings"
    "sync"
    "time"
)

const (
    maxConcurrent = 100      // 控制最大并发协程数
    maxFileDescriptors = 100 // 控制最大文件描述符
    timeout = 30 * time.Second // 请求超时时间
)

var (
    wg sync.WaitGroup
    client *http.Client
)

func init() {
    // 创建共享的HTTP客户端,启用连接池
    client = &http.Client{
        Transport: &http.Transport{
            MaxIdleConns:        maxFileDescriptors,
            MaxIdleConnsPerHost: maxFileDescriptors,
            IdleConnTimeout:     90 * time.Second,
        },
        Timeout: timeout,
    }
}

func main() {
    file, err := os.Open("ip.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
    
    outfile, err := os.Create("urls.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer outfile.Close()
    
    // 创建带缓冲的通道控制并发
    semaphore := make(chan struct{}, maxConcurrent)
    results := make(chan []string, maxConcurrent)
    
    // 结果写入协程
    go func() {
        for output := range results {
            for _, url := range output {
                fmt.Fprintln(outfile, url)
            }
        }
    }()
    
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        ip := scanner.Text()
        if ip == "" {
            continue
        }
        
        wg.Add(1)
        semaphore <- struct{}{} // 获取信号量
        
        go func(ip string) {
            defer wg.Done()
            defer func() { <-semaphore }() // 释放信号量
            
            Grabber(ip, results)
        }(ip)
    }
    
    wg.Wait()
    close(results)
    
    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}

func Grabber(ip string, results chan []string) {
    var output []string
    page := 1
    
    for page < 251 {
        req, err := http.NewRequest(
            http.MethodGet,
            fmt.Sprintf(
                "http://www.bing.com/search?q=ip:%s+&count=50&first=%d",
                url.QueryEscape(ip),
                page,
            ),
            nil,
        )
        if err != nil {
            log.Println("创建请求失败:", err)
            break
        }
        
        req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:57.0) Gecko/20100101 Firefox/57.0")
        
        res, err := client.Do(req)
        if err != nil {
            log.Println("请求失败:", err)
            break
        }
        
        body, err := ioutil.ReadAll(res.Body)
        res.Body.Close() // 立即关闭响应体
        
        if err != nil {
            log.Println("读取响应失败:", err)
            break
        }
        
        re := regexp.MustCompile(`<h2><a href="(.*?)"`)
        links := re.FindAllString(string(body), -1)
        
        if links != nil {
            for _, link := range links {
                o := strings.Split(link, `"`)
                if len(o) >= 2 {
                    d := strings.Split(o[1], "/")
                    if len(d) >= 3 {
                        s := d[0] + "//" + d[2]
                        if !stringInArray(s, output) {
                            output = append(output, s)
                        }
                    }
                }
            }
        }
        
        page += 50
    }
    
    if len(output) > 0 {
        results <- output
    }
}

func stringInArray(str string, arr []string) bool {
    for _, s := range arr {
        if s == str {
            return true
        }
    }
    return false
}

关键改进:

  1. 使用共享HTTP客户端:避免为每个请求创建新客户端,启用连接池复用连接
  2. 协程池模式:通过带缓冲的通道控制最大并发协程数
  3. 立即关闭响应体:使用res.Body.Close()替代defer,及时释放资源
  4. 超时控制:为HTTP客户端设置超时,防止长时间占用连接
  5. 连接数限制:通过MaxIdleConnsMaxIdleConnsPerHost限制最大连接数

这个方案将并发协程数限制在100个,同时通过连接池复用TCP连接,显著减少文件描述符的使用。对于5万个IP地址,每个IP最多5次请求,总共25万次请求,但并发连接数被控制在100个以内。

回到顶部