異步框架使用loguru和contextvars實現日志按Id輸出

需求闡述

工作中使用fastStream模塊消費kafka事件的時候,遇到以下問題。1. 由于main.py模塊要用到許多其他模塊 ,main模塊,和其他模塊都有日志輸出。2. 要實現異步消費事件,但是又不希望日志打印的很亂。3. 想著在每次消費事件的時候,生成一個唯一的id,然后根據id生成一個log處理器。4. 這個獨一無二的log處理器可以透傳到其他模塊。但是我又不想通過參數的方式傳遞,那樣太麻煩了。5. 嘗試借助上下文變量的方式。

詢問deepseek

我的描述如下

我有一個需求幫我設計以日志框架 1.我使用fastStream模塊,消費kafka事件。首先框架里至少有多個自定義模塊,log.py, main.py, git.py, change.py等。 2. 這四個模塊,分別對應日志處理,主進程,git命令,change信息的處理。每個模塊都需要打印日志信息。 3. 我希望當每個事件進入到主進程后,能生成一id,這個id將生成一log處理器。這個log處理在主進程調度相應的git,change.py模塊的時候,能通過全局的方式對應使用這個事件對應id生成的log對象。有肯能是需要使用到上下文的方式來傳遞,但我不希望每次調度一個git或者change的函數或者方法的時候都將這個log處理器作為參數。我希望在主進程設置,在子進程直接使用。 4. 這個框架能夠適用于異步框架。5. log使用loguru模塊,并且需要實現每個id的日志寫到id.log文件中,并且20天自動清理一次。

# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制臺輸出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True  # 確保異步安全)def add_event_logger(event_id: str):"""為事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})# git.py
from loguru import logger
from .log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作邏輯logger.bind(event_id=event_id).success("Git operation completed")# change.py
from loguru import logger
from .log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change處理邏輯logger.bind(event_id=event_id).warning("Found potential conflicts")# main.py
from faststream import Context
from loguru import logger
from .log import EventLoggerContext, configure_logger, LOG_DIR
from .git import git_operation
from .change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4())  # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主處理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 啟動FastStream消費者
from faststream.kafka import KafkaBrokerbroker = KafkaBroker("localhost:9092")
@broker.subscriber("events-topic")
async def on_event(event):await handle_event(event)

調試代碼

  • 項目結構
    在這里插入圖片描述

  • main.py中的代碼

# main.py
import asynciofrom faststream import Context
from loguru import logger
from log import EventLoggerContext, configure_logger, LOG_DIR
from git import git_operation
from change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4())  # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主處理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 啟動FastStream消費者
# from faststream.kafka import KafkaBroker
#
# broker = KafkaBroker("localhost:9092")
#
#
# @broker.subscriber("events-topic")
# async def on_event(event):
#     await handle_event(event)async def stress_test():events = [{"repo_url": f"https://github.com/user/repo_{i}.git","changes": "#"*i} for i in range(100)]await asyncio.gather(*[handle_event(e) for e in events])# 測試運行(在入口文件添加)
if __name__ == "__main__":setup()asyncio.run(stress_test())
  • log.py中的代碼
# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制臺輸出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True  # 確保異步安全)def add_event_logger(event_id: str):"""為事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})
  • git.py中的代碼
# git.py
from loguru import logger
from log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作邏輯logger.bind(event_id=event_id).success("Git operation completed")

chang.py中的代碼

# change.py
from loguru import logger
from log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change處理邏輯logger.bind(event_id=event_id).warning("Found potential conflicts")

驗證結果

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/75654.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/75654.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/75654.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【HTTPS協議】

文章目錄 一、HTTPS二、HTTPS協議五種加密方案1.只使用對稱加密2.只使用非對稱加密3.雙方都使用非對稱加密4.對稱加密非對稱加密中間人攻擊理解數字簽名CA機構和證書 5. 對稱加密非對稱加密證書認證中間人篡改證書&#xff1f;中間人調包整個證書&#xff1f; 常見問題總結 一、…

