Golang处理io.Reader时遇到的问题请教
Golang处理io.Reader时遇到的问题请教 我正在尝试在Go语言中实现从云存储(例如S1)到我的存储S2的流式传输。
// 备份函数
func Backup(<some arguments>, uploadPathName string, objectReader io.Reader) {
ctx := context.Background()
// 创建一个上传句柄。
// 部分代码
fmt.Printf("\n正在上传 ....")
_, err = io.Copy(upload, objectReader)
if err != nil {
abortErr := upload.Abort()
log.Fatal("无法上传数据", err, abortErr)
}
// 提交上传
// 部分代码
}
假设我有一个大小为15 MB的文件存储在我的S1实例中。 当我运行我的代码时,备份会在中途停止,不显示任何错误,并且只将部分文件备份到我的存储中。
使用此代码时,其他云存储没有出现此类错误。
对于S1,我不得不专门使用这段涉及复杂SectionReader的代码,它可以与minio.Object一起使用,但不能与io.Reader一起使用:
// 备份函数
func Backup(<some arguments>, uploadPathName string, objectReader *minio.Object) {
ctx := context.Background()
// 创建一个上传句柄。
// 部分代码
fmt.Printf("\n正在上传 ....")
var lastIndex int64
var numOfBytesRead int
lastIndex = 0
var buf = make([]byte, 32768)
var err1 error
for err1 != io.EOF {
sectionReader := io.NewSectionReader(objectReader, lastIndex, int64(cap(buf)))
numOfBytesRead, err1 = sectionReader.ReadAt(buf, 0)
if numOfBytesRead > 0 {
reader := bytes.NewBuffer(buf[0:numOfBytesRead])
// 尝试上传数据n次
retry := 0
for retry < MAXRETRY {
_, err = io.Copy(upload, reader)
if err != nil {
retry++
} else {
break
}
}
if retry == MAXRETRY {
log.Fatal("无法上传数据: ", err)
}
}
lastIndex = lastIndex + int64(numOfBytesRead)
}
// 提交上传
// 部分代码
// 读取后关闭文件句柄。
if err = objectReader.Close(); err != nil {
log.Fatal(err)
}
}
但是,我希望该函数兼容io.Reader,并且能够完整地备份文件。
所以,我想知道: 问题是出在那个云存储S1上,还是出在我的代码上?
更多关于Golang处理io.Reader时遇到的问题请教的实战教程也可以访问 https://www.itying.com/category-94-b0.html
test:
问题是在那个云存储 S1 中还是在我的代码中?
没有访问您的 S1 权限,无法判断。
更多关于Golang处理io.Reader时遇到的问题请教的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
S1 是 Zenko 云存储
我的 Zenko 实例位于 Zenko 沙箱免费实例,地址是 https://admin.zenko.io(每 48 小时重置一次)
skillian:
这或许能帮助你定位问题是在你的代码中还是在
io.Copy中发生。
也许更简单的方法(不过在 Windows 上不行)是你可以对进程执行 kill -QUIT 命令,这会生成一个回溯信息(这相当于用 CTRL-\ 而不是 CTRL-C 来终止进程)。如果你在进程卡住时这样做,通常可以很快看出它是否发生了死锁。
这绝对算不上“优雅”,但它帮我捕获过无限循环;或许对你也有帮助!runtime/pprof 包中的 Profile 类型有一个 WriteTo 方法,你可以用它来打印所有 goroutine 的堆栈跟踪。你可以使用 os/signal.Notify 注册一个处理器来捕获 os.Interrupt(即 Ctrl + C),并在关闭前打印你的 goroutine 跟踪信息:
import (
"os/signal"
"runtime/pprof"
)
func main() {
// ...
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
defer close(c)
go func() {
for sig := range c {
pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
signal.Stop(c)
break
}
}
// ...
}
这可能有助于你定位问题发生在你的代码中还是 io.Copy 中。
从你的代码来看,问题很可能出在云存储S1的io.Reader实现上。某些云存储的io.Reader可能没有正确实现完整的读取语义,特别是在遇到网络中断或特定EOF处理时。
以下是兼容io.Reader的改进版本,它包含了更健壮的错误处理和重试机制:
func Backup(upload io.Writer, objectReader io.Reader) error {
const bufferSize = 32768
const maxRetry = 3
buf := make([]byte, bufferSize)
var totalWritten int64
for {
// 读取数据
n, readErr := objectReader.Read(buf)
if n > 0 {
// 写入数据(带重试)
retry := 0
for retry < maxRetry {
written, writeErr := upload.Write(buf[:n])
if writeErr != nil {
retry++
if retry == maxRetry {
return fmt.Errorf("上传失败,重试%d次后错误: %w", maxRetry, writeErr)
}
time.Sleep(time.Duration(retry) * 100 * time.Millisecond)
continue
}
totalWritten += int64(written)
break
}
}
// 处理读取错误
if readErr != nil {
if readErr == io.EOF {
break // 正常结束
}
return fmt.Errorf("读取源数据失败: %w", readErr)
}
}
fmt.Printf("上传完成,总共传输: %d 字节\n", totalWritten)
return nil
}
如果你需要保持与io.Copy类似的接口但增加重试逻辑,可以使用包装器:
type retryWriter struct {
writer io.Writer
maxRetry int
}
func (rw *retryWriter) Write(p []byte) (int, error) {
for i := 0; i < rw.maxRetry; i++ {
n, err := rw.writer.Write(p)
if err == nil {
return n, nil
}
if i == rw.maxRetry-1 {
return n, err
}
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}
return 0, io.ErrShortWrite
}
func BackupWithRetry(upload io.Writer, objectReader io.Reader) error {
retryUpload := &retryWriter{
writer: upload,
maxRetry: 3,
}
_, err := io.Copy(retryUpload, objectReader)
if err != nil {
return fmt.Errorf("流式传输失败: %w", err)
}
return nil
}
对于需要处理部分读取的情况,可以添加缓冲区监控:
func BackupWithMonitoring(upload io.Writer, objectReader io.Reader) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
type result struct {
n int
err error
}
ch := make(chan result, 1)
buf := make([]byte, 32768)
go func() {
for {
select {
case <-ctx.Done():
return
default:
n, err := objectReader.Read(buf)
ch <- result{n, err}
if err != nil {
return
}
}
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case res := <-ch:
if res.err != nil && res.err != io.EOF {
return res.err
}
if res.n > 0 {
_, err := io.CopyN(upload, bytes.NewReader(buf[:res.n]), int64(res.n))
if err != nil {
return err
}
}
if res.err == io.EOF {
return nil
}
}
}
}
这些实现提供了更好的错误处理和重试机制,能够处理不完整的io.Reader实现。问题很可能出在S1的io.Reader没有正确处理EOF或网络中断,导致io.Copy提前返回而没有错误。

