Python中如何操作Redis进行玩法收藏和云服务器管理
又在腾云阁发现一篇 Python 的爬虫文章,顺便存了。
收录待用,修改转载已取得腾讯云授权
节选:
...
这是用来下载美图网上 100 个页面的所有的图片
import requests
import re
import time
from redis import Redis
headers={ 'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36' }
def push_redis_list():
r = Redis(host=‘10.66.149.8’,port=6379,password=’’)
for i in range(100):
num = 5100+i;
url =‘http://www.meizitu.com/a/’+ str(num) +’.html’
img_url = requests.get(url,timeout=30)
#print img_url.text
#time.sleep(10)
img_url_list = re.findall(‘http://mm.howkuai.com/wp-content/uploads/201.*.jpg’,img_url.text)
print(img_url_list)
for temp_img_url in img_url_list:
l = len(re.findall(‘limg’,temp_img_url))
#print l
if(l == 0):
print("url: ",temp_img_url)
r.lpush(‘meizitu’,temp_img_url)
print(r.llen(‘meizitu’))
return 0
def get_big_img_url():
r = Redis(host=‘10.66.149.8’,port=6379,password=’’)
while(1):
try:
url = r.lpop(‘meizitu’)
download(url)
time.sleep(1)
print(url)
except:
print(“请求求发送失败重试”)
time.sleep(10)
continue
return 0
def download(url):
try:
r = requests.get(url,headers=headers,timeout = 50)
name = int(time.time())
f = open(’./pic/’+str(name)+’.jpg’,‘wb’)
f.write(r.content)
f.close()
except Exception as e:
print(Exception,":",e)
if name == ‘main’:
url = ‘http://www.meizitu.com/a/list_1_’
print(“begin”)
push_redis_list()#开启则加任务队列
#get_big_img_url()#开启则运行爬取任务
...
原文链接: https://www.qcloud.com/community/article/337567001488804157
Python中如何操作Redis进行玩法收藏和云服务器管理
我理解你想用Python的Redis来实现两个功能:玩法收藏和云服务器管理。下面我分别给出代码示例。
1. 玩法收藏功能
import redis
import json
from datetime import datetime
class GameCollectionManager:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def add_collection(self, user_id, game_id, game_info):
"""添加玩法收藏"""
collection_key = f"user:{user_id}:collections"
# 存储游戏信息
game_data = {
'game_id': game_id,
'game_info': game_info,
'collected_at': datetime.now().isoformat()
}
# 使用有序集合,按收藏时间排序
self.redis_client.zadd(collection_key, {json.dumps(game_data): datetime.now().timestamp()})
# 设置过期时间(可选,比如30天)
self.redis_client.expire(collection_key, 30*24*3600)
return True
def get_collections(self, user_id, start=0, end=-1):
"""获取用户的收藏列表"""
collection_key = f"user:{user_id}:collections"
collections = self.redis_client.zrevrange(collection_key, start, end, withscores=True)
result = []
for item, score in collections:
game_data = json.loads(item)
game_data['collected_at'] = datetime.fromtimestamp(score).isoformat()
result.append(game_data)
return result
def remove_collection(self, user_id, game_id):
"""移除收藏"""
collection_key = f"user:{user_id}:collections"
collections = self.redis_client.zrange(collection_key, 0, -1)
for item in collections:
game_data = json.loads(item)
if game_data['game_id'] == game_id:
self.redis_client.zrem(collection_key, item)
return True
return False
def is_collected(self, user_id, game_id):
"""检查是否已收藏"""
collection_key = f"user:{user_id}:collections"
collections = self.redis_client.zrange(collection_key, 0, -1)
for item in collections:
game_data = json.loads(item)
if game_data['game_id'] == game_id:
return True
return False
# 使用示例
if __name__ == "__main__":
manager = GameCollectionManager()
# 添加收藏
manager.add_collection("user123", "game_001", {"name": "王者荣耀", "type": "MOBA"})
manager.add_collection("user123", "game_002", {"name": "原神", "type": "RPG"})
# 获取收藏列表
collections = manager.get_collections("user123")
print("收藏列表:", collections)
# 检查是否收藏
print("是否收藏game_001:", manager.is_collected("user123", "game_001"))
# 移除收藏
manager.remove_collection("user123", "game_001")
2. 云服务器管理功能
import redis
import json
from typing import Dict, Optional
class CloudServerManager:
def __init__(self, host='localhost', port=6379, db=1):
self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self.server_hash_key = "cloud_servers"
def add_server(self, server_id: str, server_info: Dict):
"""添加或更新云服务器信息"""
server_data = {
'server_id': server_id,
'status': server_info.get('status', 'stopped'),
'ip_address': server_info.get('ip_address', ''),
'instance_type': server_info.get('instance_type', ''),
'launch_time': server_info.get('launch_time', ''),
'tags': json.dumps(server_info.get('tags', []))
}
# 使用Hash存储服务器信息
self.redis_client.hset(self.server_hash_key, server_id, json.dumps(server_data))
# 使用Set按状态分类
status = server_info.get('status', 'stopped')
status_key = f"servers:status:{status}"
self.redis_client.sadd(status_key, server_id)
return True
def get_server(self, server_id: str) -> Optional[Dict]:
"""获取单个服务器信息"""
server_data = self.redis_client.hget(self.server_hash_key, server_id)
if server_data:
data = json.loads(server_data)
data['tags'] = json.loads(data['tags'])
return data
return None
def get_servers_by_status(self, status: str):
"""按状态获取服务器列表"""
status_key = f"servers:status:{status}"
server_ids = self.redis_client.smembers(status_key)
servers = []
for server_id in server_ids:
server_data = self.get_server(server_id)
if server_data:
servers.append(server_data)
return servers
def update_server_status(self, server_id: str, new_status: str):
"""更新服务器状态"""
server_data = self.get_server(server_id)
if not server_data:
return False
# 从原状态集合中移除
old_status = server_data['status']
old_status_key = f"servers:status:{old_status}"
self.redis_client.srem(old_status_key, server_id)
# 更新到新状态集合
new_status_key = f"servers:status:{new_status}"
self.redis_client.sadd(new_status_key, server_id)
# 更新Hash中的状态
server_data['status'] = new_status
self.redis_client.hset(self.server_hash_key, server_id, json.dumps({
**server_data,
'tags': json.dumps(server_data['tags'])
}))
return True
def delete_server(self, server_id: str):
"""删除服务器"""
server_data = self.get_server(server_id)
if server_data:
# 从状态集合中移除
status_key = f"servers:status:{server_data['status']}"
self.redis_client.srem(status_key, server_id)
# 从Hash中移除
self.redis_client.hdel(self.server_hash_key, server_id)
return True
return False
def get_all_servers(self):
"""获取所有服务器"""
all_servers = self.redis_client.hgetall(self.server_hash_key)
servers = []
for server_id, server_data in all_servers.items():
data = json.loads(server_data)
data['tags'] = json.loads(data['tags'])
servers.append(data)
return servers
# 使用示例
if __name__ == "__main__":
manager = CloudServerManager()
# 添加服务器
manager.add_server("server_001", {
"status": "running",
"ip_address": "192.168.1.100",
"instance_type": "t2.micro",
"launch_time": "2024-01-01 10:00:00",
"tags": ["web", "production"]
})
manager.add_server("server_002", {
"status": "stopped",
"ip_address": "192.168.1.101",
"instance_type": "t2.small",
"launch_time": "2024-01-02 14:30:00",
"tags": ["database", "staging"]
})
# 获取运行中的服务器
running_servers = manager.get_servers_by_status("running")
print("运行中的服务器:", running_servers)
# 更新状态
manager.update_server_status("server_002", "running")
# 获取单个服务器信息
server_info = manager.get_server("server_001")
print("服务器001信息:", server_info)
# 获取所有服务器
all_servers = manager.get_all_servers()
print("所有服务器:", all_servers)
总结一下:
- 玩法收藏用有序集合实现,方便按时间排序
- 云服务器管理用Hash+Set组合,Hash存详细信息,Set做状态分类
- 记得安装redis-py:
pip install redis
这两个类可以直接用,根据实际需求调整数据结构就行。

