核心方法
- 逐行讀取 - 最常用,內存占用O(1)
- 分塊讀取 - 適合超大文件,可控制內存使用
- 內存映射 - 高性能,虛擬內存映射
- 緩沖讀取 - 平衡性能和內存
特殊場景處理
- CSV文件 - 使用pandas的chunksize參數
- JSON Lines - 逐行解析JSON對象
- 文本分析 - 內存高效的單詞計數示例
關鍵優化技巧
- 使用生成器 - 避免一次性加載所有數據到內存
- 合理設置塊大小 - 平衡內存使用和IO效率
- 進度監控 - 實時顯示處理進度
- 錯誤處理 - 處理編碼錯誤、文件不存在等異常
使用建議
- 小于100MB: 直接讀取到內存
- 100MB-1GB: 使用逐行讀取或小塊讀取
- 大于1GB: 使用內存映射或大塊分批處理
- 結構化數據: 使用pandas的chunksize參數
這些方法可以處理幾GB甚至幾十GB的文件而不會導致內存溢出。根據您的具體需求選擇最適合的方法即可。
示例代碼:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
大文件讀取示例 - 避免內存溢出的多種方法
"""import os
import sys
import mmap
import csv
import json
from typing import Generator, Iterator
import pandas as pdclass LargeFileReader:"""大文件讀取工具類"""def __init__(self, file_path: str, encoding: str = 'utf-8'):self.file_path = file_pathself.encoding = encodingdef get_file_size(self) -> int:"""獲取文件大小(字節)"""return os.path.getsize(self.file_path)def get_file_size_mb(self) -> float:"""獲取文件大小(MB)"""return self.get_file_size() / (1024 * 1024)def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法1: 逐行讀取 - 最常用的方法內存使用量: O(1) - 每次只加載一行適用場景: 文本文件、日志文件"""try:with open(file_path, 'r', encoding=encoding) as file:for line_num, line in enumerate(file, 1):# 處理每一行yield line.strip() # 去除行尾換行符# 可選:顯示進度if line_num % 10000 == 0:print(f"已處理 {line_num} 行")except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"編碼錯誤: {e}")def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法2: 按塊讀取 - 適合處理二進制文件或超大文本文件內存使用量: O(chunk_size) - 每次加載指定大小的塊適用場景: 二進制文件、超大文本文件"""try:with open(file_path, 'r', encoding=encoding) as file:while True:chunk = file.read(chunk_size)if not chunk:breakyield chunkexcept FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"編碼錯誤: {e}")def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:"""方法3: 內存映射文件 - 高性能讀取內存使用量: 虛擬內存映射,物理內存按需加載適用場景: 需要隨機訪問的大文件、高性能要求"""try:with open(file_path, 'r', encoding=encoding) as file:with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:# 逐行讀取for line in iter(mmapped_file.readline, b""):yield line.decode(encoding).strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except Exception as e:print(f"內存映射錯誤: {e}")def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法4: 緩沖區讀取 - 平衡性能和內存使用內存使用量: O(buffer_size)適用場景: 需要自定義緩沖區大小的場景"""try:with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:for line in file:yield line.strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"編碼錯誤: {e}")def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:"""處理大型CSV文件的示例使用pandas的chunksize參數分塊讀取"""print(f"開始處理CSV文件: {file_path}")try:# 分塊讀取CSV文件chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)total_rows = 0for chunk_num, chunk in enumerate(chunk_iter, 1):# 處理當前塊print(f"處理第 {chunk_num} 塊,包含 {len(chunk)} 行")# 示例處理:統計每列的基本信息print(f"列名: {list(chunk.columns)}")print(f"數據類型: {chunk.dtypes.to_dict()}")# 這里可以添加你的數據處理邏輯# 例如:數據清洗、計算、轉換等total_rows += len(chunk)# 可選:限制處理的塊數量(用于測試)if chunk_num >= 5: # 只處理前5塊breakprint(f"總共處理了 {total_rows} 行數據")except FileNotFoundError:print(f"CSV文件未找到: {file_path}")except pd.errors.EmptyDataError:print("CSV文件為空")except Exception as e:print(f"處理CSV文件時出錯: {e}")def process_large_json_lines(file_path: str) -> None:"""處理大型JSON Lines文件 (.jsonl)每行是一個獨立的JSON對象"""print(f"開始處理JSON Lines文件: {file_path}")try:with open(file_path, 'r', encoding='utf-8') as file:for line_num, line in enumerate(file, 1):line = line.strip()if not line:continuetry:# 解析JSON對象json_obj = json.loads(line)# 處理JSON對象# 這里添加你的處理邏輯print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")# 示例:提取特定字段if isinstance(json_obj, dict):keys = list(json_obj.keys())[:5] # 只顯示前5個鍵print(f" 鍵: {keys}")except json.JSONDecodeError as e:print(f"第 {line_num} 行JSON解析錯誤: {e}")continue# 顯示進度if line_num % 1000 == 0:print(f"已處理 {line_num} 行")# 可選:限制處理行數(用于測試)if line_num >= 10000:breakexcept FileNotFoundError:print(f"JSON Lines文件未找到: {file_path}")def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:"""帶進度回調的文件處理示例"""reader = LargeFileReader(file_path)file_size_mb = reader.get_file_size_mb()print(f"文件大小: {file_size_mb:.2f} MB")print("開始處理文件...")processed_lines = 0for line in method1_line_by_line(file_path):# 處理每一行# 這里添加你的處理邏輯line_length = len(line)processed_lines += 1# 進度回調if processed_lines % callback_interval == 0:print(f"已處理 {processed_lines:,} 行")# 可選:限制處理行數(用于測試)if processed_lines >= 50000:print("達到處理限制,停止處理")breakprint(f"處理完成,總共處理了 {processed_lines:,} 行")def memory_efficient_word_count(file_path: str) -> dict:"""內存高效的單詞計數示例適用于超大文本文件"""word_count = {}print("開始統計單詞頻率...")for line_num, line in enumerate(method1_line_by_line(file_path), 1):# 簡單的單詞分割(可以根據需要改進)words = line.lower().split()for word in words:# 清理單詞(去除標點符號等)clean_word = ''.join(c for c in word if c.isalnum())if clean_word:word_count[clean_word] = word_count.get(clean_word, 0) + 1# 顯示進度if line_num % 10000 == 0:print(f"已處理 {line_num} 行,當前詞匯量: {len(word_count)}")print(f"統計完成,總詞匯量: {len(word_count)}")# 返回前10個最常用的單詞sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)return dict(sorted_words[:10])def main():"""主函數 - 演示各種大文件讀取方法"""# 注意:請替換為你的實際文件路徑test_file = "large_file.txt" # 替換為實際的大文件路徑csv_file = "large_data.csv" # 替換為實際的CSV文件路徑json_file = "large_data.jsonl" # 替換為實際的JSON Lines文件路徑print("=== 大文件讀取示例 ===\n")# 檢查文件是否存在if not os.path.exists(test_file):print(f"測試文件 {test_file} 不存在")print("請創建一個測試文件或修改文件路徑")return# 創建文件讀取器reader = LargeFileReader(test_file)print(f"文件路徑: {test_file}")print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")# 示例1: 逐行讀取(推薦用于大多數文本文件)print("=== 方法1: 逐行讀取 ===")line_count = 0for line in method1_line_by_line(test_file):line_count += 1if line_count <= 5: # 只顯示前5行print(f"第{line_count}行: {line[:50]}...")if line_count >= 10000: # 限制處理行數breakprint(f"處理了 {line_count} 行\n")# 示例2: 塊讀取print("=== 方法2: 塊讀取 ===")chunk_count = 0for chunk in method2_chunk_reading(test_file, chunk_size=1024):chunk_count += 1if chunk_count <= 3: # 只顯示前3塊print(f"塊{chunk_count}: {len(chunk)} 字符")if chunk_count >= 10: # 限制處理塊數breakprint(f"處理了 {chunk_count} 個塊\n")# 示例3: 內存映射print("=== 方法3: 內存映射 ===")mmap_count = 0try:for line in method3_mmap_reading(test_file):mmap_count += 1if mmap_count >= 10000: # 限制處理行數breakprint(f"使用內存映射處理了 {mmap_count} 行\n")except Exception as e:print(f"內存映射失敗: {e}\n")# 示例4: 帶進度的處理print("=== 方法4: 帶進度回調的處理 ===")process_with_progress_callback(test_file, callback_interval=5000)print()# 示例5: CSV文件處理if os.path.exists(csv_file):print("=== CSV文件處理 ===")process_large_csv(csv_file, chunk_size=1000)print()# 示例6: JSON Lines文件處理if os.path.exists(json_file):print("=== JSON Lines文件處理 ===")process_large_json_lines(json_file)print()# 示例7: 單詞計數print("=== 內存高效單詞計數 ===")try:top_words = memory_efficient_word_count(test_file)print("前10個最常用單詞:")for word, count in top_words.items():print(f" {word}: {count}")except Exception as e:print(f"單詞統計失敗: {e}")if __name__ == "__main__":main()# 性能優化建議:
"""
1. 選擇合適的方法:- 逐行讀取: 適用于大多數文本文件- 塊讀取: 適用于二進制文件或需要自定義處理塊的場景- 內存映射: 適用于需要隨機訪問或高性能要求的場景- pandas分塊: 適用于結構化數據(CSV)2. 內存優化:- 及時釋放不需要的變量- 使用生成器而不是列表- 避免一次性加載整個文件到內存3. 性能優化:- 合理設置緩沖區大小- 使用適當的編碼- 考慮使用多進程/多線程處理4. 錯誤處理:- 處理文件不存在的情況- 處理編碼錯誤- 處理磁盤空間不足等IO錯誤
"""
Excel大文件讀取方法
1. pandas分塊讀取 (.xlsx, .xls)
- 適合中等大小的Excel文件
- 可以處理多個工作表
- 支持數據類型自動識別
2. openpyxl逐行讀取 (.xlsx) - 推薦
- 內存效率最高的方法
- 使用
read_only=True
模式 - 真正的逐行處理,內存占用O(1)
- 適合處理超大Excel文件
3. xlrd處理 (.xls)
- 專門處理舊版Excel格式
- 分塊讀取支持
- 適合Legacy Excel文件
4. pyxlsb處理 (.xlsb)
- 處理Excel二進制格式
- 讀取速度快,文件小
- 需要額外安裝pyxlsb庫
新增功能特點
? 智能文件信息獲取 - 不加載全部數據就能獲取文件結構信息
? 內存使用對比 - 實時監控不同方法的內存消耗
? 批量數據處理 - 支持批次處理和進度監控
? 多格式支持 - 支持.xlsx、.xls、.xlsb三種格式
? 錯誤處理 - 完善的異常處理機制
安裝依賴
bash
pip install pandas openpyxl xlrd pyxlsb psutil
使用建議
- 小于50MB: 可以使用pandas直接讀取
- 50MB-500MB: 使用pandas分塊讀取
- 大于500MB: 推薦使用openpyxl逐行讀取
- 超大文件: 考慮轉換為CSV或Parquet格式
這套方案可以處理幾GB甚至更大的Excel文件而不會內存溢出,特別是openpyxl的逐行讀取方法,是處理超大Excel文件的最佳選擇!
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
大文件讀取示例 - 避免內存溢出的多種方法
"""import os
import sys
import mmap
import csv
import json
from typing import Generator, Iterator
import pandas as pdclass LargeFileReader:"""大文件讀取工具類"""def __init__(self, file_path: str, encoding: str = 'utf-8'):self.file_path = file_pathself.encoding = encodingdef get_file_size(self) -> int:"""獲取文件大小(字節)"""return os.path.getsize(self.file_path)def get_file_size_mb(self) -> float:"""獲取文件大小(MB)"""return self.get_file_size() / (1024 * 1024)def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法1: 逐行讀取 - 最常用的方法內存使用量: O(1) - 每次只加載一行適用場景: 文本文件、日志文件"""try:with open(file_path, 'r', encoding=encoding) as file:for line_num, line in enumerate(file, 1):# 處理每一行yield line.strip() # 去除行尾換行符# 可選:顯示進度if line_num % 10000 == 0:print(f"已處理 {line_num} 行")except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"編碼錯誤: {e}")def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法2: 按塊讀取 - 適合處理二進制文件或超大文本文件內存使用量: O(chunk_size) - 每次加載指定大小的塊適用場景: 二進制文件、超大文本文件"""try:with open(file_path, 'r', encoding=encoding) as file:while True:chunk = file.read(chunk_size)if not chunk:breakyield chunkexcept FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"編碼錯誤: {e}")def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:"""方法3: 內存映射文件 - 高性能讀取內存使用量: 虛擬內存映射,物理內存按需加載適用場景: 需要隨機訪問的大文件、高性能要求"""try:with open(file_path, 'r', encoding=encoding) as file:with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:# 逐行讀取for line in iter(mmapped_file.readline, b""):yield line.decode(encoding).strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except Exception as e:print(f"內存映射錯誤: {e}")def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法4: 緩沖區讀取 - 平衡性能和內存使用內存使用量: O(buffer_size)適用場景: 需要自定義緩沖區大小的場景"""try:with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:for line in file:yield line.strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"編碼錯誤: {e}")def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:"""處理大型CSV文件的示例使用pandas的chunksize參數分塊讀取"""print(f"開始處理CSV文件: {file_path}")try:# 分塊讀取CSV文件chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)total_rows = 0for chunk_num, chunk in enumerate(chunk_iter, 1):# 處理當前塊print(f"處理第 {chunk_num} 塊,包含 {len(chunk)} 行")# 示例處理:統計每列的基本信息print(f"列名: {list(chunk.columns)}")print(f"數據類型: {chunk.dtypes.to_dict()}")# 這里可以添加你的數據處理邏輯# 例如:數據清洗、計算、轉換等total_rows += len(chunk)# 可選:限制處理的塊數量(用于測試)if chunk_num >= 5: # 只處理前5塊breakprint(f"總共處理了 {total_rows} 行數據")except FileNotFoundError:print(f"CSV文件未找到: {file_path}")except pd.errors.EmptyDataError:print("CSV文件為空")except Exception as e:print(f"處理CSV文件時出錯: {e}")def process_large_json_lines(file_path: str) -> None:"""處理大型JSON Lines文件 (.jsonl)每行是一個獨立的JSON對象"""print(f"開始處理JSON Lines文件: {file_path}")try:with open(file_path, 'r', encoding='utf-8') as file:for line_num, line in enumerate(file, 1):line = line.strip()if not line:continuetry:# 解析JSON對象json_obj = json.loads(line)# 處理JSON對象# 這里添加你的處理邏輯print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")# 示例:提取特定字段if isinstance(json_obj, dict):keys = list(json_obj.keys())[:5] # 只顯示前5個鍵print(f" 鍵: {keys}")except json.JSONDecodeError as e:print(f"第 {line_num} 行JSON解析錯誤: {e}")continue# 顯示進度if line_num % 1000 == 0:print(f"已處理 {line_num} 行")# 可選:限制處理行數(用于測試)if line_num >= 10000:breakexcept FileNotFoundError:print(f"JSON Lines文件未找到: {file_path}")def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:"""帶進度回調的文件處理示例"""reader = LargeFileReader(file_path)file_size_mb = reader.get_file_size_mb()print(f"文件大小: {file_size_mb:.2f} MB")print("開始處理文件...")processed_lines = 0for line in method1_line_by_line(file_path):# 處理每一行# 這里添加你的處理邏輯line_length = len(line)processed_lines += 1# 進度回調if processed_lines % callback_interval == 0:print(f"已處理 {processed_lines:,} 行")# 可選:限制處理行數(用于測試)if processed_lines >= 50000:print("達到處理限制,停止處理")breakprint(f"處理完成,總共處理了 {processed_lines:,} 行")def memory_efficient_word_count(file_path: str) -> dict:"""內存高效的單詞計數示例適用于超大文本文件"""word_count = {}print("開始統計單詞頻率...")for line_num, line in enumerate(method1_line_by_line(file_path), 1):# 簡單的單詞分割(可以根據需要改進)words = line.lower().split()for word in words:# 清理單詞(去除標點符號等)clean_word = ''.join(c for c in word if c.isalnum())if clean_word:word_count[clean_word] = word_count.get(clean_word, 0) + 1# 顯示進度if line_num % 10000 == 0:print(f"已處理 {line_num} 行,當前詞匯量: {len(word_count)}")print(f"統計完成,總詞匯量: {len(word_count)}")# 返回前10個最常用的單詞sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)return dict(sorted_words[:10])def main():"""主函數 - 演示各種大文件讀取方法"""# 注意:請替換為你的實際文件路徑test_file = "large_file.txt" # 替換為實際的大文件路徑csv_file = "large_data.csv" # 替換為實際的CSV文件路徑json_file = "large_data.jsonl" # 替換為實際的JSON Lines文件路徑print("=== 大文件讀取示例 ===\n")# 檢查文件是否存在if not os.path.exists(test_file):print(f"測試文件 {test_file} 不存在")print("請創建一個測試文件或修改文件路徑")return# 創建文件讀取器reader = LargeFileReader(test_file)print(f"文件路徑: {test_file}")print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")# 示例1: 逐行讀取(推薦用于大多數文本文件)print("=== 方法1: 逐行讀取 ===")line_count = 0for line in method1_line_by_line(test_file):line_count += 1if line_count <= 5: # 只顯示前5行print(f"第{line_count}行: {line[:50]}...")if line_count >= 10000: # 限制處理行數breakprint(f"處理了 {line_count} 行\n")# 示例2: 塊讀取print("=== 方法2: 塊讀取 ===")chunk_count = 0for chunk in method2_chunk_reading(test_file, chunk_size=1024):chunk_count += 1if chunk_count <= 3: # 只顯示前3塊print(f"塊{chunk_count}: {len(chunk)} 字符")if chunk_count >= 10: # 限制處理塊數breakprint(f"處理了 {chunk_count} 個塊\n")# 示例3: 內存映射print("=== 方法3: 內存映射 ===")mmap_count = 0try:for line in method3_mmap_reading(test_file):mmap_count += 1if mmap_count >= 10000: # 限制處理行數breakprint(f"使用內存映射處理了 {mmap_count} 行\n")except Exception as e:print(f"內存映射失敗: {e}\n")# 示例4: 帶進度的處理print("=== 方法4: 帶進度回調的處理 ===")process_with_progress_callback(test_file, callback_interval=5000)print()# 示例5: CSV文件處理if os.path.exists(csv_file):print("=== CSV文件處理 ===")process_large_csv(csv_file, chunk_size=1000)print()# 示例6: JSON Lines文件處理if os.path.exists(json_file):print("=== JSON Lines文件處理 ===")process_large_json_lines(json_file)print()# 示例7: 單詞計數print("=== 內存高效單詞計數 ===")try:top_words = memory_efficient_word_count(test_file)print("前10個最常用單詞:")for word, count in top_words.items():print(f" {word}: {count}")except Exception as e:print(f"單詞統計失敗: {e}")if __name__ == "__main__":main()# 性能優化建議:
"""
Excel文件大文件讀取最佳實踐:1. 選擇合適的庫和方法:- openpyxl (read_only=True): 最佳內存效率,支持.xlsx- pandas + chunksize: 易用但內存使用較高- xlrd: 適用于.xls文件- pyxlsb: 適用于.xlsb文件2. Excel特有優化:- 使用read_only=True模式- 設置data_only=True跳過公式計算- 逐行讀取而不是一次性加載整個工作表- 按批次處理數據3. 內存優化策略:- 使用生成器和迭代器- 及時釋放DataFrame和工作簿對象- 避免同時打開多個工作簿- 考慮將數據轉換為更高效的格式(如Parquet)4. 性能建議:- 對于超大Excel文件,考慮先轉換為CSV格式- 使用多進程處理多個工作表- 設置合適的批次大小(1000-10000行)- 在SSD上處理文件以提高IO速度5. 文件格式選擇:- .xlsx: 現代格式,壓縮率高,但讀取較慢- .xls: 舊格式,讀取快但文件大- .xlsb: 二進制格式,讀取快,文件小- .csv: 最快的讀取速度,但丟失Excel特性6. 錯誤處理:- 處理損壞的Excel文件- 處理不同的數據類型和格式- 處理合并單元格- 處理隱藏的行和列安裝所需的依賴:
pip install pandas openpyxl xlrd pyxlsb psutil
"""