在Python开发的初期,你是否也曾依赖print()来调试代码、追踪变量?# 典型的print调试代码print("开始处理用户数据...")print(f"用户ID: {user_id}")print(f"处理结果: {result}")print("处理完成!")
这种方法在小脚本中看似方便,但当项目规模扩大、进入生产环境时,其局限性立刻暴露无遗:- 日志丢失风险:控制台输出在进程重启后荡然无存,关键错误信息无法追溯
- 信息维度单一:缺乏时间戳、模块名、日志级别等关键上下文,定位问题如同大海捞针
- 性能瓶颈:高频print导致I/O阻塞,严重影响程序响应速度
- 无法分级过滤:所有信息混杂输出,重要错误被淹没在调试信息海洋中
相比之下,Python标准库的logging模块提供了完整的解决方案:- 分级管理:支持DEBUG、INFO、WARNING、ERROR、CRITICAL五级日志,按需控制输出粒度
- 灵活输出:可同时输出到控制台、文件、网络、邮件等多种目标
- 丰富上下文:自动记录时间、模块名、函数名、行号、进程ID等元数据
- 高性能优化:支持异步日志、日志轮转、过滤重复等生产级特性
- 结构化输出:可生成JSON格式日志,无缝对接ELK、Loki等现代化日志分析平台
据统计,某电商项目通过规范使用logging模块,故障排查时间从平均8小时缩短至15分钟,日志存储成本降低65%!今天,我们将从零开始,全面掌握logging模块的核心架构、配置方法和实战技巧,助你构建专业级的Python应用日志系统。logging模块采用经典的"生产者-消费者"模型,由四个核心组件构成完整的数据流管道:Logger (日志生产者) → Filter (过滤器) → Handler (处理器) → Formatter (格式化器)
| | | |
| | | |
| | | |
| | | StreamHandler, FileHandler, RotatingFileHandler等 |
| | | |
import logging# 1. 创建Logger(生产者)logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG)# 2. 创建Handler(消费者)handler = logging.StreamHandler()handler.setLevel(logging.INFO)# 3. 创建Formatter(格式化器)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')handler.setFormatter(formatter)# 4. 关联组件logger.addHandler(handler)# 5. 生产日志logger.debug("调试信息") # 不会输出(Handler级别为INFO)logger.info("程序启动") # 输出到控制台logger.error("数据库连接失败") # 输出到控制台
logging定义了5个标准日志级别,每个级别对应一个整数值:- DEBUG:仅在开发和测试环境开启,避免生产环境性能损耗
- WARNING:记录可自动恢复的异常,无需人工干预
logging模块支持基于点分隔符的Logger层级结构,实现了配置的继承与覆盖:root (根Logger)├── app│ ├── app.web│ │ ├── app.web.views│ │ └── app.web.models│ └── app.db│ ├── app.db.models│ └── app.db.utils└── third_party └── third_party.requests
- 配置继承:子Logger默认继承父Logger的级别、Handler和Filter
- 传播机制:日志会向上传递给父Logger(可通过propagate=False禁用)
# 模块级日志配置示例import logging# 在app.db.models模块中db_logger = logging.getLogger("app.db.models")db_logger.setLevel(logging.DEBUG) # 仅此模块开启DEBUG# 在app.web.views模块中web_logger = logging.getLogger("app.web.views") web_logger.setLevel(logging.INFO) # 此模块仅INFO及以上# 根Logger配置root_logger = logging.getLogger()root_logger.setLevel(logging.WARNING) # 全局默认级别
logging.basicConfig()是入门logging的最快捷方式,支持常见配置需求:import logging# 最简配置:输出到控制台,级别为WARNINGlogging.basicConfig(level=logging.WARNING)# 完整配置示例logging.basicConfig(# 日志级别 level=logging.INFO,# 输出格式 format='%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S',# 输出目标 filename='app.log', # 输出到文件 filemode='a', # 追加模式# 编码设置(防止中文乱码) encoding='utf-8',# 强制刷新 force=True)# 使用示例logging.debug("这条不会输出")logging.info("程序初始化完成")logging.warning("配置文件缺失,使用默认值")logging.error("数据库连接异常", exc_info=True)
- 单次生效:首次调用后后续配置无效,需在程序入口处一次性配置
- Handler限制:只能添加一个Handler,复杂场景需手动配置
- 线程安全:模块本身线程安全,但basicConfig非线程安全
对于生产环境,推荐使用logging.config.dictConfig()进行集中式配置管理:import logging.configLOGGING_CONFIG = {'version': 1,'disable_existing_loggers': False,# 格式化器定义'formatters': {'standard': {'format': '%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s','datefmt': '%Y-%m-%d %H:%M:%S' },'json': {'()': 'pythonjsonlogger.jsonlogger.JsonFormatter','format': '%(asctime)s %(name)s %(levelname)s %(module)s %(funcName)s %(lineno)d %(message)s' } },# 处理器定义'handlers': {'console': {'class': 'logging.StreamHandler','level': 'INFO','formatter': 'standard','stream': 'ext://sys.stdout' },'file': {'class': 'logging.handlers.RotatingFileHandler','level': 'INFO','formatter': 'standard','filename': 'logs/app.log','maxBytes': 10485760, # 10MB'backupCount': 5,'encoding': 'utf-8' },'error_file': {'class': 'logging.handlers.RotatingFileHandler','level': 'ERROR','formatter': 'standard','filename': 'logs/error.log','maxBytes': 10485760,'backupCount': 3,'encoding': 'utf-8' },'json_file': {'class': 'logging.handlers.TimedRotatingFileHandler','level': 'INFO','formatter': 'json','filename': 'logs/app.json','when': 'midnight','backupCount': 30,'encoding': 'utf-8' } },# Logger定义'loggers': {'': { # 根Logger'handlers': ['console', 'file', 'error_file'],'level': 'INFO','propagate': True },'app.db': {'handlers': ['file', 'json_file'],'level': 'DEBUG','propagate': False,'qualname': 'app.db' },'app.web': {'handlers': ['console', 'json_file'],'level': 'INFO','propagate': False } }}# 应用配置logging.config.dictConfig(LOGGING_CONFIG)# 获取Logger并使用logger = logging.getLogger(__name__)logger.info("日志系统初始化完成")
- 配置集中管理:所有日志配置在单一字典中,便于维护和版本控制
- 复杂场景支持:支持多Handler、多Formatter、Logger层级等复杂需求
logging.config.fileConfig()支持从配置文件加载配置,适合已有配置体系的项目:# logging.conf 配置文件示例[loggers]keys=root,app_db,app_web[handlers]keys=consoleHandler,fileHandler,errorHandler[formatters]keys=simpleFormatter,detailedFormatter# Logger配置[logger_root]level=INFOhandlers=consoleHandler,fileHandler[logger_app_db]level=DEBUGhandlers=fileHandlerqualname=app.dbpropagate=0[logger_app_web]level=INFOhandlers=consoleHandler,errorHandlerqualname=app.webpropagate=0# Handler配置[handler_consoleHandler]class=StreamHandlerlevel=INFOformatter=simpleFormatterargs=(sys.stdout,)[handler_fileHandler]class=handlers.RotatingFileHandlerlevel=DEBUGformatter=detailedFormatterargs=('logs/app.log', 'a', 10485760, 5)[handler_errorHandler]class=handlers.RotatingFileHandlerlevel=ERRORformatter=detailedFormatterargs=('logs/error.log', 'a', 5242880, 3)# Formatter配置[formatter_simpleFormatter]format=%(asctime)s - %(levelname)s - %(message)sdatefmt=%Y-%m-%d %H:%M:%S[formatter_detailedFormatter]format=%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)sdatefmt=%Y-%m-%d %H:%M:%S
# Python代码加载配置import logging.configlogging.config.fileConfig('logging.conf')logger = logging.getLogger(__name__)
3.1 Web应用日志配置(Flask/Django)import loggingfrom logging.handlers import RotatingFileHandler, TimedRotatingFileHandlerfrom flask import Flaskapp = Flask(__name__)defsetup_flask_logging():"""配置Flask应用日志"""# 禁用Flask默认日志处理器 app.logger.handlers.clear()# 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(console_formatter)# 创建文件处理器(按大小轮转) file_handler = RotatingFileHandler('logs/flask_app.log', maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setLevel(logging.DEBUG) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s' ) file_handler.setFormatter(file_formatter)# 创建错误日志处理器(按时间轮转) error_handler = TimedRotatingFileHandler('logs/error.log', when='midnight', backupCount=30, encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(file_formatter)# 为Flask Logger添加处理器 app.logger.addHandler(console_handler) app.logger.addHandler(file_handler) app.logger.addHandler(error_handler) app.logger.setLevel(logging.DEBUG)# 记录启动信息 app.logger.info("Flask应用日志系统初始化完成")# 配置日志setup_flask_logging()@app.route('/')defindex(): app.logger.info("首页访问请求")return"Hello, World!"@app.route('/api/data')defget_data():try:# 模拟业务逻辑 data = {"status": "success", "data": []} app.logger.debug(f"API响应数据: {data}")return dataexcept Exception as e: app.logger.error(f"API处理异常: {str(e)}", exc_info=True)return {"status": "error", "message": "内部服务器错误"}, 500
# settings.py 中的LOGGING配置LOGGING = {'version': 1,'disable_existing_loggers': False,'formatters': {'verbose': {'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}','style': '{', },'simple': {'format': '{levelname} {message}','style': '{', }, },'handlers': {'console': {'level': 'INFO','class': 'logging.StreamHandler','formatter': 'simple' },'file': {'level': 'DEBUG','class': 'logging.handlers.RotatingFileHandler','filename': 'logs/django.log','maxBytes': 1024*1024*10, # 10MB'backupCount': 10,'formatter': 'verbose','encoding': 'utf-8' },'error_file': {'level': 'ERROR','class': 'logging.handlers.RotatingFileHandler','filename': 'logs/error.log','maxBytes': 1024*1024*5, # 5MB'backupCount': 5,'formatter': 'verbose','encoding': 'utf-8' },'mail_admins': {'level': 'ERROR','class': 'django.utils.log.AdminEmailHandler','include_html': True, } },'loggers': {'django': {'handlers': ['console', 'file'],'level': 'INFO','propagate': True, },'django.request': {'handlers': ['error_file', 'mail_admins'],'level': 'ERROR','propagate': False, },'myapp': {'handlers': ['console', 'file'],'level': 'DEBUG','propagate': False, } }}
在多进程环境中,直接使用logging可能导致日志混乱或丢失。以下方案确保多进程日志安全:import loggingimport logging.handlersimport osfrom multiprocessing import Processdefworker_process(worker_id):"""工作进程日志配置"""# 为每个进程创建独立的Logger logger = logging.getLogger(f"worker_{worker_id}") logger.setLevel(logging.DEBUG)# 进程专属日志文件 handler = logging.handlers.RotatingFileHandler(f'logs/worker_{worker_id}.log', maxBytes=10*1024*1024, backupCount=3, encoding='utf-8' ) formatter = logging.Formatter('%(asctime)s - PID:%(process)d - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter)# 移除可能继承的Handler,避免重复 logger.handlers.clear() logger.addHandler(handler)# 记录进程启动 logger.info(f"工作进程 {worker_id} 启动,PID: {os.getpid()}")# 模拟工作for i in range(5): logger.debug(f"处理任务 {i}") logger.info("工作进程完成")defmain():"""主进程日志配置""" main_logger = logging.getLogger("main") main_logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(message)s')) main_logger.addHandler(console_handler) main_logger.info("启动多进程任务...")# 创建并启动工作进程 processes = []for i in range(3): p = Process(target=worker_process, args=(i,)) p.start() processes.append(p)# 等待所有进程完成for p in processes: p.join() main_logger.info("所有工作进程完成")if __name__ == '__main__': main()
import loggingimport logging.handlersimport multiprocessingfrom logging.handlers import SocketHandler, QueueHandler, QueueListenerimport queueimport threading# 日志服务器配置LOG_HOST = 'localhost'LOG_PORT = 9020deflog_server():"""日志服务器进程:接收并处理所有进程的日志"""import socketserverimport structimport pickleclassLogRecordStreamHandler(socketserver.StreamRequestHandler):defhandle(self):whileTrue: chunk = self.connection.recv(4)if len(chunk) < 4:break slen = struct.unpack('>L', chunk)[0] chunk = self.connection.recv(slen)while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) obj = pickle.loads(chunk) record = logging.makeLogRecord(obj) logger = logging.getLogger(record.name) logger.handle(record) server = socketserver.TCPServer((LOG_HOST, LOG_PORT), LogRecordStreamHandler) print(f"日志服务器启动于 {LOG_HOST}:{LOG_PORT}") server.serve_forever()defworker_with_socket_logging(worker_id):"""工作进程:通过Socket发送日志""" logger = logging.getLogger(f"worker_{worker_id}") logger.setLevel(logging.DEBUG)# 创建SocketHandler连接到日志服务器 socket_handler = SocketHandler(LOG_HOST, LOG_PORT) logger.addHandler(socket_handler) logger.info(f"工作进程 {worker_id} 启动")# ... 工作逻辑defsetup_queue_logging():"""使用QueueHandler的异步日志配置""" log_queue = queue.Queue(-1) # 无界队列# 创建处理器 console_handler = logging.StreamHandler() file_handler = logging.handlers.RotatingFileHandler('logs/app.log', maxBytes=10*1024*1024, backupCount=5 )# 创建QueueListener管理多个处理器 listener = QueueListener( log_queue, console_handler, file_handler, respect_handler_level=True ) listener.start()# 为Logger配置QueueHandler queue_handler = QueueHandler(log_queue) root_logger = logging.getLogger() root_logger.addHandler(queue_handler) root_logger.setLevel(logging.DEBUG)return listener# 使用示例if __name__ == '__main__':# 启动日志服务器进程import multiprocessing server_process = multiprocessing.Process(target=log_server) server_process.daemon = True server_process.start()# 配置并启动工作进程 processes = []for i in range(3): p = multiprocessing.Process(target=worker_with_socket_logging, args=(i,)) p.start() processes.append(p)for p in processes: p.join()
现代日志系统强调可观测性,ELK(Elasticsearch+Logstash+Kibana)栈是行业标准。logging模块通过JSON格式日志无缝集成:# 安装python-json-logger# pip install python-json-loggerimport loggingfrom pythonjsonlogger import jsonloggerclassStructuredJsonFormatter(jsonlogger.JsonFormatter):"""自定义JSON日志格式化器"""def__init__(self, *args, **kwargs):# 定义JSON字段顺序和格式 super().__init__( fmt='%(asctime)s %(name)s %(levelname)s %(module)s %(funcName)s %(lineno)d %(message)s %(extra)s', datefmt='%Y-%m-%dT%H:%M:%S%z', # ISO8601格式 *args, **kwargs )defadd_fields(self, log_record, record, message_dict):"""添加自定义字段""" super().add_fields(log_record, record, message_dict)# 添加应用特定字段 log_record['app_name'] = 'my_python_app' log_record['environment'] = 'production'# 可从环境变量读取 log_record['service_version'] = '1.0.0'# 添加请求上下文(如有)if hasattr(record, 'request_id'): log_record['request_id'] = record.request_idif hasattr(record, 'user_id'): log_record['user_id'] = record.user_id# 确保时间戳格式正确 log_record['timestamp'] = self.formatTime(record)defsetup_elk_logging():"""配置ELK友好的日志系统"""# 创建根Logger logger = logging.getLogger() logger.setLevel(logging.INFO)# 1. 控制台输出(开发环境) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(console_formatter)# 2. JSON文件输出(ELK采集) json_handler = logging.handlers.RotatingFileHandler('logs/app.json', maxBytes=10*1024*1024, backupCount=10, encoding='utf-8' ) json_handler.setLevel(logging.INFO) json_formatter = StructuredJsonFormatter() json_handler.setFormatter(json_formatter)# 3. 错误日志单独输出 error_handler = logging.handlers.RotatingFileHandler('logs/error.json', maxBytes=5*1024*1024, backupCount=5, encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(json_formatter)# 添加所有处理器 logger.addHandler(console_handler) logger.addHandler(json_handler) logger.addHandler(error_handler)return logger# 使用示例logger = setup_elk_logging()# 记录结构化日志logger.info("用户登录成功", extra={'user_id': 12345,'username': 'john_doe','login_method': 'password','ip_address': '192.168.1.100'})try:# 模拟业务逻辑 result = complex_operation() logger.info("业务处理完成", extra={'operation_id': 'op_001','processing_time_ms': 150,'result_status': 'success' })except Exception as e: logger.error("业务处理异常", extra={'operation_id': 'op_001','error_type': type(e).__name__,'error_message': str(e) }, exc_info=True)
步骤2:Logstash配置(logstash.conf)input { file { path => "/path/to/logs/app.json" codec => json { charset => "UTF-8" } start_position => "beginning" sincedb_path => "/dev/null" }}filter {# 解析时间戳 date { match => [ "timestamp", "ISO8601" ] target => "@timestamp" }# 添加索引字段 mutate { add_field => {"[@metadata][index]" => "python-app-%{+YYYY.MM.dd}" } }# 移除原始时间戳字段(可选) mutate { remove_field => [ "timestamp" ] }}output { elasticsearch { hosts => ["localhost:9200"] index => "%{[@metadata][index]}" }# 开发环境同时输出到控制台 stdout { codec => rubydebug }}
同步日志在高并发场景下可能成为性能瓶颈。QueueHandler提供异步解决方案:import loggingimport logging.handlersimport queueimport threadingimport timeclassAsyncLoggingSystem:"""异步日志系统"""def__init__(self): self.log_queue = queue.Queue(-1) self.listener = Nonedefsetup(self):"""初始化异步日志"""# 创建实际处理日志的Handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(console_formatter) file_handler = logging.handlers.RotatingFileHandler('logs/async_app.log', maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(console_formatter)# 创建QueueListener管理多个Handler self.listener = logging.handlers.QueueListener( self.log_queue, console_handler, file_handler, respect_handler_level=True ) self.listener.start()# 配置QueueHandler queue_handler = logging.handlers.QueueHandler(self.log_queue)# 获取根Logger并配置 root_logger = logging.getLogger() root_logger.addHandler(queue_handler) root_logger.setLevel(logging.DEBUG)# 记录启动信息 root_logger.info("异步日志系统启动完成")return root_loggerdefshutdown(self):"""关闭日志系统"""if self.listener: self.listener.stop()# 性能对比测试defperformance_test():"""同步vs异步日志性能测试"""# 1. 同步日志配置 sync_logger = logging.getLogger('sync') sync_handler = logging.StreamHandler() sync_logger.addHandler(sync_handler) sync_logger.setLevel(logging.INFO)# 2. 异步日志配置 async_system = AsyncLoggingSystem() async_logger = async_system.setup()deftest_logging(logger, name, count=10000):"""测试日志性能""" start = time.time()for i in range(count): logger.info(f"测试日志 {i}") end = time.time() print(f"{name}: {count}条日志耗时 {end-start:.4f}秒")return end - start print("=== 日志性能对比测试 ===")# 同步测试 sync_time = test_logging(sync_logger, "同步日志")# 异步测试 async_time = test_logging(async_logger, "异步日志")# 计算性能提升 improvement = (sync_time - async_time) / sync_time * 100 print(f"性能提升: {improvement:.1f}%")# 清理 async_system.shutdown()# 运行测试if __name__ == '__main__': performance_test()
import loggingfrom logging.handlers import RotatingFileHandler, TimedRotatingFileHandlerdefsetup_rotation_logging():"""配置日志轮转""" logger = logging.getLogger() logger.setLevel(logging.DEBUG)# 1. 按大小轮转:单个文件超过10MB时创建新文件,保留5个备份 size_handler = RotatingFileHandler('logs/app_size.log', maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) size_handler.setLevel(logging.INFO) size_handler.setFormatter( logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') )# 2. 按时间轮转:每天午夜创建新文件,保留30天日志 time_handler = TimedRotatingFileHandler('logs/app_time.log', when='midnight', # 可选:'S'(秒), 'M'(分), 'H'(时), 'D'(天), 'W'(周) interval=1, # 间隔1天 backupCount=30, encoding='utf-8' ) time_handler.setLevel(logging.INFO) time_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') )# 3. 错误日志单独轮转(按大小) error_handler = RotatingFileHandler('logs/error.log', maxBytes=5*1024*1024, backupCount=3, encoding='utf-8' ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s') )# 添加所有处理器 logger.addHandler(size_handler) logger.addHandler(time_handler) logger.addHandler(error_handler)return logger# 使用示例logger = setup_rotation_logging()# 模拟日志生成for i in range(100000): logger.info(f"处理第{i}条记录")if i % 1000 == 0: logger.debug(f"详细处理信息: {i}")if i % 10000 == 0: logger.error(f"模拟错误: {i}")
import loggingimport reclassSensitiveInfoFilter(logging.Filter):"""敏感信息过滤器"""def__init__(self): super().__init__()# 定义敏感信息模式 self.patterns = [ (r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', '****-****-****-****'), # 信用卡号 (r'\b\d{3}[- ]?\d{2}[- ]?\d{4}\b', '***-**-****'), # 美国SSN (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '***@***.***'), # 邮箱 (r'\b\d{10,11}\b', '***********'), # 手机号 (r'\b[A-Z]{2}\d{6,8}[A-Z]?\b', '*********'), # 护照号 ]deffilter(self, record):"""过滤敏感信息"""try:# 过滤消息 record.msg = self._sanitize(record.msg)# 过滤额外字段if hasattr(record, 'extra') and isinstance(record.extra, dict):for key, value in record.extra.items():if isinstance(value, str): record.extra[key] = self._sanitize(value)except Exception as e:# 过滤失败不影响日志记录passreturnTruedef_sanitize(self, text):"""脱敏处理"""ifnot isinstance(text, str):return text result = textfor pattern, replacement in self.patterns: result = re.sub(pattern, replacement, result)return resultclassSecurityAwareLogger:"""安全感知日志器"""def__init__(self, name): self.logger = logging.getLogger(name) self.logger.setLevel(logging.INFO)# 添加敏感信息过滤器 sensitive_filter = SensitiveInfoFilter() self.logger.addFilter(sensitive_filter)# 配置处理器 self._setup_handlers()def_setup_handlers(self):"""配置处理器""" console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) self.logger.addHandler(console_handler)deflog_user_action(self, user_id, action, details=None):"""记录用户操作(自动脱敏)""" extra = {'user_id': user_id,'action': action,'ip_address': self._get_client_ip(), # 假设有方法获取IP'user_agent': self._get_user_agent() # 假设有方法获取User-Agent }if details: extra.update(details) self.logger.info(f"用户操作: {action}", extra=extra)def_get_client_ip(self):"""获取客户端IP(示例)"""return'192.168.1.100'def_get_user_agent(self):"""获取User-Agent(示例)"""return'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'def__getattr__(self, name):"""代理到实际logger的方法"""return getattr(self.logger, name)# 使用示例secure_logger = SecurityAwareLogger('security')# 安全记录用户操作(敏感信息自动脱敏)secure_logger.log_user_action( user_id=12345, action='password_change', details={'old_password': 'MySecret123!', # 会被过滤'new_password': 'NewPass456@', # 会被过滤'email': 'user@example.com'# 会被脱敏 })# 输出示例(脱敏后):# 2026-03-21 10:30:00 - security - INFO - 用户操作: password_change# 额外信息: {'user_id': 12345, 'action': 'password_change', # 'old_password': '********', 'new_password': '********',# 'email': '***@***.***', 'ip_address': '192.168.1.100',# 'user_agent': 'Mozilla/5.0 ...'}
- 多次调用basicConfig()或重复添加Handler
import loggingdeffix_duplicate_logs():"""解决日志重复输出问题"""# 获取根Logger root_logger = logging.getLogger()# 方案1:清除所有现有Handler root_logger.handlers.clear()# 方案2:禁用传播(针对特定Logger) app_logger = logging.getLogger('myapp') app_logger.propagate = False# 阻止向上传播# 方案3:统一配置入口defsetup_logging_once():ifnot root_logger.handlers: # 仅当无Handler时配置 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(message)s') console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) root_logger.setLevel(logging.INFO) setup_logging_once()# 方案4:使用模块级单例配置classLoggingConfig: _configured = False @classmethoddefsetup(cls):if cls._configured:return# 配置逻辑... cls._configured = True LoggingConfig.setup()return root_logger# 测试logger = fix_duplicate_logs()logger.info("这条日志应该只出现一次")
问题现象: DEBUG日志在生产环境输出,或ERROR日志不输出import loggingdefdiagnose_logging_levels():"""诊断日志级别问题""" logger = logging.getLogger('test') print("=== 当前Logger配置 ===") print(f"Logger名称: {logger.name}") print(f"Logger级别: {logger.level} ({logging.getLevelName(logger.level)})") print(f"父Logger: {logger.parent}") print(f"传播启用: {logger.propagate}") print(f"Handler数量: {len(logger.handlers)}")# 检查所有Handler级别for i, handler in enumerate(logger.handlers): print(f"\nHandler {i}:") print(f" 类型: {type(handler).__name__}") print(f" 级别: {handler.level} ({logging.getLevelName(handler.level)})")# 检查根Logger配置 root_logger = logging.getLogger() print(f"\n=== 根Logger配置 ===") print(f"根Logger级别: {root_logger.level} ({logging.getLevelName(root_logger.level)})") print(f"根Logger Handler数量: {len(root_logger.handlers)}")# 测试各级别输出 print("\n=== 级别测试 ===") logger.setLevel(logging.DEBUG) test_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']for level_name in test_levels: level = getattr(logging, level_name) logger.log(level, f"测试 {level_name} 级别")return logger# 常见配置错误示例defcommon_mistakes():"""常见配置错误"""# 错误1:basicConfig后添加Handler logging.basicConfig(level=logging.WARNING) logger = logging.getLogger() console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) # 这会覆盖basicConfig的级别! logger.addHandler(console_handler)# 错误2:Logger级别高于Handler级别 logger = logging.getLogger('test') logger.setLevel(logging.ERROR) # Logger只允许ERROR及以上 handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) # Handler想处理DEBUG及以上 logger.addHandler(handler)# 结果:DEBUG日志被Logger过滤,无法到达Handler# 正确配置:统一设置级别 logger = logging.getLogger('correct') logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setLevel(logging.INFO) # 与Logger级别一致 logger.addHandler(handler)# 运行诊断diagnose_logging_levels()
import loggingimport threadingimport timefrom concurrent.futures import ThreadPoolExecutordefthread_safe_logging_setup():"""线程安全的日志配置"""# logging模块本身是线程安全的,但需注意:# 1. 在主线程初始化配置# 2. 避免在多个线程中重复配置 logger = logging.getLogger('thread_safe')# 使用线程锁保护Handler添加(如果可能被多线程调用) lock = threading.Lock()defadd_handler_safely(handler):with lock:if handler notin logger.handlers: logger.addHandler(handler)# 创建处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(formatter)# 安全添加 add_handler_safely(console_handler) logger.setLevel(logging.DEBUG)return loggerdefmulti_thread_logging_test():"""多线程日志测试""" logger = thread_safe_logging_setup()defworker(worker_id):"""工作线程函数"""for i in range(5): logger.info(f"工作线程 {worker_id} - 任务 {i}") time.sleep(0.01) # 模拟工作# 创建线程池with ThreadPoolExecutor(max_workers=3, thread_name_prefix='Worker') as executor: futures = [executor.submit(worker, i) for i in range(3)]# 等待所有线程完成for future in futures: future.result() logger.info("所有线程完成")# 运行测试if __name__ == '__main__': multi_thread_logging_test()
# dev_logging.pyimport logginglogging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# prod_logging.pyimport loggingimport logging.configimport osLOGGING_CONFIG = {'version': 1,'disable_existing_loggers': False,'formatters': {'json': {'()': 'pythonjsonlogger.jsonlogger.JsonFormatter','format': '%(timestamp)s %(name)s %(levelname)s %(module)s %(funcName)s %(lineno)d %(message)s %(extra)s' } },'handlers': {'console': {'class': 'logging.StreamHandler','level': 'INFO','formatter': 'json' },'file': {'class': 'logging.handlers.RotatingFileHandler','level': 'INFO','formatter': 'json','filename': f'/var/log/{os.getenv("APP_NAME", "app")}/app.log','maxBytes': 10485760, # 10MB'backupCount': 10,'encoding': 'utf-8' } },'loggers': {'': { # 根Logger'handlers': ['console', 'file'],'level': 'INFO' } }}logging.config.dictConfig(LOGGING_CONFIG)
- 合理分级:生产环境默认开启INFO及以上,DEBUG仅用于开发
- 模块化命名:使用__name__作为Logger名称,自动创建层级
- 结构化输出:使用JSON格式,便于日志分析平台处理
- 上下文丰富:包含时间戳、模块名、函数名、行号等关键元数据
- 异步处理:高并发场景使用QueueHandler消除I/O阻塞
- 错误完整:ERROR日志必须包含异常堆栈和业务上下文
- 性能考量:高频路径先判断级别,避免不必要的字符串拼接
import loggingimport logging.handlersfrom pythonjsonlogger import jsonloggerimport requestsclassMonitoringIntegration:"""监控平台集成"""def__init__(self, webhook_url=None): self.webhook_url = webhook_urldefsetup_monitoring_logger(self):"""配置监控专用Logger""" logger = logging.getLogger('monitoring') logger.setLevel(logging.WARNING)# JSON格式处理器 json_handler = logging.handlers.RotatingFileHandler('logs/monitoring.json', maxBytes=5*1024*1024, backupCount=7, encoding='utf-8' ) json_formatter = jsonlogger.JsonFormatter('%(timestamp)s %(name)s %(levelname)s %(module)s %(funcName)s %(lineno)d %(message)s %(extra)s', datefmt='%Y-%m-%dT%H:%M:%S%z' ) json_handler.setFormatter(json_formatter)# 可选:Webhook通知if self.webhook_url: webhook_handler = self._create_webhook_handler() logger.addHandler(webhook_handler) logger.addHandler(json_handler)return loggerdef_create_webhook_handler(self):"""创建Webhook处理器"""classWebhookHandler(logging.Handler):defemit(self, record):try: log_entry = {'timestamp': self.formatTime(record),'level': record.levelname,'message': record.getMessage(),'module': record.module,'service': 'python_app' }# 添加额外字段if hasattr(record, 'extra'): log_entry.update(record.extra)# 发送到监控平台 requests.post( self.webhook_url, json=log_entry, timeout=5 )except Exception:pass# 避免日志失败导致程序崩溃return WebhookHandler()# 使用示例monitoring = MonitoringIntegration( webhook_url=os.getenv('MONITORING_WEBHOOK'))monitor_logger = monitoring.setup_monitoring_logger()# 记录关键指标monitor_logger.warning("API响应时间超过阈值", extra={'endpoint': '/api/data','response_time_ms': 2500,'threshold_ms': 2000,'request_count': 1500})monitor_logger.error("数据库连接池耗尽", extra={'database': 'primary_db','pool_size': 100,'active_connections': 100,'waiting_requests': 15})
- Python日志操作手册:Logging HOWTO
- Python日志配置字典格式:Configuration dictionary schema
- python-json-logger:JSON格式化器
掌握Python logging模块,意味着你从"脚本编写者"进化为"系统构建者"。一套完善的日志系统不仅是调试工具,更是:从今天开始,告别随意的print调试,拥抱专业的日志管理。记住:好的日志,是写给未来的自己和团队的情书——它在你最需要的时候,提供最准确的信息。技术进阶永无止境,但扎实的基础是你走得更远的保证。 希望这篇深度解析能成为你在Python日志管理道路上的得力助手!
下一篇预告:《Python asyncio深度解析:掌握高并发编程的现代武器》关注"Python与AI智能研习社",每天认识一个Python模块,持续提升开发技能!