Python中如何处理因Elasticsearch挂掉导致Redis阻塞的日志问题?
之前因为 es 挂了,导致 redis 阻塞,应用写入一直在等待,直到机器上所有进程都被阻塞。
期望是让 log 的功能更完善,更灵活,如果当某种通道出现故障的时候,不可影响应用程序
1 如何判断出通道故障了?比如 10 秒内连续出现错误
2 自动关闭该通道,或切换至其他通道
3 尝试自动恢复该通道
4 报警手段 + 恢复后提醒手段
有没有什么其他的思路,请大家指教,小弟对这个项目还不是很熟,可能描述的不是很清楚。希望大家指点指点。
Python中如何处理因Elasticsearch挂掉导致Redis阻塞的日志问题?
3 回复
我理解你的问题。当ES挂掉导致Redis写入阻塞,日志堆积在内存里,典型的异步日志生产者-消费者模型出了问题。核心思路是解耦和降级。
下面是一个完整的解决方案,用内存队列做缓冲,加上降级写入本地文件:
import logging
import threading
import queue
import time
import json
from logging.handlers import QueueHandler, QueueListener
import redis
from elasticsearch import Elasticsearch, exceptions
import os
from datetime import datetime
class FallbackFileHandler(logging.Handler):
"""降级处理器:当ES失败时写入本地文件"""
def __init__(self, fallback_path='./logs_fallback'):
super().__init__()
self.fallback_path = fallback_path
os.makedirs(fallback_path, exist_ok=True)
def emit(self, record):
try:
log_entry = self.format(record)
filename = f"{self.fallback_path}/fallback_{datetime.now().strftime('%Y%m%d')}.log"
with open(filename, 'a', encoding='utf-8') as f:
f.write(log_entry + '\n')
except Exception:
pass # 降级也失败就静默丢弃
class ElasticsearchHandler(logging.Handler):
"""ES日志处理器,带健康检查和失败重试"""
def __init__(self, es_hosts, index_prefix='logs',
fallback_handler=None, max_retries=3):
super().__init__()
self.es_hosts = es_hosts
self.index_prefix = index_prefix
self.fallback = fallback_handler
self.max_retries = max_retries
self._es = None
self._connect_es()
def _connect_es(self):
"""连接ES,失败返回None"""
try:
self._es = Elasticsearch(self.es_hosts, timeout=10)
if self._es.ping():
return True
except Exception:
self._es = None
return False
def emit(self, record):
"""发送日志到ES,失败时降级"""
if not self._es and not self._connect_es():
# ES不可用,直接走降级
if self.fallback:
self.fallback.emit(record)
return
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': self.format(record),
'logger': record.name,
'module': record.module,
'funcName': record.funcName
}
index_name = f"{self.index_prefix}-{datetime.now().strftime('%Y.%m.%d')}"
# 带重试的发送
for retry in range(self.max_retries):
try:
self._es.index(index=index_name, body=log_data)
return # 成功发送
except exceptions.ConnectionError:
self._es = None # 标记连接失效
if retry == self.max_retries - 1:
break # 重试用完
time.sleep(1 * (retry + 1)) # 指数退避
except Exception:
break # 其他错误直接跳出
# 所有重试失败,走降级
if self.fallback:
self.fallback.emit(record)
def setup_logging():
"""配置日志系统"""
# 创建内存队列
log_queue = queue.Queue(maxsize=10000) # 控制队列大小防止内存溢出
# 创建处理器
fallback = FallbackFileHandler()
es_handler = ElasticsearchHandler(
es_hosts=['localhost:9200'],
fallback_handler=fallback
)
es_handler.setLevel(logging.INFO)
# 控制台处理器(可选)
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
# 创建QueueListener
listener = QueueListener(
log_queue,
es_handler,
console # 同时输出到控制台
)
listener.start()
# 配置根logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(QueueHandler(log_queue))
return listener
# 使用示例
if __name__ == "__main__":
# 初始化日志系统
listener = setup_logging()
logger = logging.getLogger(__name__)
try:
# 正常业务代码
for i in range(100):
logger.info(f"Processing item {i}")
time.sleep(0.1)
# 模拟ES挂掉
if i == 50:
print("模拟ES故障...")
# 这里ES会失败,日志会自动降级到文件
finally:
# 程序退出时停止监听器
listener.stop()
关键点:
- 用
QueueHandler和QueueListener实现生产消费解耦,日志先入内存队列 - ES处理器自带连接检查和重试机制
- 失败时降级到本地文件,避免阻塞
- 队列设置大小限制,防止内存爆炸
这样即使ES挂一小时,日志也不会丢,都在本地文件里,等ES恢复了可以写个脚本把文件日志重新导入。
简单说就是:队列缓冲 + 失败降级本地文件。
1 的话,kafka 加消息从产生到消费之间延迟的检测? 2,4,查找微服务的服务发现,负载均衡相关的东西。3,守护进程通知重启吧,挂的原因是未知的,可能很难自动化。重启解决大部分问题,然后记录下重启前的事故现场,分析以便以后作规避吧。
我的经验也不多,看看楼下怎么说。
发帖的节点似乎不是很合适

