Golang并发文件上传性能问题:上传时间线性增长,求解决方案

Golang并发文件上传性能问题:上传时间线性增长,求解决方案

package main

import (
	"bytes"
	"crypto/tls"
	"fmt"
	"io"
	"io/ioutil"
	"mime/multipart"
	"net/http"
	"os"
	"path/filepath"
	"strconv"
	"sync"
	"time"
)

func main() {
	test()

	time.Sleep(10 * time.Minute)
}

func test() {
	for i := 0; i < 50; i++ {
		go UploadFile("myFile", "C:\\XXXXX\\installation\\jmeter\\TinyFileUploadDocs\\file-example_PDF_500_kB.pdf", 0, 0)
	}
}
func UploadFile(key, filePath string, offset int64, limit int64) {

	url := "https://www.xyz.com/test"
	file, _ := os.Open(filePath)
	fi, _ := file.Stat()
	defer file.Close()

	//
	body := &bytes.Buffer{}
	writer := multipart.NewWriter(body)
	part, err := writer.CreateFormFile(key, filepath.Base(filePath))
	if err != nil {
		fmt.Println(err)
	}
	_, err = io.Copy(part, file)

	err = writer.Close()
	if err != nil {
		fmt.Println(err)
	}
	if limit <= 0 {
		limit = fi.Size()
	}

	contentType := writer.FormDataContentType()
	req, err := http.NewRequest("POST", url, body)
	req.Header.Add("Content-Type", contentType)
	req.Header.Add("content-length", strconv.Itoa(int(limit)))
	transp, _ := createTransport()
	jobCreateTime := time.Now()
	resp, err := transp.RoundTrip(req)
	timeDifference := time.Now().Sub(jobCreateTime)
	fmt.Println("time difference is ", timeDifference)
	if resp != nil {
		defer resp.Body.Close()
		responseBody, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			fmt.Println("Response is %s %s", responseBody, err)
		}
	}

}

var m sync.Mutex
var defaultTransport *http.Transport

func createTransport() (*http.Transport, error) {
	m.Lock()
	defer m.Unlock()
	if defaultTransport != nil {
		fmt.Println("returning cached instance of httpclient", time.Now())
		return defaultTransport, nil
	}
	defaultRoundTripper := http.DefaultTransport
	defaultTransport = defaultRoundTripper.(*http.Transport).Clone()
	//if !ok {
	//	panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
	//}
	defaultTransport.TLSClientConfig = &tls.Config{
		InsecureSkipVerify: true,
	}
	defaultTransport.MaxIdleConns = 400
	defaultTransport.MaxIdleConnsPerHost = 400
	defaultTransport.IdleConnTimeout = 60 * time.Minute
	defaultTransport.ForceAttemptHTTP2 = false

	fmt.Println("returning newly created httpclient")
	return defaultTransport, nil
}

响应时间呈线性增长 返回新创建的 httpclient 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9637231 +0530 IST m=+0.014956001 返回缓存的 httpclient 实例 2023-01-24 22:09:53.971713 +0530 IST m=+0.022946601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.971713 +0530 IST m=+0.022946601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.971713 +0530 IST m=+0.022946601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9722399 +0530 IST m=+0.023473501 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9722399 +0530 IST m=+0.023473501 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727521 +0530 IST m=+0.023985801 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9727617 +0530 IST m=+0.023995401 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9732854 +0530 IST m=+0.024519101 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9732854 +0530 IST m=+0.024519101 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9732854 +0530 IST m=+0.024519101 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9732854 +0530 IST m=+0.024519101 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9732854 +0530 IST m=+0.024519101 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9732854 +0530 IST m=+0.024519101 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9738049 +0530 IST m=+0.025038701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9738049 +0530 IST m=+0.025038701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9738049 +0530 IST m=+0.025038701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9738049 +0530 IST m=+0.025038701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9738049 +0530 IST m=+0.025038701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9743254 +0530 IST m=+0.025559201 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9743254 +0530 IST m=+0.025559201 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9743254 +0530 IST m=+0.025559201 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9743254 +0530 IST m=+0.025559201 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9743254 +0530 IST m=+0.025559201 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9743254 +0530 IST m=+0.025559201 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9748602 +0530 IST m=+0.026094001 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9748602 +0530 IST m=+0.026094001 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9748602 +0530 IST m=+0.026094001 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9748602 +0530 IST m=+0.026094001 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9753908 +0530 IST m=+0.026624701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9753908 +0530 IST m=+0.026624701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9753908 +0530 IST m=+0.026624701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9753908 +0530 IST m=+0.026624701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9753908 +0530 IST m=+0.026624701 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 返回缓存的 httpclient 实例 2023-01-24 22:09:53.9759277 +0530 IST m=+0.027161601 时间差为 2.2112483秒 时间差为 2.2788669秒 时间差为 2.682173秒 时间差为 2.746857秒 时间差为 2.848968秒 时间差为 2.8817055秒 时间差为 2.8923872秒 时间差为 3.0118433秒 时间差为 3.0381271秒 时间差为 3.069961秒 时间差为 3.2203927秒 时间差为 3.3128912秒 时间差为 3.3236621秒 时间差为 3.3923878秒 时间差为 3.4503079秒 时间差为 3.4574394秒 时间差为 3.4735691秒 时间差为 3.4856638秒 时间差为 3.4835582秒 时间差为 3.4962755秒 时间差为 3.5307242秒 时间差为 3.5364922秒 时间差为 3.5358658秒 时间差为 3.5781682秒 时间差为 3.5832667秒 时间差为 3.5837974秒 时间差为 3.6138955秒 时间差为 3.6152253秒 时间差为 3.6367311秒 时间差为 3.6708537秒 时间差为 3.6875839秒 时间差为 3.6918802秒 时间差为 3.7155376秒 时间差为 3.7180674秒 时间差为 3.7208604秒 时间差为 3.7445374秒 时间差为 3.7464671秒 时间差为 3.7464801秒 时间差为 3.7584079秒 时间差为 3.8687172秒 时间差为 3.8977421秒 时间差为 3.9151934秒 时间差为 3.9218858秒 时间差为 3.9335794秒 时间差为 3.9870066秒 时间差为 4.0831174秒 时间差为 4.2388037秒 时间差为 4.2442227秒 时间差为 5.8370773秒 时间差为 6.4261305秒


