Golang中如何高效使用io.Pipe实现HTTP多部分文件上传

Golang中如何高效使用io.Pipe实现HTTP多部分文件上传 背景:

通过预签名URL将文件上传到S3存储。文件大小约为500MB。 需要某种流式处理并减少内存使用。

type TraceFile struct {
	systemPath string
	fileName   string
}

func uploadToS3(URL string, traceFile TraceFile) error {

	fileSize, err := util.GetFileSize(traceFile.systemPath)
	if err != nil {
		return err
	}
	// S3不接受没有内容长度的分块多部分MIME
	contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize

	readBody, writeBody := io.Pipe()
	defer readBody.Close()

	form := multipart.NewWriter(writeBody)
	errChan := make(chan error, 1)
	go flushToWriter(errChan, writeBody, traceFile)
	req, err := http.NewRequest(http.MethodPut, URL, readBody)
	req.Header.Set("Content-Type", form.FormDataContentType())
	if contentLength > 0 {
		req.ContentLength = contentLength
	}
	res, err := http.DefaultClient.Do(req)
	log.Info("Response from s3 ", res)
	if err != nil {
		log.Error("Uploading failed due to", err)
                 //如何通知flushToWriter协程停止?
		<-errChan
		return err
	}
       //如果flushToWriter出现错误,如何停止发送请求?
	return <-errChan
}

func emptyMultipartSize(fieldname, filename string) int64 {
	body := &bytes.Buffer{}
	form := multipart.NewWriter(body)
	form.CreateFormFile(fieldname, filename)
	form.Close()
	return int64(body.Len())
}

func flushToWriter(errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {

	form := multipart.NewWriter(writeBody)
	defer writeBody.Close()

	filePart, err := form.CreateFormFile(cnst.MultiPartFieldName, traceFile.fileName)
	if err != nil {
		errChan <- err
		return
	}

	err = copyToWriter(traceFile.systemPath, filePart)
	if err != nil {
		errChan <- err
		return
	}

	errChan <- form.Close()
}

func copyToWriter(path string, filePart io.Writer) error {
	file, err := os.Open(path)
	if err != nil {
		return err
	}
	defer file.Close()

	fi, err := file.Stat()
	if err != nil {
		return err
	}
	_, err = io.CopyN(filePart, file, fi.Size())
	return err
}

我的问题:

  1. 如何将uploadToS3中的错误通知给flushToWriter?(代码注释中有此问题)
  2. 当flushToWriter出现错误时,如何停止对端点的请求?(代码注释中有此问题)
  3. 我是否正确使用了管道?

如果还有改进的空间,请提出建议。 注意:代码在成功情况下没有问题。我只是针对失败情况和更好的错误处理采取预防措施。

观察结果:

  • 错误的端点(uploadS3函数中的错误),代码立即退出并给出正确的HTTP错误。
  • 在flushToWriter函数出现错误的情况下,代码立即退出,仅给出类似以下的HTTP错误:

http: ContentLength=189849860 with Body length 192

但实际错误发生在flushToWriter,未能传播到uploadS3函数。但它进入了下面的错误块:

if err != nil {
      log.Error("Uploading failed due to", err)
       <-errChan
       return err
   }

更多关于Golang中如何高效使用io.Pipe实现HTTP多部分文件上传的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何高效使用io.Pipe实现HTTP多部分文件上传的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


以下是针对您问题的专业解答,包含代码示例:

问题1:如何将uploadToS3中的错误通知给flushToWriter

使用context.Context来协调goroutine的取消:

func uploadToS3(URL string, traceFile TraceFile) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    fileSize, err := util.GetFileSize(traceFile.systemPath)
    if err != nil {
        return err
    }
    
    contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize
    readBody, writeBody := io.Pipe()
    defer readBody.Close()

    form := multipart.NewWriter(writeBody)
    errChan := make(chan error, 1)
    
    // 传递context
    go flushToWriter(ctx, errChan, writeBody, traceFile)
    
    req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, readBody)
    if err != nil {
        cancel() // 取消context
        return err
    }
    
    req.Header.Set("Content-Type", form.FormDataContentType())
    if contentLength > 0 {
        req.ContentLength = contentLength
    }
    
    res, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Error("Uploading failed due to", err)
        cancel() // 通知flushToWriter停止
        <-errChan
        return err
    }
    defer res.Body.Close()
    
    log.Info("Response from s3 ", res)
    return <-errChan
}

