文章概要
本文詳細介紹 Pandas 的進階主題,包括:
- 自定義函數
- 高級索引
- 數據導出
- 實際應用示例
自定義函數
函數應用
# 基本函數應用
def calculate_bonus(salary, performance):"""計算獎金Args:salary (float): 基本工資performance (float): 績效分數 (0-1)Returns:float: 獎金金額"""return salary * performance * 0.1# 應用到 DataFrame
df['bonus'] = df.apply(lambda x: calculate_bonus(x['salary'], x['performance']),axis=1
)# 使用 applymap 應用到所有元素
def format_currency(x):"""格式化貨幣Args:x (float): 金額Returns:str: 格式化后的金額"""return f"${x:,.2f}"df = df.applymap(format_currency)# 使用 transform 保持索引
def normalize_column(x):"""標準化列Args:x (pd.Series): 輸入序列Returns:pd.Series: 標準化后的序列"""return (x - x.mean()) / x.std()df['normalized_value'] = df.groupby('category')['value'].transform(normalize_column)
函數映射
# 使用 map 進行值映射
category_map = {'A': 'High','B': 'Medium','C': 'Low'
}df['category_level'] = df['category'].map(category_map)# 使用 replace 進行值替換
df['status'] = df['status'].replace({'active': 1,'inactive': 0,'pending': -1
})# 使用 apply 進行復雜映射
def map_complex_value(row):"""復雜值映射Args:row (pd.Series): 行數據Returns:str: 映射后的值"""if row['age'] < 18:return 'Minor'elif row['age'] < 65:return 'Adult'else:return 'Senior'df['age_group'] = df.apply(map_complex_value, axis=1)
函數優化
# 使用 numba 優化函數
from numba import jit@jit(nopython=True)
def calculate_statistics(values):"""計算統計量Args:values (np.array): 數值數組Returns:tuple: (均值, 標準差)"""mean = 0.0for x in values:mean += xmean /= len(values)std = 0.0for x in values:std += (x - mean) ** 2std = (std / len(values)) ** 0.5return mean, std# 使用向量化操作
def vectorized_calculation(df):"""向量化計算Args:df (pd.DataFrame): 輸入數據Returns:pd.DataFrame: 計算結果"""# 使用 numpy 的向量化操作df['result'] = np.where(df['value'] > df['threshold'],df['value'] * 1.1,df['value'] * 0.9)return df# 使用并行處理
from multiprocessing import Pooldef parallel_process(df, func, n_cores=4):"""并行處理Args:df (pd.DataFrame): 輸入數據func (function): 處理函數n_cores (int): 核心數Returns:pd.DataFrame: 處理結果"""# 分割數據chunks = np.array_split(df, n_cores)# 創建進程池pool = Pool(n_cores)# 并行處理results = pool.map(func, chunks)# 合并結果return pd.concat(results)
高級索引
多級索引
# 創建多級索引
df = pd.DataFrame({'value': np.random.randn(100),'category': np.random.choice(['A', 'B', 'C'], 100),'subcategory': np.random.choice(['X', 'Y', 'Z'], 100)
})# 設置多級索引
df = df.set_index(['category', 'subcategory'])# 使用多級索引
# 選擇特定類別
df.loc['A']# 選擇特定子類別
df.loc[('A', 'X')]# 使用 xs 進行交叉選擇
df.xs('X', level='subcategory')# 重置索引
df = df.reset_index()# 使用 stack 和 unstack
df_stacked = df.stack()
df_unstacked = df.unstack()
索引操作
# 設置索引
df = df.set_index('date')# 重置索引
df = df.reset_index()# 重命名索引
df.index.name = 'date'
df.index = df.index.rename('date')# 索引排序
df = df.sort_index()# 索引對齊
df1 = pd.DataFrame({'A': [1, 2, 3]}, index=['a', 'b', 'c'])
df2 = pd.DataFrame({'B': [4, 5, 6]}, index=['b', 'c', 'd'])
df_aligned = df1.align(df2, join='outer')# 索引轉換
df.index = pd.to_datetime(df.index)
索引優化
# 檢查索引是否唯一
is_unique = df.index.is_unique# 檢查索引是否單調
is_monotonic = df.index.is_monotonic# 檢查索引是否已排序
is_sorted = df.index.is_monotonic_increasing# 優化索引
def optimize_index(df):"""優化索引Args:df (pd.DataFrame): 輸入數據Returns:pd.DataFrame: 優化后的數據"""# 檢查索引類型if isinstance(df.index, pd.DatetimeIndex):# 確保索引已排序if not df.index.is_monotonic:df = df.sort_index()# 檢查索引是否連續if not df.index.is_monotonic_increasing:df = df.reindex(pd.date_range(df.index.min(),df.index.max(),freq='D'))return df# 使用示例
df = pd.DataFrame({'value': np.random.randn(100)
}, index=pd.date_range('2023-01-01', periods=100))# 優化索引
df = optimize_index(df)
數據導出
格式轉換
# 導出為 CSV
df.to_csv('output.csv', index=False)# 導出為 Excel
df.to_excel('output.xlsx', sheet_name='Sheet1', index=False)# 導出為 JSON
df.to_json('output.json', orient='records')# 導出為 SQL
from sqlalchemy import create_engine
engine = create_engine('sqlite:///database.db')
df.to_sql('table_name', engine, if_exists='replace')# 導出為 HTML
df.to_html('output.html')# 導出為 Markdown
df.to_markdown('output.md')
數據壓縮
# 使用 gzip 壓縮
df.to_csv('output.csv.gz', compression='gzip', index=False)# 使用 zip 壓縮
df.to_csv('output.csv.zip', compression='zip', index=False)# 使用 bz2 壓縮
df.to_csv('output.csv.bz2', compression='bz2', index=False)# 使用 xz 壓縮
df.to_csv('output.csv.xz', compression='xz', index=False)# 自定義壓縮函數
def compress_data(df, output_file, compression='gzip'):"""壓縮數據Args:df (pd.DataFrame): 輸入數據output_file (str): 輸出文件路徑compression (str): 壓縮方式"""df.to_csv(output_file, compression=compression, index=False)
批量處理
# 批量導出
def batch_export(df, output_dir, chunk_size=10000):"""批量導出數據Args:df (pd.DataFrame): 輸入數據output_dir (str): 輸出目錄chunk_size (int): 塊大小"""# 創建輸出目錄import osos.makedirs(output_dir, exist_ok=True)# 分塊導出for i, chunk in enumerate(np.array_split(df, len(df) // chunk_size + 1)):output_file = os.path.join(output_dir, f'chunk_{i}.csv')chunk.to_csv(output_file, index=False)# 批量轉換格式
def batch_convert(input_dir, output_dir, input_format='csv', output_format='excel'):"""批量轉換格式Args:input_dir (str): 輸入目錄output_dir (str): 輸出目錄input_format (str): 輸入格式output_format (str): 輸出格式"""# 創建輸出目錄import osos.makedirs(output_dir, exist_ok=True)# 獲取輸入文件列表input_files = [f for f in os.listdir(input_dir) if f.endswith(f'.{input_format}')]# 轉換每個文件for input_file in input_files:# 讀取輸入文件input_path = os.path.join(input_dir, input_file)df = pd.read_csv(input_path)# 生成輸出文件名output_file = os.path.splitext(input_file)[0] + f'.{output_format}'output_path = os.path.join(output_dir, output_file)# 導出文件if output_format == 'excel':df.to_excel(output_path, index=False)elif output_format == 'json':df.to_json(output_path, orient='records')elif output_format == 'csv':df.to_csv(output_path, index=False)
總結
進階主題部分涵蓋了:
- 自定義函數(函數應用、函數映射、函數優化)
- 高級索引(多級索引、索引操作、索引優化)
- 數據導出(格式轉換、數據壓縮、批量處理)
- 實際應用示例
掌握這些進階主題對于提升 Pandas 使用水平至關重要,它可以幫助我們:
- 提高代碼效率
- 優化數據處理
- 增強數據導出能力
- 提升代碼質量
建議在實際項目中注意:
- 合理使用自定義函數
- 優化索引操作
- 選擇合適的導出格式
- 注意數據壓縮
- 考慮批量處理
- 保持代碼可維護性
- 持續學習新特性