Golang代码解析与实现原理详解

Golang代码解析与实现原理详解 我在查阅azcopy的源代码时,无法理解下面代码中“os.Stdin”部分是如何工作的。有人能好心解释一下吗?

func (cca *cookedCopyCmdArgs) processRedirectionUpload(blobResource common.ResourceString, blockSize int64) error {
	ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)

	// if no block size is set, then use default value
	if blockSize == 0 {
		blockSize = pipingDefaultBlockSize
	}

	// step 0: initialize pipeline
	p, err := createBlobPipeline(ctx, common.CredentialInfo{CredentialType: common.ECredentialType.Anonymous()})
	if err != nil {
		return err
	}

	// step 1: parse destination url
	u, err := blobResource.FullURL()
	if err != nil {
		return fmt.Errorf("fatal: cannot parse destination blob URL due to error: %s", err.Error())
	}

	// step 2: leverage high-level call in Blob SDK to upload stdin in parallel
	blockBlobUrl := azblob.NewBlockBlobURL(*u, p)
	_, err = azblob.UploadStreamToBlockBlob(ctx, os.Stdin, blockBlobUrl, azblob.UploadStreamToBlockBlobOptions{
		BufferSize: int(blockSize),
		MaxBuffers: pipingUploadParallelism,
	})

	return err
}

源代码位于:https://github.com/Azure/azure-storage-azcopy/blob/master/cmd/copy.go


更多关于Golang代码解析与实现原理详解的实战教程也可以访问 https://www.itying.com/category-94-b0.html

8 回复

太棒了,谢谢!

更多关于Golang代码解析与实现原理详解的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


说得好 🙂 这是我唯一能找到的 azblob.Upload 实现,但他们可以使用 bloburl 方法(我现在是远程操作,无法确认)。

但问题依然存在,如何将文件的字节数据获取到标准输入中?还是说需要在命令行中通过管道输入?

@ncw 尼克,感谢你的回答。看来我的问题表述得不够清楚。我自己也使用过 azblob.UploadStreamToBlockBlob 函数,我不理解的是文件中的数据字节是如何被发送到标准输入管道,从而被该函数获取的。

示例

azcopy copy "C:\somefile.txt" "https://account.blob.core.windows.net/mycontainer

azblob.UploadStreamToBlockBlob 函数接收一个 io.Reader 接口,并持续读取数据直到结束,同时将数据上传到 Blob。

os.Stdin 实现了 io.Reader 接口,因此可以作为此函数的输入。

许多类型都实现了这个接口,例如 bytes.Buffer,这是在 Go 中进行 I/O 操作的主要方式之一。

希望这解答了你的疑问,如果没有,请告诉我!

如何从文件中获取字节数据到标准输入?

$ cat stdin.go
package main

import (
    "bufio"
    "fmt"
    "os"
)

func main() {
    s := bufio.NewScanner(os.Stdin)
    for s.Scan() {
	    b := s.Bytes()
	    fmt.Println(string(b))
    }
    if err := s.Err(); err != nil {
	    fmt.Println(err)
    }
}

$ cat stdin.file
This is data from a file.
$ go build stdin.go

$ ./stdin < stdin.file
This is data from a file.

$ cat stdin.file | ./stdin
This is data from a file.

Fatman:

我自己使用了 azblob.UploadStreamToBlockBlob 函数,我不理解的是文件中的数据字节是如何发送到 Stdin 管道并被该函数获取的。

	// 步骤 2:利用 Blob SDK 中的高级调用并行上传 stdin
	blockBlobUrl := azblob.NewBlockBlobURL(*u, p)
	_, err = azblob.UploadStreamToBlockBlob(ctx, os.Stdin, blockBlobUrl, azblob.UploadStreamToBlockBlobOptions{
		BufferSize: int(blockSize),
		MaxBuffers: pipingUploadParallelism,
	})

在这一步中,请注意 os.Stdin 是如何传递给 azblob.UploadStreamToBlockBlob 的。

os.Stdin 实现了一个 io.Reader 接口,该接口从标准输入读取字节。

Fatman:

我正在查看azcopy的源代码,但无法理解下面代码中“os.Stdin”部分是如何工作的。

    func (cca *cookedCopyCmdArgs) processRedirectionUpload(blobResource common.ResourceString, blockSize int64) error {
	// . . .
	// step 2: leverage high-level call in Blob SDK to upload stdin in parallel
	_, err = azblob.UploadStreamToBlockBlob(ctx, os.Stdin, blockBlobUrl, azblob.UploadStreamToBlockBlobOptions{
		BufferSize: int(blockSize),
		MaxBuffers: pipingUploadParallelism,
	})

	return err
}