func flushToWriter(ctx context.Context, errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {
    form := multipart.NewWriter(writeBody)
    defer writeBody.Close()

    filePart, err := form.CreateFormFile("file", traceFile.fileName)
    if err != nil {
        select {
        case errChan <- err:
        case <-ctx.Done():
        }
        return
    }

    err = copyToWriter(ctx, traceFile.systemPath, filePart)
    if err != nil {
        select {
        case errChan <- err:
        case <-ctx.Done():
        }
        return
    }

    select {
    case errChan <- form.Close():
    case <-ctx.Done():
        errChan <- ctx.Err()
    }
}

func copyToWriter(ctx context.Context, path string, filePart io.Writer) error {
    file, err := os.Open(path)
    if err != nil {
        return err
    }
    defer file.Close()

    fi, err := file.Stat()
    if err != nil {
        return err
    }
    
    // 使用带context的Copy
    _, err = io.Copy(filePart, file)
    return err
}

问题2:当flushToWriter出现错误时如何停止请求

在flushToWriter中关闭管道writer会中断HTTP请求:

func flushToWriter(ctx context.Context, errChan chan error, writeBody *io.PipeWriter, traceFile TraceFile) {
    form := multipart.NewWriter(writeBody)
    defer func() {
        // 确保管道关闭以中断HTTP请求
        writeBody.CloseWithError(err)
    }()

    filePart, err := form.CreateFormFile("file", traceFile.fileName)
    if err != nil {
        errChan <- err
        return
    }

    err = copyToWriter(ctx, traceFile.systemPath, filePart)
    if err != nil {
        errChan <- err
        return
    }

    err = form.Close()
    if err != nil {
        errChan <- err
        return
    }
    
    errChan <- nil
}

问题3:管道使用是否正确

您的管道使用基本正确,但可以优化错误处理:

func uploadToS3(URL string, traceFile TraceFile) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    fileSize, err := util.GetFileSize(traceFile.systemPath)
    if err != nil {
        return err
    }
    
    contentLength := emptyMultipartSize("file", traceFile.fileName) + fileSize
    readBody, writeBody := io.Pipe()

    errChan := make(chan error, 1)
    go func() {
        defer close(errChan)
        defer readBody.Close()
        
        form := multipart.NewWriter(writeBody)
        defer writeBody.Close()

        filePart, err := form.CreateFormFile("file", traceFile.fileName)
        if err != nil {
            errChan <- err
            return
        }

        file, err := os.Open(traceFile.systemPath)
        if err != nil {
            errChan <- err
            return
        }
        defer file.Close()

        // 流式复制,避免内存占用
        _, err = io.Copy(filePart, file)
        if err != nil {
            errChan <- err
            return
        }

        err = form.Close()
        errChan <- err
    }()

    req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, readBody)
    if err != nil {
        cancel()
        return err
    }
    
    req.Header.Set("Content-Type", "multipart/form-data")
    if contentLength > 0 {
        req.ContentLength = contentLength
    }
    
    res, err := http.DefaultClient.Do(req)
    if err != nil {
        cancel()
        // 等待goroutine完成或超时
        select {
        case <-errChan:
        case <-time.After(5 * time.Second):
        }
        return err
    }
    defer res.Body.Close()

    log.Info("Response from s3 ", res)
    
    // 检查goroutine中的错误
    if err := <-errChan; err != nil {
        return fmt.Errorf("upload failed: %w", err)
    }
    
    return nil
}

这些修改确保了:

  1. 使用context进行协调取消
  2. 管道错误正确传播
  3. 资源正确清理
  4. 内存高效的流式处理
回到顶部