一、同步爬蟲的瓶頸
傳統的同步爬蟲(如requests
+BeautifulSoup
)在請求網頁時,必須等待服務器返回響應后才能繼續下一個請求。這種阻塞式I/O操作在面對大量數據時存在以下問題:
- 速度慢:每個請求必須串行執行,無法充分利用網絡帶寬。
- 易被封禁:高頻請求可能觸發IP限制或驗證碼。
- 資源浪費:CPU在等待I/O時處于空閑狀態。
解決方案:異步爬蟲(Asynchronous Crawling)
Python的asyncio
+aiohttp
庫可以實現非阻塞I/O,允許同時發起多個請求,大幅提升爬取效率。
二、異步爬蟲技術選型
技術方案 | 適用場景 | 優勢 |
---|---|---|
aiohttp | HTTP請求 | 異步HTTP客戶端,支持高并發 |
asyncio | 事件循環 | Python原生異步I/O框架 |
aiofiles | 異步文件存儲 | 避免文件寫入阻塞主線程 |
uvloop | 加速事件循環 | 替換asyncio 默認循環,性能提升2-4倍 |
三、實戰:異步爬取新浪財經股票數據
目標
- 爬取新浪財經A股股票實時行情(代碼、名稱、價格、漲跌幅等)。
- 使用
aiohttp
實現高并發請求。 - 存儲至
CSV
文件,避免數據丟失。
步驟1:分析數據接口
新浪財經的股票數據通常通過API返回,我們可以通過瀏覽器開發者工具(F12)抓包分析:
- 示例接口:
https://finance.sina.com.cn/realstock/company/sh600000/nc.shtml
- 數據格式:部分數據直接渲染在HTML中,部分通過Ajax加載(如分時數據)。
步驟2:安裝依賴庫
步驟3:編寫異步爬蟲代碼
import asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
import csv
import time# 替換為新浪財經股票列表API(示例)
STOCK_LIST_API = "https://finance.sina.com.cn/stock/sl/stock_list.html"
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}async def fetch(session, url):"""異步獲取網頁內容"""async with session.get(url, headers=HEADERS) as response:return await response.text()async def parse_stock_data(html):"""解析股票數據(示例:僅提取名稱和價格)"""soup = BeautifulSoup(html, "html.parser")stock_name = soup.select_one(".stock-name").text.strip() if soup.select_one(".stock-name") else "N/A"stock_price = soup.select_one(".price").text.strip() if soup.select_one(".price") else "N/A"return {"name": stock_name, "price": stock_price}async def save_to_csv(data, filename="stocks.csv"):"""異步寫入CSV"""async with aiofiles.open(filename, mode="a", encoding="utf-8", newline="") as f:writer = csv.writer(f)await writer.writerow([data["name"], data["price"]])async def crawl_stock(stock_code, session):"""爬取單只股票數據"""url = f"https://finance.sina.com.cn/realstock/company/{stock_code}/nc.shtml"try:html = await fetch(session, url)data = await parse_stock_data(html)await save_to_csv(data)print(f"爬取成功:{stock_code} - {data['name']}")except Exception as e:print(f"爬取失敗:{stock_code} - {str(e)}")async def main():"""主協程:并發爬取多個股票"""stock_codes = ["sh600000", "sh601318", "sz000001"] # 示例股票代碼(可擴展)# 使用uvloop加速(僅限Unix系統)try:import uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())except ImportError:pass# 創建aiohttp會話async with aiohttp.ClientSession() as session:tasks = [crawl_stock(code, session) for code in stock_codes]await asyncio.gather(*tasks)if __name__ == "__main__":start_time = time.time()asyncio.run(main())print(f"爬取完成,耗時:{time.time() - start_time:.2f}秒")
四、性能優化策略
1. 控制并發量
新浪財經可能限制高頻請求
2. 使用代理IP
避免IP被封:
3. 隨機User-Agent
減少被識別為爬蟲的概率:
4. 數據存儲優化
- 異步數據庫寫入:如
aiomysql
、asyncpg
。 - 批量寫入:減少I/O次數。
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import pandas as pd
from fake_useragent import UserAgent
import aiomysql# 使用 Semaphore 限制并發數
semaphore = asyncio.Semaphore(10) # 最大并發 10# 代理信息
proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"# 構造代理 URL
PROXY = f"http://{proxyUser}:{proxyPass}@{proxyHost}:{proxyPort}"# 隨機 User-Agent
ua = UserAgent()# 數據庫配置
DB_CONFIG = {'host': 'localhost','port': 3306,' user': 'your_username','password': 'your_password','db': 'your_database','charset': 'utf8mb4'
}# 數據存儲優化:異步數據庫寫入
async def save_to_db(data):conn = await aiomysql.connect(**DB_CONFIG)async with conn.cursor() as cur:await cur.executemany("INSERT INTO finance_data (column1, column2, column3) VALUES (%s, %s, %s)", data)await conn.commit()conn.close()# 爬取單個股票數據
async def crawl_stock(stock_code, session):async with semaphore:url = f"https://finance.sina.com.cn/stock/{stock_code}.html"HEADERS = {"User-Agent": ua.random}async with session.get(url, headers=HEADERS, proxy=PROXY) as response:html = await response.text()data = parse(html)return data# 解析網頁內容
def parse(html):soup = BeautifulSoup(html, 'html.parser')# 假設數據在特定的表格中table = soup.find('table', {'class': 'example'})data = []for row in table.find_all('tr'):cols = row.find_all('td')cols = [ele.text.strip() for ele in cols]data.append([ele for ele in cols if ele])return data# 主函數
async def main(stock_codes):async with aiohttp.ClientSession() as session:tasks = [crawl_stock(stock_code, session) for stock_code in stock_codes]all_data = await asyncio.gather(*tasks)# 扁平化數據flat_data = [item for sublist in all_data for item in sublist]# 異步批量寫入數據庫await save_to_db(flat_data)# 示例股票代碼列表
stock_codes = ['000001','000002',# 更多股票代碼
]# 運行爬蟲
asyncio.run(main(stock_codes))
五、對比同步與異步爬蟲性能
指標 | 同步爬蟲(requests) | 異步爬蟲(aiohttp) |
---|---|---|
100次請求耗時 | ~20秒 | ~3秒 |
CPU占用 | 低(大量時間在等待) | 高(并發處理) |
反爬風險 | 高(易觸發封禁) | 較低(可控并發) |