Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现
Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现
我正在处理一些代码,这些代码返回一个用于数据提取的 io.Reader,而不是将完整输出作为字节切片返回,因为对于非常大的数据,后者会占用更多内存。
对于这个问题,它是针对 SSH 客户端接口提出的,但也可以适用于任何其底层资源在某个时刻会被关闭的读取器。
示例测试/演示代码:Go Playground - The Go Programming Language。它在我的计算机上本地运行,也可以在 Go Playground 中运行,不过有时 Playground 沙箱环境的执行会失败。我之前也针对真实的 SSH 连接测试过代码,而不仅仅是使用这个使用测试/模拟 SSH 服务器的演示代码,但在这里展示此代码更容易进行演示。
package main
import (
"fmt"
"io"
"net"
"github.com/metarsit/sshtest"
"golang.org/x/crypto/ssh"
)
func main() {
addr := "localhost:2222"
// 对于真实的演示,数据理想情况下应该是几百 MB 大小的非常大的字符串或字节数组
data := "supposedly some very large data being streamed for I/O processing"
// 初始化虚拟测试服务器,以连接到正在测试的 SSH 客户端相关代码
hp := sshtest.NewHoneyPot(addr)
// 在后台启动服务器
go func() {
hp.ListenAndServe()
}()
defer hp.Close()
hp.SetReturnString(data)
// 初始化 SSH 客户端依赖项,以测试执行命令和获取输出流
cfg := &ssh.ClientConfig{
User: "jdoe",
Auth: []ssh.AuthMethod{
ssh.Password("secret"),
},
HostKeyCallback: ssh.HostKeyCallback(
func(hostname string, remote net.Addr, key ssh.PublicKey) error {
return nil
},
),
}
outs, err := runSshCommand(addr, cfg, "echo \"hello world!\"")
if err != nil {
fmt.Printf("%v\n", err)
return
}
//result, err := io.ReadAll(outs)
result := make([]byte, 20) // 演示对读取器进行任意的部分读取
// 注意/待办:当完全读取返回的 "outs" 读取器
// 花费很长时间时会发生什么?SSH 会话或客户端连接是否会在
// 远程端关闭并导致流读取失败?
//
// 更重要的是,`runSshCommand` 函数内部对会话和客户端的延迟关闭
// 是否会影响调用方在函数已经返回但读取器尚未被
// 完全读取的情况下对返回流的读取?从这个简单的
// 演示来看,后一种情况似乎没有影响?
_, err = io.ReadFull(outs, result)
if err != nil {
fmt.Printf("%v\n", err)
return
}
fmt.Printf("main/caller output:\n%s\n", result)
}
func runSshCommand(addr string, cfg *ssh.ClientConfig, cmd string) (io.Reader, error) {
client, err := ssh.Dial("tcp", addr, cfg)
if err != nil {
return nil, fmt.Errorf("Create client failed %v", err)
}
defer client.Close()
// 打开会话
session, err := client.NewSession()
if err != nil {
return nil, fmt.Errorf("Create session failed %v", err)
}
defer session.Close()
stderr, err := session.StderrPipe()
if err != nil {
err = fmt.Errorf("cannot open stderr pipe for cmd '%s': %s", cmd, err)
return nil, err
}
stdout, err := session.StdoutPipe()
if err != nil {
err = fmt.Errorf("cannot open stdout pipe for cmd '%s': %s", cmd, err)
return nil, err
}
err = session.Run(cmd)
if err != nil {
err = fmt.Errorf("cannot run cmd '%s': %s", cmd, err)
return nil, err
}
combinedOutputStream := io.MultiReader(stdout, stderr)
return combinedOutputStream, nil
}
当我最初处理这段代码时,我不确定其运行结果(我的同事也有同样的想法,我们稍后会谈到),但示例代码确实可以工作。当我在工作中发布类似的代码进行审查时,一位同事提出了一个问题:io.Reader 的底层来源是 SSH 会话的 stdout,而被调用的函数对 SSH 资源(会话和客户端连接)有延迟关闭语句,那么在函数退出时,这些延迟关闭不会导致与返回的读取器关联的 stdout 被关闭(从而导致我们无法读取数据)吗?
因此,这里代码审查的问题是:
- 为什么示例代码仍然有效?我和我的同事是否对 stdout 相对于 SSH 客户端会话的运作方式有错误的假设?
- 在什么条件下代码会失效?如何修改示例以突出有问题的场景?
我假设一种思路是返回或传回对被关闭资源的引用,以便在从关联的读取器读取完数据或出错时根据需要关闭它们,而不是在被调用函数内部延迟关闭。这可能会让你认为,当传回给调用者时,读取器会超出作用域。但我认为这样做可能会让调用者变得繁琐和复杂,因为他们还必须管理关闭操作,并可能需要进行某种异步处理。在函数只是简单地读取所有 stdout 数据并返回一个字节切片而不是读取器的简化情况下,所有这些都不需要担心,但这是以消耗内存为代价来换取简化。
当您想要读取/传输大量数据(例如通过 SSH)但又不想在此过程中占用内存(或临时文件和磁盘空间)时,我们尝试使用像 io.Reader 接口这样的其他方式,对于这种情况,Go 的最佳实践是什么?我的方向对吗?还是有其他方法可以做到,或者我可以在这里进行一些改进?
更多关于Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html
combinedOutputStream := io.MultiReader(stdout, stderr)
io.Copy(os.Stdout, combinedOutputStream)
return combinedOutputStream, nil
通过插入这段文字,我想你应该能理解答案了。你实际执行的是 ssh user@host cmd。执行后,输出就会结束。
至于读取和生成大数据的问题,当然是转发,谁使用谁承担开销,而不是我们自己来解决这个问题。例如,解析视频时,经济高效地存储每一帧困难吗?不可能。
如果生成的是流数据,就应该将其视为流开销来处理,比如通过 io.Copy 复制到其他消费者连接,而不应过度缓冲并存储在本地。这是网络编程的基本思路。不要在应用层存储过多的缓冲区,因为内核层有自己的一套缓冲区(例如 TCP 的缓冲区)。流压力应尽可能放在内核层面处理。
更多关于Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
session.StderrPipe() 和 session.StdoutPipe() 是由 session 生成的。在你的代码中,你调用了
defer session.Close()。当你跳出 runSshCommand 函数时,你已经关闭了所有的管道,这意味着你返回的 io.Reader 已经没有意义了。 当父对象被关闭时,子函数自然也就不可用了。这个设计是合理的。
是的,我也是这么理解的。让我感到困惑(因此才有了这个论坛帖子)的是,如果你实际运行示例代码,在该函数外部返回的 reader 并不是 nil,并且可以读取数据,而不会立即返回错误或 EOF。这是在您建议修改测试代码之前的情况。它为什么会这样,正是我想知道的,但也许这个问题应该去问 Go SSH 库的开发者,以澄清其行为。
感谢您提供的修改后的代码建议。就我的情况而言,我希望保持接口,将一个 reader 交给调用者/用户,而不是 writer。对于建议的代码,这仍然有效,但用户现在需要处理如何将 writer 转换为 reader(如果他们需要那样做的话)。因此,如果我们在函数中封装这个逻辑,那么我想就需要使用 io.Pipe 来复制到管道 writer,然后返回管道 reader,并使用 goroutine 来防止管道 writer 和 reader 之间的阻塞。
等等,我查看了SSH的源代码,这非常有趣。
你使用的是session.Run(cmd)。它会一直阻塞,直到命令执行完成或发生异常才会结束。同时,因为你使用了stderr, err:= session.StderrPipe()和stdout, err:= session.StdoutPipe(),它们直接使用了一个内部缓冲区。描述中是链表类型的。
这意味着你的函数会在内部将输出数据存储在一个缓冲区中。
根据你的需求,一种有效的写法应该是这样的:
func runSshCommand(addr string, cfg *ssh.ClientConfig, cmd string) (r io.ReadCloser, err error) {
client, err := ssh.Dial("tcp", addr, cfg)
if err != nil {
return nil, fmt.Errorf("Create client failed %v", err)
}
defer func() {
if err != nil {
_ = client.Close()
}
}()
// open session
session, err := client.NewSession()
if err != nil {
return nil, fmt.Errorf("Create session failed %v", err)
}
defer func() {
if err != nil {
_ = session.Close()
}
}()
reader, writer := io.Pipe()
session.Stdout = writer
session.Stderr = writer
r = &_reader{ReadCloser: reader, s: session}
err = session.Start(cmd)
if err != nil {
err = fmt.Errorf("cannot run cmd '%s': %s", cmd, err)
return nil, err
}
go func() {
defer client.Close()
defer session.Close()
_ = writer.CloseWithError(session.Wait())
}()
return r, nil
}
type _reader struct {
io.ReadCloser
s *ssh.Session
}
func (r *_reader) Close() error {
if r.s != nil {
_ = r.s.Close()
}
return r.ReadCloser.Close()
}
通过插入这段文字,我想你应该能理解答案。你实际执行的是
ssh user@host cmd。执行后,输出就会结束。 … 如果生成了流数据,应该将其视为流开销来处理,例如使用io.Copy复制到其他消费者连接,而不应过度缓冲并存储在本地。这是网络编程的基本思路。不要在应用层存储过多缓冲区,因为内核层有自己的一套缓冲区(例如 TCP)。流压力应尽可能放在内核层面处理。
能否请你澄清/详细说明一下?你的例子和描述对我来说不是完全清楚,所以想确认一下。
你的意思是,返回的(指向)io.Reader(那个多重读取器)会导致 SSH 会话保持打开状态,直到管道中的数据被完全消费(你提到的 io.Copy 例子?我的代码里没有这个),或者至少直到远程端的 SSH 连接/会话关闭,以先发生者为准?如果是这样,这是否意味着,在函数返回(或由于返回读取器而退出作用域)后,会话和客户端的延迟关闭没有影响,或者实际上因为读取器在函数退出作用域时尚未被消费而取消了?这正是我感到困惑并想确认的地方。延迟语句绑定在它们所在的作用域内,也就是那个返回读取器供函数外部消费的函数。
或者,你的意思是(根据你下一段话),SSH 会话和连接确实在函数调用返回时已经关闭了,但操作系统/网络层以某种方式缓存了这些数据以便在本地处理,从而对 Go 代码进行了抽象?对于额外的、看不见的缓冲,这一点我多少能理解,但仅限于我给出的那个数据缓冲量很小的简单例子。对于大数据流经 SSH 连接的情况,比如一个需要几分钟到一小时才能完成的 MB 或 GB 级别的文件传输,这种设计就难以理解了——假设这些大数据在返回的 combinedOutputStream 中等待消费,而返回的函数有延迟关闭——根据 Go 代码,SSH 会话何时关闭?还是操作系统/网络层将实际的关闭时间从 Go 代码中抽象掉了?抱歉,我没有为这种大数据情况设置一个示例演示代码来测试。
我的问题/代码行为是否与这些相关?
x/crypto/ssh: cannot close ssh session
标签: NeedsInvestigation
问题描述
你使用的是哪个 Go 版本 (go version)?
$ go version
1.17
这个问题在最新版本中是否仍然存在?
是的
你使用的操作系统和处理器架构是什么 (go env)?
`go env` 输出
$ go env
GO111MODULE="on"
GOARCH="amd64"
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOPRIVATE=""
GOPROXY="https://goproxy.cn,direct"
GOROOT="/usr/local/go"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/usr/local/go/pkg/tool/darwin_amd64"
GOVCS=""
GOVERSION="go1.17.6"
GCCGO="gccgo"
AR="ar"
CC="clang"
CXX="clang++"
CGO_ENABLED="1"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/jx/d_08_gm93psc4v9lm3zzjc300000gn/T/go-build1736071007=/tmp/go-build -gno-record-gcc-switches -fno-common"
你做了什么?
func test() {
client, err := ssh.Dial("tcp", "xx.xx.xx.xx:22", &ssh.ClientConfig{
User: "root",
Auth: []ssh.AuthMethod{ssh.Password("123")},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
})
if err != nil {
log.Panic(err)
}
session, err := client.NewSession()
if err != nil {
log.Panic(err)
}
defer session.Close()
go func() {
// err := session.Run("while true;do echo test;done")
err := session.Run("sleep 6000")
if err != nil {
log.Println(err)
}
}()
time.Sleep(time.Second * 5)
err = session.Close()
if err != nil {
log.Println(err)
}
log.Println("finished")
}
你期望看到什么?
2022/04/12 13:35:17 finished
2022/04/12 13:35:17 Process exited with status 141 from signal PIPE
你实际看到了什么?
2022/04/12 13:35:17 finished
如果我运行 while true;do echo test;done,会话可以正常关闭。然而,sleep 6000 在这个进程结束后仍在运行。
os/exec: consider changing Wait to stop copying goroutines rather than waiting for them
标签: Proposal, Proposal-Accepted, FrozenDueToAge, early-in-cycle
当 os/exec.Cmd 的 Stdin、Stdout 或 Stderr 字段被设置为除 nil 或 *os.File 之外的任何值(常见情况是 *bytes.Buffer)时,我们会调用 os.Pipe 来获取管道并创建 goroutine 来复制数据进出。(*Cmd).Wait 方法首先等待子进程退出,然后等待这些 goroutine 完成数据复制。
如果子进程 C1 本身启动了一个子子进程 C2,并且如果 C1 将其任何 stdin/stdout/stderr 描述符传递给 C2,并且如果 C1 在等待 C2 退出之前就退出了,那么 C2 将持有由 os/exec 包创建的管道的打开端。(*Cmd).Wait 方法将等待直到 goroutine 完成,这意味着等待直到这些管道被关闭,实际上就是等待直到 C2 退出。这令人困惑,因为用户看到 C1 已经完成,但不理解为什么他们的程序仍在等待它。
这种困惑已经被多次报告为问题,至少包括 #7378、#18874、#20730、#21922、#22485。
它不一定非要这样工作。尽管当前的 goroutine 调用 io.Copy,但我们可以将它们改为使用循环。在每次 Read 之后,循环可以检查进程是否已被等待。然后 Wait 可以等待子进程,告诉 goroutine 停止,给它们最后一次写入的机会,然后返回。stdout/stderr goroutine 将关闭它们那一端的管道。不会有任何竞争条件,也不会有任何意外的等待。但在存在 C2 进程的情况下,我们当前收集的所有标准输出和标准错误输出将不会全部可用。
需要明确的是,程序员已经可以通过自己调用 os.Pipe 并在 Cmd 结构中使用管道来处理这些情况。现在是这样,如果我们改变它的工作方式,也仍然是这样。
我想提出的问题是:进行这种更改是否会减少人们的困惑?我们能否在不破坏 Go 1 保证的情况下进行这种更改?
os/exec: Wait waits for EOF on stdout pipe
标签: FrozenDueToAge
重现问题的步骤? 运行 http://play.golang.org/p/VwzEpAHA7M (无法在 playground 中运行,请在 mac/linux 上运行)
期望的输出是什么? 所有 3 个步骤都应该立即返回
你实际看到了什么?
第 2 步,当将 Stdout 重定向到 bytes.buffer 时,花费了 3 秒钟,这意味着它在后台进程 ("sleep 3 &") 返回后才返回
你使用的是哪个编译器 (5g, 6g, 8g, gccgo)? 6g
你使用的是哪个操作系统? Mac OS X 10.9.1 (13B42)
你使用的是哪个版本? (运行 ‘go version’) go version go1.2 darwin/amd64
请提供任何额外的信息。 在 exec.Cmd.Wait 中,它将等待所有 c.goroutine 的 errch 通道,无论进程是已完成还是在后台运行。这就是调用阻塞的地方。
是否意味着在我的代码作为客户端和远程服务器端之间,持有 stdout/reader 引用会保持 SSH 客户端端的连接处于活动状态,无论会话/客户端是否调用了 close(),直到读取器的数据被完全读取(或从任何代码执行/引用中超出作用域)?


