Golang中如何实现net/http与并发调度器的同时运行

Golang中如何实现net/http与并发调度器的同时运行 大家好,

我正在开发一个使用REST API构建的项目,该项目可以管理多个商户账户。我们需要为每个商户每10分钟运行一次定时任务。这些定时任务的URL是我们服务器自身的API。因此,我使用了Golang的net/http包来满足这个需求。

我有成千上万的商户,每个商户都有数百个需要通过定时任务完成的预订。为了完成一个预订,我需要通过HTTP调用另外两个调度器。

我需要使用Go协程并发地调用所有这些调度器。

当我尝试为1000个商户运行代码时,它开始出现EOF错误。 于是,我将系统中的打开文件数限制提高到了999000。

然后它开始报错 [dial tcp 127.0.0.1:3333: can't assign requested address]。经过大量研究,我得出结论,这个错误是由于系统中可用的TCP端口耗尽造成的,它已经用完了系统中所有可用的TCP端口。

接着,我为商户添加了一些延迟。如果系统中的所有端口都被占用,那么后续的代码将在系统有可用端口时再运行。

以下是我正在使用的代码:

func TestHttp2(c *gin.Context) {
	fileName := "portlog" + time.Now().String() + ".txt"
	_, e := os.Create(fileName)
	if e != nil {
		fmt.Println(1, e)
	}
	nStr := c.Query("num")
	n, _ := strconv.Atoi(nStr)
	fmt.Println("running")
	var ports int
	for i := 1; i <= n; i++ {
		url := "http://localhost:8000/public/print-test?num=" + strconv.Itoa(i)
	check:
		if i%1000 == 0 {
			ports = PortCheck()
			if ports >= 20000 {
				time.Sleep(10 * time.Second)
				goto check
			}
		}

		go MakeRequest2(url, fileName, ports)
	}
	return
}

/*
 * Function to hit url with get method and close the response body
 */
func MakeRequest2(url, fileName string, port int) {
	data := test{
		FileName: fileName,
		Url:      url,
		Port:     port,
	}
	b, err := json.Marshal(data)
	if err != nil {
		fmt.Println(234234324, err)
	}
	jsonString := string(b)
	client := &http.Client{}
	url2 := url + "?num2="
	req, err := http.NewRequest("GET", url2, bytes.NewBuffer([]byte(jsonString)))
	if err != nil {
		fmt.Println(2, "===========================================", err, "==================================")
		panic(err)
		return
	}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Println(1, "===========================================", err, "==================================")
		panic(err)
		return
	}
	io.Copy(ioutil.Discard, resp.Body)
	client.CloseIdleConnections()
	resp.Body.Close()
}

func PortCheck() int {
	cmd := exec.Command("netstat", "-an")
	grep := exec.Command("grep", "8000")
	wc := exec.Command("wc", "-l")
	pipe, _ := cmd.StdoutPipe()
	defer pipe.Close()

	grep.Stdin = pipe
	pipe2, _ := grep.StdoutPipe()
	defer pipe2.Close()
	wc.Stdin = pipe2
	// Run ps first.
	cmd.Start()
	grep.Start()
	out, err := wc.Output()
	if err != nil {
		// if there was any error, print it here
		fmt.Println("could not run command: ", err)
	}
	// otherwise, print the output from running the command
	fmt.Println("Output: ", string(out))
	fmt.Println(cmd)
	wc.Wait()
	ports, err := strconv.Atoi(strings.Split(string(out), "\n")[0])
	if err != nil {
		fmt.Println(err)
	}
	return ports
}

func PrintTest(c *gin.Context) {
	data := test{}
	_ = json.NewDecoder(c.Request.Body).Decode(&data)
	file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		fmt.Println(1, err)
		return
	}
	defer file.Close()
	_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
	if err != nil {
		fmt.Println(err)
		return
	}
	for i := 1; i <= 100; i++ {
		time.Sleep(2 * time.Second)
		url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
		url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
		go MakeRequest1(url1)
		go MakeRequest1(url2)
	}
	return
}

type test struct {
	Url      string `json:"url"`
	FileName string `json:"file_name"`
	Port     int    `json:"port"`
}

