Python中如何用socket实现FTP并处理用户登录状态?

当用户正确登录以后,怎么保存这个状态,达到本次所有 ftp 操作不需要验证身份,客户端正常或意外退出后及时注销。如果再次登录则还需要验证身份的效果。小白求解。
Python中如何用socket实现FTP并处理用户登录状态?

6 回复

每次连接实例化一个 client 类?这不就有状态了


import socket
import threading
import os
import hashlib
import json

class FTPServer:
    def __init__(self, host='0.0.0.0', port=2121):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.active_users = {}  # {socket: {'username': 'xxx', 'logged_in': True}}
        self.user_credentials = self.load_users()
        
    def load_users(self):
        """加载用户凭证(实际项目应该用数据库)"""
        try:
            with open('users.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # 默认用户
            return {
                'admin': hashlib.sha256('admin123'.encode()).hexdigest(),
                'user': hashlib.sha256('password'.encode()).hexdigest()
            }
    
    def save_users(self):
        """保存用户数据"""
        with open('users.json', 'w') as f:
            json.dump(self.user_credentials, f)
    
    def authenticate(self, username, password):
        """验证用户登录"""
        if username in self.user_credentials:
            hashed_pw = hashlib.sha256(password.encode()).hexdigest()
            return self.user_credentials[username] == hashed_pw
        return False
    
    def handle_client(self, client_socket, address):
        """处理客户端连接"""
        print(f"[+] 新连接: {address}")
        self.active_users[client_socket] = {'logged_in': False, 'username': None}
        
        try:
            client_socket.send(b"220 FTP Server Ready\r\n")
            
            while True:
                data = client_socket.recv(1024).decode().strip()
                if not data:
                    break
                    
                print(f"[{address}] 命令: {data}")
                
                if data.startswith("USER"):
                    self.handle_user(client_socket, data)
                elif data.startswith("PASS"):
                    self.handle_pass(client_socket, data)
                elif data.startswith("LIST"):
                    self.handle_list(client_socket)
                elif data.startswith("RETR"):
                    self.handle_retr(client_socket, data)
                elif data.startswith("STOR"):
                    self.handle_stor(client_socket, data)
                elif data.startswith("QUIT"):
                    client_socket.send(b"221 Goodbye\r\n")
                    break
                else:
                    client_socket.send(b"500 Unknown command\r\n")
                    
        except Exception as e:
            print(f"[-] 连接错误 {address}: {e}")
        finally:
            if client_socket in self.active_users:
                del self.active_users[client_socket]
            client_socket.close()
            print(f"[-] 连接关闭: {address}")
    
    def handle_user(self, client_socket, data):
        """处理USER命令"""
        try:
            username = data.split()[1]
            self.active_users[client_socket]['username'] = username
            client_socket.send(b"331 User name okay, need password\r\n")
        except:
            client_socket.send(b"501 Syntax error in parameters\r\n")
    
    def handle_pass(self, client_socket, data):
        """处理PASS命令"""
        if not self.active_users[client_socket]['username']:
            client_socket.send(b"503 Bad sequence of commands\r\n")
            return
            
        try:
            password = data.split()[1]
            username = self.active_users[client_socket]['username']
            
            if self.authenticate(username, password):
                self.active_users[client_socket]['logged_in'] = True
                client_socket.send(b"230 User logged in\r\n")
            else:
                client_socket.send(b"530 Login incorrect\r\n")
        except:
            client_socket.send(b"501 Syntax error\r\n")
    
    def handle_list(self, client_socket):
        """处理LIST命令"""
        if not self.check_login(client_socket):
            return
            
        try:
            files = os.listdir('.')
            file_list = "\r\n".join(files)
            client_socket.send(f"150 Opening ASCII mode data connection\r\n".encode())
            client_socket.send(f"{file_list}\r\n".encode())
            client_socket.send(b"226 Transfer complete\r\n")
        except:
            client_socket.send(b"550 Failed to list directory\r\n")
    
    def handle_retr(self, client_socket, data):
        """处理文件下载"""
        if not self.check_login(client_socket):
            return
            
        try:
            filename = data.split()[1]
            if os.path.exists(filename):
                client_socket.send(f"150 Opening data connection for {filename}\r\n".encode())
                with open(filename, 'rb') as f:
                    while chunk := f.read(1024):
                        client_socket.send(chunk)
                client_socket.send(b"226 Transfer complete\r\n")
            else:
                client_socket.send(b"550 File not found\r\n")
        except:
            client_socket.send(b"550 Failed to download file\r\n")
    
    def handle_stor(self, client_socket, data):
        """处理文件上传"""
        if not self.check_login(client_socket):
            return
            
        try:
            filename = data.split()[1]
            client_socket.send(b"150 Ready to receive file\r\n")
            
            with open(filename, 'wb') as f:
                while True:
                    chunk = client_socket.recv(1024)
                    if not chunk or b"226" in chunk:
                        break
                    f.write(chunk)
            
            client_socket.send(b"226 Transfer complete\r\n")
        except:
            client_socket.send(b"550 Failed to upload file\r\n")
    
    def check_login(self, client_socket):
        """检查用户是否登录"""
        if not self.active_users[client_socket]['logged_in']:
            client_socket.send(b"530 Not logged in\r\n")
            return False
        return True
    
    def start(self):
        """启动服务器"""
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        print(f"[*] FTP服务器监听 {self.host}:{self.port}")
        
        while True:
            client_socket, address = self.server_socket.accept()
            client_thread = threading.Thread(
                target=self.handle_client,
                args=(client_socket, address),
                daemon=True
            )
            client_thread.start()

class FTPClient:
    """简单的FTP客户端示例"""
    def __init__(self, host='127.0.0.1', port=2121):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))
        print(self.sock.recv(1024).decode())
    
    def send_command(self, command):
        self.sock.send(f"{command}\r\n".encode())
        return self.sock.recv(4096).decode()
    
    def login(self, username, password):
        print(self.send_command(f"USER {username}"))
        print(self.send_command(f"PASS {password}"))

# 使用示例
if __name__ == "__main__":
    # 启动服务器
    server = FTPServer()
    
    # 在另一个线程启动服务器
    import threading
    server_thread = threading.Thread(target=server.start, daemon=True)
    server_thread.start()
    
    # 等待服务器启动
    import time
    time.sleep(1)
    
    # 测试客户端
    client = FTPClient()
    client.login("admin", "admin123")
    print(client.send_command("LIST"))
    print(client.send_command("QUIT"))

这个实现包含了FTP服务器的核心功能:用户认证、登录状态管理、文件列表、上传下载。关键点:

  1. 用户状态管理:用active_users字典跟踪每个连接的登录状态
  2. 认证流程:标准的USER/PASS命令序列,密码用SHA256哈希存储
  3. 线程安全:每个客户端连接在独立线程中处理
  4. 基本FTP命令:实现了LIST、RETR、STOR、QUIT等基本命令

用户数据保存在users.json文件中,实际项目应该用数据库。登录状态通过检查active_users[client_socket]['logged_in']来验证,确保未登录用户不能执行文件操作。

总结:用字典管理连接状态,每个命令前检查登录状态。

ftp 不像 http ,是有状态的。你需要维护一个套接字=>状态的映射,记下连接的状态,包括是否已经验证过身份

好吧上面说的服务端的情况。不过客户端你仍然需要记下状态

不考虑其他身份验证的话,只能存密码,或者保证连接不断

谢谢,我试试,貌似得抓各种退出的异常。

回到顶部