寫python的時候突然想著能不能用注解于是就寫了個這個
文章目錄
- 原始版
- 改進點
原始版
import os
import pickle
import hashlib
import inspect
import functoolsdef _generate_cache_filename(func, *args, **kwargs):"""生成緩存文件名的內部函數"""# 獲取調用來源文件的絕對路徑caller_frame = inspect.stack()[2] # 注意調整為2,跳過當前函數和調用者caller_file = os.path.abspath(caller_frame.filename)# 生成調用文件路徑的短哈希file_hash = hashlib.md5(caller_file.encode()).hexdigest()[:8]# 生成參數簽名args_repr = "_".join([repr(arg) for arg in args])kwargs_repr = "_".join([f"{k}={repr(v)}" for k, v in kwargs.items()])# 處理無參數情況param_repr = f"{args_repr}_{kwargs_repr}" if (args or kwargs) else "no_params"# 組合最終緩存文件名return f"{func.__name__}_{param_repr}_{file_hash}.pkl"def cache(func):@functools.wraps(func)def wrapper(*args, **kwargs):# 使用共享函數生成緩存文件名cache_file = _generate_cache_filename(func, *args, **kwargs)# 緩存邏輯if os.path.exists(cache_file):with open(cache_file, 'rb') as f:return pickle.load(f)result = func(*args, **kwargs)with open(cache_file, 'wb') as f:pickle.dump(result, f)print(f'緩存已保存:{cache_file}')return resultreturn wrapperdef clear_cache(func, *args, **kwargs):"""手動清除緩存文件"""# 使用共享函數生成緩存文件名cache_file = _generate_cache_filename(func, *args, **kwargs)# 刪除緩存文件if os.path.exists(cache_file):os.remove(cache_file)print(f"緩存已刪除: {cache_file}")else:print(f"緩存文件不存在: {cache_file}")# 測試用例
@cache
def get_data(a, b):print("計算數據")return a + bif __name__ == "__main__":# 第一次調用(創建緩存)print(get_data(1, 2)) # 輸出: 計算數據 和 3# 第二次調用(讀取緩存)print(get_data(1, 2)) # 無"計算數據"輸出# 清除緩存clear_cache(get_data, 1, 2) # 成功刪除# 再次調用(重新計算)print(get_data(1, 2)) # 再次輸出"計算數據"
1._generate_cache_filename用于生成緩存文件名字,inspect.stack()[2]獲取調用棧中的當前使用的文件名字,提取調用文件的絕對路徑并轉換為8位MD5哈希值。
*args和**kwargs分別轉換為字符串表示,用于區分不同參數的同名函數,當函數無參數時,使用"no_params"。
【這里需要todo一下:傳入的參數判斷是否能做為合法的文件名字】
最終生成"函數名_參數簽名_調用文件哈希.pkl"。
【todo:最終的文件名稱不能超過系統保存的最大長度】
需要確保_generate_cache_filename函數能生成唯一且合法的文件名
2.def cache(func)
簡單的緩存裝飾器,將函數的計算結果持久化到文件中
使用裝飾器模式,外層函數接受被裝飾函數作為參數
functools.wraps保留原函數的元信息
內層wrapper函數處理實際調用邏輯
通過_generate_cache_filename函數生成唯一的緩存文件名
檢查緩存文件是否存在,存在則直接讀取并返回緩存結果
否則調用原始函數獲取計算結果,使用pickle模塊序列化結果到文件,打印緩存保存信息,返回計算結果
注意:
被緩存函數的返回值必須可被pickle序列化
在多進程環境中使用時需注意文件鎖問題
緩存文件需要定期清理以避免存儲空間占用
需要todo改進:
添加緩存過期機制
支持自定義序列化方法 todo
添加緩存命中率統計
支持分布式緩存存儲 todo
3.def clear_cache(func, *args, **kwargs)
用于手動清除特定函數的緩存文件。
檢查緩存文件是否存在,若存在則刪除并打印確認信息;若不存在則提示文件不存在的狀態。
文件刪除操作不可逆,需謹慎調用。
改進點
1、合法文件名處理:
使用正則表達式移除非法字符:re.sub(r’[<>:"/\|?*\x00-\x1F]', ‘_’, name)
處理特殊字符和不可打印字符
2、文件名長度截斷:
限制文件名最大長度(255字符)
對長文件名進行智能截斷(保留首尾部分)
3、緩存過期機制:
添加expire_seconds參數控制緩存有效期
基于文件修改時間檢查過期狀態
默認過期時間為24小時
4、日志系統:
使用Python標準logging模塊
不同級別的日志(DEBUG、INFO、WARNING、ERROR)
格式化的日志輸出
5、異常處理:
捕獲并記錄文件操作中的所有異常
提供有意義的錯誤信息
緩存失敗時不影響主程序運行
6、自定義緩存目錄:
可配置的緩存目錄參數
自動創建不存在的目錄
默認目錄為./.cache
7、緩存統計:
跟蹤命中、未命中和過期次數
計算命中率
線程安全的統計計數器
按函數名查看統計信息
7、多線程/進程安全:
使用filelock庫實現跨進程文件鎖
為每個緩存文件創建對應的鎖文件
設置鎖超時時間(10秒)
8、增強的緩存清除:
清除特定參數的緩存
清除函數的所有緩存
批量刪除操作
9、附加功能:
添加了清理緩存的方法(clear_cache和clear_all_cache)
統計信息查看函數(get_cache_stats和print_cache_stats)
智能緩存路徑管理
import os
import pickle
import hashlib
import inspect
import functools
import time
import re
import logging
import threading
from collections import defaultdict
from filelock import FileLock# 設置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('cache_decorator')# 緩存統計
cache_stats = defaultdict(lambda: {'hits': 0, 'misses': 0, 'expired': 0, 'deleted': 0})
stats_lock = threading.Lock()# 默認配置
DEFAULT_CACHE_DIR = os.path.join(os.getcwd(), '.cache')
DEFAULT_EXPIRE_SECONDS = 24 * 60 * 60 # 默認過期時間: 24小時
MAX_FILENAME_LENGTH = 200 # 最大文件名長度def _sanitize_filename(name):"""移除文件名中的非法字符并截斷長度"""# 替換非法字符sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', name)# 截斷文件名if len(sanitized) > MAX_FILENAME_LENGTH:prefix = sanitized[:MAX_FILENAME_LENGTH // 2]suffix = sanitized[-MAX_FILENAME_LENGTH // 2:]sanitized = prefix + '...' + suffix# 確保最終長度不超過限制sanitized = sanitized[:MAX_FILENAME_LENGTH]return sanitizeddef _generate_cache_filename(func, *args, **kwargs):"""生成緩存文件名的內部函數"""# 獲取調用來源文件的絕對路徑caller_frame = inspect.stack()[2] # 調整堆棧深度caller_file = os.path.abspath(caller_frame.filename)# 生成調用文件路徑的短哈希file_hash = hashlib.md5(caller_file.encode()).hexdigest()[:8]# 生成參數簽名args_repr = "_".join([repr(arg) for arg in args])kwargs_repr = "_".join([f"{k}={repr(v)}" for k, v in sorted(kwargs.items())])# 處理無參數情況param_repr = f"{args_repr}_{kwargs_repr}" if (args or kwargs) else "no_params"# 組合并清理文件名raw_filename = f"{func.__name__}_{param_repr}_{file_hash}"return _sanitize_filename(raw_filename) + ".pkl"def _get_cache_file_path(cache_dir, cache_file):"""獲取緩存文件完整路徑,確保目錄存在"""# 創建緩存目錄(如果不存在)os.makedirs(cache_dir, exist_ok=True)return os.path.join(cache_dir, cache_file)def cache(expire_seconds=DEFAULT_EXPIRE_SECONDS, cache_dir=DEFAULT_CACHE_DIR):"""帶參數的緩存裝飾器Args:expire_seconds (int): 緩存過期時間(秒)cache_dir (str): 緩存文件存儲目錄"""def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):# 生成緩存文件名cache_file = _generate_cache_filename(func, *args, **kwargs)cache_path = _get_cache_file_path(cache_dir, cache_file)lock_path = cache_path + ".lock"# 使用文件鎖確保線程/進程安全with FileLock(lock_path, timeout=10):# 檢查緩存是否存在且未過期if os.path.exists(cache_path):file_age = time.time() - os.path.getmtime(cache_path)if expire_seconds is None or file_age < expire_seconds:# 緩存命中try:with open(cache_path, 'rb') as f:result = pickle.load(f)with stats_lock:cache_stats[func.__name__]['hits'] += 1logger.debug(f'緩存命中: {cache_path}')return resultexcept Exception as e:logger.warning(f'加載緩存失敗: {e}')# 緩存過期with stats_lock:cache_stats[func.__name__]['expired'] += 1logger.debug(f'緩存已過期: {cache_path}')# 緩存未命中或過期,重新計算with stats_lock:cache_stats[func.__name__]['misses'] += 1result = func(*args, **kwargs)# 保存結果到緩存try:with open(cache_path, 'wb') as f:pickle.dump(result, f)logger.debug(f'緩存已保存: {cache_path}')except Exception as e:logger.error(f'保存緩存失敗: {e}')return result# 為包裝的函數添加清除緩存的方法def clear_cache(*args, **kwargs):"""清除特定參數的緩存"""cache_file = _generate_cache_filename(func, *args, **kwargs)cache_path = _get_cache_file_path(cache_dir, cache_file)if os.path.exists(cache_path):try:os.remove(cache_path)logger.info(f'緩存已刪除: {cache_path}')with stats_lock:cache_stats[func.__name__]['deleted'] += 1return Trueexcept Exception as e:logger.error(f'刪除緩存失敗: {e}')return Falseelse:logger.warning(f'緩存文件不存在: {cache_path}')return Falsedef clear_all_cache():"""清除該函數的所有緩存"""pattern = re.compile(f"^{func.__name__}_.*\\.pkl$")cleared = 0total = 0for filename in os.listdir(cache_dir):if pattern.match(filename):total += 1file_path = os.path.join(cache_dir, filename)try:os.remove(file_path)cleared += 1with stats_lock:cache_stats[func.__name__]['deleted'] += 1except Exception as e:logger.error(f'刪除緩存失敗 {filename}: {e}')logger.info(f'已清除 {cleared}/{total} 個緩存文件')return cleareddef clear_expired_cache(expire_seconds=expire_seconds):"""清除該函數的所有過期緩存"""pattern = re.compile(f"^{func.__name__}_.*\\.pkl$")current_time = time.time()removed = 0total = 0for filename in os.listdir(cache_dir):if pattern.match(filename):total += 1file_path = os.path.join(cache_dir, filename)try:# 檢查文件是否過期mtime = os.path.getmtime(file_path)if current_time - mtime > expire_seconds:os.remove(file_path)removed += 1with stats_lock:cache_stats[func.__name__]['deleted'] += 1logger.debug(f'已刪除過期緩存: {filename}')except Exception as e:logger.error(f'處理緩存文件 {filename} 失敗: {e}')logger.info(f'已刪除 {removed}/{total} 個過期緩存文件')return removedwrapper.clear_cache = clear_cachewrapper.clear_all_cache = clear_all_cachewrapper.clear_expired_cache = clear_expired_cachereturn wrapperreturn decoratordef get_cache_stats(func_name=None):"""獲取緩存統計信息Args:func_name (str): 函數名,None 表示所有函數Returns:dict: 緩存統計信息"""with stats_lock:if func_name:return cache_stats.get(func_name, {'hits': 0, 'misses': 0, 'expired': 0, 'deleted': 0})# 計算總命中率total_stats = {'hits': 0, 'misses': 0, 'expired': 0, 'deleted': 0}for stats in cache_stats.values():for k in total_stats:total_stats[k] += stats[k]# 添加命中率百分比total = total_stats['hits'] + total_stats['misses'] + total_stats['expired']if total > 0:total_stats['hit_rate'] = total_stats['hits'] / total * 100else:total_stats['hit_rate'] = 0.0return total_statsdef print_cache_stats(func_name=None):"""打印緩存統計信息"""stats = get_cache_stats(func_name)if func_name:print(f"\n緩存統計 - {func_name}:")else:print("\n全局緩存統計:")print(f"命中次數: {stats['hits']}")print(f"未命中次數: {stats['misses']}")print(f"過期次數: {stats['expired']}")print(f"刪除次數: {stats['deleted']}")if 'hit_rate' in stats:print(f"命中率: {stats['hit_rate']:.2f}%")else:total = stats['hits'] + stats['misses'] + stats['expired']if total > 0:hit_rate = stats['hits'] / total * 100print(f"命中率: {hit_rate:.2f}%")def clear_all_expired_cache(cache_dir=DEFAULT_CACHE_DIR, expire_seconds=DEFAULT_EXPIRE_SECONDS):"""清除緩存目錄中所有過期的緩存文件Args:cache_dir (str): 緩存目錄expire_seconds (int): 過期時間(秒)"""current_time = time.time()removed = 0total = 0if not os.path.exists(cache_dir):logger.warning(f"緩存目錄不存在: {cache_dir}")return 0for filename in os.listdir(cache_dir):if filename.endswith('.pkl'):total += 1file_path = os.path.join(cache_dir, filename)try:# 檢查文件是否過期mtime = os.path.getmtime(file_path)if current_time - mtime > expire_seconds:os.remove(file_path)removed += 1with stats_lock:# 嘗試找出對應的函數名func_name = filename.split('_')[0]if func_name in cache_stats:cache_stats[func_name]['deleted'] += 1logger.debug(f'已刪除過期緩存: {filename}')except Exception as e:logger.error(f'處理緩存文件 {filename} 失敗: {e}')logger.info(f'已刪除 {removed}/{total} 個過期緩存文件')return removed# 測試用例
@cache(expire_seconds=2, cache_dir="./test_cache")
def get_data(a, b):print("計算數據")return a + bif __name__ == "__main__":# 確保測試緩存目錄存在os.makedirs("./test_cache", exist_ok=True)# 第一次調用(創建緩存)print(get_data(1, 2)) # 輸出: 計算數據 和 3# 第二次調用(讀取緩存)print(get_data(1, 2)) # 無"計算數據"輸出# 等待緩存過期time.sleep(3)# 第三次調用(緩存過期后重新計算)print(get_data(1, 2)) # 再次輸出"計算數據"# 清除特定參數緩存get_data.clear_cache(1, 2)# 第四次調用(清除后重新計算)print(get_data(1, 2)) # 輸出"計算數據"# 創建另一個緩存print(get_data(3, 4))# 等待緩存過期time.sleep(3)# 清除過期緩存(僅限get_data函數)get_data.clear_expired_cache()# 清除整個緩存目錄中的過期緩存clear_all_expired_cache("./test_cache", expire_seconds=1)# 清除所有緩存get_data.clear_all_cache()# 打印緩存統計print_cache_stats()