func TestFunc1(c *gin.Context) {
	time.Sleep(5 * time.Second)
	fmt.Println("test function 1")
}
func TestFunc2(c *gin.Context) {
	time.Sleep(5 * time.Second)
	fmt.Println("test function 2")
}

/*
 * Function to hit url with get method and close the response body
 */
func MakeRequest1(url string) {
	client := &http.Client{}
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		fmt.Println(3, "===========================================", err, "==================================")
		panic(err)
		return
	}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Println(4, "===========================================", err, "==================================")
		panic(err)
		return
	}
	io.Copy(ioutil.Discard, resp.Body)
	client.CloseIdleConnections()
	resp.Body.Close()
	return
}

在这段代码中,我首先调用TestHttp2函数,它将为5000个商户运行。每处理1000个商户后,它会检查端口使用情况。如果已用端口数大于20000,我们将等待端口释放。 然后,我们通过HTTP调用PrintTest函数。在该函数中,我们通过HTTP命令调用TestFunc1TestFunc2。 现在它开始出现同样的错误:

can’t assign requested address

因为TestFunc1TestFunc2正在消耗端口。如果我不调用这两个函数,代码可以完美地处理200万个商户。但我需要并发地调用这两个函数。

于是,我在PrintTest函数中也添加了端口检查条件:

func PrintTest(c *gin.Context) {
	data := test{}
	_ = json.NewDecoder(c.Request.Body).Decode(&data)
	file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		fmt.Println(1, err)
		return
	}
	defer file.Close()
	_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
	if err != nil {
		fmt.Println(err)
		return
	}
 check:
	 	ports := PortCheck()
         	if ports >= 20000 {
	 		time.Sleep(10 * time.Second)
	 		goto check
	 	}
	for i := 1; i <= 100; i++ {
		time.Sleep(2 * time.Second)
		url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
		url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
		go MakeRequest1(url1)
		go MakeRequest1(url2)
	}
	return
}

现在系统的所有端口没有被耗尽,但它在最后开始报错:

fork/exec /usr/bin/wc: resource temporarily unavailable

之后,我尝试使用curl命令调用TestFunc1TestFunc2

/*
 * Function to hit url with get method and close the response body
 */
func CurlTest(url string) {
	cmd := exec.Command("curl", "-H", "Connection: close", "-H", "--no-keepalive", "-X", "GET", url)
	var out bytes.Buffer
	var stderr bytes.Buffer
	cmd.Stdout = &out
	cmd.Stderr = &stderr
	err := cmd.Start()
	if err == nil {
		_ = cmd.Wait()
	}
	// fmt.Println(i, time.Now())
	return
}
func PrintTest(c *gin.Context) {
	data := test{}
	_ = json.NewDecoder(c.Request.Body).Decode(&data)
	file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		fmt.Println(1, err)
		return
	}
	defer file.Close()
	_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
	if err != nil {
		fmt.Println(err)
		return
	}
	for i := 1; i <= 100; i++ {
		time.Sleep(2 * time.Second)
		url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
		url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
		go CurlTest(url1)
		go CurlTest(url2)
	}
	return
}

如果我使用Go协程调用CurlTest函数,它会在系统中打开太多文件,然后开始报错:

http: Accept error: accept tcp [::]:8080: accept4: too many open files; retrying in 1s

即使我将打开文件数限制设置为大约100万,每当打开文件数达到超过100万时,它就开始出现这个错误。

请帮我找出这个问题的最佳解决方案。 我希望使用net/http而不是curl来调用调度器,因为curl消耗了太多资源。 请为这个问题建议最佳的架构。 我们能否以其他方式处理TCP端口的情况,而不是添加延迟?


更多关于Golang中如何实现net/http与并发调度器的同时运行的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

感谢您的回复。 如果我提高软限制,那么每次重启系统时我都必须重新提高它。 只有在使用curl运行调度程序时才会出现打开文件错误, 我希望使用net/http来运行调度程序。

更多关于Golang中如何实现net/http与并发调度器的同时运行的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


  1. 实现一个速率限制/工作池,不要尝试同时连接所有远程主机。分散负载。
  2. 检查你是否可能只提高了软限制,而硬限制仍然保持在一百万。

