Golang代码解析与实现原理详解
Golang代码解析与实现原理详解 我在查阅azcopy的源代码时,无法理解下面代码中“os.Stdin”部分是如何工作的。有人能好心解释一下吗?
func (cca *cookedCopyCmdArgs) processRedirectionUpload(blobResource common.ResourceString, blockSize int64) error {
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// if no block size is set, then use default value
if blockSize == 0 {
blockSize = pipingDefaultBlockSize
}
// step 0: initialize pipeline
p, err := createBlobPipeline(ctx, common.CredentialInfo{CredentialType: common.ECredentialType.Anonymous()})
if err != nil {
return err
}
// step 1: parse destination url
u, err := blobResource.FullURL()
if err != nil {
return fmt.Errorf("fatal: cannot parse destination blob URL due to error: %s", err.Error())
}
// step 2: leverage high-level call in Blob SDK to upload stdin in parallel
blockBlobUrl := azblob.NewBlockBlobURL(*u, p)
_, err = azblob.UploadStreamToBlockBlob(ctx, os.Stdin, blockBlobUrl, azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(blockSize),
MaxBuffers: pipingUploadParallelism,
})
return err
}
源代码位于:https://github.com/Azure/azure-storage-azcopy/blob/master/cmd/copy.go
更多关于Golang代码解析与实现原理详解的实战教程也可以访问 https://www.itying.com/category-94-b0.html
说得好 🙂 这是我唯一能找到的 azblob.Upload 实现,但他们可以使用 bloburl 方法(我现在是远程操作,无法确认)。
但问题依然存在,如何将文件的字节数据获取到标准输入中?还是说需要在命令行中通过管道输入?
@ncw
尼克,感谢你的回答。看来我的问题表述得不够清楚。我自己也使用过 azblob.UploadStreamToBlockBlob 函数,我不理解的是文件中的数据字节是如何被发送到标准输入管道,从而被该函数获取的。
示例
azcopy copy "C:\somefile.txt" "https://account.blob.core.windows.net/mycontainer
azblob.UploadStreamToBlockBlob 函数接收一个 io.Reader 接口,并持续读取数据直到结束,同时将数据上传到 Blob。
os.Stdin 实现了 io.Reader 接口,因此可以作为此函数的输入。
许多类型都实现了这个接口,例如 bytes.Buffer,这是在 Go 中进行 I/O 操作的主要方式之一。
希望这解答了你的疑问,如果没有,请告诉我!
如何从文件中获取字节数据到标准输入?
$ cat stdin.go
package main
import (
"bufio"
"fmt"
"os"
)
func main() {
s := bufio.NewScanner(os.Stdin)
for s.Scan() {
b := s.Bytes()
fmt.Println(string(b))
}
if err := s.Err(); err != nil {
fmt.Println(err)
}
}
$ cat stdin.file
This is data from a file.
$ go build stdin.go
$ ./stdin < stdin.file
This is data from a file.
$ cat stdin.file | ./stdin
This is data from a file.
Fatman:
我自己使用了
azblob.UploadStreamToBlockBlob函数,我不理解的是文件中的数据字节是如何发送到 Stdin 管道并被该函数获取的。
// 步骤 2:利用 Blob SDK 中的高级调用并行上传 stdin
blockBlobUrl := azblob.NewBlockBlobURL(*u, p)
_, err = azblob.UploadStreamToBlockBlob(ctx, os.Stdin, blockBlobUrl, azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(blockSize),
MaxBuffers: pipingUploadParallelism,
})
在这一步中,请注意 os.Stdin 是如何传递给 azblob.UploadStreamToBlockBlob 的。
os.Stdin 实现了一个 io.Reader 接口,该接口从标准输入读取字节。
Fatman:
我正在查看azcopy的源代码,但无法理解下面代码中“os.Stdin”部分是如何工作的。
func (cca *cookedCopyCmdArgs) processRedirectionUpload(blobResource common.ResourceString, blockSize int64) error { // . . . // step 2: leverage high-level call in Blob SDK to upload stdin in parallel _, err = azblob.UploadStreamToBlockBlob(ctx, os.Stdin, blockBlobUrl, azblob.UploadStreamToBlockBlobOptions{ BufferSize: int(blockSize), MaxBuffers: pipingUploadParallelism, }) return err }
Fatman:
@ncw 我自己也使用过azblob.UploadStreamToBlockBlob函数,我不理解的是文件中的数据字节是如何发送到Stdin管道并被该函数接收的。
示例
azcopy copy "C:\somefile.txt" "https://account.blob.core.windows.net/mycontainer
processRedirectionUpload 是 UploadStreamToBlockBlob 的一个包装器。你为什么期望你的示例会执行 processRedirectionUpload?你并没有提供任何证据证明它确实执行了。对于你的示例,cca.isRedirection() 是 true 吗?
在azcopy的这段代码中,os.Stdin作为标准输入流被传递给Azure Blob存储的上传函数。这里的关键在于理解Go语言中标准输入的工作原理以及azcopy如何利用它进行数据流式传输。
工作原理详解
1. os.Stdin的本质
os.Stdin是Go语言中*os.File类型的全局变量,实现了io.Reader接口。当azcopy以管道方式运行时,操作系统会将前一个命令的输出重定向到这个标准输入。
// os.Stdin的定义(简化)
var (
Stdin = NewFile(uintptr(syscall.Stdin), "/dev/stdin")
)
// 实际使用示例:从标准输入读取数据
func readFromStdin() {
data := make([]byte, 1024)
n, err := os.Stdin.Read(data)
if err != nil && err != io.EOF {
log.Fatal(err)
}
fmt.Printf("Read %d bytes from stdin\n", n)
}
2. 管道重定向的工作方式
当你在终端中使用管道操作符时:
# 示例:将文件内容通过管道传递给azcopy上传到Blob存储
cat largefile.bin | azcopy cp "https://mystorage.blob.core.windows.net/mycontainer/largefile.bin"
在这个场景中:
cat命令的输出被重定向到azcopy进程的标准输入os.Stdin自动接收这些数据流UploadStreamToBlockBlob函数从os.Stdin持续读取数据
3. UploadStreamToBlockBlob的内部机制
Azure SDK的UploadStreamToBlockBlob函数内部实现了一个高效的分块上传机制:
// 模拟UploadStreamToBlockBlob的简化工作流程
func uploadStreamInternal(ctx context.Context, reader io.Reader, blockBlobURL BlockBlobURL, options UploadStreamToBlockBlobOptions) error {
buffer := make([]byte, options.BufferSize)
for {
// 从标准输入读取数据块
n, err := reader.Read(buffer)
if err != nil && err != io.EOF {
return err
}
if n > 0 {
// 上传数据块到Azure Blob存储
blockID := generateBlockID()
_, err := blockBlobURL.StageBlock(ctx, blockID, bytes.NewReader(buffer[:n]), nil)
if err != nil {
return err
}
}
if err == io.EOF {
break
}
}
// 提交所有块
_, err := blockBlobURL.CommitBlockList(ctx, blockList, nil)
return err
}
4. 并发上传的实现
UploadStreamToBlockBlob使用多个缓冲区并行上传:
// 并发上传的简化示例
func concurrentUpload(ctx context.Context, stdin io.Reader, blockSize int, parallelism int) error {
buffers := make(chan []byte, parallelism)
errChan := make(chan error, 1)
// 启动多个goroutine处理上传
for i := 0; i < parallelism; i++ {
go func() {
for buffer := range buffers {
// 实际上传逻辑
if err := uploadBlock(buffer); err != nil {
errChan <- err
return
}
}
}()
}
// 从标准输入读取并分发到缓冲区
for {
buffer := make([]byte, blockSize)
n, err := stdin.Read(buffer)
if n > 0 {
select {
case buffers <- buffer[:n]:
case err := <-errChan:
return err
}
}
if err == io.EOF {
break
}
if err != nil {
return err
}
}
close(buffers)
return nil
}
关键点总结
- 流式处理:
os.Stdin允许azcopy以流式方式处理数据,无需将整个文件加载到内存 - 零拷贝优化:数据直接从标准输入缓冲区传输到网络缓冲区,减少内存复制
- 背压控制:当上传速度慢于读取速度时,
Read调用会阻塞,自然形成流量控制 - 错误传播:如果标准输入关闭或发生错误,
Read方法会返回错误,上传过程相应终止
这种设计使得azcopy能够高效处理大文件上传,即使文件大小超过可用内存也能正常工作。os.Stdin在这里充当了数据源的角色,Azure SDK的UploadStreamToBlockBlob函数负责将数据流分块并并行上传到Blob存储。

