Golang中如何高效实现数千个API请求的并行处理
Golang中如何高效实现数千个API请求的并行处理 大家好,
我需要从同一个端点同时获取5000多个项目。该API端点每次以JSON格式返回500个项目,我必须通过偏移量参数进行迭代以获取其余项目。每个项目都包含特定产品的信息,最终会得到5000多个独立的产品。
我有两个主要需求: a) 我希望并行执行所有请求。 b) 我想探索处理数据的最佳方法,包括如何保存数据以及最合适的数据结构。
该端点为私有访问。我可以使用白名单代理绕过速率限制。
我所说的数据结构,是指我需要以某种形式保存JSON响应,例如Go切片、数组,或者像您说的那样使用Go结构体,以便我能将每个产品通过一系列条件和业务逻辑进行过滤。
几周前,我用Python按照您的建议做过类似的事情,但当我想提升性能时,情况变得非常复杂。这就是我转向Go的原因。在您看来,当我需要额外的性能提升时,在Go中进行调整是否要容易得多?
更多关于Golang中如何高效实现数千个API请求的并行处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我认为这个问题可能过于宽泛,难以获得非常详细的回复。但是,通常返回偏移量的API之所以这样做,是因为它们设计为被串行调用。有没有办法事先获取一个总数,以便确定需要拉取多少组500个项目?
并发处理是一个相对简单的问题。可以参考《Go程序设计语言》中的这个简单示例。你甚至可以免费阅读这本书的相关部分。滚动到该PDF的第36页(即书的第17页)。我经常推荐这本书是有充分理由的;它非常出色。
在数据结构方面,如果你指的是用于表示反序列化JSON的Go结构体,可以取一段示例JSON并粘贴到这个工具中来生成结构体。这至少会是一个良好的起点。
我在与外部API交互方面有很多经验,可以告诉你:在这里获得最佳性能很可能需要一些尝试和错误。你可能试图在并发方面过于取巧,结果发现自己被限速或者饱和了你/他们的连接。我最好的建议是:从一个能工作的简单方案开始,暂时不要太担心性能。5000个产品对我来说听起来数据量并不巨大,所以性能可能根本不是问题。
总而言之:保持简单。先做出一个能工作的东西。然后随着时间的推移进行调整以获得更好的性能。但这只是我的一点浅见,其他人可能会有更具体的建议!另外,如果你能提及你正在使用什么API,可能会更有帮助。
bureauvanorton:
我所说的数据结构,是指我需要以某种形式保存JSON响应,比如Go的切片、数组,甚至像你说的Go结构体,这样我才能将每个产品通过一系列条件和业务逻辑进行过滤。
好的,如果是那样的话,我想我之前给你的那个链接会非常有用。只需将你的示例JSON粘贴进去,你就会得到一个很好的起点。那个工具是Caddy的作者构建的,我一直在使用它。
bureauvanorton:
几周前我用Python按照你的建议做了,但当我想提升性能时,情况变得非常复杂。这就是我转向Go的原因。在你看来,当我需要额外的性能提升时,在Go中进行调整会容易得多吗?
我不确定我会这么说。但总的来说,在静态类型语言中进行重构更容易,也更不容易出错。当然,简单的并发性是Go的卖点之一;如果你想找一本关于这方面的好书,Katherine Cox-Buday的书非常出色。尽量保持你的函数简短精炼,因为重构小函数比重构一大团意大利面条式的代码要容易得多。
但再次强调,我建议先让程序运行起来,然后再决定如何以及在何处进行优化。你甚至知道从API拉取数据是你的瓶颈吗?也许你对产品进行的任何处理才是缓慢的部分。也许你需要同步地从API拉取数据,并通过通道发送产品以进行异步处理/保存/或其他操作。我想说的是:我不知道,你也不知道。至少现在还不知道。😊
此外,选择Go的另一个理由是:测试/基准测试功能是内置在语言中的,而且非常容易(并且有一种崇尚基准测试的文化)。想知道某件事是否缓慢吗?对它进行基准测试。
在Golang中高效处理数千个API请求的并行处理,可以使用goroutine和channel结合worker pool模式。以下是具体实现方案:
package main
import (
"encoding/json"
"fmt"
"sync"
"time"
"net/http"
"strconv"
"log"
)
const (
totalItems = 5000
pageSize = 500
maxConcurrency = 50 // 控制并发数,避免过多连接
)
type Product struct {
ID int `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
Category string `json:"category"`
}
type APIResponse struct {
Products []Product `json:"products"`
Offset int `json:"offset"`
Total int `json:"total"`
}
func fetchPage(offset int, client *http.Client) ([]Product, error) {
url := fmt.Sprintf("https://api.example.com/products?limit=%d&offset=%d", pageSize, offset)
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var apiResp APIResponse
if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil {
return nil, err
}
return apiResp.Products, nil
}
func worker(id int, jobs <-chan int, results chan<- []Product, wg *sync.WaitGroup, client *http.Client) {
defer wg.Done()
for offset := range jobs {
log.Printf("Worker %d fetching offset %d", id, offset)
products, err := fetchPage(offset, client)
if err != nil {
log.Printf("Worker %d error fetching offset %d: %v", id, offset, err)
results <- []Product{} // 发送空切片保持通道同步
continue
}
results <- products
}
}
func main() {
start := time.Now()
// 计算需要的请求数量
numRequests := (totalItems + pageSize - 1) / pageSize
// 创建带缓冲的channel
jobs := make(chan int, numRequests)
results := make(chan []Product, numRequests)
// 创建HTTP客户端(复用连接)
client := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: maxConcurrency,
MaxIdleConnsPerHost: maxConcurrency,
IdleConnTimeout: 90 * time.Second,
},
}
var wg sync.WaitGroup
// 启动worker pool
for w := 1; w <= maxConcurrency; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg, client)
}
// 发送任务到jobs channel
for i := 0; i < numRequests; i++ {
offset := i * pageSize
jobs <- offset
}
close(jobs)
// 等待所有worker完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
var allProducts []Product
var mu sync.Mutex
// 使用多个goroutine处理结果
var resultWg sync.WaitGroup
resultWg.Add(1)
go func() {
defer resultWg.Done()
for products := range results {
if len(products) > 0 {
mu.Lock()
allProducts = append(allProducts, products...)
mu.Unlock()
}
}
}()
resultWg.Wait()
// 输出统计信息
elapsed := time.Since(start)
log.Printf("Fetched %d products in %v", len(allProducts), elapsed)
log.Printf("Average: %.2f products/second", float64(len(allProducts))/elapsed.Seconds())
// 数据存储示例
storeProducts(allProducts)
}
// 存储产品的示例函数
func storeProducts(products []Product) {
// 使用map按类别分组
productsByCategory := make(map[string][]Product)
for _, p := range products {
productsByCategory[p.Category] = append(productsByCategory[p.Category], p)
}
// 转换为JSON保存
jsonData, err := json.MarshalIndent(productsByCategory, "", " ")
if err != nil {
log.Printf("Error marshaling products: %v", err)
return
}
// 这里可以保存到文件或数据库
fmt.Printf("Total categories: %d\n", len(productsByCategory))
// 示例:打印每个类别的产品数量
for category, prods := range productsByCategory {
fmt.Printf("Category %s: %d products\n", category, len(prods))
}
_ = jsonData // 实际使用中需要保存到文件
}
// 使用sync.Map处理并发写入的替代方案
func processWithSyncMap(products []Product) {
var productMap sync.Map
var wg sync.WaitGroup
for _, p := range products {
wg.Add(1)
go func(product Product) {
defer wg.Done()
// 以ID为key存储产品
productMap.Store(strconv.Itoa(product.ID), product)
}(p)
}
wg.Wait()
// 遍历sync.Map
productMap.Range(func(key, value interface{}) bool {
fmt.Printf("Product ID %s: %v\n", key, value)
return true
})
}
关键优化点:
- Worker Pool模式:控制并发goroutine数量,避免创建5000+个goroutine
- HTTP连接复用:使用自定义Transport配置连接池
- 缓冲Channel:避免goroutine阻塞
- 并发安全:使用sync.Mutex保护共享数据,或使用sync.Map
- 错误处理:worker中单独处理错误,不影响其他请求
数据结构建议:
- 使用
[]Product切片存储所有产品 - 使用
map[string][]Product按类别分组 - 考虑使用
sync.Map处理高并发读写
存储方案:
- 内存中处理:使用切片和map
- 持久化存储:转换为JSON保存到文件,或存入数据库
- 分批处理:每收集一定数量后批量保存