在Golang中实现高并发HTTP请求时,端口耗尽是常见问题。根本原因在于每个HTTP客户端连接都会占用一个本地端口,当并发量极大时,会快速耗尽可用端口。以下是针对你问题的解决方案:

1. 使用连接池和HTTP客户端复用

创建全局HTTP客户端并复用连接,而不是为每个请求创建新客户端:

var (
    httpClient *http.Client
    once sync.Once
)

func getHTTPClient() *http.Client {
    once.Do(func() {
        httpClient = &http.Client{
            Transport: &http.Transport{
                MaxIdleConns:        1000,
                MaxIdleConnsPerHost: 1000,
                MaxConnsPerHost:     1000,
                IdleConnTimeout:     90 * time.Second,
                TLSHandshakeTimeout: 10 * time.Second,
                DialContext: (&net.Dialer{
                    Timeout:   30 * time.Second,
                    KeepAlive: 30 * time.Second,
                }).DialContext,
            },
            Timeout: 30 * time.Second,
        }
    })
    return httpClient
}

2. 使用工作池控制并发

创建固定大小的goroutine池来控制并发数量:

type Job struct {
    MerchantID int
    URL        string
}

func worker(id int, jobs <-chan Job, results chan<- error) {
    client := getHTTPClient()
    for job := range jobs {
        err := processMerchant(client, job)
        results <- err
    }
}

func processMerchant(client *http.Client, job Job) error {
    req, err := http.NewRequest("GET", job.URL, nil)
    if err != nil {
        return err
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    _, err = io.Copy(io.Discard, resp.Body)
    return err
}

func ScheduleMerchantTasks(merchants []Merchant, workerCount int) {
    jobs := make(chan Job, len(merchants))
    results := make(chan error, len(merchants))
    
    // 启动worker
    for w := 1; w <= workerCount; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送任务
    for _, m := range merchants {
        jobs <- Job{
            MerchantID: m.ID,
            URL:        fmt.Sprintf("http://localhost:8000/public/print-test?num=%d", m.ID),
        }
    }
    close(jobs)
    
    // 收集结果
    for i := 0; i < len(merchants); i++ {
        err := <-results
        if err != nil {
            log.Printf("Error processing merchant: %v", err)
        }
    }
}

3. 使用semaphore控制并发goroutine数量

import "golang.org/x/sync/semaphore"

func ProcessMerchantsConcurrently(merchants []Merchant, maxConcurrent int) {
    sem := semaphore.NewWeighted(int64(maxConcurrent))
    var wg sync.WaitGroup
    ctx := context.Background()
    
    for _, m := range merchants {
        wg.Add(1)
        go func(merchant Merchant) {
            defer wg.Done()
            
            // 获取信号量,控制并发
            if err := sem.Acquire(ctx, 1); err != nil {
                log.Printf("Failed to acquire semaphore: %v", err)
                return
            }
            defer sem.Release(1)
            
            // 处理商户任务
            processMerchantTask(merchant)
        }(m)
    }
    
    wg.Wait()
}

func processMerchantTask(m Merchant) {
    client := getHTTPClient()
    
    // 调用调度器1
    err := callScheduler(client, fmt.Sprintf("http://localhost:3333/scheduler1/%d", m.ID))
    if err != nil {
        log.Printf("Scheduler1 error for merchant %d: %v", m.ID, err)
    }
    
    // 调用调度器2
    err = callScheduler(client, fmt.Sprintf("http://localhost:3333/scheduler2/%d", m.ID))
    if err != nil {
        log.Printf("Scheduler2 error for merchant %d: %v", m.ID, err)
    }
}

4. 定时任务调度器实现

type MerchantScheduler struct {
    client      *http.Client
    merchantMap map[int]*time.Ticker
    mu          sync.RWMutex
}

func NewMerchantScheduler() *MerchantScheduler {
    return &MerchantScheduler{
        client:      getHTTPClient(),
        merchantMap: make(map[int]*time.Ticker),
    }
}

func (ms *MerchantScheduler) ScheduleMerchant(merchantID int, interval time.Duration) {
    ms.mu.Lock()
    defer ms.mu.Unlock()
    
    // 如果已经存在定时器,先停止
    if ticker, exists := ms.merchantMap[merchantID]; exists {
        ticker.Stop()
    }
    
    ticker := time.NewTicker(interval)
    ms.merchantMap[merchantID] = ticker
    
    go func(id int, t *time.Ticker) {
        for range t.C {
            ms.executeMerchantTask(id)
        }
    }(merchantID, ticker)
}

func (ms *MerchantScheduler) executeMerchantTask(merchantID int) {
    // 使用工作池执行任务,避免创建过多goroutine
    go func() {
        urls := []string{
            fmt.Sprintf("http://localhost:3333/scheduler1/%d", merchantID),
            fmt.Sprintf("http://localhost:3333/scheduler2/%d", merchantID),
        }
        
        var wg sync.WaitGroup
        for _, url := range urls {
            wg.Add(1)
            go func(u string) {
                defer wg.Done()
                req, _ := http.NewRequest("GET", u, nil)
                resp, err := ms.client.Do(req)
                if err != nil {
                    log.Printf("Error calling %s: %v", u, err)
                    return
                }
                defer resp.Body.Close()
                io.Copy(io.Discard, resp.Body)
            }(url)
        }
        wg.Wait()
    }()
}

5. 完整的优化版本

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net/http"
    "sync"
    "time"
    
    "golang.org/x/sync/semaphore"
)

