Python中如何处理因Elasticsearch挂掉导致Redis阻塞的日志问题?

之前因为 es 挂了,导致 redis 阻塞,应用写入一直在等待,直到机器上所有进程都被阻塞。
期望是让 log 的功能更完善,更灵活,如果当某种通道出现故障的时候,不可影响应用程序
1 如何判断出通道故障了?比如 10 秒内连续出现错误
2 自动关闭该通道,或切换至其他通道
3 尝试自动恢复该通道
4 报警手段 + 恢复后提醒手段

有没有什么其他的思路,请大家指教,小弟对这个项目还不是很熟,可能描述的不是很清楚。希望大家指点指点。
Python中如何处理因Elasticsearch挂掉导致Redis阻塞的日志问题?

3 回复

我理解你的问题。当ES挂掉导致Redis写入阻塞,日志堆积在内存里,典型的异步日志生产者-消费者模型出了问题。核心思路是解耦和降级。

下面是一个完整的解决方案,用内存队列做缓冲,加上降级写入本地文件:

import logging
import threading
import queue
import time
import json
from logging.handlers import QueueHandler, QueueListener
import redis
from elasticsearch import Elasticsearch, exceptions
import os
from datetime import datetime

class FallbackFileHandler(logging.Handler):
    """降级处理器:当ES失败时写入本地文件"""
    def __init__(self, fallback_path='./logs_fallback'):
        super().__init__()
        self.fallback_path = fallback_path
        os.makedirs(fallback_path, exist_ok=True)
        
    def emit(self, record):
        try:
            log_entry = self.format(record)
            filename = f"{self.fallback_path}/fallback_{datetime.now().strftime('%Y%m%d')}.log"
            with open(filename, 'a', encoding='utf-8') as f:
                f.write(log_entry + '\n')
        except Exception:
            pass  # 降级也失败就静默丢弃

class ElasticsearchHandler(logging.Handler):
    """ES日志处理器,带健康检查和失败重试"""
    def __init__(self, es_hosts, index_prefix='logs', 
                 fallback_handler=None, max_retries=3):
        super().__init__()
        self.es_hosts = es_hosts
        self.index_prefix = index_prefix
        self.fallback = fallback_handler
        self.max_retries = max_retries
        self._es = None
        self._connect_es()
        
    def _connect_es(self):
        """连接ES,失败返回None"""
        try:
            self._es = Elasticsearch(self.es_hosts, timeout=10)
            if self._es.ping():
                return True
        except Exception:
            self._es = None
        return False
        
    def emit(self, record):
        """发送日志到ES,失败时降级"""
        if not self._es and not self._connect_es():
            # ES不可用,直接走降级
            if self.fallback:
                self.fallback.emit(record)
            return
            
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': self.format(record),
            'logger': record.name,
            'module': record.module,
            'funcName': record.funcName
        }
        
        index_name = f"{self.index_prefix}-{datetime.now().strftime('%Y.%m.%d')}"
        
        # 带重试的发送
        for retry in range(self.max_retries):
            try:
                self._es.index(index=index_name, body=log_data)
                return  # 成功发送
            except exceptions.ConnectionError:
                self._es = None  # 标记连接失效
                if retry == self.max_retries - 1:
                    break  # 重试用完
                time.sleep(1 * (retry + 1))  # 指数退避
            except Exception:
                break  # 其他错误直接跳出
        
        # 所有重试失败,走降级
        if self.fallback:
            self.fallback.emit(record)

def setup_logging():
    """配置日志系统"""
    # 创建内存队列
    log_queue = queue.Queue(maxsize=10000)  # 控制队列大小防止内存溢出
    
    # 创建处理器
    fallback = FallbackFileHandler()
    es_handler = ElasticsearchHandler(
        es_hosts=['localhost:9200'],
        fallback_handler=fallback
    )
    es_handler.setLevel(logging.INFO)
    
    # 控制台处理器(可选)
    console = logging.StreamHandler()
    console.setLevel(logging.WARNING)
    
    # 创建QueueListener
    listener = QueueListener(
        log_queue,
        es_handler,
        console  # 同时输出到控制台
    )
    listener.start()
    
    # 配置根logger
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    root_logger.addHandler(QueueHandler(log_queue))
    
    return listener

# 使用示例
if __name__ == "__main__":
    # 初始化日志系统
    listener = setup_logging()
    
    logger = logging.getLogger(__name__)
    
    try:
        # 正常业务代码
        for i in range(100):
            logger.info(f"Processing item {i}")
            time.sleep(0.1)
            
            # 模拟ES挂掉
            if i == 50:
                print("模拟ES故障...")
                # 这里ES会失败,日志会自动降级到文件
                
    finally:
        # 程序退出时停止监听器
        listener.stop()

关键点:

  1. QueueHandlerQueueListener实现生产消费解耦,日志先入内存队列
  2. ES处理器自带连接检查和重试机制
  3. 失败时降级到本地文件,避免阻塞
  4. 队列设置大小限制,防止内存爆炸

这样即使ES挂一小时,日志也不会丢,都在本地文件里,等ES恢复了可以写个脚本把文件日志重新导入。

简单说就是:队列缓冲 + 失败降级本地文件。


1 的话,kafka 加消息从产生到消费之间延迟的检测? 2,4,查找微服务的服务发现,负载均衡相关的东西。3,守护进程通知重启吧,挂的原因是未知的,可能很难自动化。重启解决大部分问题,然后记录下重启前的事故现场,分析以便以后作规避吧。
我的经验也不多,看看楼下怎么说。

发帖的节点似乎不是很合适

回到顶部