Golang处理io.Reader时遇到的问题请教

Golang处理io.Reader时遇到的问题请教 我正在尝试在Go语言中实现从云存储(例如S1)到我的存储S2的流式传输。

// 备份函数
func Backup(<some arguments>, uploadPathName string, objectReader io.Reader) {

	ctx := context.Background()

	// 创建一个上传句柄。
	// 部分代码
	fmt.Printf("\n正在上传 ....")

	_, err = io.Copy(upload, objectReader)
	if err != nil {
		abortErr := upload.Abort()
		log.Fatal("无法上传数据", err, abortErr)
	}

	// 提交上传
        // 部分代码
}

假设我有一个大小为15 MB的文件存储在我的S1实例中。 当我运行我的代码时,备份会在中途停止,不显示任何错误,并且只将部分文件备份到我的存储中。

使用此代码时,其他云存储没有出现此类错误。

对于S1,我不得不专门使用这段涉及复杂SectionReader的代码,它可以与minio.Object一起使用,但不能与io.Reader一起使用:

// 备份函数
func Backup(<some arguments>, uploadPathName string, objectReader *minio.Object) {

	ctx := context.Background()

	// 创建一个上传句柄。
	// 部分代码
	fmt.Printf("\n正在上传 ....")

	var lastIndex int64
	var numOfBytesRead int
	lastIndex = 0
	var buf = make([]byte, 32768)

	var err1 error
	for err1 != io.EOF {
		sectionReader := io.NewSectionReader(objectReader, lastIndex, int64(cap(buf)))
		numOfBytesRead, err1 = sectionReader.ReadAt(buf, 0)
		if numOfBytesRead > 0 {
			reader := bytes.NewBuffer(buf[0:numOfBytesRead])
			// 尝试上传数据n次
			retry := 0
			for retry < MAXRETRY {
				_, err = io.Copy(upload, reader)
				if err != nil {
					retry++
				} else {
					break
				}
			}
			if retry == MAXRETRY {
				log.Fatal("无法上传数据: ", err)
			}
		}

		lastIndex = lastIndex + int64(numOfBytesRead)
	}

	// 提交上传
    // 部分代码
	
    // 读取后关闭文件句柄。
	if err = objectReader.Close(); err != nil {
		log.Fatal(err)
	}
}

但是,我希望该函数兼容io.Reader,并且能够完整地备份文件。

所以,我想知道: 问题是出在那个云存储S1上,还是出在我的代码上?


更多关于Golang处理io.Reader时遇到的问题请教的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

test:

问题是在那个云存储 S1 中还是在我的代码中?

没有访问您的 S1 权限,无法判断。

更多关于Golang处理io.Reader时遇到的问题请教的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


S1 是 Zenko 云存储

我的 Zenko 实例位于 Zenko 沙箱免费实例,地址是 https://admin.zenko.io(每 48 小时重置一次)

skillian:

这或许能帮助你定位问题是在你的代码中还是在 io.Copy 中发生。

也许更简单的方法(不过在 Windows 上不行)是你可以对进程执行 kill -QUIT 命令,这会生成一个回溯信息(这相当于用 CTRL-\ 而不是 CTRL-C 来终止进程)。如果你在进程卡住时这样做,通常可以很快看出它是否发生了死锁。

这绝对算不上“优雅”,但它帮我捕获过无限循环;或许对你也有帮助!runtime/pprof 包中的 Profile 类型有一个 WriteTo 方法,你可以用它来打印所有 goroutine 的堆栈跟踪。你可以使用 os/signal.Notify 注册一个处理器来捕获 os.Interrupt(即 Ctrl + C),并在关闭前打印你的 goroutine 跟踪信息:

import (
    "os/signal"
    "runtime/pprof"
)

func main() {
    // ...
    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)
    defer close(c)
    go func() {
        for sig := range c {
            pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
            signal.Stop(c)
            break
        }
    }
    // ...
}

这可能有助于你定位问题发生在你的代码中还是 io.Copy 中。

从你的代码来看,问题很可能出在云存储S1的io.Reader实现上。某些云存储的io.Reader可能没有正确实现完整的读取语义,特别是在遇到网络中断或特定EOF处理时。

以下是兼容io.Reader的改进版本,它包含了更健壮的错误处理和重试机制:

func Backup(upload io.Writer, objectReader io.Reader) error {
    const bufferSize = 32768
    const maxRetry = 3
    
    buf := make([]byte, bufferSize)
    var totalWritten int64
    
    for {
        // 读取数据
        n, readErr := objectReader.Read(buf)
        if n > 0 {
            // 写入数据(带重试)
            retry := 0
            for retry < maxRetry {
                written, writeErr := upload.Write(buf[:n])
                if writeErr != nil {
                    retry++
                    if retry == maxRetry {
                        return fmt.Errorf("上传失败,重试%d次后错误: %w", maxRetry, writeErr)
                    }
                    time.Sleep(time.Duration(retry) * 100 * time.Millisecond)
                    continue
                }
                totalWritten += int64(written)
                break
            }
        }
        
        // 处理读取错误
        if readErr != nil {
            if readErr == io.EOF {
                break // 正常结束
            }
            return fmt.Errorf("读取源数据失败: %w", readErr)
        }
    }
    
    fmt.Printf("上传完成,总共传输: %d 字节\n", totalWritten)
    return nil
}

如果你需要保持与io.Copy类似的接口但增加重试逻辑,可以使用包装器:

type retryWriter struct {
    writer   io.Writer
    maxRetry int
}

func (rw *retryWriter) Write(p []byte) (int, error) {
    for i := 0; i < rw.maxRetry; i++ {
        n, err := rw.writer.Write(p)
        if err == nil {
            return n, nil
        }
        if i == rw.maxRetry-1 {
            return n, err
        }
        time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
    }
    return 0, io.ErrShortWrite
}

func BackupWithRetry(upload io.Writer, objectReader io.Reader) error {
    retryUpload := &retryWriter{
        writer:   upload,
        maxRetry: 3,
    }
    
    _, err := io.Copy(retryUpload, objectReader)
    if err != nil {
        return fmt.Errorf("流式传输失败: %w", err)
    }
    
    return nil
}

对于需要处理部分读取的情况,可以添加缓冲区监控:

func BackupWithMonitoring(upload io.Writer, objectReader io.Reader) error {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
    defer cancel()
    
    type result struct {
        n   int
        err error
    }
    
    ch := make(chan result, 1)
    buf := make([]byte, 32768)
    
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            default:
                n, err := objectReader.Read(buf)
                ch <- result{n, err}
                if err != nil {
                    return
                }
            }
        }
    }()
    
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case res := <-ch:
            if res.err != nil && res.err != io.EOF {
                return res.err
            }
            
            if res.n > 0 {
                _, err := io.CopyN(upload, bytes.NewReader(buf[:res.n]), int64(res.n))
                if err != nil {
                    return err
                }
            }
            
            if res.err == io.EOF {
                return nil
            }
        }
    }
}

这些实现提供了更好的错误处理和重试机制,能够处理不完整的io.Reader实现。问题很可能出在S1的io.Reader没有正确处理EOF或网络中断,导致io.Copy提前返回而没有错误。

回到顶部