Python中如何存储和获取随机100个网页的状态?

需求:获取排名前 100 个 url 的 HTTP 状态码存储,前台要显示排名前 100 个 url 非 200 状态码的页面有哪些。

目前想法:

  1. 获取排名前 100 的 url
  2. 轮询 100 个 url 状态码
  3. 删除之前的数据库数据
  4. 保存获取到的状态码

目前数据库设计:

数据库

遇到的难题:

  1. 在第 3 步删除之前数据的时候,需要一定的时间,前台用户查询的时候会显示没有数据,体验不好。
  2. 因为是获取排名前 100 的 url,所以每次查询的 url 可能不一样,无法在之前的数据上更新

请教: 有没有更优雅的存储方式? 后续可能会扩展到 1000 个、10000 个 url,遇到类似情况该如何解决呢?

先提前感谢各位大佬的回复了,感谢!


Python中如何存储和获取随机100个网页的状态?

23 回复

写进去的时候,加个时间戳,每次页面展示的时候,都请求最近的时间戳的。


import requests
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import json

def check_url_status(url):
    """检查单个URL的状态"""
    try:
        response = requests.get(url, timeout=5)
        return {
            'url': url,
            'status_code': response.status_code,
            'success': response.status_code == 200,
            'timestamp': time.time()
        }
    except requests.RequestException as e:
        return {
            'url': url,
            'status_code': None,
            'success': False,
            'error': str(e),
            'timestamp': time.time()
        }

def generate_random_urls(count=100):
    """生成随机URL列表"""
    domains = [
        'https://www.google.com',
        'https://www.github.com',
        'https://www.stackoverflow.com',
        'https://www.python.org',
        'https://www.wikipedia.org',
        'https://www.reddit.com',
        'https://www.amazon.com',
        'https://www.microsoft.com'
    ]
    
    urls = []
    for _ in range(count):
        domain = random.choice(domains)
        # 添加随机路径增加URL多样性
        path = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=random.randint(3, 10)))
        urls.append(f"{domain}/{path}")
    
    return urls

def store_results(results, filename='url_status.json'):
    """存储结果到JSON文件"""
    with open(filename, 'w') as f:
        json.dump(results, f, indent=2)

