需求闡述
工作中使用fastStream模塊消費kafka事件的時候,遇到以下問題。1. 由于main.py模塊要用到許多其他模塊 ,main模塊,和其他模塊都有日志輸出。2. 要實現異步消費事件,但是又不希望日志打印的很亂。3. 想著在每次消費事件的時候,生成一個唯一的id,然后根據id生成一個log處理器。4. 這個獨一無二的log處理器可以透傳到其他模塊。但是我又不想通過參數的方式傳遞,那樣太麻煩了。5. 嘗試借助上下文變量的方式。
詢問deepseek
我的描述如下
我有一個需求幫我設計以日志框架 1.我使用fastStream模塊,消費kafka事件。首先框架里至少有多個自定義模塊,log.py, main.py, git.py, change.py等。 2. 這四個模塊,分別對應日志處理,主進程,git命令,change信息的處理。每個模塊都需要打印日志信息。 3. 我希望當每個事件進入到主進程后,能生成一id,這個id將生成一log處理器。這個log處理在主進程調度相應的git,change.py模塊的時候,能通過全局的方式對應使用這個事件對應id生成的log對象。有肯能是需要使用到上下文的方式來傳遞,但我不希望每次調度一個git或者change的函數或者方法的時候都將這個log處理器作為參數。我希望在主進程設置,在子進程直接使用。 4. 這個框架能夠適用于異步框架。5. log使用loguru模塊,并且需要實現每個id的日志寫到id.log文件中,并且20天自動清理一次。
# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制臺輸出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True # 確保異步安全)def add_event_logger(event_id: str):"""為事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})# git.py
from loguru import logger
from .log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作邏輯logger.bind(event_id=event_id).success("Git operation completed")# change.py
from loguru import logger
from .log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change處理邏輯logger.bind(event_id=event_id).warning("Found potential conflicts")# main.py
from faststream import Context
from loguru import logger
from .log import EventLoggerContext, configure_logger, LOG_DIR
from .git import git_operation
from .change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4()) # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主處理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 啟動FastStream消費者
from faststream.kafka import KafkaBrokerbroker = KafkaBroker("localhost:9092")
@broker.subscriber("events-topic")
async def on_event(event):await handle_event(event)
調試代碼
-
項目結構
-
main.py中的代碼
# main.py
import asynciofrom faststream import Context
from loguru import logger
from log import EventLoggerContext, configure_logger, LOG_DIR
from git import git_operation
from change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4()) # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主處理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 啟動FastStream消費者
# from faststream.kafka import KafkaBroker
#
# broker = KafkaBroker("localhost:9092")
#
#
# @broker.subscriber("events-topic")
# async def on_event(event):
# await handle_event(event)async def stress_test():events = [{"repo_url": f"https://github.com/user/repo_{i}.git","changes": "#"*i} for i in range(100)]await asyncio.gather(*[handle_event(e) for e in events])# 測試運行(在入口文件添加)
if __name__ == "__main__":setup()asyncio.run(stress_test())
- log.py中的代碼
# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制臺輸出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True # 確保異步安全)def add_event_logger(event_id: str):"""為事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})
- git.py中的代碼
# git.py
from loguru import logger
from log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作邏輯logger.bind(event_id=event_id).success("Git operation completed")
chang.py中的代碼
# change.py
from loguru import logger
from log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change處理邏輯logger.bind(event_id=event_id).warning("Found potential conflicts")