Python 多线程爬取微博数据,写入 MongoDB 后数量对不上怎么办?

迫于 sf 没人看,所以到这里来,具体

截图

链接: https://segmentfault.com/q/1010000018385188

主要是想间隔性的爬取防止被 Ban,例如这个人的微博量很多,例如有 300 多页,我就想着多进程一次请求 9 个页面,然后赞停几秒,然后继续一次请求 9 个页面,循环直到请求完是我思路哪里不对么? 在线等,挺急的

if __name__ == '__main__':
	p = Pool(9)
	x = 1
	y = 10
	while y <= 30:
		for i  in range(x,y):
			p.apply_async(getPage, args = (i,))
		print('9 page done!')
		time.sleep(random.randint(3,5)+random.random())
		x += 10
		y += 10
	p.close()
	p.join()
	print('Done!')

Python 多线程爬取微博数据,写入 MongoDB 后数量对不上怎么办?

7 回复

微博的数据好像有保护。你检查一下 25 页之后是不是正常的访问。


我遇到过类似问题,多线程爬虫写入MongoDB时数据丢失通常有几个原因:

  1. 线程竞争导致重复覆盖:多个线程同时写入相同_id的文档
  2. 连接池配置不当:MongoDB连接在多个线程间共享出现问题
  3. 异常处理不完整:线程异常导致数据没写入但计数了

这是修复后的核心代码:

import threading
from queue import Queue
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class WeiboCrawler:
    def __init__(self, mongo_uri='mongodb://localhost:27017/', db_name='weibo'):
        # 每个线程创建独立连接,避免线程安全问题
        self.mongo_uri = mongo_uri
        self.db_name = db_name
        self.data_queue = Queue()
        self.lock = threading.Lock()
        self.counter = 0
        
    def worker(self, thread_id):
        # 每个worker有自己的MongoDB连接
        client = MongoClient(self.mongo_uri)
        db = client[self.db_name]
        collection = db['weibo_posts']
        
        # 确保有唯一索引
        collection.create_index('weibo_id', unique=True)
        
        while True:
            try:
                data = self.data_queue.get(timeout=5)
                if data is None:
                    break
                    
                # 使用upsert避免重复
                result = collection.update_one(
                    {'weibo_id': data['weibo_id']},
                    {'$set': data},
                    upsert=True
                )
                
                with self.lock:
                    if result.upserted_id:
                        self.counter += 1
                        logger.info(f"Thread-{thread_id} 插入新数据,总数: {self.counter}")
                    else:
                        logger.debug(f"Thread-{thread_id} 更新已有数据")
                        
            except DuplicateKeyError:
                logger.warning(f"Thread-{thread_id} 检测到重复数据: {data.get('weibo_id')}")
            except Exception as e:
                logger.error(f"Thread-{thread_id} 写入失败: {str(e)}")
            finally:
                self.data_queue.task_done()
                
        client.close()
    
    def start_crawling(self, urls, num_threads=5):
        # 先清空队列
        while not self.data_queue.empty():
            self.data_queue.get()
            
        # 添加任务到队列
        for url in urls:
            self.data_queue.put(self.fetch_weibo_data(url))
            
        # 启动线程
        threads = []
        for i in range(num_threads):
            t = threading.Thread(target=self.worker, args=(i,))
            t.daemon = True
            t.start()
            threads.append(t)
            
        # 等待所有任务完成
        self.data_queue.join()
        
        # 发送停止信号
        for _ in range(num_threads):
            self.data_queue.put(None)
            
        for t in threads:
            t.join()
            
        logger.info(f"爬取完成,实际写入数量: {self.counter}")
        
    def fetch_weibo_data(self, url):
        # 这里实现实际的微博数据抓取逻辑
        # 返回包含weibo_id的字典
        pass

# 使用示例
if __name__ == '__main__':
    crawler = WeiboCrawler()
    urls = ['http://weibo.com/...']  # 你的微博URL列表
    crawler.start_crawling(urls, num_threads=5)

关键点:

  • 每个线程用独立的MongoDB连接
  • update_one + upsert=True替代直接插入
  • weibo_id字段加唯一索引
  • 用线程锁保护计数器
  • 完整捕获DuplicateKeyError异常

检查你的代码是不是少了这些处理。另外,可以在MongoDB里跑个查询验证实际数量:db.weibo_posts.countDocuments({})

建议:用upsert替代insert,给关键字段加唯一索引。

第 10 页,20 页的数据请求不到的吧,如果我没有理解错你的代码

如果你不确定你爬取的接口有没有做什么特殊的处理,建议先别使用多线程(协程)进行爬取,测试一次爬取的数据对不对得上

好的,谢谢,我试试
这个应该没有吧,首先是 1-10,( 1-9 ), 然后 10-20,( 10-19 ),20-30 (20-29), 应该是没有落下

第 10 和 20 没取到吧,range1 到 10 不包含 10,后面加 10 又从 11 开始了

是的,应该加 9 的,我弄错了

回到顶部