Golang互斥锁实现并发爬虫的URL去重方案

Golang互斥锁实现并发爬虫的URL去重方案 大家好,我想知道你们能否对以下代码提供一些反馈。

我想了解这个初学者方法是否达到了在同步已访问站点(存储在映射中)的同时并发抓取URL的目标。这与Go语言之旅中的练习10相关。

我是否应该使用一个结构体来存放包含已抓取URL的映射? 我是否应该使用通道(在这里如何应用?) 锁的位置是否正确?

一个有趣的现象是,注释掉 time.Sleep(time.Second) 会使代码在抓取第一个URL时中断,并且永远不会抓取其他URL。为什么这里需要 time.Sleep

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/juniormayhe/currentFilename"
)

type Fetcher interface {
	// Fetch 返回 URL 的正文和在该页面上找到的 URL 切片。
	Fetch(url string) (body string, urls []string, err error)
}

func setVisited(url string, visited map[string]bool, lock sync.Mutex) {
	lock.Lock()
	visited[url] = true
	lock.Unlock()
}

// Crawl 使用 fetcher 递归地抓取从 url 开始的页面,直到达到最大深度 depth。
func Crawl(url string, depth int, fetcher Fetcher, fetched map[string]bool) {

	var lock sync.Mutex

	_, exist := fetched[url]

	if depth <= 0 || exist {
		return
	}

	body, urls, err := fetcher.Fetch(url)

	if err != nil {
		setVisited(url, fetched, lock)

		fmt.Println(err)
		return
	}

	fmt.Printf("found: %s %q\n", url, body)

	for _, u := range urls {
		go Crawl(u, depth-1, fetcher, fetched)
	}
	setVisited(url, fetched, lock)

	time.Sleep(time.Second)

}

// fakeFetcher 是返回预设结果的 Fetcher。
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
	if res, ok := f[url]; ok {
		return res.body, res.urls, nil
	}
	return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher 是一个填充好的 fakeFetcher。
var fetcher = fakeFetcher{
	"https://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"https://golang.org/pkg/",
			"https://golang.org/cmd/",
		},
	},
	"https://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"https://golang.org/",
			"https://golang.org/cmd/",
			"https://golang.org/pkg/fmt/",
			"https://golang.org/pkg/os/",
		},
	},
	"https://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
	"https://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
}

func main() {
	fmt.Println(currentFilename.GetCurrentFileName(), "\n")
	fetchedUrls := make(map[string]bool)
	Crawl("https://golang.org/", 4, fetcher, fetchedUrls)
	fmt.Println(fetchedUrls)
}


更多关于Golang互斥锁实现并发爬虫的URL去重方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

在Google上搜索“并发队列”或“并发工作队列”。

更多关于Golang互斥锁实现并发爬虫的URL去重方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


锁机制不会生效,因为你在每次调用 Crawl 时都创建了一个新的锁,然后通过值传递的方式将一个它的新副本传递给 visited。最简单的做法可能是使用 sync.Map,它会为你处理锁的问题。

代码启动了大量运行 Crawl 的 Goroutine。但它并没有等待它们完成。 因此 main() 函数退出,杀死了所有的 goroutine。 尝试使用 sync.WaitGroup 来等待所有 goroutine 完成。

以下是针对你代码的专业分析:

主要问题分析

1. 锁传递问题

你的 setVisited 函数接收的是 sync.Mutex 的值拷贝,这会导致每个 goroutine 使用不同的锁实例:

// 错误:传递了锁的值拷贝
func setVisited(url string, visited map[string]bool, lock sync.Mutex) {
    lock.Lock()
    visited[url] = true
    lock.Unlock()
}

// 正确:应该传递锁的指针
func setVisited(url string, visited map[string]bool, lock *sync.Mutex) {
    lock.Lock()
    visited[url] = true
    lock.Unlock()
}

2. 并发安全实现方案

这里提供一个改进的实现方案:

package main

import (
    "fmt"
    "sync"
)

type SafeVisited struct {
    mu      sync.Mutex
    visited map[string]bool
}

func (sv *SafeVisited) Set(url string) bool {
    sv.mu.Lock()
    defer sv.mu.Unlock()
    
    if sv.visited[url] {
        return false
    }
    sv.visited[url] = true
    return true
}

func (sv *SafeVisited) Get(url string) bool {
    sv.mu.Lock()
    defer sv.mu.Unlock()
    return sv.visited[url]
}

func Crawl(url string, depth int, fetcher Fetcher, sv *SafeVisited, wg *sync.WaitGroup) {
    defer wg.Done()
    
    if depth <= 0 {
        return
    }
    
    // 原子性检查并设置已访问
    if !sv.Set(url) {
        return
    }
    
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    
    fmt.Printf("found: %s %q\n", url, body)
    
    // 并发抓取子链接
    for _, u := range urls {
        wg.Add(1)
        go Crawl(u, depth-1, fetcher, sv, wg)
    }
}

func main() {
    sv := &SafeVisited{
        visited: make(map[string]bool),
    }
    
    var wg sync.WaitGroup
    wg.Add(1)
    go Crawl("https://golang.org/", 4, fetcher, sv, &wg)
    wg.Wait()
    
    fmt.Println("Visited URLs:", sv.visited)
}

3. 关于 time.Sleep 的问题

time.Sleep(time.Second) 在原始代码中是必需的,因为:

  • 主 goroutine 在启动所有子 goroutine 后立即退出
  • 子 goroutine 没有足够时间执行
  • 使用 sync.WaitGroup 可以正确同步 goroutine 的执行

4. 通道方案示例

如果需要使用通道进行协调:

func CrawlWithChannel(url string, depth int, fetcher Fetcher, sv *SafeVisited) {
    type crawlTask struct {
        url   string
        depth int
    }
    
    tasks := make(chan crawlTask, 100)
    var wg sync.WaitGroup
    
    // 启动固定数量的 worker
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for task := range tasks {
                if task.depth <= 0 {
                    continue
                }
                
                if !sv.Set(task.url) {
                    continue
                }
                
                body, urls, err := fetcher.Fetch(task.url)
                if err != nil {
                    fmt.Println(err)
                    continue
                }
                
                fmt.Printf("found: %s %q\n", task.url, body)
                
                // 将子链接加入任务队列
                for _, u := range urls {
                    tasks <- crawlTask{url: u, depth: task.depth - 1}
                }
            }
        }()
    }
    
    // 提交初始任务
    tasks <- crawlTask{url: url, depth: depth}
    close(tasks)
    wg.Wait()
}

关键改进点

  1. 使用结构体封装:将互斥锁和映射封装在一起,确保数据一致性
  2. 使用指针传递锁:避免锁的值拷贝问题
  3. 添加 WaitGroup:正确等待所有 goroutine 完成
  4. 原子性操作:在锁的保护下同时完成检查和设置操作

这个实现确保了并发安全,避免了数据竞争,并且不需要使用 time.Sleep 来等待 goroutine 完成。

回到顶部