Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现

Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现 我正在处理一些代码,这些代码返回一个用于数据提取的 io.Reader,而不是将完整输出作为字节切片返回,因为对于非常大的数据,后者会占用更多内存。

对于这个问题,它是针对 SSH 客户端接口提出的,但也可以适用于任何其底层资源在某个时刻会被关闭的读取器。

示例测试/演示代码:Go Playground - The Go Programming Language。它在我的计算机上本地运行,也可以在 Go Playground 中运行,不过有时 Playground 沙箱环境的执行会失败。我之前也针对真实的 SSH 连接测试过代码,而不仅仅是使用这个使用测试/模拟 SSH 服务器的演示代码,但在这里展示此代码更容易进行演示。

package main

import (
	"fmt"
	"io"
	"net"

	"github.com/metarsit/sshtest"
	"golang.org/x/crypto/ssh"
)

func main() {
	addr := "localhost:2222"
  // 对于真实的演示,数据理想情况下应该是几百 MB 大小的非常大的字符串或字节数组
	data := "supposedly some very large data being streamed for I/O processing"

	// 初始化虚拟测试服务器,以连接到正在测试的 SSH 客户端相关代码
	hp := sshtest.NewHoneyPot(addr)

	// 在后台启动服务器
	go func() {
		hp.ListenAndServe()
	}()
	defer hp.Close()

	hp.SetReturnString(data)

	// 初始化 SSH 客户端依赖项,以测试执行命令和获取输出流
	cfg := &ssh.ClientConfig{
		User: "jdoe",
		Auth: []ssh.AuthMethod{
			ssh.Password("secret"),
		},
		HostKeyCallback: ssh.HostKeyCallback(
			func(hostname string, remote net.Addr, key ssh.PublicKey) error {
				return nil
			},
		),
	}

	outs, err := runSshCommand(addr, cfg, "echo \"hello world!\"")
	if err != nil {
		fmt.Printf("%v\n", err)
    return
	}
	//result, err := io.ReadAll(outs)
  result := make([]byte, 20) // 演示对读取器进行任意的部分读取
  // 注意/待办:当完全读取返回的 "outs" 读取器
  // 花费很长时间时会发生什么?SSH 会话或客户端连接是否会在
  // 远程端关闭并导致流读取失败?
  //
  // 更重要的是,`runSshCommand` 函数内部对会话和客户端的延迟关闭
  // 是否会影响调用方在函数已经返回但读取器尚未被
  // 完全读取的情况下对返回流的读取?从这个简单的
  // 演示来看,后一种情况似乎没有影响?
  _, err = io.ReadFull(outs, result)
	if err != nil {
		fmt.Printf("%v\n", err)
    return
	}
	fmt.Printf("main/caller output:\n%s\n", result)
}

func runSshCommand(addr string, cfg *ssh.ClientConfig, cmd string) (io.Reader, error) {

  client, err := ssh.Dial("tcp", addr, cfg)
	if err != nil {
		return nil, fmt.Errorf("Create client failed %v", err)
	}
	defer client.Close()

	// 打开会话
	session, err := client.NewSession()
	if err != nil {
		return nil, fmt.Errorf("Create session failed %v", err)
	}
	defer session.Close()

	stderr, err := session.StderrPipe()
	if err != nil {
		err = fmt.Errorf("cannot open stderr pipe for cmd '%s': %s", cmd, err)
		return nil, err
	}

	stdout, err := session.StdoutPipe()
	if err != nil {
		err = fmt.Errorf("cannot open stdout pipe for cmd '%s': %s", cmd, err)
		return nil, err
	}

	err = session.Run(cmd)
	if err != nil {
		err = fmt.Errorf("cannot run cmd '%s': %s", cmd, err)
		return nil, err
	}

	combinedOutputStream := io.MultiReader(stdout, stderr)

	return combinedOutputStream, nil
}

当我最初处理这段代码时,我不确定其运行结果(我的同事也有同样的想法,我们稍后会谈到),但示例代码确实可以工作。当我在工作中发布类似的代码进行审查时,一位同事提出了一个问题:io.Reader 的底层来源是 SSH 会话的 stdout,而被调用的函数对 SSH 资源(会话和客户端连接)有延迟关闭语句,那么在函数退出时,这些延迟关闭不会导致与返回的读取器关联的 stdout 被关闭(从而导致我们无法读取数据)吗?

因此,这里代码审查的问题是:

  • 为什么示例代码仍然有效?我和我的同事是否对 stdout 相对于 SSH 客户端会话的运作方式有错误的假设?
  • 在什么条件下代码会失效?如何修改示例以突出有问题的场景?

