Python中如何实现远程执行命令的工具并优化代码?
闲着没事写了个轻量的远程命令执行的小工具,没什么头绪了,求前辈们给点意见或者方向,地址: https://github.com/tonnie17/flowlight ,轻喷...
Python中如何实现远程执行命令的工具并优化代码?
8 回复
问题在哪?
import paramiko
import concurrent.futures
import logging
from typing import List, Dict, Optional
class RemoteCommandExecutor:
def __init__(self, host: str, username: str, password: str = None, key_file: str = None):
self.host = host
self.username = username
self.password = password
self.key_file = key_file
self.client = None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
def connect(self):
"""建立SSH连接"""
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.key_file:
self.client.connect(self.host, username=self.username, key_filename=self.key_file)
else:
self.client.connect(self.host, username=self.username, password=self.password)
def disconnect(self):
"""关闭SSH连接"""
if self.client:
self.client.close()
def execute(self, command: str, timeout: int = 30) -> Dict:
"""执行单个命令"""
try:
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
return {
'success': True,
'output': stdout.read().decode('utf-8').strip(),
'error': stderr.read().decode('utf-8').strip(),
'exit_code': stdout.channel.recv_exit_status()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'output': '',
'exit_code': -1
}
class BatchRemoteExecutor:
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
def execute_on_hosts(self, hosts_config: List[Dict], command: str) -> Dict[str, Dict]:
"""批量在多台主机上执行命令"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_host = {
executor.submit(self._execute_single, config, command): config['host']
for config in hosts_config
}
for future in concurrent.futures.as_completed(future_to_host):
host = future_to_host[future]
try:
results[host] = future.result()
except Exception as e:
results[host] = {
'success': False,
'error': str(e)
}
return results
def _execute_single(self, config: Dict, command: str) -> Dict:
"""单台主机执行命令"""
with RemoteCommandExecutor(
host=config['host'],
username=config['username'],
password=config.get('password'),
key_file=config.get('key_file')
) as executor:
return executor.execute(command)
# 使用示例
if __name__ == "__main__":
# 配置多台主机
hosts = [
{'host': '192.168.1.100', 'username': 'user1', 'password': 'pass1'},
{'host': '192.168.1.101', 'username': 'user2', 'key_file': '/path/to/key.pem'}
]
# 创建批量执行器
batch_executor = BatchRemoteExecutor(max_workers=5)
# 在所有主机上执行命令
results = batch_executor.execute_on_hosts(
hosts_config=hosts,
command="uptime && df -h"
)
# 输出结果
for host, result in results.items():
print(f"Host: {host}")
print(f"Success: {result['success']}")
print(f"Output: {result.get('output', '')}")
print(f"Error: {result.get('error', '')}")
print("-" * 50)
这个实现用了几个关键优化:
- 连接管理:用上下文管理器自动处理SSH连接的打开和关闭,避免资源泄漏
- 错误处理:每个命令执行都有完整的成功/失败状态和错误信息
- 批量执行:用线程池并发执行,通过
max_workers控制并发数 - 结果结构:统一的结果字典格式,包含输出、错误码和状态
- 认证灵活:支持密码和密钥文件两种认证方式
核心是BatchRemoteExecutor类,它管理多台主机的并发命令执行。RemoteCommandExecutor处理单台主机的SSH连接和命令执行。用ThreadPoolExecutor实现并发,通过max_workers参数控制最大并发连接数,避免同时打开太多SSH连接。
参数化所有配置,命令执行有超时机制,返回结构化的结果方便后续处理。代码里用了类型提示,让参数和返回值更清晰。
用的时候先配好主机列表,创建执行器实例,调用execute_on_hosts方法就行。结果以主机名为key的字典返回,包含每台主机的执行详情。
简单说就是:用paramiko做SSH,线程池做并发,上下文管理器管资源。
我想做成一个方便调试集群的东西,怎么提高在实际开发中的实用性?
看看 ansible ?
Fabric 就行了
多看看轮子 再造轮子
感谢各位建议, ansible 和 Fabric 都用过,有很多功能都用不上,其实目的不在于造轮子,而是方便在自己工作上使用,想着封装 paramiko 定制一个更易用的东西
saltstack 值得看下
如果还想 hacking , 可以看看 ZeroMQ 的网络编程相关
btw, 没有头绪,最好先定几个”需求场景”,再去 start coding

