Golang中如何限制goroutine的数量

Golang中如何限制goroutine的数量 下面的代码将输入的所有URL字符串解析为字符串切片

inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
flag.Parse()
k := strings.Split(*inputString,`\n`)

k = [http://golang.org http://golang.org]

如何只调用5个goroutine,这些goroutine使用GET方法对切片中的任意地址执行某些操作, 但每个地址只能使用一次 - 如果一个goroutine正在处理字符串A,其他goroutine就不能再使用这个字符串? 谢谢!


更多关于Golang中如何限制goroutine的数量的实战教程也可以访问 https://www.itying.com/category-94-b0.html

9 回复

请查看信号量包及其示例。https://godoc.org/golang.org/x/sync/semaphore#example-package--WorkerPool

这基本上就是文档中示例所实现的功能——限制工作池中的goroutine数量。

更多关于Golang中如何限制goroutine的数量的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我写了一篇文章,描述了限制并发性的不同方法。文中包含可运行的代码,并使用相同的示例来说明所有展示的不同方法。

https://pocketgophers.com/limit-concurrent-use/

一些使用工作池的示例

https://gobyexample.com/worker-pools

缓冲通道,例如像这样:

// 示例代码

参考链接:http://jmoiron.net/blog/limiting-concurrency-in-go/

您的代码没有展示您目前如何使用 Goroutine 来处理 k 中的 URL。为了帮助您,我们需要:

  • 查看您的代码
  • 了解 k 可能有多大
  • 了解您希望运行多少个 Goroutine 来处理这些 URL

使用缓冲通道

jobs := make(chan int, 100)
results := make(chan int, 100)

在这个例子中,只有当任务数量不超过缓冲区大小时才能正常工作。该示例为五个任务使用了100这个荒谬的缓冲区大小,这是不好的实践。

这不是一个非常灵活的方法,如果程序终止,很容易导致阻塞或goroutine无法完成它们的工作。应该使用信号量或等待组来进行同步。

我做了类似这样的操作 但需要某种机制来确保只有5个goroutine能并行工作

package main

import (
	"flag"
	"fmt"
	"strings"
	"net/http"
	"io/ioutil"
	"sync"
	"strconv"
)

func main() {

	var wg sync.WaitGroup
	inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
	flag.Parse()

	k := strings.Split(*inputString,`\n`)
	wg.Add(len(k)) //错误的方式

	var result = 0
	for _, v := range k {
		go func() { 
			defer wg.Done()
			var url = ""
			var total = 0
			url, total = callGet(s)
			result = result + total
			fmt.Printf("Count for " + url + ":" + strconv.Itoa(total) + "\n")
		}()
	}

	wg.Wait()
	fmt.Println("Total ", result)
}


func callGet(s string) (name string, value int)  {
	r, _ := http.Get(s) 
	defer r.Body.Close()

	body, _ := ioutil.ReadAll(r.Body)
    bodyStr := string(body)
	name = s
	value = strings.Count(bodyStr, "Go")

	return
}

如果你只想运行五个goroutine,就不能启动k个goroutine。只需启动五个,并使用通道在它们之间传递URL和结果。

尝试类似这样的代码:

package main

import (
	"fmt"
	"strings"
	"sync"
)

func main() {
	input := `http://www.example.com/1
http://www.example.com/2
http://www.example.com/3
http://www.example.com/4
http://www.example.com/5
http://www.example.com/6
http://www.example.com/7
http://www.example.com/8
http://www.example.com/9
http://www.example.com/10`
	urls := strings.Split(input, "\n")

	// 用于传递URL的通道
	work := make(chan string)

	// 用于传递结果的通道
	results := make(chan string)

	// 启动三个工作goroutine
	var done sync.WaitGroup
	for i := 0; i < 3; i++ {
		done.Add(1)
		go func() {
			defer done.Done()
			for w := range work {
				// TODO: 对任务w执行实际工作
				results <- fmt.Sprintf("my result for '%s': '%s", w, strings.ToUpper(w))
			}
		}()
	}

	// 等待WaitGroup,然后关闭结果通道
	go func() {
		done.Wait()
		// 所有工作goroutine已完成,不会再有结果
		close(results)
	}()

	// 将所有URL作为任务传递到工作通道
	go func() {
		for _, url := range urls {
			work <- url
		}
		// 所有URL已传递,可以关闭工作通道
		close(work)
	}()

	// 等待结果
	var combinedResult string
	for result := range results {
		combinedResult += result + "\n"
	}

	// 完成
	fmt.Println(combinedResult)
}

另请参阅 https://goplay.space/#o9nqqSCswq1

在Go语言中,可以使用带缓冲的channel和sync.WaitGroup来限制goroutine的数量并确保每个URL只被处理一次。以下是实现方案:

package main

import (
	"flag"
	"fmt"
	"net/http"
	"strings"
	"sync"
)

func main() {
	inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
	flag.Parse()
	k := strings.Split(*inputString, "\n")
	
	// 创建带缓冲的channel来限制goroutine数量
	maxGoroutines := 5
	guard := make(chan struct{}, maxGoroutines)
	
	// 创建URL channel
	urls := make(chan string, len(k))
	
	// 创建WaitGroup等待所有goroutine完成
	var wg sync.WaitGroup
	
	// 将所有URL发送到channel
	for _, url := range k {
		urls <- url
	}
	close(urls)
	
	// 启动固定数量的goroutine处理URL
	for url := range urls {
		wg.Add(1)
		guard <- struct{}{} // 获取信号量
		
		go func(u string) {
			defer wg.Done()
			defer func() { <-guard }() // 释放信号量
			
			// 使用GET方法处理URL
			resp, err := http.Get(u)
			if err != nil {
				fmt.Printf("Error fetching %s: %v\n", u, err)
				return
			}
			defer resp.Body.Close()
			
			fmt.Printf("Successfully processed %s with status: %d\n", u, resp.StatusCode)
		}(url)
	}
	
	wg.Wait()
}

另一种更简洁的方法是使用worker pool模式:

package main

import (
	"flag"
	"fmt"
	"net/http"
	"strings"
	"sync"
)

func worker(id int, urls <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for url := range urls {
		// 使用GET方法处理URL
		resp, err := http.Get(url)
		if err != nil {
			fmt.Printf("Worker %d: Error fetching %s: %v\n", id, url, err)
			continue
		}
		defer resp.Body.Close()
		
		fmt.Printf("Worker %d: Successfully processed %s with status: %d\n", id, url, resp.StatusCode)
	}
}

func main() {
	inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
	flag.Parse()
	k := strings.Split(*inputString, "\n")
	
	// 创建URL channel
	urls := make(chan string, len(k))
	
	// 创建WaitGroup
	var wg sync.WaitGroup
	
	// 启动5个worker goroutine
	numWorkers := 5
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, urls, &wg)
	}
	
	// 发送所有URL到channel
	for _, url := range k {
		urls <- url
	}
	close(urls)
	
	wg.Wait()
}

两种方法都能确保:

  1. 最多同时运行5个goroutine
  2. 每个URL只被处理一次
  3. 所有URL都会被处理

第一种方法使用信号量模式,第二种使用worker pool模式。两种都是Go中常见的并发控制模式。

回到顶部