我假设一种思路是返回或传回对被关闭资源的引用,以便在从关联的读取器读取完数据或出错时根据需要关闭它们,而不是在被调用函数内部延迟关闭。这可能会让你认为,当传回给调用者时,读取器会超出作用域。但我认为这样做可能会让调用者变得繁琐和复杂,因为他们还必须管理关闭操作,并可能需要进行某种异步处理。在函数只是简单地读取所有 stdout 数据并返回一个字节切片而不是读取器的简化情况下,所有这些都不需要担心,但这是以消耗内存为代价来换取简化。

当您想要读取/传输大量数据(例如通过 SSH)但又不想在此过程中占用内存(或临时文件和磁盘空间)时,我们尝试使用像 io.Reader 接口这样的其他方式,对于这种情况,Go 的最佳实践是什么?我的方向对吗?还是有其他方法可以做到,或者我可以在这里进行一些改进?


更多关于Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复
combinedOutputStream := io.MultiReader(stdout, stderr)

io.Copy(os.Stdout, combinedOutputStream)

return combinedOutputStream, nil

通过插入这段文字,我想你应该能理解答案了。你实际执行的是 ssh user@host cmd。执行后,输出就会结束。

至于读取和生成大数据的问题,当然是转发,谁使用谁承担开销,而不是我们自己来解决这个问题。例如,解析视频时,经济高效地存储每一帧困难吗?不可能。 如果生成的是流数据,就应该将其视为流开销来处理,比如通过 io.Copy 复制到其他消费者连接,而不应过度缓冲并存储在本地。这是网络编程的基本思路。不要在应用层存储过多的缓冲区,因为内核层有自己的一套缓冲区(例如 TCP 的缓冲区)。流压力应尽可能放在内核层面处理。

更多关于Golang中函数调用内返回SSH客户端命令输出及延迟关闭SSH资源的实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


session.StderrPipe() 和 session.StdoutPipe() 是由 session 生成的。在你的代码中,你调用了 defer session.Close()。当你跳出 runSshCommand 函数时,你已经关闭了所有的管道,这意味着你返回的 io.Reader 已经没有意义了。 当父对象被关闭时,子函数自然也就不可用了。这个设计是合理的。

是的,我也是这么理解的。让我感到困惑(因此才有了这个论坛帖子)的是,如果你实际运行示例代码,在该函数外部返回的 reader 并不是 nil,并且可以读取数据,而不会立即返回错误或 EOF。这是在您建议修改测试代码之前的情况。它为什么会这样,正是我想知道的,但也许这个问题应该去问 Go SSH 库的开发者,以澄清其行为。

感谢您提供的修改后的代码建议。就我的情况而言,我希望保持接口,将一个 reader 交给调用者/用户,而不是 writer。对于建议的代码,这仍然有效,但用户现在需要处理如何将 writer 转换为 reader(如果他们需要那样做的话)。因此,如果我们在函数中封装这个逻辑,那么我想就需要使用 io.Pipe 来复制到管道 writer,然后返回管道 reader,并使用 goroutine 来防止管道 writer 和 reader 之间的阻塞。

等等,我查看了SSH的源代码,这非常有趣。 你使用的是session.Run(cmd)。它会一直阻塞,直到命令执行完成或发生异常才会结束。同时,因为你使用了stderr, err:= session.StderrPipe()stdout, err:= session.StdoutPipe(),它们直接使用了一个内部缓冲区。描述中是链表类型的。 这意味着你的函数会在内部将输出数据存储在一个缓冲区中。 根据你的需求,一种有效的写法应该是这样的:

func runSshCommand(addr string, cfg *ssh.ClientConfig, cmd string) (r io.ReadCloser, err error) {
	client, err := ssh.Dial("tcp", addr, cfg)
	if err != nil {
		return nil, fmt.Errorf("Create client failed %v", err)
	}
	defer func() {
		if err != nil {
			_ = client.Close()
		}
	}()

	// open session
	session, err := client.NewSession()
	if err != nil {
		return nil, fmt.Errorf("Create session failed %v", err)
	}
	defer func() {
		if err != nil {
			_ = session.Close()
		}
	}()

	reader, writer := io.Pipe()
	session.Stdout = writer
	session.Stderr = writer

	r = &_reader{ReadCloser: reader, s: session}

	err = session.Start(cmd)
	if err != nil {
		err = fmt.Errorf("cannot run cmd '%s': %s", cmd, err)
		return nil, err
	}
	go func() {
		defer client.Close()
		defer session.Close()
		_ = writer.CloseWithError(session.Wait())
	}()
	return r, nil
}

type _reader struct {
	io.ReadCloser
	s *ssh.Session
}

func (r *_reader) Close() error {
	if r.s != nil {
		_ = r.s.Close()
	}
	return r.ReadCloser.Close()
}