Fatman:

@ncw 我自己也使用过azblob.UploadStreamToBlockBlob函数,我不理解的是文件中的数据字节是如何发送到Stdin管道并被该函数接收的。

示例 azcopy copy "C:\somefile.txt" "https://account.blob.core.windows.net/mycontainer

processRedirectionUpload 是 UploadStreamToBlockBlob 的一个包装器。你为什么期望你的示例会执行 processRedirectionUpload?你并没有提供任何证据证明它确实执行了。对于你的示例,cca.isRedirection()true 吗?

在azcopy的这段代码中,os.Stdin作为标准输入流被传递给Azure Blob存储的上传函数。这里的关键在于理解Go语言中标准输入的工作原理以及azcopy如何利用它进行数据流式传输。

工作原理详解

1. os.Stdin的本质

os.Stdin是Go语言中*os.File类型的全局变量,实现了io.Reader接口。当azcopy以管道方式运行时,操作系统会将前一个命令的输出重定向到这个标准输入。

// os.Stdin的定义(简化)
var (
    Stdin  = NewFile(uintptr(syscall.Stdin), "/dev/stdin")
)

// 实际使用示例:从标准输入读取数据
func readFromStdin() {
    data := make([]byte, 1024)
    n, err := os.Stdin.Read(data)
    if err != nil && err != io.EOF {
        log.Fatal(err)
    }
    fmt.Printf("Read %d bytes from stdin\n", n)
}

2. 管道重定向的工作方式

当你在终端中使用管道操作符时:

# 示例:将文件内容通过管道传递给azcopy上传到Blob存储
cat largefile.bin | azcopy cp "https://mystorage.blob.core.windows.net/mycontainer/largefile.bin"

在这个场景中:

  • cat命令的输出被重定向到azcopy进程的标准输入
  • os.Stdin自动接收这些数据流
  • UploadStreamToBlockBlob函数从os.Stdin持续读取数据

3. UploadStreamToBlockBlob的内部机制

Azure SDK的UploadStreamToBlockBlob函数内部实现了一个高效的分块上传机制:

// 模拟UploadStreamToBlockBlob的简化工作流程
func uploadStreamInternal(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, options UploadStreamToBlockBlobOptions) error {
    buffer := make([]byte, options.BufferSize)
    
    for {
        // 从标准输入读取数据块
        n, err := reader.Read(buffer)
        if err != nil && err != io.EOF {
            return err
        }
        
        if n > 0 {
            // 上传数据块到Azure Blob存储
            blockID := generateBlockID()
            _, err := blockBlobURL.StageBlock(ctx, blockID, bytes.NewReader(buffer[:n]), nil)
            if err != nil {
                return err
            }
        }
        
        if err == io.EOF {
            break
        }
    }
    
    // 提交所有块
    _, err := blockBlobURL.CommitBlockList(ctx, blockList, nil)
    return err
}

4. 并发上传的实现

UploadStreamToBlockBlob使用多个缓冲区并行上传:

// 并发上传的简化示例
func concurrentUpload(ctx context.Context, stdin io.Reader, blockSize int, parallelism int) error {
    buffers := make(chan []byte, parallelism)
    errChan := make(chan error, 1)
    
    // 启动多个goroutine处理上传
    for i := 0; i < parallelism; i++ {
        go func() {
            for buffer := range buffers {
                // 实际上传逻辑
                if err := uploadBlock(buffer); err != nil {
                    errChan <- err
                    return
                }
            }
        }()
    }
    
    // 从标准输入读取并分发到缓冲区
    for {
        buffer := make([]byte, blockSize)
        n, err := stdin.Read(buffer)
        if n > 0 {
            select {
            case buffers <- buffer[:n]:
            case err := <-errChan:
                return err
            }
        }
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
    }
    
    close(buffers)
    return nil
}

关键点总结

  1. 流式处理os.Stdin允许azcopy以流式方式处理数据,无需将整个文件加载到内存
  2. 零拷贝优化:数据直接从标准输入缓冲区传输到网络缓冲区,减少内存复制
  3. 背压控制:当上传速度慢于读取速度时,Read调用会阻塞,自然形成流量控制
  4. 错误传播:如果标准输入关闭或发生错误,Read方法会返回错误,上传过程相应终止

这种设计使得azcopy能够高效处理大文件上传,即使文件大小超过可用内存也能正常工作。os.Stdin在这里充当了数据源的角色,Azure SDK的UploadStreamToBlockBlob函数负责将数据流分块并并行上传到Blob存储。

回到顶部