Python中如何实现远程执行命令的工具并优化代码?

闲着没事写了个轻量的远程命令执行的小工具,没什么头绪了,求前辈们给点意见或者方向,地址: https://github.com/tonnie17/flowlight ,轻喷...


Python中如何实现远程执行命令的工具并优化代码?
8 回复

问题在哪?


import paramiko
import concurrent.futures
import logging
from typing import List, Dict, Optional

class RemoteCommandExecutor:
    def __init__(self, host: str, username: str, password: str = None, key_file: str = None):
        self.host = host
        self.username = username
        self.password = password
        self.key_file = key_file
        self.client = None
        
    def __enter__(self):
        self.connect()
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()
        
    def connect(self):
        """建立SSH连接"""
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        if self.key_file:
            self.client.connect(self.host, username=self.username, key_filename=self.key_file)
        else:
            self.client.connect(self.host, username=self.username, password=self.password)
            
    def disconnect(self):
        """关闭SSH连接"""
        if self.client:
            self.client.close()
            
    def execute(self, command: str, timeout: int = 30) -> Dict:
        """执行单个命令"""
        try:
            stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
            return {
                'success': True,
                'output': stdout.read().decode('utf-8').strip(),
                'error': stderr.read().decode('utf-8').strip(),
                'exit_code': stdout.channel.recv_exit_status()
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'output': '',
                'exit_code': -1
            }

class BatchRemoteExecutor:
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
        
    def execute_on_hosts(self, hosts_config: List[Dict], command: str) -> Dict[str, Dict]:
        """批量在多台主机上执行命令"""
        results = {}
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_host = {
                executor.submit(self._execute_single, config, command): config['host']
                for config in hosts_config
            }
            
            for future in concurrent.futures.as_completed(future_to_host):
                host = future_to_host[future]
                try:
                    results[host] = future.result()
                except Exception as e:
                    results[host] = {
                        'success': False,
                        'error': str(e)
                    }
                    
        return results
        
    def _execute_single(self, config: Dict, command: str) -> Dict:
        """单台主机执行命令"""
        with RemoteCommandExecutor(
            host=config['host'],
            username=config['username'],
            password=config.get('password'),
            key_file=config.get('key_file')
        ) as executor:
            return executor.execute(command)

# 使用示例
if __name__ == "__main__":
    # 配置多台主机
    hosts = [
        {'host': '192.168.1.100', 'username': 'user1', 'password': 'pass1'},
        {'host': '192.168.1.101', 'username': 'user2', 'key_file': '/path/to/key.pem'}
    ]
    
    # 创建批量执行器
    batch_executor = BatchRemoteExecutor(max_workers=5)
    
    # 在所有主机上执行命令
    results = batch_executor.execute_on_hosts(
        hosts_config=hosts,
        command="uptime && df -h"
    )
    
    # 输出结果
    for host, result in results.items():
        print(f"Host: {host}")
        print(f"Success: {result['success']}")
        print(f"Output: {result.get('output', '')}")
        print(f"Error: {result.get('error', '')}")
        print("-" * 50)

这个实现用了几个关键优化:

  1. 连接管理:用上下文管理器自动处理SSH连接的打开和关闭,避免资源泄漏
  2. 错误处理:每个命令执行都有完整的成功/失败状态和错误信息
  3. 批量执行:用线程池并发执行,通过max_workers控制并发数
  4. 结果结构:统一的结果字典格式,包含输出、错误码和状态
  5. 认证灵活:支持密码和密钥文件两种认证方式

核心是BatchRemoteExecutor类,它管理多台主机的并发命令执行。RemoteCommandExecutor处理单台主机的SSH连接和命令执行。用ThreadPoolExecutor实现并发,通过max_workers参数控制最大并发连接数,避免同时打开太多SSH连接。

参数化所有配置,命令执行有超时机制,返回结构化的结果方便后续处理。代码里用了类型提示,让参数和返回值更清晰。

用的时候先配好主机列表,创建执行器实例,调用execute_on_hosts方法就行。结果以主机名为key的字典返回,包含每台主机的执行详情。

简单说就是:用paramiko做SSH,线程池做并发,上下文管理器管资源。

我想做成一个方便调试集群的东西,怎么提高在实际开发中的实用性?

看看 ansible ?

Fabric 就行了

多看看轮子 再造轮子

感谢各位建议, ansible 和 Fabric 都用过,有很多功能都用不上,其实目的不在于造轮子,而是方便在自己工作上使用,想着封装 paramiko 定制一个更易用的东西

saltstack 值得看下

如果还想 hacking , 可以看看 ZeroMQ 的网络编程相关

btw, 没有头绪,最好先定几个”需求场景”,再去 start coding

回到顶部