通过插入这段文字,我想你应该能理解答案。你实际执行的是 ssh user@host cmd。执行后,输出就会结束。 … 如果生成了流数据,应该将其视为流开销来处理,例如使用 io.Copy 复制到其他消费者连接,而不应过度缓冲并存储在本地。这是网络编程的基本思路。不要在应用层存储过多缓冲区,因为内核层有自己的一套缓冲区(例如 TCP)。流压力应尽可能放在内核层面处理。

能否请你澄清/详细说明一下?你的例子和描述对我来说不是完全清楚,所以想确认一下。

你的意思是,返回的(指向)io.Reader(那个多重读取器)会导致 SSH 会话保持打开状态,直到管道中的数据被完全消费(你提到的 io.Copy 例子?我的代码里没有这个),或者至少直到远程端的 SSH 连接/会话关闭,以先发生者为准?如果是这样,这是否意味着,在函数返回(或由于返回读取器而退出作用域)后,会话和客户端的延迟关闭没有影响,或者实际上因为读取器在函数退出作用域时尚未被消费而取消了?这正是我感到困惑并想确认的地方。延迟语句绑定在它们所在的作用域内,也就是那个返回读取器供函数外部消费的函数。

或者,你的意思是(根据你下一段话),SSH 会话和连接确实在函数调用返回时已经关闭了,但操作系统/网络层以某种方式缓存了这些数据以便在本地处理,从而对 Go 代码进行了抽象?对于额外的、看不见的缓冲,这一点我多少能理解,但仅限于我给出的那个数据缓冲量很小的简单例子。对于大数据流经 SSH 连接的情况,比如一个需要几分钟到一小时才能完成的 MB 或 GB 级别的文件传输,这种设计就难以理解了——假设这些大数据在返回的 combinedOutputStream 中等待消费,而返回的函数有延迟关闭——根据 Go 代码,SSH 会话何时关闭?还是操作系统/网络层将实际的关闭时间从 Go 代码中抽象掉了?抱歉,我没有为这种大数据情况设置一个示例演示代码来测试。

你说的话让人很困惑。我只能解释一下我认为你的困惑点在哪里。这本质上是一个流处理的问题。

session.StderrPipe()session.StdoutPipe() 是由 session 生成的。在你的代码中,你调用了 defer session.Close()。当你跳出 runSshCommand 函数时,你已经关闭了所有的管道,这意味着你返回的 io.Reader 已经没有意义了。

当父对象被关闭时,子函数自然也就不可用了。这个设计是合理的。

根据你的 runSshCommand 函数,稍微修改后的方法应该是这样的:

func runSshCommand2(addr string, cfg *ssh.ClientConfig, cmd string, dst io.Writer) error {
	client, err := ssh.Dial("tcp", addr, cfg)
	if err != nil {
		return fmt.Errorf("Create client failed %v", err)
	}
	defer client.Close()

	// open session
	session, err := client.NewSession()
	if err != nil {
		return fmt.Errorf("Create session failed %v", err)
	}
	defer session.Close()

	stderr, err := session.StderrPipe()
	if err != nil {
		err = fmt.Errorf("cannot open stderr pipe for cmd '%s': %s", cmd, err)
		return err
	}

	stdout, err := session.StdoutPipe()
	if err != nil {
		err = fmt.Errorf("cannot open stdout pipe for cmd '%s': %s", cmd, err)
		return err
	}

	err = session.Run(cmd)
	if err != nil {
		err = fmt.Errorf("cannot run cmd '%s': %s", cmd, err)
		return err
	}

	combinedOutputStream := io.MultiReader(stdout, stderr)

	_, err = io.Copy(dst, combinedOutputStream)
	return err
}

我不确定你想探究什么,但 ssh 属于网络编程,所以它也符合以下逻辑:(假设 ssh 使用的连接是 tcpconn)

  1. 当底层连接断开或异常时,上层包装器也会断开或异常(例如,tcpconn 断开,ssh 连接也断开)。
  2. 当读取 ssh 连接数据时,会触发向下读取 tcpconn 数据。
  3. 一个好的网络库不会预先将未知大小的数据从底层读入缓冲区。
  4. 当你关闭上层连接时,不一定关闭下层连接,因为它可能被复用。所以处理程序会直接丢弃关于上层被关闭的连接数据。
  5. 还有很多,但这是我目前能想到的。

我的问题/代码行为是否与这些相关?

x/crypto/ssh: cannot close ssh session

标签: NeedsInvestigation

问题描述

你使用的是哪个 Go 版本 (go version)?
$ go version
1.17
这个问题在最新版本中是否仍然存在?

是的

