在低配Windows PC上利用Golang并发实现百万次GET请求
在低配Windows PC上利用Golang并发实现百万次GET请求 我的代码在 ip.txt 文件有 3000 行时运行良好。当我增加行数,尝试了 5 万行时,这意味着我想并发地发起 5 万个 HTTP GET 请求。但我遇到了:
SOCKET: Too many open files
我尝试了许多解决方案。我尝试减少 Go 协程的数量,但仍然遇到同样的错误。我的代码:
const (
// this is where you can specify how many maxFileDescriptors
// you want to allow open
maxFileDescriptors = 1000
)
var wg sync.WaitGroup
// Final Literation
func main() {
file, err := os.Open("ip.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
outfile, err := os.Create("urls.txt")
if err != nil {
log.Fatal(err)
}
defer outfile.Close()
results := make(chan []string, maxFileDescriptors)
go func() {
for output := range results {
for _, url := range output {
fmt.Fprintln(outfile, url)
}
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
wg.Add(1)
go Grabber(scanner.Text(), results)
}
wg.Wait()
close(results)
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
Grabber() 函数只是发送请求、解析数据,并将字符串列表返回给它的通道。有什么解决方案吗?如何在不改变 ulimit 的情况下使其工作?我不知道如何在 Windows 上更改 ulimit。😭😭
func Grabber(ip string, results chan []string) {
defer wg.Done()
var output []string
if ip == "" {
return
}
page := 1
for page < 251 {
client := &http.Client{}
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf(
"http://www.bing.com/search?q=ip:%s+&count=50&first=1",
url.QueryEscape(ip),
),
nil,
)
if err != nil {
fmt.Println(err.Error())
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:57.0) Gecko/20100101 Firefox/57.0")
res, err := client.Do(req)
if err != nil {
fmt.Println("Invalid Request")
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println("Couldn't Read")
}
re := regexp.MustCompile(`<h2><a href="(.*?)"`)
links := re.FindAllString(string(body), -1)
if links != nil {
for l := range links {
o := strings.Split(links[l], `"`)
d := strings.Split(o[1], "/")
s := d[0] + "//" + d[2]
if !stringInArray(s, output) {
output = append(output, s)
}
}
}
page = page + 50
}
results <- output
for _, links := range output {
fmt.Println(links)
}
}
更多关于在低配Windows PC上利用Golang并发实现百万次GET请求的实战教程也可以访问 https://www.itying.com/category-94-b0.html
- 你可以像使用 WaitGroup 一样,一次性编译正则表达式。
re := regexp.MustCompile(`<h2><a href="(.*?)"`)
- 你可以使用 fasthttp 来复用内存。
更多关于在低配Windows PC上利用Golang并发实现百万次GET请求的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我相当确定这就是问题所在。如果响应体(body)没有正确关闭,连接就不会被妥善关闭。这将导致操作系统无法立即关闭连接(超时后才会关闭),从而迅速达到连接数限制。(在我们的网络环境中就发生过这种情况,几天后它几乎搞垮了我们的互联网连接。别提了 😉)
// 代码示例:确保关闭响应体
resp, err := http.Get("http://example.com")
if err != nil {
// 处理错误
}
defer resp.Body.Close()
// 处理响应...
你的代码中存在一个延迟泄漏问题。在 for page < 251 循环内部,你执行了一个 HTTP 请求并延迟关闭响应体。这个延迟操作只有在函数结束时才会执行。因此,你累积了 251 个延迟操作,从而导致 251 个响应体没有被关闭。
要修复这个错误,我建议你将这段代码:
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
替换为:
body, err := ioutil.ReadAll(res.Body)
res.Body.Close()
我不确定这是否能完全解决你的问题,但这可能会有所帮助,因为不关闭响应体会导致连接保持打开状态。
您可以调整设计以绕过Windows处理程序的限制。创建一个缓冲通道来接收您的IP地址,以及一个从该通道接收IP的函数。类似这样:
jobs := make(chan string, 100)// 对不同通道长度进行基准测试,找到最适合您的方案(CPU+IO)
func run(ctx context.Context) (results chan Response, err error){
for {
select {
// case ip:= <-jobs{} 发起请求,将响应发送到结果通道
// case timeout 实现超时
}
select{
// 处理上下文取消
// case ok: ctx.Done(){}
}
}
}
这个链接可以为您提供一些关于如何创建工作池的提示。
我从不认为更多的 goroutine 能让程序运行得更快。我们的硬件资源是有限的。在纯 CPU 计算程序中,我希望我的 goroutine 数量不要过多地超过 CPU 逻辑核心数,越接近越好。这将减少 CPU 的上下文切换,保持更快的计算速度。
当我需要处理大量 IO 时,我期望有更多的 goroutine,以便在 IO 阻塞时 CPU 可以切换到其他任务。根据 IO 延迟,我们应该有选择地保持 goroutine 的数量。例如,如果我在一个 8 核服务器上请求一个延迟为 1 秒的网页,我通常会启动 50-100 个 goroutine 来处理任务,这使得大部分 CPU 时间用于处理程序逻辑,而不是上下文切换或等待 IO。
以下是一个任务分发的示例。我不知道这是否对您有帮助。您需要多少个 goroutine?您需要进行实际验证,以确保能获得一个 CPU 利用率更高的高效数量。
package main
import (
"net/http"
"sync"
)
func main() {
var wg sync.WaitGroup
urls := make([]string, 1000000)
gnum := 100
wg.Add(gnum)
queue := make(chan string)
for i := 0; i < gnum; i++ {
go func() {
defer wg.Done()
request(queue)
}()
}
for _, url := range urls {
queue <- url
}
close(queue)
wg.Wait()
}
func request(queue chan string) {
for url := range queue {
http.Get(url)
}
}
你的代码问题在于为每个IP地址创建了独立的HTTP客户端,并且没有控制并发协程数量。当处理5万个IP时,每个IP还会发起最多5次请求(page < 251),这会导致同时打开大量网络连接和文件描述符。
以下是改进方案,使用连接池和协程池来控制并发:
package main
import (
"bufio"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"regexp"
"strings"
"sync"
"time"
)
const (
maxConcurrent = 100 // 控制最大并发协程数
maxFileDescriptors = 100 // 控制最大文件描述符
timeout = 30 * time.Second // 请求超时时间
)
var (
wg sync.WaitGroup
client *http.Client
)
func init() {
// 创建共享的HTTP客户端,启用连接池
client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxFileDescriptors,
MaxIdleConnsPerHost: maxFileDescriptors,
IdleConnTimeout: 90 * time.Second,
},
Timeout: timeout,
}
}
func main() {
file, err := os.Open("ip.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
outfile, err := os.Create("urls.txt")
if err != nil {
log.Fatal(err)
}
defer outfile.Close()
// 创建带缓冲的通道控制并发
semaphore := make(chan struct{}, maxConcurrent)
results := make(chan []string, maxConcurrent)
// 结果写入协程
go func() {
for output := range results {
for _, url := range output {
fmt.Fprintln(outfile, url)
}
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
ip := scanner.Text()
if ip == "" {
continue
}
wg.Add(1)
semaphore <- struct{}{} // 获取信号量
go func(ip string) {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
Grabber(ip, results)
}(ip)
}
wg.Wait()
close(results)
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
func Grabber(ip string, results chan []string) {
var output []string
page := 1
for page < 251 {
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf(
"http://www.bing.com/search?q=ip:%s+&count=50&first=%d",
url.QueryEscape(ip),
page,
),
nil,
)
if err != nil {
log.Println("创建请求失败:", err)
break
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:57.0) Gecko/20100101 Firefox/57.0")
res, err := client.Do(req)
if err != nil {
log.Println("请求失败:", err)
break
}
body, err := ioutil.ReadAll(res.Body)
res.Body.Close() // 立即关闭响应体
if err != nil {
log.Println("读取响应失败:", err)
break
}
re := regexp.MustCompile(`<h2><a href="(.*?)"`)
links := re.FindAllString(string(body), -1)
if links != nil {
for _, link := range links {
o := strings.Split(link, `"`)
if len(o) >= 2 {
d := strings.Split(o[1], "/")
if len(d) >= 3 {
s := d[0] + "//" + d[2]
if !stringInArray(s, output) {
output = append(output, s)
}
}
}
}
}
page += 50
}
if len(output) > 0 {
results <- output
}
}
func stringInArray(str string, arr []string) bool {
for _, s := range arr {
if s == str {
return true
}
}
return false
}
关键改进:
- 使用共享HTTP客户端:避免为每个请求创建新客户端,启用连接池复用连接
- 协程池模式:通过带缓冲的通道控制最大并发协程数
- 立即关闭响应体:使用
res.Body.Close()替代defer,及时释放资源 - 超时控制:为HTTP客户端设置超时,防止长时间占用连接
- 连接数限制:通过
MaxIdleConns和MaxIdleConnsPerHost限制最大连接数
这个方案将并发协程数限制在100个,同时通过连接池复用TCP连接,显著减少文件描述符的使用。对于5万个IP地址,每个IP最多5次请求,总共25万次请求,但并发连接数被控制在100个以内。

