Golang中Gocolly并行事务无法正常工作的解决方案

Golang中Gocolly并行事务无法正常工作的解决方案 我尝试按照此页面上的逻辑抓取论坛数据。通过这种方式,我可以毫无问题地抓取页面中的数据。但是,当我为For循环创建多线程时,程序在抓取数据一段时间后会退出,无法抓取所有数据。 示例:如果总共有300条数据,它将抓取120条数据并结束程序。

我期待您的解决方案建议。提前感谢您。

Colly示例:http://go-colly.org/docs/examples/reddit/

我的代码步骤:

  1. 从数据库中选择要扫描的URL
  2. 在for循环中执行“c.Visit(URL)”
  3. 使用“c.OnHtml”进行抓取

我的代码示例:

	counter := 0
	resultsLen := len(results)
	var wg sync.WaitGroup
	wg.Add(resultsLen)
	for _, result := range results {
		go func(result primitive.M) {
			defer wg.Done()
			counter++
			
			mainForumUrl := result["url"].(string)
			fmt.Println("##### Looking Main Forum Url #####",counter,":",mainForumUrl)
			c.Visit(mainForumUrl)
			c.Wait()
		}(result)

	}
	wg.Wait()

更多关于Golang中Gocolly并行事务无法正常工作的解决方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中Gocolly并行事务无法正常工作的解决方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


问题在于您为每个URL使用了同一个Colly收集器实例。Colly收集器不是并发安全的,当多个goroutine同时调用c.Visit()时会导致竞争条件。

解决方案是为每个goroutine创建独立的收集器实例:

var wg sync.WaitGroup
wg.Add(len(results))

for _, result := range results {
    go func(result primitive.M) {
        defer wg.Done()
        
        mainForumUrl := result["url"].(string)
        fmt.Println("##### Looking Main Forum Url #####", mainForumUrl)
        
        // 为每个goroutine创建新的收集器
        c := colly.NewCollector(
            colly.AllowedDomains("example.com"),
            // 添加其他配置...
        )
        
        // 设置回调函数
        c.OnHTML("selector", func(e *colly.HTMLElement) {
            // 处理数据
        })
        
        c.OnError(func(r *colly.Response, err error) {
            fmt.Printf("Error visiting %s: %v\n", mainForumUrl, err)
        })
        
        c.Visit(mainForumUrl)
        c.Wait()
    }(result)
}

wg.Wait()

或者使用带有限制的并行处理:

// 限制并发数
maxConcurrent := 10
semaphore := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup

for _, result := range results {
    wg.Add(1)
    semaphore <- struct{}{} // 获取信号量
    
    go func(result primitive.M) {
        defer wg.Done()
        defer func() { <-semaphore }() // 释放信号量
        
        mainForumUrl := result["url"].(string)
        fmt.Println("##### Looking Main Forum Url #####", mainForumUrl)
        
        c := colly.NewCollector(
            colly.Async(true),
            colly.MaxDepth(1),
        )
        
        c.Limit(&colly.LimitRule{
            DomainGlob:  "*",
            Parallelism: 2,
            Delay:       1 * time.Second,
        })
        
        c.OnHTML("selector", func(e *colly.HTMLElement) {
            // 抓取逻辑
        })
        
        c.Visit(mainForumUrl)
        c.Wait()
    }(result)
}

wg.Wait()

如果需要在多个goroutine间共享数据,使用线程安全的同步机制:

type SafeCounter struct {
    mu sync.Mutex
    counter int
}

func (sc *SafeCounter) Increment() {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    sc.counter++
}

func (sc *SafeCounter) Value() int {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    return sc.counter
}

// 使用示例
var safeCounter SafeCounter
var wg sync.WaitGroup
wg.Add(len(results))

for _, result := range results {
    go func(result primitive.M) {
        defer wg.Done()
        
        safeCounter.Increment()
        count := safeCounter.Value()
        mainForumUrl := result["url"].(string)
        fmt.Printf("##### Looking Main Forum Url %d: %s #####\n", count, mainForumUrl)
        
        c := colly.NewCollector()
        // ... 收集器配置和回调
        
        c.Visit(mainForumUrl)
        c.Wait()
    }(result)
}

wg.Wait()

关键点是每个goroutine必须使用独立的Colly收集器实例,避免共享状态导致的并发问题。

回到顶部