Python中pty模块怎么用?

为了学习 pty.fork(),尝试写了下面的程序:
import os
import pty
import sys

(pid, fd)=os.forkpty()

if pid==0: #child
sys.stdout.write(sys.stdin.read(1024)[::-1])
else: #parent
print “parent running”
os.write(fd, “msg from parent to child”)
print os.read(fd,1024)


结果发现,最后一行 os.read 读入的居然是刚刚 write 出去的字符串
也就是子进程并没有读到主进程发来的字符串
这是什么原因呢?

改进的程序如下:
import os
import pty
import sys
import time

(pid, fd)=os.forkpty()

if pid==0: #child
sys.stderr.write(sys.stdin.read(10)[::-1])
sys.stderr.flush()
else: #parent
print “parent running”
print os.read(fd,0)
os.write(fd, “msg from parent to child\n”)
time.sleep(1)
print os.read(fd,1024)

主进程写过之后 sleep 一下再读;子进程读取较短的字节数,就可以成功了
经过多次尝试,主进程写之后、读之前的 sleep 只能算“对整体结果有改善”但不是因果关系,偶尔还是会失败。所以看起来这个是操作系统调度多进程方面的问题

但子进程如果不缩短读取长度,在我的尝试里是全部失败的,这个我就想不明白了
Python中pty模块怎么用?


3 回复

Python的pty模块主要用于创建伪终端(pseudo-terminal),这在需要模拟终端交互的场景中很有用,比如SSH服务器、终端模拟器或自动化脚本。

基本用法示例:

import os
import pty
import select

def create_pty():
    # 创建主从伪终端
    master, slave = pty.openpty()
    
    # 获取终端名称
    slave_name = os.ttyname(slave)
    print(f"Slave terminal: {slave_name}")
    
    # 写入数据到从终端
    os.write(master, b"Hello from master!\n")
    
    # 从主终端读取数据
    rlist, _, _ = select.select([master], [], [], 1)
    if rlist:
        data = os.read(master, 1024)
        print(f"Received: {data.decode()}")
    
    os.close(master)
    os.close(slave)

if __name__ == "__main__":
    create_pty()

更实用的例子 - 运行交互式命令:

import os
import pty
import subprocess
import select

def run_interactive_command():
    master, slave = pty.openpty()
    
    # 在子进程中运行bash
    pid = os.fork()
    if pid == 0:  # 子进程
        os.close(master)
        os.setsid()
        os.dup2(slave, 0)  # 标准输入
        os.dup2(slave, 1)  # 标准输出
        os.dup2(slave, 2)  # 标准错误
        os.close(slave)
        
        # 启动交互式shell
        os.execlp('bash', 'bash')
    else:  # 父进程
        os.close(slave)
        
        # 发送命令
        os.write(master, b'echo "Hello from pty!"\n')
        os.write(master, b'exit\n')
        
        # 读取输出
        while True:
            rlist, _, _ = select.select([master], [], [], 0.5)
            if not rlist:
                break
            try:
                data = os.read(master, 1024)
                if data:
                    print(data.decode(), end='')
            except OSError:
                break
        
        os.close(master)
        os.waitpid(pid, 0)

if __name__ == "__main__":
    run_interactive_command()

关键点说明:

  1. openpty()返回主从文件描述符对
  2. 主端用于读写,从端通常交给子进程
  3. 需要处理文件描述符的关闭和清理
  4. 使用select/poll处理非阻塞读写

常见用途:

  • 终端模拟器开发
  • 自动化测试终端应用
  • 远程访问服务(如SSH)
  • 需要终端特性的子进程管理

一句话建议: 用pty处理需要终端交互的场景,注意正确管理文件描述符。


排版乱了……还好只有一层缩紧,大家努力想象一下

编辑的时候右下有个 default ,选择 makedown 就行了哦~

回到顶部