def load_results(filename='url_status.json'):
    """从JSON文件加载结果"""
    try:
        with open(filename, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return []

def main():
    # 1. 生成100个随机URL
    urls = generate_random_urls(100)
    print(f"已生成 {len(urls)} 个随机URL")
    
    # 2. 并发检查URL状态
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_url = {executor.submit(check_url_status, url): url for url in urls}
        
        for future in as_completed(future_to_url):
            result = future.result()
            results.append(result)
            print(f"Checked: {result['url']} - Status: {result.get('status_code', 'Error')}")
    
    # 3. 存储结果
    store_results(results)
    print(f"\n结果已保存到 url_status.json")
    
    # 4. 统计信息
    successful = sum(1 for r in results if r['success'])
    print(f"成功访问: {successful}/{len(results)}")
    
    # 5. 示例:如何获取特定状态的结果
    print("\n=== 状态码200的URL ===")
    for result in results:
        if result['success']:
            print(f"✓ {result['url']}")

if __name__ == "__main__":
    main()

这个方案做了几件事:

  1. 生成随机URL:从预设域名列表随机组合生成100个URL
  2. 并发检查:使用线程池同时检查多个URL,提高效率
  3. 结果存储:将结果保存为JSON格式,包含URL、状态码、时间戳等信息
  4. 错误处理:处理网络请求异常,确保程序健壮性
  5. 数据统计:提供基本的成功/失败统计

关键点说明:

  • 使用requests库进行HTTP请求
  • ThreadPoolExecutor实现并发检查
  • JSON格式便于后续分析和处理
  • 包含时间戳便于追踪检查时间

使用方式:

# 运行检查
python check_urls.py

# 加载之前的结果
results = load_results()
for item in results:
    print(f"{item['url']}: {item['status_code']}")

扩展建议: 可以改用数据库存储以便长期追踪。

直接 id 排序是否可行

#1 感谢提供解决方案,目前是可以解决这个问题,定期删除之前旧的记录就可以。

数据库这样设计:



#2 id 排序是指什么样呢?求更详细


还会有更优雅的解决方案嘛?

可以先将 100 条数据插入库中,然后设置初始状态,等待查询更新,添加更新时间戳字段,等待回写状态;若用户来查时,发现是初始状态,则直接逻辑中去查询一次这个 url,查询其状态,并将状态写入到数据库;

#4 因为是获取排名前 100 的 url,所以每次查询的 url 可能不一样,无法在之前的数据上更新

比如我上次获取的前 100 个 url 是 a1 ~ a100,最后一次获取的是 a1~a50、b1~b50,前台只要显示最后一次获取的数据,如果要覆盖的话,还需要检查更新 a51~a100 的数据……

用 loading cache ?设定好阈值后,读写过程中会自动 evict 旧数据

直接把非 200 的都用数组放到 Redis 里。

加上一个时间戳就可以解决
1.要是检测非 200 的
select url from your_table where status_code != 200 order by last_modified DESC limit 0, 100

2. 用你的程序获取状态码

3.直接 update 时间戳和状态码就完事了,根本不需要去删除老的数据

主键自增 ID,直接存进去取的时候直接 select * from xxx order by id desc limit 100 不就行了?时间戳都不要。

#6 感谢提供解决方案,我搜一下 python 相关的操作方式

#7 感谢提供方案,因为只需要展示最后获取的 100 个 url 的 200 状态,如果方式 redis 的话也还是要清空之前的记录,清空期间担心用户访问的时候会显示数据为空。

#8 #9 #10 感谢提供方案,需要获取最后一次获取到 100 个 url 的 200 状态,所以不能用 limit 0,100,因为可能最后查询到的 100 个 url 只有 20 个 url 是 200 状态,总数也不止 100 个,每次获取到的都是随机 100 个,所以 update 感觉也不太可行…


#11 感谢提供思路,请问有更详细的解决方案嘛?

用时间排序有什么不行的

Java 里有 guava cache (以及很多同类工具)正好可以限制 cache 的大小的同时,又能监听 cache evict 事件(用来回调把抛弃的数据持久化到数据库),Python 肯定有类似的轮子。

随手搜了一下,有一个叫 beaker 的模块,应该能满足需要

关键信息没说:
是否要求某次排名更新后,所有 url 的 status 都查询一遍,前端才刷?

#15 感谢提供解决方案,我去搜搜相关模块的文档

#16 每 5 分钟获取一次排前面 100 的 url,然后获取这 100 个 url 的状态,存储,前端由用户手动刷新即可

你还是没回答啊。排名出来了,状态查了 50 个,50 个没查呢,现在前端要数据,给什么?

#18 所有 url 状态查完统一存储发出,设置的 5 分钟轮询就是想着 5 分钟应该足够 100 个 url 的状态查询了;没有查询完毕之前,继续返回上次的查询结果

#19 好的 我查一下资料,非常感谢,不过后续可能会适配到 1000、10000 …

放心吧, 1000 及 10000 還是很小

我有一個類似的, 3500 多 URL 佔用 10MB 不到 RAM

es kibana 啥代码也不要写

#22 我搜索一下相关技术,感谢提供解决方案

#21 哇 非常感谢,发现 ZADD/ZRANK 太好用了,可以实现我大部分的需求,可以自动更新已存在的数据,还可以直接排序好实用,也可以直接分页查询太好了!

不过有一点问题就是无法设置过期时间,查了一圈也没找到相关的解决方案,继续求教一下

第一次轮训查到三个状态码为非 200 的,按照排名顺序插入,排名也是必须的,所以 score 无法设置时间戳…
> ZADD urls 1 xxx.com/001
> ZADD urls 3 xxx.com/002
> ZADD urls 25 xxx.com/003

前台可以展示出排名第几的 url 是非 200 状态,第二次轮训的时候只查到 1 个状态码为非 200 状态
> ZADD urls 23 xxx.com/001

问题来了,如何清空之前存储的 002、003 两个数据呢?或者如何设置自动清理超过 10 分钟没更新的数据呢?

redis 的 zrank 就可以了
多使用缓存,这类问题不需要使用数据库
如果需要的话也是异步的,非高频操作

回到顶部