Golang中如何高效实现数千个API请求的并行处理

Golang中如何高效实现数千个API请求的并行处理 大家好,

我需要从同一个端点同时获取5000多个项目。该API端点每次以JSON格式返回500个项目,我必须通过偏移量参数进行迭代以获取其余项目。每个项目都包含特定产品的信息,最终会得到5000多个独立的产品。

我有两个主要需求: a) 我希望并行执行所有请求。 b) 我想探索处理数据的最佳方法,包括如何保存数据以及最合适的数据结构。

4 回复

该端点为私有访问。我可以使用白名单代理绕过速率限制。

我所说的数据结构,是指我需要以某种形式保存JSON响应,例如Go切片、数组,或者像您说的那样使用Go结构体,以便我能将每个产品通过一系列条件和业务逻辑进行过滤。

几周前,我用Python按照您的建议做过类似的事情,但当我想提升性能时,情况变得非常复杂。这就是我转向Go的原因。在您看来,当我需要额外的性能提升时,在Go中进行调整是否要容易得多?

更多关于Golang中如何高效实现数千个API请求的并行处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我认为这个问题可能过于宽泛,难以获得非常详细的回复。但是,通常返回偏移量的API之所以这样做,是因为它们设计为被串行调用。有没有办法事先获取一个总数,以便确定需要拉取多少组500个项目?

并发处理是一个相对简单的问题。可以参考《Go程序设计语言》中的这个简单示例。你甚至可以免费阅读这本书的相关部分。滚动到该PDF的第36页(即书的第17页)。我经常推荐这本书是有充分理由的;它非常出色。

在数据结构方面,如果你指的是用于表示反序列化JSON的Go结构体,可以取一段示例JSON并粘贴到这个工具中来生成结构体。这至少会是一个良好的起点。

我在与外部API交互方面有很多经验,可以告诉你:在这里获得最佳性能很可能需要一些尝试和错误。你可能试图在并发方面过于取巧,结果发现自己被限速或者饱和了你/他们的连接。我最好的建议是:从一个能工作的简单方案开始,暂时不要太担心性能。5000个产品对我来说听起来数据量并不巨大,所以性能可能根本不是问题。

总而言之:保持简单。先做出一个能工作的东西。然后随着时间的推移进行调整以获得更好的性能。但这只是我的一点浅见,其他人可能会有更具体的建议!另外,如果你能提及你正在使用什么API,可能会更有帮助。

bureauvanorton:

我所说的数据结构,是指我需要以某种形式保存JSON响应,比如Go的切片、数组,甚至像你说的Go结构体,这样我才能将每个产品通过一系列条件和业务逻辑进行过滤。

好的,如果是那样的话,我想我之前给你的那个链接会非常有用。只需将你的示例JSON粘贴进去,你就会得到一个很好的起点。那个工具是Caddy的作者构建的,我一直在使用它。

bureauvanorton:

几周前我用Python按照你的建议做了,但当我想提升性能时,情况变得非常复杂。这就是我转向Go的原因。在你看来,当我需要额外的性能提升时,在Go中进行调整会容易得多吗?

我不确定我会这么说。但总的来说,在静态类型语言中进行重构更容易,也更不容易出错。当然,简单的并发性是Go的卖点之一;如果你想找一本关于这方面的好书,Katherine Cox-Buday的书非常出色。尽量保持你的函数简短精炼,因为重构小函数比重构一大团意大利面条式的代码要容易得多。

但再次强调,我建议先让程序运行起来,然后再决定如何以及在何处进行优化。你甚至知道从API拉取数据是你的瓶颈吗?也许你对产品进行的任何处理才是缓慢的部分。也许你需要同步地从API拉取数据,并通过通道发送产品以进行异步处理/保存/或其他操作。我想说的是:我不知道,你也不知道。至少现在还不知道。😊

此外,选择Go的另一个理由是:测试/基准测试功能是内置在语言中的,而且非常容易(并且有一种崇尚基准测试的文化)。想知道某件事是否缓慢吗?对它进行基准测试。

在Golang中高效处理数千个API请求的并行处理,可以使用goroutine和channel结合worker pool模式。以下是具体实现方案:

package main

import (
    "encoding/json"
    "fmt"
    "sync"
    "time"
    "net/http"
    "strconv"
    "log"
)

const (
    totalItems     = 5000
    pageSize       = 500
    maxConcurrency = 50 // 控制并发数,避免过多连接
)

type Product struct {
    ID       int     `json:"id"`
    Name     string  `json:"name"`
    Price    float64 `json:"price"`
    Category string  `json:"category"`
}

type APIResponse struct {
    Products []Product `json:"products"`
    Offset   int       `json:"offset"`
    Total    int       `json:"total"`
}