更多关于Golang并发文件上传性能问题:上传时间线性增长,求解决方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

你是否尝试过使用任何性能监控工具来检查是否达到了磁盘读取、网络或CPU的极限?

更多关于Golang并发文件上传性能问题:上传时间线性增长,求解决方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你在那里测量了几样不同的东西。 一些建议:

  • 使用 sync.Once 来创建“缓存的 httpclient”,而不是每次都获取锁。
  • 使用单一的 http.Client 即可……不需要创建一个传输器。
  • 限制逻辑存在一些问题,因为在通过 CreateFormFile 之后,大小会出错。在 Go 中你不必设置内容长度,它会自动处理正确。
  • 使用 io.Pipe 的话,我认为你不需要提前读取整个文件。
  • time.Since > time.Now().Sub()

希望其中一些建议对你有帮助。

问题在于并发上传时所有goroutine都使用同一个http.Transport实例,虽然连接池配置了MaxIdleConnsPerHost: 400,但服务器可能限制了并发连接数或请求处理能力。同时,所有goroutine几乎同时发起请求,导致服务器端排队处理。

以下是优化后的代码示例:

package main

import (
	"bytes"
	"crypto/tls"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
	"path/filepath"
	"strconv"
	"sync"
	"time"
)

func main() {
	test()
	time.Sleep(10 * time.Minute)
}

func test() {
	// 使用工作池控制并发数
	const workerCount = 10
	sem := make(chan struct{}, workerCount)
	var wg sync.WaitGroup
	
	for i := 0; i < 50; i++ {
		wg.Add(1)
		sem <- struct{}{}
		go func(idx int) {
			defer wg.Done()
			defer func() { <-sem }()
			UploadFile("myFile", "C:\\XXXXX\\installation\\jmeter\\TinyFileUploadDocs\\file-example_PDF_500_kB.pdf", 0, 0, idx)
		}(i)
	}
	wg.Wait()
}

func UploadFile(key, filePath string, offset int64, limit int64, idx int) {
	url := "https://www.xyz.com/test"
	
	// 为每个请求创建独立的transport
	transport := createTransport()
	client := &http.Client{
		Transport: transport,
		Timeout:   30 * time.Second,
	}
	
	file, err := os.Open(filePath)
	if err != nil {
		fmt.Printf("[%d] Error opening file: %v\n", idx, err)
		return
	}
	defer file.Close()
	
	fi, err := file.Stat()
	if err != nil {
		fmt.Printf("[%d] Error stating file: %v\n", idx, err)
		return
	}
	
	// 重置文件指针
	file.Seek(0, 0)
	
	body := &bytes.Buffer{}
	writer := multipart.NewWriter(body)
	part, err := writer.CreateFormFile(key, filepath.Base(filePath))
	if err != nil {
		fmt.Printf("[%d] Error creating form file: %v\n", idx, err)
		return
	}
	
	_, err = io.Copy(part, file)
	if err != nil {
		fmt.Printf("[%d] Error copying file: %v\n", idx, err)
		return
	}
	
	err = writer.Close()
	if err != nil {
		fmt.Printf("[%d] Error closing writer: %v\n", idx, err)
		return
	}
	
	if limit <= 0 {
		limit = fi.Size()
	}
	
	req, err := http.NewRequest("POST", url, body)
	if err != nil {
		fmt.Printf("[%d] Error creating request: %v\n", idx, err)
		return
	}
	
	req.Header.Add("Content-Type", writer.FormDataContentType())
	req.Header.Add("Content-Length", strconv.Itoa(int(limit)))
	
	jobCreateTime := time.Now()
	resp, err := client.Do(req)
	timeDifference := time.Since(jobCreateTime)
	
	fmt.Printf("[%d] Time difference: %v\n", idx, timeDifference)
	
	if err != nil {
		fmt.Printf("[%d] Request error: %v\n", idx, err)
		return
	}
	defer resp.Body.Close()
	
	_, err = io.Copy(io.Discard, resp.Body)
	if err != nil {
		fmt.Printf("[%d] Error reading response: %v\n", idx, err)
	}
}

