Golang中如何在不将HTTP请求体读入内存的情况下重复使用它?

Golang中如何在不将HTTP请求体读入内存的情况下重复使用它? 我正在使用 curl -T <FILE> 上传文件,文件大小可能高达5GB甚至更大。服务器实现可能是基于 HTTP/1.1 的 Go、Rust 或 Java 的 HTTP 服务器。

我需要在服务器端计算文件的校验和,包括 MD5SHA1SHA256。这需要进行三次计算,因此最好并行计算,并且我绝不能将文件读入内存,因为这会导致服务直接 OOM(同时也不能写入临时文件)。然而,我们遇到的最大问题是 HTTP 请求体 不能 被多次读取:使用它计算一次校验和后,再次尝试读取会变为空。我们尝试过使用 Go 的 io.Pipe() 来实现,但效率似乎不高。

计算完校验和后,我们需要根据业务逻辑决定是否将文件上传到某个 S3,这又需要文件流。

这个需求有可能实现吗?能否提供一些具体的代码?

func main() {
    fmt.Println("hello world")
}

更多关于Golang中如何在不将HTTP请求体读入内存的情况下重复使用它?的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

我在Rust语言论坛上找到了你的另一个问题,你在那里提供了更多关于你提问动机的背景信息:

我需要在校验和计算完成后,判断是否将文件上传到S3,以避免冗余存储(当然,另一个思路是边计算边上传,如果上传后发现重复,可以异步删除)。

虽然这帮助我理解了为什么你不想在接收数据的同时上传到S3,但我仍然不明白你为什么不能使用本地临时文件。

更多关于Golang中如何在不将HTTP请求体读入内存的情况下重复使用它?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


cxy77:

这需要进行三次计算……并且不写入临时文件……计算完校验和后,我们需要根据业务逻辑判断是否将文件上传到某个S3,这又需要文件流。

你不能像无法重读 os.Stdin 一样重读请求体:你是直接从流输入中读取的(对于 stdin 来说:是用户在键盘上输入的内容或从另一个应用程序管道传入的内容;对于请求来说:是服务器在其响应中发送的内容),一旦它被消费掉,它就消失了。如果你想重读它,或者重新请求信息(例如,读取一次来计算哈希,然后再次请求并直接发送到 S3),你必须在某个地方存储它。

cxy77:

我们尝试使用 Go 的 io.Pipe() 来实现这一点,但它似乎效率不高。

你能澄清一下你所说的“效率不高”是什么意思吗?

在Go中实现不将大文件读入内存的情况下重复使用HTTP请求体是可行的,关键是要使用io.TeeReaderio.MultiWriter来分流数据。以下是具体实现方案:

package main

import (
	"crypto/md5"
	"crypto/sha1"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"sync"
)

func main() {
	http.HandleFunc("/upload", uploadHandler)
	http.ListenAndServe(":8080", nil)
}

func uploadHandler(w http.ResponseWriter, r *http.Request) {
	// 创建三个哈希计算器
	md5Hash := md5.New()
	sha1Hash := sha1.New()
	sha256Hash := sha256.New()

	// 使用MultiWriter将数据同时写入三个哈希计算器
	multiWriter := io.MultiWriter(md5Hash, sha1Hash, sha256Hash)

	// 创建管道用于后续处理
	pr, pw := io.Pipe()
	
	// 使用TeeReader将请求体同时复制到哈希计算器和管道
	teeReader := io.TeeReader(r.Body, multiWriter)

	var wg sync.WaitGroup
	wg.Add(1)

	// 启动goroutine将数据写入管道
	go func() {
		defer pw.Close()
		defer wg.Done()
		_, err := io.Copy(pw, teeReader)
		if err != nil {
			pw.CloseWithError(err)
		}
	}()

	// 并行计算哈希值
	var (
		md5Result    string
		sha1Result   string
		sha256Result string
		hashErr      error
	)

	wg.Add(1)
	go func() {
		defer wg.Done()
		// 等待哈希计算完成
		io.Copy(io.Discard, teeReader)
		
		// 获取哈希结果
		md5Result = hex.EncodeToString(md5Hash.Sum(nil))
		sha1Result = hex.EncodeToString(sha1Hash.Sum(nil))
		sha256Result = hex.EncodeToString(sha256Hash.Sum(nil))
	}()

	// 这里可以添加S3上传逻辑,使用pr作为数据源
	// 例如:s3uploader.Upload(pr, "filename")
	
	// 等待所有操作完成
	wg.Wait()

	if hashErr != nil {
		http.Error(w, "Hash calculation failed", http.StatusInternalServerError)
		return
	}

	// 返回计算结果
	fmt.Fprintf(w, "MD5: %s\nSHA1: %s\nSHA256: %s\n", 
		md5Result, sha1Result, sha256Result)
}

