使用Golang的Workers实现HTTP客户端

使用Golang的Workers实现HTTP客户端 朋友们好,

我正在编写一个程序,用于扫描并找出我的IP地址块中哪些服务器托管了某个域名。程序目前运行良好,现在我想添加使用工作线程的功能,以提升性能。

我参考了这个示例 https://gobyexample.com/worker-pools 并在网上查阅了大量资料,但到目前为止还没找到让它正常工作的解决方案。

这是代码 https://play.golang.org/p/2ZfCrhlp8FP

目前我的输出看起来像这样:

Analyzing Domain:  gonkar.com
Site Body Length:  111851
Job Number:  1 Using IP:  198.50.181.62
Job Number:  3 Using IP:  198.50.181.33
Job Number:  2 Using IP:  198.50.181.47
Job Number:  4 Using IP:  198.50.181.40
Job Number:  5 Using IP:  198.50.181.41
4

然后程序就卡在那里了,没有继续处理其余的IP地址。

提前感谢。


更多关于使用Golang的Workers实现HTTP客户端的实战教程也可以访问 https://www.itying.com/category-94-b0.html

7 回复

你好,

我移除了 for 循环,现在它似乎“工作”了。

https://play.golang.org/p/NkbY46Uk1DH

我将检查它是否正确遍历了 range

感谢所有的帮助。

更多关于使用Golang的Workers实现HTTP客户端的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


@lutzhorn

非常感谢你提供了如此出色的示例,我会研究如何将其与我的 scanBlock 函数结合,并在子网上进行迭代,然后告知你结果。

感谢并祝一切顺利

你好 @lutzhorn

我做了一些实验,但到目前为止,它还不能为我运行。

我所做的,是将 for 循环从循环 jobcount 改为循环 ipAddresses2 切片,以便为每个应该执行的任务获取 IP 地址。不知怎的,我这个“绝妙”的主意没有奏效,呵呵。

https://play.golang.org/p/LfjWhRFMu4r

我仍在测试代码,看看它应该如何工作 🙂

感谢所有的帮助。

你好 @lutzhorn

非常感谢你的回复,

我读了一点资料,意识到 jobs 通道是因为 numJobs 常量而被阻塞的。然后我做了如下修改:

numJobs := len(ipAddresses2)

现在它根据切片的长度来创建相应数量的任务。我想现在的问题在于如何排队处理所有这些任务,因为如果我使用一个 /20 子网,它会同时启动大量任务,我可能会因为自己知识的缺乏而无意中造成一次拒绝服务攻击 :slight_smile: 哈哈。

这是代码的当前状态:

https://play.golang.org/p/xsFHJV9GfIu

关于你建议用 j := <= jobs 来替代 for j := range jobs,我的问题是:应该在哪里使用它呢?

非常感谢。

你启动了 len(ipAddresses2) 个任务

	for _, ips := range ipAddresses2 {
		go scanBlock(ips, jobs, results)
	}

但你只向 jobs 通道发送了五个“任务”。因此,只有前五个运行 scanBlock 的 goroutine 能够从 jobs 通道读取数据。

尝试移除 scanBlock 函数中的 for j := range jobs 循环,改为只接收一次,使用 j := <- jobs

回到 main 函数中,你只从 results 通道接收了五个结果:

	for a := 1; a <= numJobs; a++ {
		fmt.Println(<-results)
	}

这里应该使用:

for result := range results {
    fmt.Println(<-results)
}

由于你的工具核心执行的是昂贵且耗时的IO操作,goroutine是一种限制并发操作数量的方式。你可以为每个任务启动自己的goroutine,因为goroutine的创建成本很低。你只需要确保同时执行这些昂贵且耗时IO操作的goroutine数量是有限的。

Go语言的方式不是使用工作池,而是使用信号量和等待组来限制并发。尝试类似下面的方法。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 要执行的任务。我们使用一个简单的循环,而不是遍历列表。
var jobcount = 1000

// 我们限制并发执行的任务数量。
var limit = 10

