Golang中如何限制goroutine的数量
Golang中如何限制goroutine的数量 下面的代码将输入的所有URL字符串解析为字符串切片
inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
flag.Parse()
k := strings.Split(*inputString,`\n`)
k = [http://golang.org http://golang.org]
如何只调用5个goroutine,这些goroutine使用GET方法对切片中的任意地址执行某些操作, 但每个地址只能使用一次 - 如果一个goroutine正在处理字符串A,其他goroutine就不能再使用这个字符串? 谢谢!
更多关于Golang中如何限制goroutine的数量的实战教程也可以访问 https://www.itying.com/category-94-b0.html
请查看信号量包及其示例。https://godoc.org/golang.org/x/sync/semaphore#example-package--WorkerPool
这基本上就是文档中示例所实现的功能——限制工作池中的goroutine数量。
更多关于Golang中如何限制goroutine的数量的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我写了一篇文章,描述了限制并发性的不同方法。文中包含可运行的代码,并使用相同的示例来说明所有展示的不同方法。
一些使用工作池的示例
使用缓冲通道
jobs := make(chan int, 100)
results := make(chan int, 100)
在这个例子中,只有当任务数量不超过缓冲区大小时才能正常工作。该示例为五个任务使用了100这个荒谬的缓冲区大小,这是不好的实践。
这不是一个非常灵活的方法,如果程序终止,很容易导致阻塞或goroutine无法完成它们的工作。应该使用信号量或等待组来进行同步。
我做了类似这样的操作 但需要某种机制来确保只有5个goroutine能并行工作
package main
import (
"flag"
"fmt"
"strings"
"net/http"
"io/ioutil"
"sync"
"strconv"
)
func main() {
var wg sync.WaitGroup
inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
flag.Parse()
k := strings.Split(*inputString,`\n`)
wg.Add(len(k)) //错误的方式
var result = 0
for _, v := range k {
go func() {
defer wg.Done()
var url = ""
var total = 0
url, total = callGet(s)
result = result + total
fmt.Printf("Count for " + url + ":" + strconv.Itoa(total) + "\n")
}()
}
wg.Wait()
fmt.Println("Total ", result)
}
func callGet(s string) (name string, value int) {
r, _ := http.Get(s)
defer r.Body.Close()
body, _ := ioutil.ReadAll(r.Body)
bodyStr := string(body)
name = s
value = strings.Count(bodyStr, "Go")
return
}
如果你只想运行五个goroutine,就不能启动k个goroutine。只需启动五个,并使用通道在它们之间传递URL和结果。
尝试类似这样的代码:
package main
import (
"fmt"
"strings"
"sync"
)
func main() {
input := `http://www.example.com/1
http://www.example.com/2
http://www.example.com/3
http://www.example.com/4
http://www.example.com/5
http://www.example.com/6
http://www.example.com/7
http://www.example.com/8
http://www.example.com/9
http://www.example.com/10`
urls := strings.Split(input, "\n")
// 用于传递URL的通道
work := make(chan string)
// 用于传递结果的通道
results := make(chan string)
// 启动三个工作goroutine
var done sync.WaitGroup
for i := 0; i < 3; i++ {
done.Add(1)
go func() {
defer done.Done()
for w := range work {
// TODO: 对任务w执行实际工作
results <- fmt.Sprintf("my result for '%s': '%s", w, strings.ToUpper(w))
}
}()
}
// 等待WaitGroup,然后关闭结果通道
go func() {
done.Wait()
// 所有工作goroutine已完成,不会再有结果
close(results)
}()
// 将所有URL作为任务传递到工作通道
go func() {
for _, url := range urls {
work <- url
}
// 所有URL已传递,可以关闭工作通道
close(work)
}()
// 等待结果
var combinedResult string
for result := range results {
combinedResult += result + "\n"
}
// 完成
fmt.Println(combinedResult)
}
在Go语言中,可以使用带缓冲的channel和sync.WaitGroup来限制goroutine的数量并确保每个URL只被处理一次。以下是实现方案:
package main
import (
"flag"
"fmt"
"net/http"
"strings"
"sync"
)
func main() {
inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
flag.Parse()
k := strings.Split(*inputString, "\n")
// 创建带缓冲的channel来限制goroutine数量
maxGoroutines := 5
guard := make(chan struct{}, maxGoroutines)
// 创建URL channel
urls := make(chan string, len(k))
// 创建WaitGroup等待所有goroutine完成
var wg sync.WaitGroup
// 将所有URL发送到channel
for _, url := range k {
urls <- url
}
close(urls)
// 启动固定数量的goroutine处理URL
for url := range urls {
wg.Add(1)
guard <- struct{}{} // 获取信号量
go func(u string) {
defer wg.Done()
defer func() { <-guard }() // 释放信号量
// 使用GET方法处理URL
resp, err := http.Get(u)
if err != nil {
fmt.Printf("Error fetching %s: %v\n", u, err)
return
}
defer resp.Body.Close()
fmt.Printf("Successfully processed %s with status: %d\n", u, resp.StatusCode)
}(url)
}
wg.Wait()
}
另一种更简洁的方法是使用worker pool模式:
package main
import (
"flag"
"fmt"
"net/http"
"strings"
"sync"
)
func worker(id int, urls <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for url := range urls {
// 使用GET方法处理URL
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Worker %d: Error fetching %s: %v\n", id, url, err)
continue
}
defer resp.Body.Close()
fmt.Printf("Worker %d: Successfully processed %s with status: %d\n", id, url, resp.StatusCode)
}
}
func main() {
inputString := flag.String("input", "http://golang.org\nhttp://golang.org", "url's")
flag.Parse()
k := strings.Split(*inputString, "\n")
// 创建URL channel
urls := make(chan string, len(k))
// 创建WaitGroup
var wg sync.WaitGroup
// 启动5个worker goroutine
numWorkers := 5
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, urls, &wg)
}
// 发送所有URL到channel
for _, url := range k {
urls <- url
}
close(urls)
wg.Wait()
}
两种方法都能确保:
- 最多同时运行5个goroutine
- 每个URL只被处理一次
- 所有URL都会被处理
第一种方法使用信号量模式,第二种使用worker pool模式。两种都是Go中常见的并发控制模式。


