Python爬虫:如何实现爬取所有IP数据并存入MongoDB,且支持中断恢复执行

数据采集自 ip-api.com

在 celery 队列中获取数据并存入 mongodb 数据库

支持程序中断后从上次停下的地方继续采集

支持 python3+

Github 地址: https://github.com/xiaojieluo/ip-database

下面截图是我在本地电脑爬的,开了 4 个 worker,celery 很容易横向拓展,如果多加几台服务器,做成分布式爬虫,速度应该还是可以的

截图

安装

$ pip install -r requirements.txt

使用

1. 配置 mongodb 连接信息

ip-database 会将采集的数据存储到 mongodb 中,所以需要在 db.py 中配置 mongodb 的连接信息

2. 启动 redis-server 和 celery

celery 使用 redis 存储任务队列,需要启动 redis-server


$ redis-server

$ celery -A task worker --loglevel=info

3. 启动主程序


$ python ip.py


Python爬虫:如何实现爬取所有IP数据并存入MongoDB,且支持中断恢复执行

1 回复

帖子回复:

要实现带中断恢复的IP数据爬虫并存入MongoDB,核心是记录爬取状态。这里用 requests 爬取示例IP API,用 pymongo 操作数据库,通过记录最后一个成功爬取的IP索引来实现断点续传。

import requests
import pymongo
from pymongo import MongoClient
import time

class ResumeableIPCrawler:
    def __init__(self, db_url='mongodb://localhost:27017/', db_name='ip_db', collection_name='ip_data'):
        self.client = MongoClient(db_url)
        self.db = self.client[db_name]
        self.collection = self.db[collection_name]
        # 创建状态记录集合
        self.state_collection = self.db['crawl_state']
        # 确保有状态记录文档
        if self.state_collection.count_documents({}) == 0:
            self.state_collection.insert_one({'last_index': 0})
    
    def fetch_ip_data(self, ip_list):
        """爬取IP数据,支持断点续传"""
        # 获取上次中断的位置
        state = self.state_collection.find_one()
        start_index = state['last_index']
        
        for i, ip in enumerate(ip_list[start_index:], start=start_index):
            try:
                # 模拟API请求(这里用httpbin作为示例)
                url = f'http://httpbin.org/ip'
                response = requests.get(url, timeout=10)
                data = response.json()
                
                # 添加IP信息
                data['target_ip'] = ip
                data['crawl_time'] = time.time()
                
                # 存入MongoDB
                self.collection.insert_one(data)
                print(f"成功爬取 IP {ip}")
                
                # 更新状态记录
                self.state_collection.update_one(
                    {}, 
                    {'$set': {'last_index': i + 1}}
                )
                
                time.sleep(1)  # 礼貌爬取间隔
                
            except Exception as e:
                print(f"爬取 IP {ip} 时出错: {e}")
                # 出错时保持当前状态,下次从这继续
                break
    
    def get_ip_list(self):
        """这里返回要爬取的IP列表,实际可以从文件或数据库读取"""
        # 示例:生成100个示例IP
        return [f'192.168.1.{i}' for i in range(1, 101)]

# 使用示例
if __name__ == '__main__':
    crawler = ResumeableIPCrawler()
    ip_list = crawler.get_ip_list()
    crawler.fetch_ip_data(ip_list)

关键点说明:

  1. 状态记录:用单独的 crawl_state 集合记录最后处理的索引,中断后能从这里继续
  2. 错误处理:单个IP爬取出错时中断循环,保持状态不变
  3. 数据存储:每个IP的数据作为独立文档存入MongoDB,包含原始数据和爬取时间戳
  4. 续传逻辑:每次成功爬取后立即更新状态,确保中断时最多丢失一个IP的数据

一句话总结:用状态集合记录爬取进度,每次成功存储后更新位置,下次从记录点继续。

回到顶部