Golang中如何高效使用io.Pipe实现HTTP多部分文件上传
Golang中如何高效使用io.Pipe实现HTTP多部分文件上传 背景:
通过预签名URL将文件上传到S3存储。文件大小约为500MB。 需要某种流式处理并减少内存使用。
type TraceFile struct {
systemPath string
fileName string
}
func uploadToS3(URL string, traceFile TraceFile) error {
fileSize, err := util.GetFileSize(traceFile.systemPath)
if err != nil {
return err
}
// S3不接受没有内容长度的分块多部分MIME
contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize
readBody, writeBody := io.Pipe()
defer readBody.Close()
form := multipart.NewWriter(writeBody)
errChan := make(chan error, 1)
go flushToWriter(errChan, writeBody, traceFile)
req, err := http.NewRequest(http.MethodPut, URL, readBody)
req.Header.Set("Content-Type", form.FormDataContentType())
if contentLength > 0 {
req.ContentLength = contentLength
}
res, err := http.DefaultClient.Do(req)
log.Info("Response from s3 ", res)
if err != nil {
log.Error("Uploading failed due to", err)
//如何通知flushToWriter协程停止?
<-errChan
return err
}
//如果flushToWriter出现错误,如何停止发送请求?
return <-errChan
}
func emptyMultipartSize(fieldname, filename string) int64 {
body := &bytes.Buffer{}
form := multipart.NewWriter(body)
form.CreateFormFile(fieldname, filename)
form.Close()
return int64(body.Len())
}
func flushToWriter(errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {
form := multipart.NewWriter(writeBody)
defer writeBody.Close()
filePart, err := form.CreateFormFile(cnst.MultiPartFieldName, traceFile.fileName)
if err != nil {
errChan <- err
return
}
err = copyToWriter(traceFile.systemPath, filePart)
if err != nil {
errChan <- err
return
}
errChan <- form.Close()
}
func copyToWriter(path string, filePart io.Writer) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return err
}
_, err = io.CopyN(filePart, file, fi.Size())
return err
}
我的问题:
- 如何将uploadToS3中的错误通知给flushToWriter?(代码注释中有此问题)
- 当flushToWriter出现错误时,如何停止对端点的请求?(代码注释中有此问题)
- 我是否正确使用了管道?
如果还有改进的空间,请提出建议。 注意:代码在成功情况下没有问题。我只是针对失败情况和更好的错误处理采取预防措施。
观察结果:
- 错误的端点(uploadS3函数中的错误),代码立即退出并给出正确的HTTP错误。
- 在flushToWriter函数出现错误的情况下,代码立即退出,仅给出类似以下的HTTP错误:
http: ContentLength=189849860 with Body length 192
但实际错误发生在flushToWriter,未能传播到uploadS3函数。但它进入了下面的错误块:
if err != nil {
log.Error("Uploading failed due to", err)
<-errChan
return err
}
更多关于Golang中如何高效使用io.Pipe实现HTTP多部分文件上传的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中如何高效使用io.Pipe实现HTTP多部分文件上传的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
以下是针对您问题的专业解答,包含代码示例:
问题1:如何将uploadToS3中的错误通知给flushToWriter
使用context.Context来协调goroutine的取消:
func uploadToS3(URL string, traceFile TraceFile) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fileSize, err := util.GetFileSize(traceFile.systemPath)
if err != nil {
return err
}
contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize
readBody, writeBody := io.Pipe()
defer readBody.Close()
form := multipart.NewWriter(writeBody)
errChan := make(chan error, 1)
// 传递context
go flushToWriter(ctx, errChan, writeBody, traceFile)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, readBody)
if err != nil {
cancel() // 取消context
return err
}
req.Header.Set("Content-Type", form.FormDataContentType())
if contentLength > 0 {
req.ContentLength = contentLength
}
res, err := http.DefaultClient.Do(req)
if err != nil {
log.Error("Uploading failed due to", err)
cancel() // 通知flushToWriter停止
<-errChan
return err
}
defer res.Body.Close()
log.Info("Response from s3 ", res)
return <-errChan
}
func flushToWriter(ctx context.Context, errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {
form := multipart.NewWriter(writeBody)
defer writeBody.Close()
filePart, err := form.CreateFormFile("file", traceFile.fileName)
if err != nil {
select {
case errChan <- err:
case <-ctx.Done():
}
return
}
err = copyToWriter(ctx, traceFile.systemPath, filePart)
if err != nil {
select {
case errChan <- err:
case <-ctx.Done():
}
return
}
select {
case errChan <- form.Close():
case <-ctx.Done():
errChan <- ctx.Err()
}
}
func copyToWriter(ctx context.Context, path string, filePart io.Writer) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return err
}
// 使用带context的Copy
_, err = io.Copy(filePart, file)
return err
}
问题2:当flushToWriter出现错误时如何停止请求
在flushToWriter中关闭管道writer会中断HTTP请求:
func flushToWriter(ctx context.Context, errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {
form := multipart.NewWriter(writeBody)
defer func() {
// 确保管道关闭以中断HTTP请求
writeBody.CloseWithError(err)
}()
filePart, err := form.CreateFormFile("file", traceFile.fileName)
if err != nil {
errChan <- err
return
}
err = copyToWriter(ctx, traceFile.systemPath, filePart)
if err != nil {
errChan <- err
return
}
err = form.Close()
if err != nil {
errChan <- err
return
}
errChan <- nil
}
问题3:管道使用是否正确
您的管道使用基本正确,但可以优化错误处理:
func uploadToS3(URL string, traceFile TraceFile) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fileSize, err := util.GetFileSize(traceFile.systemPath)
if err != nil {
return err
}
contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize
readBody, writeBody := io.Pipe()
errChan := make(chan error, 1)
go func() {
defer close(errChan)
defer readBody.Close()
form := multipart.NewWriter(writeBody)
defer writeBody.Close()
filePart, err := form.CreateFormFile("file", traceFile.fileName)
if err != nil {
errChan <- err
return
}
file, err := os.Open(traceFile.systemPath)
if err != nil {
errChan <- err
return
}
defer file.Close()
// 流式复制,避免内存占用
_, err = io.Copy(filePart, file)
if err != nil {
errChan <- err
return
}
err = form.Close()
errChan <- err
}()
req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, readBody)
if err != nil {
cancel()
return err
}
req.Header.Set("Content-Type", "multipart/form-data")
if contentLength > 0 {
req.ContentLength = contentLength
}
res, err := http.DefaultClient.Do(req)
if err != nil {
cancel()
// 等待goroutine完成或超时
select {
case <-errChan:
case <-time.After(5 * time.Second):
}
return err
}
defer res.Body.Close()
log.Info("Response from s3 ", res)
// 检查goroutine中的错误
if err := <-errChan; err != nil {
return fmt.Errorf("upload failed: %w", err)
}
return nil
}
这些修改确保了:
- 使用context进行协调取消
- 管道错误正确传播
- 资源正确清理
- 内存高效的流式处理

