Python中如何操作Redis进行玩法收藏和云服务器管理

又在腾云阁发现一篇 Python 的爬虫文章,顺便存了。

收录待用,修改转载已取得腾讯云授权


节选:

...

这是用来下载美图网上 100 个页面的所有的图片

import requests
import re
import time
from redis import Redis
headers={ 'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36' }

def push_redis_list(): r = Redis(host=‘10.66.149.8’,port=6379,password=’’) for i in range(100): num = 5100+i; url =‘http://www.meizitu.com/a/’+ str(num) +’.html’ img_url = requests.get(url,timeout=30) #print img_url.text #time.sleep(10) img_url_list = re.findall(‘http://mm.howkuai.com/wp-content/uploads/201.*.jpg’,img_url.text) print(img_url_list) for temp_img_url in img_url_list: l = len(re.findall(‘limg’,temp_img_url)) #print l if(l == 0): print("url: ",temp_img_url) r.lpush(‘meizitu’,temp_img_url) print(r.llen(‘meizitu’)) return 0

def get_big_img_url(): r = Redis(host=‘10.66.149.8’,port=6379,password=’’) while(1): try: url = r.lpop(‘meizitu’) download(url) time.sleep(1) print(url) except: print(“请求求发送失败重试”) time.sleep(10) continue return 0

def download(url): try: r = requests.get(url,headers=headers,timeout = 50) name = int(time.time()) f = open(’./pic/’+str(name)+’.jpg’,‘wb’) f.write(r.content) f.close() except Exception as e: print(Exception,":",e)

if name == ‘main’: url = ‘http://www.meizitu.com/a/list_1_’ print(“begin”) push_redis_list()#开启则加任务队列 #get_big_img_url()#开启则运行爬取任务

...


原文链接: https://www.qcloud.com/community/article/337567001488804157


Python中如何操作Redis进行玩法收藏和云服务器管理

1 回复

我理解你想用Python的Redis来实现两个功能:玩法收藏和云服务器管理。下面我分别给出代码示例。

1. 玩法收藏功能

import redis
import json
from datetime import datetime

class GameCollectionManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
    
    def add_collection(self, user_id, game_id, game_info):
        """添加玩法收藏"""
        collection_key = f"user:{user_id}:collections"
        
        # 存储游戏信息
        game_data = {
            'game_id': game_id,
            'game_info': game_info,
            'collected_at': datetime.now().isoformat()
        }
        
        # 使用有序集合,按收藏时间排序
        self.redis_client.zadd(collection_key, {json.dumps(game_data): datetime.now().timestamp()})
        
        # 设置过期时间(可选,比如30天)
        self.redis_client.expire(collection_key, 30*24*3600)
        
        return True
    
    def get_collections(self, user_id, start=0, end=-1):
        """获取用户的收藏列表"""
        collection_key = f"user:{user_id}:collections"
        collections = self.redis_client.zrevrange(collection_key, start, end, withscores=True)
        
        result = []
        for item, score in collections:
            game_data = json.loads(item)
            game_data['collected_at'] = datetime.fromtimestamp(score).isoformat()
            result.append(game_data)
        
        return result
    
    def remove_collection(self, user_id, game_id):
        """移除收藏"""
        collection_key = f"user:{user_id}:collections"
        collections = self.redis_client.zrange(collection_key, 0, -1)
        
        for item in collections:
            game_data = json.loads(item)
            if game_data['game_id'] == game_id:
                self.redis_client.zrem(collection_key, item)
                return True
        
        return False
    
    def is_collected(self, user_id, game_id):
        """检查是否已收藏"""
        collection_key = f"user:{user_id}:collections"
        collections = self.redis_client.zrange(collection_key, 0, -1)
        
        for item in collections:
            game_data = json.loads(item)
            if game_data['game_id'] == game_id:
                return True
        
        return False

# 使用示例
if __name__ == "__main__":
    manager = GameCollectionManager()
    
    # 添加收藏
    manager.add_collection("user123", "game_001", {"name": "王者荣耀", "type": "MOBA"})
    manager.add_collection("user123", "game_002", {"name": "原神", "type": "RPG"})
    
    # 获取收藏列表
    collections = manager.get_collections("user123")
    print("收藏列表:", collections)
    
    # 检查是否收藏
    print("是否收藏game_001:", manager.is_collected("user123", "game_001"))
    
    # 移除收藏
    manager.remove_collection("user123", "game_001")

