Golang中如何实现net/http与并发调度器的同时运行
Golang中如何实现net/http与并发调度器的同时运行 大家好,
我正在开发一个使用REST API构建的项目,该项目可以管理多个商户账户。我们需要为每个商户每10分钟运行一次定时任务。这些定时任务的URL是我们服务器自身的API。因此,我使用了Golang的net/http包来满足这个需求。
我有成千上万的商户,每个商户都有数百个需要通过定时任务完成的预订。为了完成一个预订,我需要通过HTTP调用另外两个调度器。
我需要使用Go协程并发地调用所有这些调度器。
当我尝试为1000个商户运行代码时,它开始出现EOF错误。 于是,我将系统中的打开文件数限制提高到了999000。
然后它开始报错 [dial tcp 127.0.0.1:3333: can't assign requested address]。经过大量研究,我得出结论,这个错误是由于系统中可用的TCP端口耗尽造成的,它已经用完了系统中所有可用的TCP端口。
接着,我为商户添加了一些延迟。如果系统中的所有端口都被占用,那么后续的代码将在系统有可用端口时再运行。
以下是我正在使用的代码:
func TestHttp2(c *gin.Context) {
fileName := "portlog" + time.Now().String() + ".txt"
_, e := os.Create(fileName)
if e != nil {
fmt.Println(1, e)
}
nStr := c.Query("num")
n, _ := strconv.Atoi(nStr)
fmt.Println("running")
var ports int
for i := 1; i <= n; i++ {
url := "http://localhost:8000/public/print-test?num=" + strconv.Itoa(i)
check:
if i%1000 == 0 {
ports = PortCheck()
if ports >= 20000 {
time.Sleep(10 * time.Second)
goto check
}
}
go MakeRequest2(url, fileName, ports)
}
return
}
/*
* Function to hit url with get method and close the response body
*/
func MakeRequest2(url, fileName string, port int) {
data := test{
FileName: fileName,
Url: url,
Port: port,
}
b, err := json.Marshal(data)
if err != nil {
fmt.Println(234234324, err)
}
jsonString := string(b)
client := &http.Client{}
url2 := url + "?num2="
req, err := http.NewRequest("GET", url2, bytes.NewBuffer([]byte(jsonString)))
if err != nil {
fmt.Println(2, "===========================================", err, "==================================")
panic(err)
return
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(1, "===========================================", err, "==================================")
panic(err)
return
}
io.Copy(ioutil.Discard, resp.Body)
client.CloseIdleConnections()
resp.Body.Close()
}
func PortCheck() int {
cmd := exec.Command("netstat", "-an")
grep := exec.Command("grep", "8000")
wc := exec.Command("wc", "-l")
pipe, _ := cmd.StdoutPipe()
defer pipe.Close()
grep.Stdin = pipe
pipe2, _ := grep.StdoutPipe()
defer pipe2.Close()
wc.Stdin = pipe2
// Run ps first.
cmd.Start()
grep.Start()
out, err := wc.Output()
if err != nil {
// if there was any error, print it here
fmt.Println("could not run command: ", err)
}
// otherwise, print the output from running the command
fmt.Println("Output: ", string(out))
fmt.Println(cmd)
wc.Wait()
ports, err := strconv.Atoi(strings.Split(string(out), "\n")[0])
if err != nil {
fmt.Println(err)
}
return ports
}
func PrintTest(c *gin.Context) {
data := test{}
_ = json.NewDecoder(c.Request.Body).Decode(&data)
file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println(1, err)
return
}
defer file.Close()
_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
if err != nil {
fmt.Println(err)
return
}
for i := 1; i <= 100; i++ {
time.Sleep(2 * time.Second)
url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
go MakeRequest1(url1)
go MakeRequest1(url2)
}
return
}
type test struct {
Url string `json:"url"`
FileName string `json:"file_name"`
Port int `json:"port"`
}
func TestFunc1(c *gin.Context) {
time.Sleep(5 * time.Second)
fmt.Println("test function 1")
}
func TestFunc2(c *gin.Context) {
time.Sleep(5 * time.Second)
fmt.Println("test function 2")
}
/*
* Function to hit url with get method and close the response body
*/
func MakeRequest1(url string) {
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fmt.Println(3, "===========================================", err, "==================================")
panic(err)
return
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(4, "===========================================", err, "==================================")
panic(err)
return
}
io.Copy(ioutil.Discard, resp.Body)
client.CloseIdleConnections()
resp.Body.Close()
return
}
在这段代码中,我首先调用TestHttp2函数,它将为5000个商户运行。每处理1000个商户后,它会检查端口使用情况。如果已用端口数大于20000,我们将等待端口释放。
然后,我们通过HTTP调用PrintTest函数。在该函数中,我们通过HTTP命令调用TestFunc1和TestFunc2。
现在它开始出现同样的错误:
can’t assign requested address
因为TestFunc1和TestFunc2正在消耗端口。如果我不调用这两个函数,代码可以完美地处理200万个商户。但我需要并发地调用这两个函数。
于是,我在PrintTest函数中也添加了端口检查条件:
func PrintTest(c *gin.Context) {
data := test{}
_ = json.NewDecoder(c.Request.Body).Decode(&data)
file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println(1, err)
return
}
defer file.Close()
_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
if err != nil {
fmt.Println(err)
return
}
check:
ports := PortCheck()
if ports >= 20000 {
time.Sleep(10 * time.Second)
goto check
}
for i := 1; i <= 100; i++ {
time.Sleep(2 * time.Second)
url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
go MakeRequest1(url1)
go MakeRequest1(url2)
}
return
}
现在系统的所有端口没有被耗尽,但它在最后开始报错:
fork/exec /usr/bin/wc: resource temporarily unavailable
之后,我尝试使用curl命令调用TestFunc1和TestFunc2:
/*
* Function to hit url with get method and close the response body
*/
func CurlTest(url string) {
cmd := exec.Command("curl", "-H", "Connection: close", "-H", "--no-keepalive", "-X", "GET", url)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
err := cmd.Start()
if err == nil {
_ = cmd.Wait()
}
// fmt.Println(i, time.Now())
return
}
func PrintTest(c *gin.Context) {
data := test{}
_ = json.NewDecoder(c.Request.Body).Decode(&data)
file, err := os.OpenFile(data.FileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println(1, err)
return
}
defer file.Close()
_, err = file.WriteString(time.Now().String() + "==>" + data.Url + " ==> " + strconv.Itoa(data.Port) + "\n")
if err != nil {
fmt.Println(err)
return
}
for i := 1; i <= 100; i++ {
time.Sleep(2 * time.Second)
url1 := "http://localhost:8000/public/test-func1?num=" + strconv.Itoa(i)
url2 := "http://localhost:8000/public/test-func2?num=" + strconv.Itoa(i)
go CurlTest(url1)
go CurlTest(url2)
}
return
}
如果我使用Go协程调用CurlTest函数,它会在系统中打开太多文件,然后开始报错:
http: Accept error: accept tcp [::]:8080: accept4: too many open files; retrying in 1s
即使我将打开文件数限制设置为大约100万,每当打开文件数达到超过100万时,它就开始出现这个错误。
请帮我找出这个问题的最佳解决方案。
我希望使用net/http而不是curl来调用调度器,因为curl消耗了太多资源。
请为这个问题建议最佳的架构。
我们能否以其他方式处理TCP端口的情况,而不是添加延迟?
更多关于Golang中如何实现net/http与并发调度器的同时运行的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢您的回复。 如果我提高软限制,那么每次重启系统时我都必须重新提高它。 只有在使用curl运行调度程序时才会出现打开文件错误, 我希望使用net/http来运行调度程序。
更多关于Golang中如何实现net/http与并发调度器的同时运行的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
- 实现一个速率限制/工作池,不要尝试同时连接所有远程主机。分散负载。
- 检查你是否可能只提高了软限制,而硬限制仍然保持在一百万。
在Golang中实现高并发HTTP请求时,端口耗尽是常见问题。根本原因在于每个HTTP客户端连接都会占用一个本地端口,当并发量极大时,会快速耗尽可用端口。以下是针对你问题的解决方案:
1. 使用连接池和HTTP客户端复用
创建全局HTTP客户端并复用连接,而不是为每个请求创建新客户端:
var (
httpClient *http.Client
once sync.Once
)
func getHTTPClient() *http.Client {
once.Do(func() {
httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 1000,
MaxConnsPerHost: 1000,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
Timeout: 30 * time.Second,
}
})
return httpClient
}
2. 使用工作池控制并发
创建固定大小的goroutine池来控制并发数量:
type Job struct {
MerchantID int
URL string
}
func worker(id int, jobs <-chan Job, results chan<- error) {
client := getHTTPClient()
for job := range jobs {
err := processMerchant(client, job)
results <- err
}
}
func processMerchant(client *http.Client, job Job) error {
req, err := http.NewRequest("GET", job.URL, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
_, err = io.Copy(io.Discard, resp.Body)
return err
}
func ScheduleMerchantTasks(merchants []Merchant, workerCount int) {
jobs := make(chan Job, len(merchants))
results := make(chan error, len(merchants))
// 启动worker
for w := 1; w <= workerCount; w++ {
go worker(w, jobs, results)
}
// 发送任务
for _, m := range merchants {
jobs <- Job{
MerchantID: m.ID,
URL: fmt.Sprintf("http://localhost:8000/public/print-test?num=%d", m.ID),
}
}
close(jobs)
// 收集结果
for i := 0; i < len(merchants); i++ {
err := <-results
if err != nil {
log.Printf("Error processing merchant: %v", err)
}
}
}
3. 使用semaphore控制并发goroutine数量
import "golang.org/x/sync/semaphore"
func ProcessMerchantsConcurrently(merchants []Merchant, maxConcurrent int) {
sem := semaphore.NewWeighted(int64(maxConcurrent))
var wg sync.WaitGroup
ctx := context.Background()
for _, m := range merchants {
wg.Add(1)
go func(merchant Merchant) {
defer wg.Done()
// 获取信号量,控制并发
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
return
}
defer sem.Release(1)
// 处理商户任务
processMerchantTask(merchant)
}(m)
}
wg.Wait()
}
func processMerchantTask(m Merchant) {
client := getHTTPClient()
// 调用调度器1
err := callScheduler(client, fmt.Sprintf("http://localhost:3333/scheduler1/%d", m.ID))
if err != nil {
log.Printf("Scheduler1 error for merchant %d: %v", m.ID, err)
}
// 调用调度器2
err = callScheduler(client, fmt.Sprintf("http://localhost:3333/scheduler2/%d", m.ID))
if err != nil {
log.Printf("Scheduler2 error for merchant %d: %v", m.ID, err)
}
}
4. 定时任务调度器实现
type MerchantScheduler struct {
client *http.Client
merchantMap map[int]*time.Ticker
mu sync.RWMutex
}
func NewMerchantScheduler() *MerchantScheduler {
return &MerchantScheduler{
client: getHTTPClient(),
merchantMap: make(map[int]*time.Ticker),
}
}
func (ms *MerchantScheduler) ScheduleMerchant(merchantID int, interval time.Duration) {
ms.mu.Lock()
defer ms.mu.Unlock()
// 如果已经存在定时器,先停止
if ticker, exists := ms.merchantMap[merchantID]; exists {
ticker.Stop()
}
ticker := time.NewTicker(interval)
ms.merchantMap[merchantID] = ticker
go func(id int, t *time.Ticker) {
for range t.C {
ms.executeMerchantTask(id)
}
}(merchantID, ticker)
}
func (ms *MerchantScheduler) executeMerchantTask(merchantID int) {
// 使用工作池执行任务,避免创建过多goroutine
go func() {
urls := []string{
fmt.Sprintf("http://localhost:3333/scheduler1/%d", merchantID),
fmt.Sprintf("http://localhost:3333/scheduler2/%d", merchantID),
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
req, _ := http.NewRequest("GET", u, nil)
resp, err := ms.client.Do(req)
if err != nil {
log.Printf("Error calling %s: %v", u, err)
return
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
}(url)
}
wg.Wait()
}()
}
5. 完整的优化版本
package main
import (
"context"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
const (
maxConcurrentRequests = 500 // 根据系统调整
requestTimeout = 30 * time.Second
)
type Scheduler struct {
client *http.Client
sem *semaphore.Weighted
}
func NewScheduler() *Scheduler {
transport := &http.Transport{
MaxIdleConns: maxConcurrentRequests,
MaxIdleConnsPerHost: maxConcurrentRequests,
MaxConnsPerHost: maxConcurrentRequests,
IdleConnTimeout: 90 * time.Second,
}
return &Scheduler{
client: &http.Client{
Transport: transport,
Timeout: requestTimeout,
},
sem: semaphore.NewWeighted(maxConcurrentRequests),
}
}
func (s *Scheduler) ProcessMerchant(merchantID int) error {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
// 获取信号量控制并发
if err := s.sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to acquire semaphore: %w", err)
}
defer s.sem.Release(1)
// 并发调用两个调度器
var wg sync.WaitGroup
errors := make(chan error, 2)
urls := []string{
fmt.Sprintf("http://localhost:3333/scheduler1/%d", merchantID),
fmt.Sprintf("http://localhost:3333/scheduler2/%d", merchantID),
}
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
if err := s.makeRequest(u); err != nil {
errors <- err
}
}(url)
}
wg.Wait()
close(errors)
// 返回第一个错误
for err := range errors {
return err
}
return nil
}
func (s *Scheduler) makeRequest(url string) error {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
resp, err := s.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 读取并丢弃响应体
_, err = io.Copy(io.Discard, resp.Body)
return err
}
// 定时执行任务
func (s *Scheduler) StartPeriodicTasks(merchants []int, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
s.processBatch(merchants)
}
}
func (s *Scheduler) processBatch(merchants []int) {
var wg sync.WaitGroup
for _, merchantID := range merchants {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := s.ProcessMerchant(id); err != nil {
log.Printf("Error processing merchant %d: %v", id, err)
}
}(merchantID)
}
wg.Wait()
}
关键优化点:
- 复用HTTP客户端和连接池
- 使用信号量控制最大并发数
- 避免为每个请求创建新客户端
- 合理设置连接超时和空闲超时
- 使用context控制请求超时
这个架构可以处理成千上万的商户定时任务,而不会耗尽系统资源。通过控制并发数量和复用连接,可以避免端口耗尽和文件描述符不足的问题。