更优化的版本,使用自定义的multiReader来更好地控制数据流:

package main

import (
	"crypto/md5"
	"crypto/sha1"
	"crypto/sha256"
	"encoding/hex"
	"io"
	"net/http"
	"sync"
)

type hashingReader struct {
	reader   io.Reader
	md5Hash  hash.Hash
	sha1Hash hash.Hash
	sha256Hash hash.Hash
	mu       sync.Mutex
}

func newHashingReader(r io.Reader) *hashingReader {
	return &hashingReader{
		reader:    r,
		md5Hash:   md5.New(),
		sha1Hash:  sha1.New(),
		sha256Hash: sha256.New(),
	}
}

func (hr *hashingReader) Read(p []byte) (n int, err error) {
	n, err = hr.reader.Read(p)
	if n > 0 {
		hr.mu.Lock()
		hr.md5Hash.Write(p[:n])
		hr.sha1Hash.Write(p[:n])
		hr.sha256Hash.Write(p[:n])
		hr.mu.Unlock()
	}
	return n, err
}

func (hr *hashingReader) GetHashes() (md5Str, sha1Str, sha256Str string) {
	hr.mu.Lock()
	defer hr.mu.Unlock()
	md5Str = hex.EncodeToString(hr.md5Hash.Sum(nil))
	sha1Str = hex.EncodeToString(hr.sha1Hash.Sum(nil))
	sha256Str = hex.EncodeToString(hr.sha256Hash.Sum(nil))
	return
}

func uploadHandler(w http.ResponseWriter, r *http.Request) {
	// 创建哈希读取器
	hashingReader := newHashingReader(r.Body)
	
	// 创建管道用于S3上传
	pr, pw := io.Pipe()
	
	var wg sync.WaitGroup
	wg.Add(2)
	
	// Goroutine 1: 计算哈希并复制数据到管道
	go func() {
		defer pw.Close()
		defer wg.Done()
		_, err := io.Copy(pw, hashingReader)
		if err != nil {
			pw.CloseWithError(err)
		}
	}()
	
	// Goroutine 2: 获取哈希结果
	var md5Result, sha1Result, sha256Result string
	go func() {
		defer wg.Done()
		// 等待所有数据读取完成
		io.Copy(io.Discard, hashingReader)
		md5Result, sha1Result, sha256Result = hashingReader.GetHashes()
	}()
	
	// 这里可以处理S3上传,使用pr作为数据源
	// uploadToS3(pr)
	
	// 等待所有操作完成
	wg.Wait()
	
	// 返回结果
	fmt.Fprintf(w, "MD5: %s\nSHA1: %s\nSHA256: %s\n", 
		md5Result, sha1Result, sha256Result)
}

这个方案的关键点:

  1. 使用io.TeeReader将数据流同时复制到多个目标
  2. 使用io.MultiWriter并行计算多个哈希值
  3. 使用io.Pipe创建可重用的数据流供后续处理
  4. 通过goroutine实现并行处理,提高效率
  5. 整个过程中文件数据不会完全加载到内存,而是以流的方式处理
回到顶部