算法設計學習8

實驗目的及要求&#xff1a; 通過深入學習樹&#xff08;Tree&#xff09;和二叉樹&#xff08;Binary Tree&#xff09;這兩種重要的數據結構&#xff0c;掌握它們的基本概念、性質和操作&#xff0c;提高對樹形結構的理解和應用能力。通過本實驗&#xff0c;學生將深化對樹和…

P17_ResNeXt-50

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 一、模型結構 ResNeXt-50由多個殘差塊&#xff08;Residual Block&#xff09;組成&#xff0c;每個殘差塊包含三個卷積層。以下是模型的主要結構&#xff1…

【YOLO系列(V5-V12)通用數據集-剪刀石頭布手勢檢測數據集】

YOLO格式的剪刀石頭布手勢檢測數據集&#xff0c;適用于YOLOv5-v11所有版本&#xff0c;可以用于本科畢設、發paper、做課設等等&#xff0c;有需要的在這里獲取&#xff1a; 【YOLO系列&#xff08;V5-V12&#xff09;通用數據集-剪刀石頭布手勢檢測數據集】 數據集專欄地址&a…

基于連接池與重試機制的高效TDengine寫入方案

摘要 在時序數據庫應用場景中,如何構建穩定高效的寫入機制是核心挑戰。本文基于提供的Python代碼實現,解析一種結合連接池管理、智能重試策略和事務控制的TDengine寫入方案,并分析其技術優勢與優化方向。 一、代碼 from dbutils.pooled_db import PooledDB import timede…

抖音熱點視頻識別與分片處理機制解析

抖音作為日活數億的短視頻平臺,其熱點視頻識別和分片處理機制是支撐高并發訪問的核心技術。以下是抖音熱點視頻識別與分片的實現方案: 熱點視頻識別機制 1. 實時行為監控系統 用戶行為聚合:監控點贊、評論、分享、完播率等指標的異常增長曲線內容特征分析:通過AI識別視頻…

基于RDK X3的“校史通“機器人:SLAM導航+智能交互,讓校史館活起來!

視頻標題&#xff1a; 【校史館の新晉頂流】RDK X3機器人&#xff1a;導覽員看了直呼內卷 視頻文案&#xff1a; 跑得賊穩團隊用RDK X3整了個大活——給校史館造了個"社牛"機器人&#xff01; 基于RDK X3開發板實現智能導航與語音交互SLAM技術讓機器人自主避障不…

Metal學習筆記十三:陰影

在本章中&#xff0c;您將了解陰影。陰影表示表面上沒有光。當另一個表面或對象使對象與光線相遮擋時&#xff0c;您會看到對象上的陰影。在項目中添加陰影可使您的場景看起來更逼真&#xff0c;并提供深度感。 陰影貼圖 陰影貼圖是包含場景陰影信息的紋理。當光線照射到物體…

Matplotlib:數據可視化的藝術與科學

引言&#xff1a;讓數據開口說話 在數據分析與機器學習領域&#xff0c;可視化是理解數據的重要橋梁。Matplotlib 作為 Python 最流行的繪圖庫&#xff0c;提供了從簡單折線圖到復雜 3D 圖表的完整解決方案。本文將通過實際案例&#xff0c;帶您從基礎繪圖到高級定制全面掌握 …

Python數據可視化-第4章-圖表樣式的美化

環境 開發工具 VSCode庫的版本 numpy1.26.4 matplotlib3.10.1 ipympl0.9.7教材 本書為《Python數據可視化》一書的配套內容&#xff0c;本章為第4章 圖表樣式的美化 本章主要介紹了圖表樣式的美化&#xff0c;包括圖表樣式概述、使用顏色、選擇線型、添加數據標記、設置字體…

嵌入式海思Hi3861連接華為物聯網平臺操作方法

