Python爬虫如何实时监控900家中国企业新闻动态?

开源一个项目 https://github.com/NolanZhao/news_feed

简介: 此项目可监控近千家中国企业的官方网站的新闻动态,如有更新,系统能在 5 分钟之内通过邮件发送更新的标题和链接。 更新的信息流也可通过浏览器查看。监控的公司和站点可以添加删除。

原理: 采用 celery 任务队列,定期抓取网站 html, 使用 difflib 比对新旧页面源码,发现增加的部分,提取 url 和 text,过滤筛选,保存 MySQL 数据库。 定期把更新的 url 和 text,通过邮件发送给订阅者。

方法简单粗暴,没有摘取网页结构化数据,仅仅获取更新的链接和标题。优点:实时性可以保障

第一个版本,功能尚不完善,欢迎吐槽、贡献代码~

我的邮箱 [email protected]


Python爬虫如何实时监控900家中国企业新闻动态?

18 回复

谢谢分享


核心方案:异步并发 + 消息队列 + 分布式调度

要实时监控900家企业的新闻,靠单线程或简单循环肯定不行。得用异步IO提高抓取效率,用消息队列解耦调度和抓取,再用分布式架构横向扩展。下面是一个基于aiohttpCeleryRedis的可行方案框架:

# 1. 定义企业列表和新闻源(示例)
companies = [
    {"id": 1, "name": "公司A", "news_url": "https://example.com/news/a"},
    {"id": 2, "name": "公司B", "news_url": "https://example.com/news/b"},
    # ... 其他898家
]

# 2. 异步抓取模块(aiohttp)
import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def fetch_news(session, company):
    try:
        async with session.get(company['news_url'], timeout=10) as resp:
            html = await resp.text()
            soup = BeautifulSoup(html, 'lxml')
            # 这里写具体的解析逻辑,提取新闻标题、时间、链接等
            news_item = {
                'company_id': company['id'],
                'title': soup.find('h1').text.strip(),
                'url': company['news_url']
            }
            return news_item
    except Exception as e:
        print(f"抓取{company['name']}失败: {e}")
        return None

async def monitor_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_news(session, company) for company in companies]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # 过滤有效结果并发送到消息队列
        valid_news = [r for r in results if r]
        return valid_news

# 3. Celery任务调度(tasks.py)
from celery import Celery
import asyncio

app = Celery('monitor', broker='redis://localhost:6379/0')

@app.task
def start_monitoring():
    # 运行异步抓取
    loop = asyncio.get_event_loop()
    news_items = loop.run_until_complete(monitor_all())
    # 将抓取结果发送到处理管道
    for item in news_items:
        process_news.delay(item)

@app.task
def process_news(news_item):
    # 这里写新闻去重、存储、分析等后续处理
    print(f"处理新闻: {news_item['title']}")
    # 存入数据库或发送通知
    # save_to_db(news_item)

# 4. 启动定时监控(Beat调度)
from celery.schedules import crontab

app.conf.beat_schedule = {
    'monitor-every-5-minutes': {
        'task': 'tasks.start_monitoring',
        'schedule': 300.0,  # 每5分钟执行一次
    },
}

关键点:

  1. 异步并发:用aiohttp同时抓取多家网站,比同步请求快几十倍。
  2. 任务队列:Celery把抓取任务拆解,支持重试和分布式部署。
  3. 去重机制:一定要在存储前根据URL或内容哈希去重,避免重复数据。
  4. 反爬处理:需要加代理池、随机User-Agent和请求间隔,这里没展开写。

一句话建议:用异步+队列把抓取和处理拆开,再加个定时调度就能跑起来了。

啊哈·

谢谢分享

直接比较源码有些暴力吧, 至少是抽取出文章来比较, 甚至是比较 simhash 比较好一点

这么多新闻,看得过来吗?

加关键词过滤,就可以看你关注的。

怎么操作,链接失效

你想多了,没人说爬出来的结果只能肉眼看而不能做进一步分析

太厉害了,1024 个赞。希望以后能参与进来维护代码。

太厉害了,1024 个赞。希望以后能参与进来维护代码。

正好有需要学习这个 比心

感谢!

既然是开源我也就不好 BB 啥了,我只想说虽然是第一版既然开源了能不能搞的开箱即用。。。。。。各种错误,各种补丁给你打上才跑起来…

我在 ubuntu 部署很顺利,除了 Python3 环境有些麻烦之外,亲测没有留坑。。。

我擦,我 SB 了,用的 python2.7

厉害了,顶一个

怎么抓到的都是 N 久前的文章:?

回到顶部