Python異步爬蟲編程技巧:從入門到高級實戰指南 🚀
📚 目錄
- 前言:為什么要學異步爬蟲
- 異步編程基礎概念
- 異步爬蟲核心技術棧
- 入門實戰:第一個異步爬蟲
- 進階技巧:并發控制與資源管理
- 高級實戰:分布式異步爬蟲架構
- 反爬蟲對抗策略
- 性能優化與監控
- 常見問題與解決方案
- 最佳實踐總結
前言:為什么要學異步爬蟲? 🤔
在我近幾年的爬蟲開發經驗中,見證了從最初的單線程順序爬取,到多線程并發,再到如今的異步編程范式的演進。記得在做新聞數據采集的時候,傳統的同步爬蟲需要2小時才能完成10萬個新聞的數據采集,而改用異步方案后,同樣的任務僅需15分鐘就能完成。
異步爬蟲的核心優勢在于:
- 高并發能力:單進程可處理數千個并發請求
- 資源利用率高:避免了線程切換開銷
- 響應速度快:非阻塞IO讓程序更高效
- 擴展性強:更容易構建大規模爬蟲系統
異步編程基礎概念 📖
什么是異步編程?
異步編程是一種編程范式,允許程序在等待某個操作完成時繼續執行其他任務,而不是阻塞等待。在爬蟲場景中,當我們發送HTTP請求時,不需要傻等服務器響應,而是可以繼續發送其他請求。
核心概念解析
協程(Coroutine):可以暫停和恢復執行的函數,是異步編程的基本單元。
事件循環(Event Loop):負責調度和執行協程的核心機制。
awaitable對象:可以被await關鍵字等待的對象,包括協程、Task和Future。
異步爬蟲核心技術棧 🛠?
在實際項目中,我通常會使用以下技術組合:
組件 | 推薦庫 | 用途 |
---|---|---|
HTTP客戶端 | aiohttp | 異步HTTP請求 |
HTML解析 | BeautifulSoup4 | 頁面內容提取 |
并發控制 | asyncio.Semaphore | 限制并發數量 |
數據存儲 | aiofiles | 異步文件操作 |
代理管理 | aiohttp-proxy | 代理池管理 |
入門實戰:第一個異步爬蟲 🎯
讓我們從一個簡單但實用的例子開始,爬取多個網頁的標題:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeclass AsyncSpider:def __init__(self, max_concurrent=10):"""初始化異步爬蟲:param max_concurrent: 最大并發數"""self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)async def __aenter__(self):"""異步上下文管理器入口"""# 創建aiohttp會話,設置連接池和超時參數connector = aiohttp.TCPConnector(limit=100, # 總連接池大小limit_per_host=20, # 單個host的連接數限制ttl_dns_cache=300, # DNS緩存時間)timeout = aiohttp.ClientTimeout(total=30, connect=10)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""異步上下文管理器退出"""if self.session:await self.session.close()async def fetch_page(self, url):"""獲取單個頁面內容:param url: 目標URL:return: 頁面標題和URL的元組"""async with self.semaphore: # 控制并發數try:async with self.session.get(url) as response:if response.status == 200:html = await response.text()soup = BeautifulSoup(html, 'html.parser')title = soup.find('title')title_text = title.get_text().strip() if title else "無標題"print(f"? 成功獲取: {url} - {title_text}")return url, title_textelse:print(f"? HTTP錯誤 {response.status}: {url}")return url, Noneexcept asyncio.TimeoutError:print(f"? 超時: {url}")return url, Noneexcept Exception as e:print(f"🚫 異常: {url} - {str(e)}")return url, Noneasync def crawl_urls(self, urls):"""批量爬取URL列表:param urls: URL列表:return: 結果列表"""print(f"🚀 開始爬取 {len(urls)} 個URL,最大并發數: {self.max_concurrent}")start_time = time.time()# 創建所有任務tasks = [self.fetch_page(url) for url in urls]# 并發執行所有任務results = await asyncio.gather(*tasks, return_exceptions=True)end_time = time.time()print(f"🎉 爬取完成,耗時: {end_time - start_time:.2f}秒")# 過濾掉異常結果valid_results = [r for r in results if isinstance(r, tuple) and r[1] is not None]print(f"📊 成功率: {len(valid_results)}/{len(urls)} ({len(valid_results)/len(urls)*100:.1f}%)")return valid_results# 使用示例
async def main():# 測試URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2', 'https://httpbin.org/delay/1','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json',]# 使用異步上下文管理器確保資源正確釋放async with AsyncSpider(max_concurrent=5) as spider:results = await spider.crawl_urls(urls)# 輸出結果print("\n📋 爬取結果:")for url, title in results:print(f" {url} -> {title}")if __name__ == "__main__":asyncio.run(main())
這個基礎版本展示了異步爬蟲的核心要素:
- 使用
aiohttp
進行異步HTTP請求 - 通過
Semaphore
控制并發數量 - 使用異步上下文管理器管理資源
- 異常處理和超時控制
進階技巧:并發控制與資源管理 ?
在實際項目中,合理的并發控制和資源管理至關重要。以下是一個更高級的實現:
import asyncio
import aiohttp
import aiofiles
import json
import random
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin, urlparse
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@dataclass
class CrawlResult:"""爬取結果數據類"""url: strstatus_code: inttitle: Optional[str] = Nonecontent_length: Optional[int] = Noneresponse_time: Optional[float] = Noneerror: Optional[str] = Noneclass AdvancedAsyncSpider:def __init__(self, config: Dict[str, Any]):"""高級異步爬蟲初始化:param config: 配置字典,包含各種爬蟲參數"""self.max_concurrent = config.get('max_concurrent', 10)self.retry_times = config.get('retry_times', 3)self.retry_delay = config.get('retry_delay', 1)self.request_delay = config.get('request_delay', 0)self.timeout = config.get('timeout', 30)self.session = Noneself.semaphore = asyncio.Semaphore(self.max_concurrent)self.results: List[CrawlResult] = []self.failed_urls: List[str] = []# 用戶代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',]async def __aenter__(self):"""創建會話"""connector = aiohttp.TCPConnector(limit=200,limit_per_host=50,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=60,enable_cleanup_closed=True)timeout = aiohttp.ClientTimeout(total=self.timeout)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,trust_env=True # 支持代理環境變量)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""關閉會話"""if self.session:await self.session.close()# 等待連接完全關閉await asyncio.sleep(0.1)def get_random_headers(self) -> Dict[str, str]:"""生成隨機請求頭"""return {'User-Agent': random.choice(self.user_agents),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1',}async def fetch_with_retry(self, url: str) -> CrawlResult:"""帶重試機制的頁面獲取:param url: 目標URL:return: 爬取結果對象"""async with self.semaphore:for attempt in range(self.retry_times + 1):start_time = asyncio.get_event_loop().time()try:# 添加隨機延遲,避免被反爬蟲檢測if self.request_delay > 0:await asyncio.sleep(random.uniform(0, self.request_delay))headers = self.get_random_headers()async with self.session.get(url, headers=headers) as response:response_time = asyncio.get_event_loop().time() - start_timecontent = await response.text()# 解析標題title = Noneif 'text/html' in response.headers.get('content-type', ''):from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')title_tag = soup.find('title')if title_tag:title = title_tag.get_text().strip()result = CrawlResult(url=url,status_code=response.status,title=title,content_length=len(content),response_time=response_time)logger.info(f"? 成功: {url} (狀態碼: {response.status}, 耗時: {response_time:.2f}s)")return resultexcept asyncio.TimeoutError:error_msg = f"超時 (嘗試 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"? {url} - {error_msg}")except aiohttp.ClientError as e:error_msg = f"客戶端錯誤: {str(e)} (嘗試 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"🚫 {url} - {error_msg}")except Exception as e:error_msg = f"未知錯誤: {str(e)} (嘗試 {attempt + 1}/{self.retry_times + 1})"logger.error(f"💥 {url} - {error_msg}")# 如果不是最后一次嘗試,等待后重試if attempt < self.retry_times:await asyncio.sleep(self.retry_delay * (2 ** attempt)) # 指數退避# 所有重試都失敗了result = CrawlResult(url=url,status_code=0,error=f"重試 {self.retry_times} 次后仍然失敗")self.failed_urls.append(url)logger.error(f"? 最終失敗: {url}")return resultasync def crawl_batch(self, urls: List[str], callback=None) -> List[CrawlResult]:"""批量爬取URL:param urls: URL列表:param callback: 可選的回調函數,處理每個結果:return: 爬取結果列表"""logger.info(f"🚀 開始批量爬取 {len(urls)} 個URL")start_time = asyncio.get_event_loop().time()# 創建所有任務tasks = [self.fetch_with_retry(url) for url in urls]# 使用as_completed獲取完成的任務,可以實時處理結果results = []completed = 0for coro in asyncio.as_completed(tasks):result = await cororesults.append(result)completed += 1# 調用回調函數if callback:await callback(result, completed, len(urls))# 顯示進度if completed % 10 == 0 or completed == len(urls):logger.info(f"📊 進度: {completed}/{len(urls)} ({completed/len(urls)*100:.1f}%)")total_time = asyncio.get_event_loop().time() - start_timesuccess_count = len([r for r in results if r.status_code > 0])logger.info(f"🎉 批量爬取完成")logger.info(f"📈 總耗時: {total_time:.2f}秒")logger.info(f"📊 成功率: {success_count}/{len(urls)} ({success_count/len(urls)*100:.1f}%)")self.results.extend(results)return resultsasync def save_results(self, filename: str):"""異步保存結果到JSON文件"""data = {'total_urls': len(self.results),'successful': len([r for r in self.results if r.status_code > 0]),'failed': len(self.failed_urls),'results': [asdict(result) for result in self.results],'failed_urls': self.failed_urls}async with aiofiles.open(filename, 'w', encoding='utf-8') as f:await f.write(json.dumps(data, ensure_ascii=False, indent=2))logger.info(f"💾 結果已保存到: {filename}")# 使用示例
async def progress_callback(result: CrawlResult, completed: int, total: int):"""進度回調函數"""if result.status_code > 0:print(f"? [{completed}/{total}] {result.url} - {result.title}")else:print(f"? [{completed}/{total}] {result.url} - {result.error}")async def advanced_demo():"""高級爬蟲演示"""config = {'max_concurrent': 20, # 并發數'retry_times': 2, # 重試次數'retry_delay': 1, # 重試延遲'request_delay': 0.5, # 請求間隔'timeout': 15, # 超時時間}# 測試URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json','https://httpbin.org/xml','https://httpbin.org/html','https://httpbin.org/robots.txt',] * 3 # 重復3次,模擬更多URLasync with AdvancedAsyncSpider(config) as spider:# 批量爬取results = await spider.crawl_batch(urls, callback=progress_callback)# 保存結果await spider.save_results('crawl_results.json')# 統計信息successful = [r for r in results if r.status_code > 0]avg_response_time = sum(r.response_time or 0 for r in successful) / len(successful)print(f"\n📊 統計信息:")print(f" 成功: {len(successful)}")print(f" 失敗: {len(spider.failed_urls)}")print(f" 平均響應時間: {avg_response_time:.2f}秒")if __name__ == "__main__":asyncio.run(advanced_demo())
這個進階版本包含了:
- 完整的重試機制與指數退避
- 隨機請求頭和延遲,避免反爬蟲檢測
- 實時進度回調和詳細日志
- 結構化的結果存儲
- 更好的資源管理和異常處理
高級實戰:分布式異步爬蟲架構 🏗?
對于大規模爬蟲項目,我們需要考慮分布式架構。以下是一個基于Redis的分布式爬蟲實現:
import asyncio
import aiohttp
import aioredis
import json
import hashlib
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import logginglogger = logging.getLogger(__name__)@dataclass
class Task:"""爬取任務數據結構"""url: strpriority: int = 0retry_count: int = 0max_retries: int = 3metadata: Dict[str, Any] = Nonedef __post_init__(self):if self.metadata is None:self.metadata = {}class DistributedSpider:"""分布式異步爬蟲"""def __init__(self, worker_id: str, redis_config: Dict[str, Any], spider_config: Dict[str, Any]):self.worker_id = worker_idself.redis_config = redis_configself.spider_config = spider_config# Redis連接self.redis = None# 隊列名稱self.task_queue = "spider:tasks"self.result_queue = "spider:results"self.failed_queue = "spider:failed"self.duplicate_set = "spider:duplicates"# HTTP會話self.session = Noneself.semaphore = asyncio.Semaphore(spider_config.get('max_concurrent', 10))# 統計信息self.stats = {'processed': 0,'success': 0,'failed': 0,'start_time': time.time()}async def __aenter__(self):"""初始化連接"""# 連接Redisself.redis = aioredis.from_url(f"redis://{self.redis_config['host']}:{self.redis_config['port']}",password=self.redis_config.get('password'),db=self.redis_config.get('db', 0),encoding='utf-8',decode_responses=True)# 創建HTTP會話connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)timeout = aiohttp.ClientTimeout(total=30)self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)logger.info(f"🚀 Worker {self.worker_id} 已啟動")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""清理資源"""if self.session:await self.session.close()if self.redis:await self.redis.close()logger.info(f"🛑 Worker {self.worker_id} 已停止")def generate_task_id(self, url: str) -> str:"""生成任務唯一ID"""return hashlib.md5(url.encode()).hexdigest()async def add_task(self, task: Task) -> bool:"""添加任務到隊列"""task_id = self.generate_task_id(task.url)# 檢查是否重復is_duplicate = await self.redis.sismember(self.duplicate_set, task_id)if is_duplicate:logger.debug(f"?? 重復任務: {task.url}")return False# 添加到去重集合await self.redis.sadd(self.duplicate_set, task_id)# 添加到任務隊列(使用優先級隊列)task_data = json.dumps(asdict(task))await self.redis.zadd(self.task_queue, {task_data: task.priority})logger.info(f"? 任務已添加: {task.url} (優先級: {task.priority})")return Trueasync def get_task(self) -> Optional[Task]:"""從隊列獲取任務"""# 使用BZPOPMAX獲取最高優先級任務(阻塞式)result = await self.redis.bzpopmax(self.task_queue, timeout=5)if not result:return Nonetask_data = json.loads(result[1])return Task(**task_data)async def process_task(self, task: Task) -> Dict[str, Any]:"""處理單個任務"""async with self.semaphore:start_time = time.time()try:# 發送HTTP請求headers = {'User-Agent': 'DistributedSpider/1.0','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'}async with self.session.get(task.url, headers=headers) as response:content = await response.text()response_time = time.time() - start_timeresult = {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': response.status,'content_length': len(content),'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'content': content[:1000], # 只保存前1000字符'metadata': task.metadata}# 這里可以添加內容解析邏輯if response.status == 200:result['success'] = True# 解析頁面,提取新的URL(示例)new_urls = await self.extract_urls(content, task.url)result['extracted_urls'] = new_urlselse:result['success'] = Falseresult['error'] = f"HTTP {response.status}"return resultexcept Exception as e:response_time = time.time() - start_timereturn {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': 0,'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'success': False,'error': str(e),'metadata': task.metadata}async def extract_urls(self, content: str, base_url: str) -> List[str]:"""從頁面內容中提取URL(示例實現)"""try:from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')urls = []for link in soup.find_all('a', href=True):url = urljoin(base_url, link['href'])# 簡單過濾if url.startswith('http') and len(urls) < 10:urls.append(url)return urlsexcept Exception:return []async def save_result(self, result: Dict[str, Any]):"""保存處理結果"""if result['success']:# 保存成功結果await self.redis.lpush(self.result_queue, json.dumps(result))self.stats['success'] += 1# 如果有提取的URL,添加為新任務if 'extracted_urls' in result:for url in result['extracted_urls']:new_task = Task(url=url, priority=0, metadata={'parent_url': result['url']})await self.add_task(new_task)else:# 處理失敗的任務task_id = result['task_id']# 重試邏輯original_task = Task(url=result['url'],retry_count=result.get('retry_count', 0) + 1,metadata=result.get('metadata', {}))if original_task.retry_count <= original_task.max_retries:# 重新加入隊列,降低優先級original_task.priority = -original_task.retry_countawait self.add_task(original_task)logger.info(f"🔄 重試任務: {original_task.url} (第{original_task.retry_count}次)")else:# 超過重試次數,保存到失敗隊列await self.redis.lpush(self.failed_queue, json.dumps(result))logger.error(f"💀 任務最終失敗: {original_task.url}")self.stats['failed'] += 1async def run_worker(self, max_tasks: Optional[int] = None):"""運行工作進程"""logger.info(f"🏃 Worker {self.worker_id} 開始工作")processed = 0while True:if max_tasks and processed >= max_tasks:logger.info(f"🎯 達到最大任務數量: {max_tasks}")break# 獲取任務task = await self.get_task()if not task:logger.debug("? 暫無任務,等待中...")continue# 處理任務logger.info(f"🔧 處理任務: {task.url}")result = await self.process_task(task)# 保存結果await self.save_result(result)# 更新統計processed += 1self.stats['processed'] = processed# 定期輸出統計信息if processed % 10 == 0:await self.print_stats()async def print_stats(self):"""打印統計信息"""elapsed = time.time() - self.stats['start_time']rate = self.stats['processed'] / elapsed if elapsed > 0 else 0logger.info(f"📊 Worker {self.worker_id} 統計:")logger.info(f" 已處理: {self.stats['processed']}")logger.info(f" 成功: {self.stats['success']}")logger.info(f" 失敗: {self.stats['failed']}")logger.info(f" 速率: {rate:.2f} tasks/sec")logger.info(f" 運行時間: {elapsed:.2f}秒")# 使用示例
async def distributed_demo():"""分布式爬蟲演示"""redis_config = {'host': 'localhost','port': 6379,'password': None,'db': 0}spider_config = {'max_concurrent': 5,'request_delay': 1,}worker_id = f"worker-{int(time.time())}"async with DistributedSpider(worker_id, redis_config, spider_config) as spider:# 添加一些初始任務initial_urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/json','https://httpbin.org/html',]for url in initial_urls:task = Task(url=url, priority=10) # 高優先級await spider.add_task(task)# 運行工作進程await spider.run_worker(max_tasks=20)if __name__ == "__main__":logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')asyncio.run(distributed_demo())
這個分布式版本實現了:
- 基于Redis的任務隊列和結果存儲
- 優先級任務調度
- 自動去重機制
- 失敗重試和降級處理
- 多工作進程協作
- 實時統計和監控
反爬蟲對抗策略 🛡?
在實際爬蟲開發中,反爬蟲對抗是必須面對的挑戰。以下是一些常用的策略:
import asyncio
import aiohttp
import random
import time
from typing import List, Dict, Optional
from urllib.parse import urlparse
import jsonclass AntiAntiSpider:"""反反爬蟲工具類"""def __init__(self):# 用戶代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0','Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',]# 代理池self.proxy_pool = [# 'http://proxy1:port',# 'http://proxy2:port',# 可以從代理服務商API動態獲取]# 請求頻率控制self.domain_delays = {} # 每個域名的延遲配置self.last_request_times = {} # 每個域名的最后請求時間def get_random_user_agent(self) -> str:"""獲取隨機User-Agent"""return random.choice(self.user_agents)def get_proxy(self) -> Optional[str]:"""獲取代理(如果有的話)"""if self.proxy_pool:return random.choice(self.proxy_pool)return Noneasync def respect_robots_txt(self, session: aiohttp.ClientSession, url: str) -> bool:"""檢查robots.txt(可選實現)"""try:parsed_url = urlparse(url)robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"async with session.get(robots_url) as response:if response.status == 200:robots_content = await response.text()# 這里可以實現robots.txt解析邏輯# 簡化版本:檢查是否包含Disallowreturn 'Disallow: /' not in robots_contentexcept:passreturn True # 默認允許async def rate_limit(self, url: str):"""頻率限制"""domain = urlparse(url).netloc# 獲取該域名的延遲配置(默認1-3秒)if domain not in self.domain_delays:self.domain_delays[domain] = random.uniform(1, 3)# 檢查上次請求時間if domain in self.last_request_times:elapsed = time.time() - self.last_request_times[domain]required_delay = self.domain_delays[domain]if elapsed < required_delay:sleep_time = required_delay - elapsedawait asyncio.sleep(sleep_time)# 更新最后請求時間self.last_request_times[domain] = time.time()def get_headers(self, url: str, referer: Optional[str] = None) -> Dict[str, str]:"""生成請求頭"""headers = {'User-Agent': self.get_random_user_agent(),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Sec-Fetch-Dest': 'document','Sec-Fetch-Mode': 'navigate','Sec-Fetch-Site': 'none','Cache-Control': 'max-age=0',}# 添加Referer(如果提供)if referer:headers['Referer'] = referer# 根據域名添加特定頭部domain = urlparse(url).netlocif 'github' in domain:headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'return headersclass StealthSpider:"""隱蔽爬蟲實現"""def __init__(self, max_concurrent: int = 3):self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)self.anti_anti = AntiAntiSpider()# 會話狀態管理self.cookies = {}self.session_data = {}async def __aenter__(self):# 創建更真實的連接器配置connector = aiohttp.TCPConnector(limit=50,limit_per_host=10,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=30,enable_cleanup_closed=True,# 模擬真實瀏覽器的連接行為family=0, # 支持IPv4和IPv6)# 設置合理的超時timeout = aiohttp.ClientTimeout(total=30,connect=10,sock_read=20)# 創建會話,支持自動重定向和cookieself.session = aiohttp.ClientSession(connector=connector,timeout=timeout,cookie_jar=aiohttp.CookieJar(),# 支持自動解壓auto_decompress=True,# 信任環境變量中的代理設置trust_env=True)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()await asyncio.sleep(0.1) # 確保連接完全關閉async def fetch_with_stealth(self, url: str, referer: Optional[str] = None) -> Dict:"""隱蔽模式獲取頁面"""async with self.semaphore:# 頻率限制await self.anti_anti.rate_limit(url)# 生成請求頭headers = self.anti_anti.get_headers(url, referer)# 獲取代理proxy = self.anti_anti.get_proxy()try:# 發送請求async with self.session.get(url, headers=headers, proxy=proxy,allow_redirects=True,max_redirects=5) as response:content = await response.text()return {'url': str(response.url),'status': response.status,'headers': dict(response.headers),'content': content,'cookies': {cookie.key: cookie.value for cookie in response.cookies.values()},'final_url': str(response.url),'redirects': len(response.history),'success': True}except Exception as e:return {'url': url,'status': 0,'error': str(e),'success': False}async def crawl_with_session_management(self, urls: List[str]) -> List[Dict]:"""帶會話管理的爬取"""results = []for i, url in enumerate(urls):# 使用前一個URL作為referer(模擬用戶行為)referer = urls[i-1] if i > 0 else Noneresult = await self.fetch_with_stealth(url, referer)results.append(result)# 模擬用戶閱讀時間if result['success']:reading_time = random.uniform(2, 8)print(f"📖 模擬閱讀時間: {reading_time:.1f}秒")await asyncio.sleep(reading_time)return results# 驗證碼處理示例(需要OCR服務)
class CaptchaHandler:"""驗證碼處理器"""async def solve_captcha(self, captcha_image_url: str, session: aiohttp.ClientSession) -> Optional[str]:"""解決驗證碼(示例實現)實際項目中可能需要:1. 第三方OCR服務2. 機器學習模型3. 人工打碼平臺"""try:# 下載驗證碼圖片async with session.get(captcha_image_url) as response:if response.status == 200:image_data = await response.read()# 這里應該調用OCR服務# 示例:使用第三方服務# result = await self.call_ocr_service(image_data)# return result# 臨時返回None,表示無法處理return Noneexcept Exception as e:print(f"驗證碼處理失敗: {e}")return Noneasync def handle_captcha_page(self, session: aiohttp.ClientSession, response_text: str, current_url: str):"""處理包含驗證碼的頁面"""# 檢測是否包含驗證碼if 'captcha' in response_text.lower() or '驗證碼' in response_text:print("🤖 檢測到驗證碼頁面")# 提取驗證碼圖片URL(需要根據具體網站實現)# 這里只是示例from bs4 import BeautifulSoupsoup = BeautifulSoup(response_text, 'html.parser')captcha_img = soup.find('img', {'id': 'captcha'})if captcha_img:captcha_url = captcha_img.get('src')if captcha_url:# 解決驗證碼captcha_result = await self.solve_captcha(captcha_url, session)if captcha_result:print(f"? 驗證碼識別結果: {captcha_result}")return captcha_resultprint("? 驗證碼處理失敗")return None# 使用示例
async def stealth_demo():"""隱蔽爬蟲演示"""urls = ['https://httpbin.org/user-agent','https://httpbin.org/headers','https://httpbin.org/cookies','https://httpbin.org/redirect/2',]async with StealthSpider(max_concurrent=2) as spider:results = await spider.crawl_with_session_management(urls)for result in results:if result['success']:print(f"? {result['url']} (狀態: {result['status']})")if result['redirects'] > 0:print(f" 重定向次數: {result['redirects']}")else:print(f"? {result['url']} - {result['error']}")if __name__ == "__main__":asyncio.run(stealth_demo())
性能優化與監控 📊
性能優化是異步爬蟲的關鍵環節,以下是一個完整的監控和優化方案:
import asyncio
import aiohttp
import time
import psutil
import gc
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging
from collections import deque, defaultdict
import json@dataclass
class PerformanceMetrics:"""性能指標數據類"""timestamp: floatrequests_per_second: floataverage_response_time: floatmemory_usage_mb: floatcpu_usage_percent: floatactive_connections: intqueue_size: intsuccess_rate: floatclass PerformanceMonitor:"""性能監控器"""def __init__(self, window_size: int = 60):self.window_size = window_size # 監控窗口大小(秒)self.metrics_history = deque(maxlen=window_size)self.request_times = deque()self.response_times = deque()self.success_count = 0self.total_count = 0self.start_time = time.time()# 連接池監控self.active_connections = 0self.queue_size = 0def record_request(self, response_time: float, success: bool):"""記錄請求指標"""current_time = time.time()self.request_times.append(current_time)self.response_times.append(response_time)if success:self.success_count += 1self.total_count += 1# 清理過期數據cutoff_time = current_time - self.window_sizewhile self.request_times and self.request_times[0] < cutoff_time:self.request_times.popleft()self.response_times.popleft()def get_current_metrics(self) -> PerformanceMetrics:"""獲取當前性能指標"""current_time = time.time()# 計算RPSrps = len(self.request_times) / self.window_size if self.request_times else 0# 計算平均響應時間avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0# 系統資源使用memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MBcpu_usage = psutil.Process().cpu_percent()# 成功率success_rate = self.success_count / self.total_count if self.total_count > 0 else 0metrics = PerformanceMetrics(timestamp=current_time,requests_per_second=rps,average_response_time=avg_response_time,memory_usage_mb=memory_usage,cpu_usage_percent=cpu_usage,active_connections=self.active_connections,queue_size=self.queue_size,success_rate=success_rate)self.metrics_history.append(metrics)return metricsdef print_stats(self):"""打印統計信息"""metrics = self.get_current_metrics()uptime = time.time() - self.start_timeprint(f"\n📊 性能監控報告 (運行時間: {uptime:.1f}s)")print(f" RPS: {metrics.requests_per_second:.2f}")print(f" 平均響應時間: {metrics.average_response_time:.2f}s")print(f" 內存使用: {metrics.memory_usage_mb:.1f}MB")print(f" CPU使用: {metrics.cpu_usage_percent:.1f}%")print(f" 活躍連接: {metrics.active_connections}")print(f" 隊列大小: {metrics.queue_size}")print(f" 成功率: {metrics.success_rate:.1%}")print(f" 總請求數: {self.total_count}")class OptimizedAsyncSpider:"""優化版異步爬蟲"""def __init__(self, config: Dict):self.config = configself.session = Noneself.semaphore = Noneself.monitor = PerformanceMonitor()# 連接池優化配置self.connector_config = {'limit': config.get('max_connections', 100),'limit_per_host': config.get('max_connections_per_host', 30),'ttl_dns_cache': config.get('dns_cache_ttl', 300),'use_dns_cache': True,'keepalive_timeout': config.get('keepalive_timeout', 60),'enable_cleanup_closed': True,# 優化TCP socket選項'socket_options': [(1, 6, 1), # TCP_NODELAY] if hasattr(1, '__index__') else None}# 自適應并發控制self.adaptive_concurrency = config.get('adaptive_concurrency', True)self.min_concurrent = config.get('min_concurrent', 5)self.max_concurrent = config.get('max_concurrent', 50)self.current_concurrent = self.min_concurrent# 請求池self.request_pool = asyncio.Queue(maxsize=config.get('request_queue_size', 1000))# 響應時間統計(用于自適應調整)self.response_time_window = deque(maxlen=100)async def __aenter__(self):# 創建優化的連接器connector = aiohttp.TCPConnector(**self.connector_config)# 設置超時timeout = aiohttp.ClientTimeout(total=self.config.get('total_timeout', 30),connect=self.config.get('connect_timeout', 10),sock_read=self.config.get('read_timeout', 20))# 創建會話self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,# 啟用壓縮auto_decompress=True,# 設置最大響應大小read_bufsize=self.config.get('read_bufsize', 64 * 1024),)# 初始化信號量self.semaphore = asyncio.Semaphore(self.current_concurrent)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()# 強制垃圾回收gc.collect()async def adjust_concurrency(self):"""自適應并發調整"""if not self.adaptive_concurrency or len(self.response_time_window) < 10:return# 計算最近的平均響應時間recent_avg = sum(list(self.response_time_window)[-10:]) / 10overall_avg = sum(self.response_time_window) / len(self.response_time_window)# 如果最近響應時間明顯增加,降低并發if recent_avg > overall_avg * 1.5 and self.current_concurrent > self.min_concurrent:self.current_concurrent = max(self.min_concurrent, self.current_concurrent - 2)print(f"📉 降低并發數至: {self.current_concurrent}")# 如果響應時間穩定且較快,增加并發elif recent_avg < overall_avg * 0.8 and self.current_concurrent < self.max_concurrent:self.current_concurrent = min(self.max_concurrent, self.current_concurrent + 1)print(f"📈 提高并發數至: {self.current_concurrent}")# 更新信號量self.semaphore = asyncio.Semaphore(self.current_concurrent)async def fetch_optimized(self, url: str) -> Dict:"""優化的請求方法"""async with self.semaphore:start_time = time.time()try:# 更新連接數self.monitor.active_connections += 1async with self.session.get(url) as response:content = await response.text()response_time = time.time() - start_time# 記錄響應時間self.response_time_window.append(response_time)self.monitor.record_request(response_time, response.status == 200)return {'url': url,'status': response.status,'content': content,'response_time': response_time,'content_length': len(content),'success': response.status == 200}except Exception as e:response_time = time.time() - start_timeself.monitor.record_request(response_time, False)return {'url': url,'status': 0,'error': str(e),'response_time': response_time,'success': False}finally:self.monitor.active_connections -= 1async def batch_crawl_optimized(self, urls: List[str]) -> List[Dict]:"""優化的批量爬取"""print(f"🚀 開始優化爬取 {len(urls)} 個URL")# 分批處理,避免內存過載batch_size = self.config.get('batch_size', 100)all_results = []for i in range(0, len(urls), batch_size):batch_urls = urls[i:i + batch_size]print(f"📦 處理批次 {i//batch_size + 1}/{(len(urls)-1)//batch_size + 1}")# 創建任務tasks = [self.fetch_optimized(url) for url in batch_urls]# 執行任務batch_results = await asyncio.gather(*tasks, return_exceptions=True)# 過濾異常valid_results = [r for r in batch_results if isinstance(r, dict)]all_results.extend(valid_results)# 自適應調整并發await self.adjust_concurrency()# 打印監控信息if i % (batch_size * 2) == 0: # 每2個批次打印一次self.monitor.print_stats()# 批次間短暫休息,避免過載if i + batch_size < len(urls):await asyncio.sleep(0.1)return all_resultsasync def memory_cleanup(self):"""內存清理"""# 手動觸發垃圾回收gc.collect()# 清理過期的監控數據current_time = time.time()cutoff_time = current_time - 300 # 保留5分鐘的數據while (self.monitor.metrics_history and self.monitor.metrics_history[0].timestamp < cutoff_time):self.monitor.metrics_history.popleft()# 使用示例
async def performance_demo():"""性能優化演示"""config = {'max_connections': 50,'max_connections_per_host': 15,'max_concurrent': 20,'min_concurrent': 5,'adaptive_concurrency': True,'batch_size': 50,'total_timeout': 15,'connect_timeout': 5,}# 生成大量測試URLtest_urls = []for i in range(200):delay = i % 5 + 1 # 1-5秒延遲test_urls.append(f'https://httpbin.org/delay/{delay}')async with OptimizedAsyncSpider(config) as spider:# 啟動監控任務monitor_task = asyncio.create_task(periodic_monitoring(spider.monitor))try:# 執行爬取results = await spider.batch_crawl_optimized(test_urls)# 最終統計print(f"\n🎉 爬取完成!")print(f"📊 總URL數: {len(test_urls)}")print(f"📊 成功數: {len([r for r in results if r.get('success')])}")print(f"📊 失敗數: {len([r for r in results if not r.get('success')])}")spider.monitor.print_stats()finally:monitor_task.cancel()async def periodic_monitoring(monitor: PerformanceMonitor):"""定期監控任務"""while True:await asyncio.sleep(10) # 每10秒監控一次try:metrics = monitor.get_current_metrics()# 檢查異常情況if metrics.memory_usage_mb > 500: # 內存超過500MBprint("?? 內存使用過高!")if metrics.requests_per_second < 1 and monitor.total_count > 0:print("?? RPS過低,可能存在瓶頸!")if metrics.success_rate < 0.8 and monitor.total_count > 10:print("?? 成功率過低!")except Exception as e:print(f"監控異常: {e}")if __name__ == "__main__":asyncio.run(performance_demo())
常見問題與解決方案 ?
1. 內存泄露問題
問題:長時間運行后內存持續增長
解決方案:
# 正確的資源管理
async with aiohttp.ClientSession() as session:# 使用完自動關閉pass# 定期垃圾回收
import gc
gc.collect()# 限制并發數量
semaphore = asyncio.Semaphore(10)
2. 連接池耗盡
問題:大量并發請求導致連接池耗盡
解決方案:
connector = aiohttp.TCPConnector(limit=100, # 增加連接池大小limit_per_host=20, # 單host連接數限制ttl_dns_cache=300, # DNS緩存enable_cleanup_closed=True # 自動清理關閉的連接
)
3. 超時處理不當
問題:請求超時導致任務堆積
解決方案:
timeout = aiohttp.ClientTimeout(total=30, # 總超時connect=10, # 連接超時sock_read=20 # 讀取超時
)
4. 異常傳播
問題:單個任務異常影響整體爬取
解決方案:
# 使用return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)# 過濾異常結果
valid_results = [r for r in results if not isinstance(r, Exception)]
最佳實踐總結 🏆
經過近幾年的異步爬蟲開發經驗,我總結出以下最佳實踐:
1. 架構設計原則
- 單一職責:每個組件只負責特定功能
- 可擴展性:支持水平擴展和配置調整
- 容錯性:優雅處理各種異常情況
- 監控性:完整的性能監控和日志記錄
2. 性能優化要點
- 合理的并發數:根據目標網站和網絡環境調整
- 連接復用:使用連接池減少握手開銷
- 內存管理:及時清理資源,避免內存泄露
- 批處理:分批處理大量URL,避免過載
3. 反爬蟲對抗策略
- 請求頭偽裝:模擬真實瀏覽器行為
- 頻率控制:合理的請求間隔
- IP輪換:使用代理池分散請求
- 會話管理:維護登錄狀態和cookie
4. 錯誤處理機制
- 重試策略:指數退避重試
- 降級處理:失敗任務的備用方案
- 監控告警:及時發現和處理問題
- 日志記錄:詳細的操作日志
5. 數據質量保證
- 去重機制:避免重復抓取
- 數據驗證:確保抓取數據的完整性
- 增量更新:只抓取變化的數據
- 備份恢復:重要數據的備份策略
通過遵循這些最佳實踐,你可以構建出高效、穩定、可維護的異步爬蟲系統。記住,爬蟲開發不僅僅是技術問題,還需要考慮法律合規、網站負載、數據質量等多個方面。
異步爬蟲是一門實踐性很強的技術,建議在實際項目中不斷優化和完善。隨著Python異步生態的不斷發展,相信會有更多優秀的工具和庫出現,讓我們的爬蟲開發更加高效和便捷。
希望這份指南能夠幫助你在異步爬蟲的道路上走得更遠!🚀