func main() {
    // 在所有任务完成之前,`main`函数不能结束。我们使用一个WaitGroup来等待所有任务。
    wg := new(sync.WaitGroup)

    // 我们使用一个带缓冲的通道作为信号量,以限制并发执行的任务数量。
    sem := make(chan struct{}, limit)

    // 我们在自己的goroutine中运行每个任务,但使用信号量来限制它们的并发执行。
    for i := 0; i < jobcount; i++ {
        // 必须等待此任务。
        wg.Add(1)

        // 通过向带缓冲的通道写入来获取信号量。如果通道已满,此调用将阻塞,直到另一个任务释放它。
        sem <- struct{}{}

        // 现在我们已经获取了信号量,可以为此任务启动一个goroutine。注意,我们必须将`i`作为参数捕获。
        go func(i int) {
            // 当此goroutine的工作完成后,我们递减WaitGroup。
            defer wg.Done()

            // 当此goroutine的工作完成后,我们释放信号量。
            defer func() { <-sem }()

            // 执行实际工作。
            result := work(i)
            fmt.Printf("[%d] done, result is %d\n", i, result)
        }(i)
    }

    // 等待所有任务完成。
    wg.Wait()
}

func work(i int) int {
    // 这里我们模拟一个昂贵且耗时的操作。
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

    return i * 2
}

参见 https://play.golang.org/p/jrtop4sEINs

根据你的代码,问题在于没有正确关闭工作池和等待所有任务完成。以下是修改后的实现:

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "strings"
    "sync"
    "time"
)

type Job struct {
    id       int
    ip       string
    domain   string
    baseBody string
}

func worker(id int, jobs <-chan Job, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Job Number: %d Using IP: %s\n", job.id, job.ip)
        
        url := fmt.Sprintf("http://%s", job.ip)
        req, err := http.NewRequest("GET", url, nil)
        if err != nil {
            results <- 0
            continue
        }
        
        req.Host = job.domain
        client := &http.Client{Timeout: 5 * time.Second}
        resp, err := client.Do(req)
        if err != nil {
            results <- 0
            continue
        }
        defer resp.Body.Close()
        
        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            results <- 0
            continue
        }
        
        if strings.Contains(string(body), job.baseBody) {
            fmt.Printf("FOUND: %s\n", job.ip)
            results <- 1
        } else {
            results <- 0
        }
    }
}

func main() {
    domain := "gonkar.com"
    baseBody := "some_identifying_content" // 替换为实际标识内容
    
    // 生成IP地址列表
    var ips []string
    for i := 1; i <= 254; i++ {
        ips = append(ips, fmt.Sprintf("198.50.181.%d", i))
    }
    
    numJobs := len(ips)
    jobs := make(chan Job, numJobs)
    results := make(chan int, numJobs)
    
    // 创建worker池
    numWorkers := 50
    var wg sync.WaitGroup
    
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }
    
    // 发送任务
    for j, ip := range ips {
        jobs <- Job{
            id:       j + 1,
            ip:       ip,
            domain:   domain,
            baseBody: baseBody,
        }
    }
    close(jobs)
    
    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    foundCount := 0
    for result := range results {
        foundCount += result
    }
    
    fmt.Printf("Total found: %d\n", foundCount)
}

关键修改点:

  1. 使用sync.WaitGroup确保所有worker完成工作
  2. 正确关闭jobs通道,触发worker退出循环
  3. 在worker完成后关闭results通道
  4. 添加超时处理,避免HTTP请求阻塞
  5. 使用缓冲通道防止阻塞

示例输出:

Job Number: 1 Using IP: 198.50.181.1
Job Number: 2 Using IP: 198.50.181.2
Job Number: 3 Using IP: 198.50.181.3
FOUND: 198.50.181.62
Job Number: 4 Using IP: 198.50.181.4
// ... 处理所有IP地址
Total found: 1

这个实现会处理所有254个IP地址,不会在中间卡住。

回到顶部