func fetchPage(offset int, client *http.Client) ([]Product, error) {
    url := fmt.Sprintf("https://api.example.com/products?limit=%d&offset=%d", pageSize, offset)
    
    resp, err := client.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    var apiResp APIResponse
    if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil {
        return nil, err
    }
    
    return apiResp.Products, nil
}

func worker(id int, jobs <-chan int, results chan<- []Product, wg *sync.WaitGroup, client *http.Client) {
    defer wg.Done()
    
    for offset := range jobs {
        log.Printf("Worker %d fetching offset %d", id, offset)
        
        products, err := fetchPage(offset, client)
        if err != nil {
            log.Printf("Worker %d error fetching offset %d: %v", id, offset, err)
            results <- []Product{} // 发送空切片保持通道同步
            continue
        }
        
        results <- products
    }
}

func main() {
    start := time.Now()
    
    // 计算需要的请求数量
    numRequests := (totalItems + pageSize - 1) / pageSize
    
    // 创建带缓冲的channel
    jobs := make(chan int, numRequests)
    results := make(chan []Product, numRequests)
    
    // 创建HTTP客户端(复用连接)
    client := &http.Client{
        Timeout: 30 * time.Second,
        Transport: &http.Transport{
            MaxIdleConns:        maxConcurrency,
            MaxIdleConnsPerHost: maxConcurrency,
            IdleConnTimeout:     90 * time.Second,
        },
    }
    
    var wg sync.WaitGroup
    
    // 启动worker pool
    for w := 1; w <= maxConcurrency; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg, client)
    }
    
    // 发送任务到jobs channel
    for i := 0; i < numRequests; i++ {
        offset := i * pageSize
        jobs <- offset
    }
    close(jobs)
    
    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    var allProducts []Product
    var mu sync.Mutex
    
    // 使用多个goroutine处理结果
    var resultWg sync.WaitGroup
    resultWg.Add(1)
    
    go func() {
        defer resultWg.Done()
        
        for products := range results {
            if len(products) > 0 {
                mu.Lock()
                allProducts = append(allProducts, products...)
                mu.Unlock()
            }
        }
    }()
    
    resultWg.Wait()
    
    // 输出统计信息
    elapsed := time.Since(start)
    log.Printf("Fetched %d products in %v", len(allProducts), elapsed)
    log.Printf("Average: %.2f products/second", float64(len(allProducts))/elapsed.Seconds())
    
    // 数据存储示例
    storeProducts(allProducts)
}

// 存储产品的示例函数
func storeProducts(products []Product) {
    // 使用map按类别分组
    productsByCategory := make(map[string][]Product)
    
    for _, p := range products {
        productsByCategory[p.Category] = append(productsByCategory[p.Category], p)
    }
    
    // 转换为JSON保存
    jsonData, err := json.MarshalIndent(productsByCategory, "", "  ")
    if err != nil {
        log.Printf("Error marshaling products: %v", err)
        return
    }
    
    // 这里可以保存到文件或数据库
    fmt.Printf("Total categories: %d\n", len(productsByCategory))
    
    // 示例:打印每个类别的产品数量
    for category, prods := range productsByCategory {
        fmt.Printf("Category %s: %d products\n", category, len(prods))
    }
    
    _ = jsonData // 实际使用中需要保存到文件
}

// 使用sync.Map处理并发写入的替代方案
func processWithSyncMap(products []Product) {
    var productMap sync.Map
    
    var wg sync.WaitGroup
    for _, p := range products {
        wg.Add(1)
        go func(product Product) {
            defer wg.Done()
            // 以ID为key存储产品
            productMap.Store(strconv.Itoa(product.ID), product)
        }(p)
    }
    wg.Wait()
    
    // 遍历sync.Map
    productMap.Range(func(key, value interface{}) bool {
        fmt.Printf("Product ID %s: %v\n", key, value)
        return true
    })
}

关键优化点:

  1. Worker Pool模式:控制并发goroutine数量,避免创建5000+个goroutine
  2. HTTP连接复用:使用自定义Transport配置连接池
  3. 缓冲Channel:避免goroutine阻塞
  4. 并发安全:使用sync.Mutex保护共享数据,或使用sync.Map
  5. 错误处理:worker中单独处理错误,不影响其他请求

数据结构建议:

  • 使用[]Product切片存储所有产品
  • 使用map[string][]Product按类别分组
  • 考虑使用sync.Map处理高并发读写

存储方案:

  • 内存中处理:使用切片和map
  • 持久化存储:转换为JSON保存到文件,或存入数据库
  • 分批处理:每收集一定数量后批量保存
回到顶部