2. 云服务器管理功能

import redis
import json
from typing import Dict, Optional

class CloudServerManager:
    def __init__(self, host='localhost', port=6379, db=1):
        self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
        self.server_hash_key = "cloud_servers"
    
    def add_server(self, server_id: str, server_info: Dict):
        """添加或更新云服务器信息"""
        server_data = {
            'server_id': server_id,
            'status': server_info.get('status', 'stopped'),
            'ip_address': server_info.get('ip_address', ''),
            'instance_type': server_info.get('instance_type', ''),
            'launch_time': server_info.get('launch_time', ''),
            'tags': json.dumps(server_info.get('tags', []))
        }
        
        # 使用Hash存储服务器信息
        self.redis_client.hset(self.server_hash_key, server_id, json.dumps(server_data))
        
        # 使用Set按状态分类
        status = server_info.get('status', 'stopped')
        status_key = f"servers:status:{status}"
        self.redis_client.sadd(status_key, server_id)
        
        return True
    
    def get_server(self, server_id: str) -> Optional[Dict]:
        """获取单个服务器信息"""
        server_data = self.redis_client.hget(self.server_hash_key, server_id)
        if server_data:
            data = json.loads(server_data)
            data['tags'] = json.loads(data['tags'])
            return data
        return None
    
    def get_servers_by_status(self, status: str):
        """按状态获取服务器列表"""
        status_key = f"servers:status:{status}"
        server_ids = self.redis_client.smembers(status_key)
        
        servers = []
        for server_id in server_ids:
            server_data = self.get_server(server_id)
            if server_data:
                servers.append(server_data)
        
        return servers
    
    def update_server_status(self, server_id: str, new_status: str):
        """更新服务器状态"""
        server_data = self.get_server(server_id)
        if not server_data:
            return False
        
        # 从原状态集合中移除
        old_status = server_data['status']
        old_status_key = f"servers:status:{old_status}"
        self.redis_client.srem(old_status_key, server_id)
        
        # 更新到新状态集合
        new_status_key = f"servers:status:{new_status}"
        self.redis_client.sadd(new_status_key, server_id)
        
        # 更新Hash中的状态
        server_data['status'] = new_status
        self.redis_client.hset(self.server_hash_key, server_id, json.dumps({
            **server_data,
            'tags': json.dumps(server_data['tags'])
        }))
        
        return True
    
    def delete_server(self, server_id: str):
        """删除服务器"""
        server_data = self.get_server(server_id)
        if server_data:
            # 从状态集合中移除
            status_key = f"servers:status:{server_data['status']}"
            self.redis_client.srem(status_key, server_id)
            
            # 从Hash中移除
            self.redis_client.hdel(self.server_hash_key, server_id)
            return True
        return False
    
    def get_all_servers(self):
        """获取所有服务器"""
        all_servers = self.redis_client.hgetall(self.server_hash_key)
        
        servers = []
        for server_id, server_data in all_servers.items():
            data = json.loads(server_data)
            data['tags'] = json.loads(data['tags'])
            servers.append(data)
        
        return servers

# 使用示例
if __name__ == "__main__":
    manager = CloudServerManager()
    
    # 添加服务器
    manager.add_server("server_001", {
        "status": "running",
        "ip_address": "192.168.1.100",
        "instance_type": "t2.micro",
        "launch_time": "2024-01-01 10:00:00",
        "tags": ["web", "production"]
    })
    
    manager.add_server("server_002", {
        "status": "stopped",
        "ip_address": "192.168.1.101",
        "instance_type": "t2.small",
        "launch_time": "2024-01-02 14:30:00",
        "tags": ["database", "staging"]
    })
    
    # 获取运行中的服务器
    running_servers = manager.get_servers_by_status("running")
    print("运行中的服务器:", running_servers)
    
    # 更新状态
    manager.update_server_status("server_002", "running")
    
    # 获取单个服务器信息
    server_info = manager.get_server("server_001")
    print("服务器001信息:", server_info)
    
    # 获取所有服务器
    all_servers = manager.get_all_servers()
    print("所有服务器:", all_servers)

总结一下:

  • 玩法收藏用有序集合实现,方便按时间排序
  • 云服务器管理用Hash+Set组合,Hash存详细信息,Set做状态分类
  • 记得安装redis-py:pip install redis

这两个类可以直接用,根据实际需求调整数据结构就行。

回到顶部