const (
    maxConcurrentRequests = 500  // 根据系统调整
    requestTimeout        = 30 * time.Second
)

type Scheduler struct {
    client *http.Client
    sem    *semaphore.Weighted
}

func NewScheduler() *Scheduler {
    transport := &http.Transport{
        MaxIdleConns:        maxConcurrentRequests,
        MaxIdleConnsPerHost: maxConcurrentRequests,
        MaxConnsPerHost:     maxConcurrentRequests,
        IdleConnTimeout:     90 * time.Second,
    }
    
    return &Scheduler{
        client: &http.Client{
            Transport: transport,
            Timeout:   requestTimeout,
        },
        sem: semaphore.NewWeighted(maxConcurrentRequests),
    }
}

func (s *Scheduler) ProcessMerchant(merchantID int) error {
    ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
    defer cancel()
    
    // 获取信号量控制并发
    if err := s.sem.Acquire(ctx, 1); err != nil {
        return fmt.Errorf("failed to acquire semaphore: %w", err)
    }
    defer s.sem.Release(1)
    
    // 并发调用两个调度器
    var wg sync.WaitGroup
    errors := make(chan error, 2)
    
    urls := []string{
        fmt.Sprintf("http://localhost:3333/scheduler1/%d", merchantID),
        fmt.Sprintf("http://localhost:3333/scheduler2/%d", merchantID),
    }
    
    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            if err := s.makeRequest(u); err != nil {
                errors <- err
            }
        }(url)
    }
    
    wg.Wait()
    close(errors)
    
    // 返回第一个错误
    for err := range errors {
        return err
    }
    
    return nil
}

func (s *Scheduler) makeRequest(url string) error {
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return err
    }
    
    resp, err := s.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    // 读取并丢弃响应体
    _, err = io.Copy(io.Discard, resp.Body)
    return err
}

// 定时执行任务
func (s *Scheduler) StartPeriodicTasks(merchants []int, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for range ticker.C {
        s.processBatch(merchants)
    }
}

func (s *Scheduler) processBatch(merchants []int) {
    var wg sync.WaitGroup
    for _, merchantID := range merchants {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            if err := s.ProcessMerchant(id); err != nil {
                log.Printf("Error processing merchant %d: %v", id, err)
            }
        }(merchantID)
    }
    wg.Wait()
}

关键优化点:

  1. 复用HTTP客户端和连接池
  2. 使用信号量控制最大并发数
  3. 避免为每个请求创建新客户端
  4. 合理设置连接超时和空闲超时
  5. 使用context控制请求超时

这个架构可以处理成千上万的商户定时任务,而不会耗尽系统资源。通过控制并发数量和复用连接,可以避免端口耗尽和文件描述符不足的问题。

回到顶部