func createTransport() *http.Transport {
	return &http.Transport{
		TLSClientConfig: &tls.Config{
			InsecureSkipVerify: true,
		},
		MaxIdleConns:        100,
		MaxIdleConnsPerHost: 10,
		MaxConnsPerHost:     10,
		IdleConnTimeout:     90 * time.Second,
		ForceAttemptHTTP2:   false,
		DisableCompression:  true,
	}
}

或者使用带速率限制的版本:

package main

import (
	"bytes"
	"crypto/tls"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
	"path/filepath"
	"strconv"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	test()
	time.Sleep(10 * time.Minute)
}

func test() {
	// 创建速率限制器:每秒5个请求,突发10个
	limiter := rate.NewLimiter(5, 10)
	
	for i := 0; i < 50; i++ {
		go func(idx int) {
			// 等待令牌
			err := limiter.WaitN(context.Background(), 1)
			if err != nil {
				fmt.Printf("[%d] Rate limit error: %v\n", idx, err)
				return
			}
			UploadFile("myFile", "C:\\XXXXX\\installation\\jmeter\\TinyFileUploadDocs\\file-example_PDF_500_kB.pdf", 0, 0, idx)
		}(i)
	}
}

func UploadFile(key, filePath string, offset int64, limit int64, idx int) {
	// 复用HTTP客户端
	client := getHTTPClient()
	
	file, err := os.Open(filePath)
	if err != nil {
		fmt.Printf("[%d] Error opening file: %v\n", idx, err)
		return
	}
	defer file.Close()
	
	fi, err := file.Stat()
	if err != nil {
		fmt.Printf("[%d] Error stating file: %v\n", idx, err)
		return
	}
	
	file.Seek(0, 0)
	
	body := &bytes.Buffer{}
	writer := multipart.NewWriter(body)
	part, err := writer.CreateFormFile(key, filepath.Base(filePath))
	if err != nil {
		fmt.Printf("[%d] Error creating form file: %v\n", idx, err)
		return
	}
	
	_, err = io.Copy(part, file)
	if err != nil {
		fmt.Printf("[%d] Error copying file: %v\n", idx, err)
		return
	}
	
	err = writer.Close()
	if err != nil {
		fmt.Printf("[%d] Error closing writer: %v\n", idx, err)
		return
	}
	
	if limit <= 0 {
		limit = fi.Size()
	}
	
	req, err := http.NewRequest("POST", "https://www.xyz.com/test", body)
	if err != nil {
		fmt.Printf("[%d] Error creating request: %v\n", idx, err)
		return
	}
	
	req.Header.Add("Content-Type", writer.FormDataContentType())
	req.Header.Add("Content-Length", strconv.Itoa(int(limit)))
	
	jobCreateTime := time.Now()
	resp, err := client.Do(req)
	timeDifference := time.Since(jobCreateTime)
	
	fmt.Printf("[%d] Time difference: %v\n", idx, timeDifference)
	
	if err != nil {
		fmt.Printf("[%d] Request error: %v\n", idx, err)
		return
	}
	defer resp.Body.Close()
	
	_, err = io.Copy(io.Discard, resp.Body)
	if err != nil {
		fmt.Printf("[%d] Error reading response: %v\n", idx, err)
	}
}

var (
	httpClient *http.Client
	once       sync.Once
)

func getHTTPClient() *http.Client {
	once.Do(func() {
		transport := &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
			MaxIdleConns:        100,
			MaxIdleConnsPerHost: 20,
			MaxConnsPerHost:     20,
			IdleConnTimeout:     90 * time.Second,
			ForceAttemptHTTP2:   false,
		}
		
		httpClient = &http.Client{
			Transport: transport,
			Timeout:   30 * time.Second,
		}
	})
	return httpClient
}

关键改进:

  1. 使用工作池或速率限制器控制并发数
  2. 为每个请求设置合理的超时时间
  3. 调整连接池参数,避免过多并发连接
  4. 使用单例HTTP客户端复用连接
  5. 添加错误处理和资源清理
回到顶部