Python中如何用socket实现FTP并处理用户登录状态?
当用户正确登录以后,怎么保存这个状态,达到本次所有 ftp 操作不需要验证身份,客户端正常或意外退出后及时注销。如果再次登录则还需要验证身份的效果。小白求解。
Python中如何用socket实现FTP并处理用户登录状态?
6 回复
每次连接实例化一个 client 类?这不就有状态了
import socket
import threading
import os
import hashlib
import json
class FTPServer:
def __init__(self, host='0.0.0.0', port=2121):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.active_users = {} # {socket: {'username': 'xxx', 'logged_in': True}}
self.user_credentials = self.load_users()
def load_users(self):
"""加载用户凭证(实际项目应该用数据库)"""
try:
with open('users.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
# 默认用户
return {
'admin': hashlib.sha256('admin123'.encode()).hexdigest(),
'user': hashlib.sha256('password'.encode()).hexdigest()
}
def save_users(self):
"""保存用户数据"""
with open('users.json', 'w') as f:
json.dump(self.user_credentials, f)
def authenticate(self, username, password):
"""验证用户登录"""
if username in self.user_credentials:
hashed_pw = hashlib.sha256(password.encode()).hexdigest()
return self.user_credentials[username] == hashed_pw
return False
def handle_client(self, client_socket, address):
"""处理客户端连接"""
print(f"[+] 新连接: {address}")
self.active_users[client_socket] = {'logged_in': False, 'username': None}
try:
client_socket.send(b"220 FTP Server Ready\r\n")
while True:
data = client_socket.recv(1024).decode().strip()
if not data:
break
print(f"[{address}] 命令: {data}")
if data.startswith("USER"):
self.handle_user(client_socket, data)
elif data.startswith("PASS"):
self.handle_pass(client_socket, data)
elif data.startswith("LIST"):
self.handle_list(client_socket)
elif data.startswith("RETR"):
self.handle_retr(client_socket, data)
elif data.startswith("STOR"):
self.handle_stor(client_socket, data)
elif data.startswith("QUIT"):
client_socket.send(b"221 Goodbye\r\n")
break
else:
client_socket.send(b"500 Unknown command\r\n")
except Exception as e:
print(f"[-] 连接错误 {address}: {e}")
finally:
if client_socket in self.active_users:
del self.active_users[client_socket]
client_socket.close()
print(f"[-] 连接关闭: {address}")
def handle_user(self, client_socket, data):
"""处理USER命令"""
try:
username = data.split()[1]
self.active_users[client_socket]['username'] = username
client_socket.send(b"331 User name okay, need password\r\n")
except:
client_socket.send(b"501 Syntax error in parameters\r\n")
def handle_pass(self, client_socket, data):
"""处理PASS命令"""
if not self.active_users[client_socket]['username']:
client_socket.send(b"503 Bad sequence of commands\r\n")
return
try:
password = data.split()[1]
username = self.active_users[client_socket]['username']
if self.authenticate(username, password):
self.active_users[client_socket]['logged_in'] = True
client_socket.send(b"230 User logged in\r\n")
else:
client_socket.send(b"530 Login incorrect\r\n")
except:
client_socket.send(b"501 Syntax error\r\n")
def handle_list(self, client_socket):
"""处理LIST命令"""
if not self.check_login(client_socket):
return
try:
files = os.listdir('.')
file_list = "\r\n".join(files)
client_socket.send(f"150 Opening ASCII mode data connection\r\n".encode())
client_socket.send(f"{file_list}\r\n".encode())
client_socket.send(b"226 Transfer complete\r\n")
except:
client_socket.send(b"550 Failed to list directory\r\n")
def handle_retr(self, client_socket, data):
"""处理文件下载"""
if not self.check_login(client_socket):
return
try:
filename = data.split()[1]
if os.path.exists(filename):
client_socket.send(f"150 Opening data connection for {filename}\r\n".encode())
with open(filename, 'rb') as f:
while chunk := f.read(1024):
client_socket.send(chunk)
client_socket.send(b"226 Transfer complete\r\n")
else:
client_socket.send(b"550 File not found\r\n")
except:
client_socket.send(b"550 Failed to download file\r\n")
def handle_stor(self, client_socket, data):
"""处理文件上传"""
if not self.check_login(client_socket):
return
try:
filename = data.split()[1]
client_socket.send(b"150 Ready to receive file\r\n")
with open(filename, 'wb') as f:
while True:
chunk = client_socket.recv(1024)
if not chunk or b"226" in chunk:
break
f.write(chunk)
client_socket.send(b"226 Transfer complete\r\n")
except:
client_socket.send(b"550 Failed to upload file\r\n")
def check_login(self, client_socket):
"""检查用户是否登录"""
if not self.active_users[client_socket]['logged_in']:
client_socket.send(b"530 Not logged in\r\n")
return False
return True
def start(self):
"""启动服务器"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"[*] FTP服务器监听 {self.host}:{self.port}")
while True:
client_socket, address = self.server_socket.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address),
daemon=True
)
client_thread.start()
class FTPClient:
"""简单的FTP客户端示例"""
def __init__(self, host='127.0.0.1', port=2121):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
print(self.sock.recv(1024).decode())
def send_command(self, command):
self.sock.send(f"{command}\r\n".encode())
return self.sock.recv(4096).decode()
def login(self, username, password):
print(self.send_command(f"USER {username}"))
print(self.send_command(f"PASS {password}"))
# 使用示例
if __name__ == "__main__":
# 启动服务器
server = FTPServer()
# 在另一个线程启动服务器
import threading
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()
# 等待服务器启动
import time
time.sleep(1)
# 测试客户端
client = FTPClient()
client.login("admin", "admin123")
print(client.send_command("LIST"))
print(client.send_command("QUIT"))
这个实现包含了FTP服务器的核心功能:用户认证、登录状态管理、文件列表、上传下载。关键点:
- 用户状态管理:用
active_users字典跟踪每个连接的登录状态 - 认证流程:标准的USER/PASS命令序列,密码用SHA256哈希存储
- 线程安全:每个客户端连接在独立线程中处理
- 基本FTP命令:实现了LIST、RETR、STOR、QUIT等基本命令
用户数据保存在users.json文件中,实际项目应该用数据库。登录状态通过检查active_users[client_socket]['logged_in']来验证,确保未登录用户不能执行文件操作。
总结:用字典管理连接状态,每个命令前检查登录状态。
ftp 不像 http ,是有状态的。你需要维护一个套接字=>状态的映射,记下连接的状态,包括是否已经验证过身份
好吧上面说的服务端的情况。不过客户端你仍然需要记下状态
不考虑其他身份验证的话,只能存密码,或者保证连接不断
谢谢,我试试,貌似得抓各种退出的异常。