你使用的操作系统和处理器架构是什么 (go env)?
`go env` 输出
$ go env
GO111MODULE="on"
GOARCH="amd64"
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOPRIVATE=""
GOPROXY="https://goproxy.cn,direct"
GOROOT="/usr/local/go"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/usr/local/go/pkg/tool/darwin_amd64"
GOVCS=""
GOVERSION="go1.17.6"
GCCGO="gccgo"
AR="ar"
CC="clang"
CXX="clang++"
CGO_ENABLED="1"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/jx/d_08_gm93psc4v9lm3zzjc300000gn/T/go-build1736071007=/tmp/go-build -gno-record-gcc-switches -fno-common"
你做了什么?
func test() {
	client, err := ssh.Dial("tcp", "xx.xx.xx.xx:22", &ssh.ClientConfig{
		User:            "root",
		Auth:            []ssh.AuthMethod{ssh.Password("123")},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
	})
	if err != nil {
		log.Panic(err)
	}
	session, err := client.NewSession()
	if err != nil {
		log.Panic(err)
	}
	defer session.Close()
	go func() {
		// err := session.Run("while true;do echo test;done")
		err := session.Run("sleep 6000")
		if err != nil {
			log.Println(err)
		}
	}()
	time.Sleep(time.Second * 5)

	err = session.Close()
	if err != nil {
		log.Println(err)
	}
	log.Println("finished")
}
你期望看到什么?
2022/04/12 13:35:17 finished
2022/04/12 13:35:17 Process exited with status 141 from signal PIPE
你实际看到了什么?
2022/04/12 13:35:17 finished

如果我运行 while true;do echo test;done,会话可以正常关闭。然而,sleep 6000 在这个进程结束后仍在运行。


os/exec: consider changing Wait to stop copying goroutines rather than waiting for them

标签: Proposal, Proposal-Accepted, FrozenDueToAge, early-in-cycle

os/exec.CmdStdinStdoutStderr 字段被设置为除 nil*os.File 之外的任何值(常见情况是 *bytes.Buffer)时,我们会调用 os.Pipe 来获取管道并创建 goroutine 来复制数据进出。(*Cmd).Wait 方法首先等待子进程退出,然后等待这些 goroutine 完成数据复制。

如果子进程 C1 本身启动了一个子子进程 C2,并且如果 C1 将其任何 stdin/stdout/stderr 描述符传递给 C2,并且如果 C1 在等待 C2 退出之前就退出了,那么 C2 将持有由 os/exec 包创建的管道的打开端。(*Cmd).Wait 方法将等待直到 goroutine 完成,这意味着等待直到这些管道被关闭,实际上就是等待直到 C2 退出。这令人困惑,因为用户看到 C1 已经完成,但不理解为什么他们的程序仍在等待它。

这种困惑已经被多次报告为问题,至少包括 #7378、#18874、#20730、#21922、#22485。

它不一定非要这样工作。尽管当前的 goroutine 调用 io.Copy,但我们可以将它们改为使用循环。在每次 Read 之后,循环可以检查进程是否已被等待。然后 Wait 可以等待子进程,告诉 goroutine 停止,给它们最后一次写入的机会,然后返回。stdout/stderr goroutine 将关闭它们那一端的管道。不会有任何竞争条件,也不会有任何意外的等待。但在存在 C2 进程的情况下,我们当前收集的所有标准输出和标准错误输出将不会全部可用。

需要明确的是,程序员已经可以通过自己调用 os.Pipe 并在 Cmd 结构中使用管道来处理这些情况。现在是这样,如果我们改变它的工作方式,也仍然是这样。

我想提出的问题是:进行这种更改是否会减少人们的困惑?我们能否在不破坏 Go 1 保证的情况下进行这种更改?


os/exec: Wait waits for EOF on stdout pipe

标签: FrozenDueToAge

重现问题的步骤? 运行 http://play.golang.org/p/VwzEpAHA7M (无法在 playground 中运行,请在 mac/linux 上运行)

期望的输出是什么? 所有 3 个步骤都应该立即返回

你实际看到了什么? 第 2 步,当将 Stdout 重定向到 bytes.buffer 时,花费了 3 秒钟,这意味着它在后台进程 ("sleep 3 &") 返回后才返回

你使用的是哪个编译器 (5g, 6g, 8g, gccgo)? 6g

你使用的是哪个操作系统? Mac OS X 10.9.1 (13B42)

你使用的是哪个版本? (运行 ‘go version’) go version go1.2 darwin/amd64

请提供任何额外的信息。 在 exec.Cmd.Wait 中,它将等待所有 c.goroutine 的 errch 通道,无论进程是已完成还是在后台运行。这就是调用阻塞的地方。


是否意味着在我的代码作为客户端和远程服务器端之间,持有 stdout/reader 引用会保持 SSH 客户端端的连接处于活动状态,无论会话/客户端是否调用了 close(),直到读取器的数据被完全读取(或从任何代码执行/引用中超出作用域)?

回到顶部