1.1 實驗目的 快速演示 1、認識輕量級HarmonyOS——LiteOS-M 2、初步掌握華為云物聯網平臺的使用 3、快速驅動海思Hi3861 WIFI芯片,連接互聯網并登錄物聯網平臺

如何在Redis容量限制下保持熱點數據

如何在Redis容量限制下保持熱點數據 當數據庫有100萬條數據但Redis只能保存10萬條時,需要智能的策略來確保Redis中存儲的都是最常訪問的熱點數據。以下是幾種有效的解決方案: 一、內存淘汰策略 Redis提供了多種內存淘汰機制,當內存不足時會自動刪除部分數據: 策略命令/配…

cv2.fillPoly()和cv2.polylines()

參數解釋 cv2.fillPoly() 和 cv2.polylines() 都是 OpenCV 的函數。功能是繪制多邊形&#xff0c;cv2.fillPoly()可繪制實心多邊形&#xff0c; cv2.polylines() 可繪制空心多邊形 cv2.fillPoly()用途&#xff1a;提取ROI 可在黑色圖像上&#xff0c;填充白色&#xff0c;作為…

數據庫--SQL

SQL&#xff1a;Structured Query Language&#xff0c;結構化查詢語言 SQL是用于管理關系型數據庫并對其中的數據進行一系列操作&#xff08;包括數據插入、查詢、修改刪除&#xff09;的一種語言 分類&#xff1a;數據定義語言DDL、數據操縱語言DML、數據控制語言DCL、事務處…

【python】速通筆記

Python學習路徑 - 從零基礎到入門 環境搭建 安裝Python Windows: 從官網下載安裝包 https://www.python.org/downloads/Mac/Linux: 通常已預裝&#xff0c;可通過終端輸入python3 --version檢查 配置開發環境 推薦使用VS Code或PyCharm作為代碼編輯器安裝Python擴展插件創建第…

批量刪除git本地分支和遠程分支命令

1、按照關鍵詞開頭匹配刪除遠程分支 git branch -r | grep "origin/feature/develop-1"| sed s/origin\///g | xargs -n 1 git push origin --delete git branch -r 列出所有遠端分支。 grep "origin/feature/develop-1" 模糊匹配分支名稱包含"orig…

上市電子制造企業如何實現合規的質量文件管理?

浙江潔美電子科技股份有限公司成立于2001年&#xff0c;是一家專業為片式電子元器件(被動元件、分立器件、集成電路及LED)配套生產電子薄型載帶、上下膠帶、離型膜、流延膜等產品的國家高新技術企業&#xff0c;主要產品有分切紙帶、打孔經帶、壓孔紙帶、上下膠帶、塑料載帶及其…

leetcode數組-有序數組的平方

題目 題目鏈接&#xff1a;https://leetcode.cn/problems/squares-of-a-sorted-array/ 給你一個按 非遞減順序 排序的整數數組 nums&#xff0c;返回 每個數字的平方 組成的新數組&#xff0c;要求也按 非遞減順序 排序。 輸入&#xff1a;nums [-4,-1,0,3,10] 輸出&#xff…

基于微信小程序的醫院掛號預約系統設計與實現

摘 要 現代經濟快節奏發展以及不斷完善升級的信息化技術&#xff0c;讓傳統數據信息的管理升級為軟件存儲&#xff0c;歸納&#xff0c;集中處理數據信息的管理方式。本微信小程序醫院掛號預約系統就是在這樣的大環境下誕生&#xff0c;其可以幫助管理者在短時間內處理完畢龐大…

密碼學基礎——DES算法

前面的密碼學基礎——密碼學文章中介紹了密碼學相關的概念&#xff0c;其中簡要地對稱密碼體制(也叫單鑰密碼體制、秘密密鑰體制&#xff09;進行了解釋&#xff0c;我們可以知道單鑰體制的加密密鑰和解密密鑰相同&#xff0c;單鑰密碼分為流密碼和分組密碼。 流密碼&#xff0…