以下是一個重構后的高可用、可配置、低耦合的專利CSV處理函數,包含清晰的注釋和結構:
import csv
import pandas as pd
from datetime import datetime
import os
from typing import List, Dict, Any, Optional, Tuple
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)class PatentProcessor:"""專利數據處理器"""def __init__(self, config: Optional[Dict[str, Any]] = None):"""初始化專利處理器Args:config: 配置字典,包含處理選項"""self.config = config or self.get_default_config()@staticmethoddef get_default_config() -> Dict[str, Any]:"""獲取默認配置"""return {'required_columns': ['id', 'inventor/author', 'title', 'priority date'],'author_column': 'inventor/author','split_separator': ',','filter_condition': None, # 例如: {'column': 'assignee', 'value': '百度', 'case_sensitive': False}'output_columns': ['author', 'value', 'id', 'title_list', 'priority_date_list', 'start_year', 'end_year'],'encoding': 'utf-8'}def load_csv_data(self, csv_filepath: str) -> Optional[pd.DataFrame]:"""加載CSV文件數據Args:csv_filepath: CSV文件路徑Returns:pandas DataFrame 或 None(如果加載失敗)"""try:data = []with open(csv_filepath, encoding=self.config['encoding']) as f:reader = csv.reader(f)for row in reader:data.append(row)if len(data) < 2:logger.warning(f"File {csv_filepath} has insufficient data rows")return None# 使用第二行作為列名,第三行開始作為數據df = pd.DataFrame(data[2:], columns=data[1])logger.info(f"Successfully loaded CSV file: {csv_filepath}")return dfexcept FileNotFoundError:logger.error(f"Error: The file {csv_filepath} does not exist.")except Exception as e:logger.error(f"An unexpected error occurred while loading {csv_filepath}: {e}")return Nonedef apply_filters(self, df: pd.DataFrame) -> pd.DataFrame:"""應用數據過濾條件Args:df: 輸入DataFrameReturns:過濾后的DataFrame"""filter_condition = self.config.get('filter_condition')if filter_condition:column = filter_condition['column']value = filter_condition['value']case_sensitive = filter_condition.get('case_sensitive', False)if column in df.columns:if case_sensitive:df = df[df[column].str.contains(value, na=False)]else:df = df[df[column].str.contains(value, case=False, na=False)]logger.info(f"Applied filter: {filter_condition}")return dfdef convert_to_excel(self, df: pd.DataFrame, csv_filepath: str) -> str:"""將DataFrame轉換為Excel文件Args:df: 輸入DataFramecsv_filepath: 原始CSV文件路徑(用于生成輸出路徑)Returns:生成的Excel文件路徑"""try:excel_filepath = csv_filepath.replace('.csv', '.xlsx')df.to_excel(excel_filepath, index=False)logger.info(f"CSV file has been converted to Excel: {excel_filepath}")return excel_filepathexcept Exception as e:logger.error(f"Error converting to Excel: {e}")return ""def process_authors(self, df: pd.DataFrame) -> pd.DataFrame:"""處理作者數據,進行統計和分析Args:df: 包含專利數據的DataFrameReturns:作者統計DataFrame"""# 選擇需要的列required_cols = self.config['required_columns']missing_cols = [col for col in required_cols if col not in df.columns]if missing_cols:logger.warning(f"Missing columns: {missing_cols}. Available columns: {list(df.columns)}")required_cols = [col for col in required_cols if col in df.columns]df = df[required_cols].copy()# 分割作者列author_col = self.config['author_column']if author_col in df.columns:df[author_col] = df[author_col].str.split(self.config['split_separator'])df = df.explode(author_col)df[author_col] = df[author_col].str.strip()# 統計作者出現次數author_counts = df[author_col].value_counts()new_df = pd.DataFrame({'author': author_counts.index,'value': author_counts.values})# 修復:使用列表推導式而不是直接賦值new_df['id'] = [df[df[author_col] == author]['id'].tolist() for author in new_df['author']]new_df['title_list'] = [df[df[author_col] == author]['title'].tolist() for author in new_df['author']]if 'priority date' in df.columns:new_df['priority_date_list'] = [df[df[author_col] == author]['priority date'].tolist() for author in new_df['author']]# 計算開始和結束年份date_ranges = [self.calculate_date_range(dates) for dates in new_df['priority_date_list']]new_df['start_year'] = [start for start, _ in date_ranges]new_df['end_year'] = [end for _, end in date_ranges]return new_df@staticmethoddef calculate_date_range(date_list: List[str]) -> Tuple[Optional[int], Optional[int]]:"""計算日期列表的開始和結束年份Args:date_list: 日期字符串列表Returns:(開始年份, 結束年份) 元組"""valid_dates = []for date_str in date_list:if pd.notna(date_str) and date_str.strip():try:# 嘗試多種日期格式date_obj = datetime.strptime(date_str.strip(), '%Y-%m-%d')valid_dates.append(date_obj)except ValueError:try:date_obj = datetime.strptime(date_str.strip(), '%Y/%m/%d')valid_dates.append(date_obj)except ValueError:# 如果無法解析日期,跳過continueif not valid_dates:return None, Nonemin_date = min(valid_dates)max_date = max(valid_dates)return min_date.year, max_date.yeardef save_author_stats(self, author_df: pd.DataFrame, csv_filepath: str) -> str:"""保存作者統計結果Args:author_df: 作者統計DataFramecsv_filepath: 原始CSV文件路徑(用于生成輸出路徑)Returns:生成的統計文件路徑"""try:rank_excel_filepath = csv_filepath.replace('.csv', '_rank.xlsx')# 只保存配置中指定的列output_cols = [col for col in self.config['output_columns'] if col in author_df.columns]author_df[output_cols].to_excel(rank_excel_filepath, index=False)logger.info(f"Author statistics saved: {rank_excel_filepath}")return rank_excel_filepathexcept Exception as e:logger.error(f"Error saving author statistics: {e}")return ""def process_patent_file(self, csv_filepath: str) -> Dict[str, str]:"""處理單個專利CSV文件Args:csv_filepath: CSV文件路徑Returns:包含輸出文件路徑的字典"""results = {'original_file': csv_filepath}# 1. 加載數據df = self.load_csv_data(csv_filepath)if df is None or df.empty:return results# 2. 應用過濾條件df = self.apply_filters(df)# 3. 轉換為Excelexcel_path = self.convert_to_excel(df, csv_filepath)results['excel_file'] = excel_path# 4. 處理作者數據author_df = self.process_authors(df)# 5. 保存作者統計stats_path = self.save_author_stats(author_df, csv_filepath)results['stats_file'] = stats_pathreturn results# 使用示例
def process_patents_folder(folder_path: str, file_list: List[str], config: Optional[Dict[str, Any]] = None):"""處理文件夾中的多個專利CSV文件Args:folder_path: 文件夾路徑file_list: 要處理的文件列表config: 處理配置"""processor = PatentProcessor(config)for filename in file_list:file_path = os.path.join(folder_path, filename)logger.info(f"Processing file: {filename}")try:results = processor.process_patent_file(file_path)logger.info(f"Completed processing: {filename}")logger.info(f"Results: {results}")except Exception as e:logger.error(f"Error processing {filename}: {e}")# 配置示例
CUSTOM_CONFIG = {'required_columns': ['id', 'inventor/author', 'title', 'priority date'],'author_column': 'inventor/author','split_separator': ',','filter_condition': None, # 例如: {'column': 'assignee', 'value': 'Google', 'case_sensitive': False}'output_columns': ['author', 'value', 'id', 'title_list', 'priority_date_list', 'start_year', 'end_year'],'encoding': 'utf-8'}if __name__ == "__main__":# 使用示例folder = r'E:files\patents'gp_files = ["gp-search-google.csv",]# 使用默認配置處理文件process_patents_folder(folder, gp_files)# 使用自定義配置處理文件# process_patents_folder(folder, gp_files, CUSTOM_CONFIG)
這個重構版本具有以下優點:
- 模塊化設計:將功能拆分為多個獨立的方法,每個方法只負責一個明確的任務
- 可配置性:通過配置字典可以靈活調整處理參數
- 錯誤處理:完善的異常處理和日志記錄
- 類型提示:添加了類型提示,提高代碼可讀性和可維護性
- 靈活性:支持自定義過濾條件、輸出列等
- 可擴展性:易于添加新的處理功能或修改現有邏輯
- 清晰的文檔:每個方法都有詳細的文檔字符串說明參數和返回值
使用方法:
# 簡單使用
processor = PatentProcessor()
results = processor.process_patent_file('path/to/patents.csv')# 自定義配置使用
config = {'filter_condition': {'column': 'assignee', 'value': 'Google', 'case_sensitive': False},'required_columns': ['id', 'inventor/author', 'title', 'priority date']
}
processor = PatentProcessor(config)
results = processor.process_patent_file('path/to/patents.csv')