項目背景
基于一個 scrapy-redis
搭建的分布式系統,所有item
都通過重寫 pipeline
存儲到 redis
的 list
中。這里我通過代碼演示如何基于線程池 + 協程實現對 item
的中文件下載。
-
Item 結構
目的是為了下載 item 中
attachments
保存的附件內容。{"crawl_time":"20221017 12:00:00","version":"20221017 12:00:00","data": [{"title": "","attachments": [{"ori_url": "https://www.baidu.com", # 文件地址"path": "", # 文件本地保存路徑"filename": "xxx" # 文件名稱}]}] }
一、批量獲取 item
為了能夠提高數據的存儲效率,選擇從 redis
中彈出多個 item
,但當前部署的 redis
版本為 5.0
,lpop
不支持同時彈出多個數據,需要通過 LRANGE
和 LTRIM
命令實現,但是兩個命令執行不是原子操作,在多線程的情況下會導致數據異常,因此通過 lua
腳本執行批量彈出多個 item
。
1.1 lua 腳本
1.2 讀取數據
設定好批量讀取的大小,執行 lua 腳本,獲取數據。
二、并發
2.1 線程池
使用線程池去管理這么多 item
下載任務的原因:
- 減少頻繁創建和銷毀線程的開銷
- 控制并發數量,防止不斷創建線程導致資源耗盡
- 復用線程,減少線程切換開銷
將獲取到的 data
進行分片,分片后的數據交給多個線程去下載,提高并發效率。
2.2 協程任務
每個線程新建一個事件循環對象 loop
,用來管理分片后的 data
協程任務。
為了復用 TCP
連接和 session
,選擇讓分片 data
共享一個 TCPConnector
和 ClientSession
對象。這是基于 data
分片大小大概率是同一個網站的數據設計的,可以降低連接創建會話管理的的資源消耗。
2.3 協程并發
通過 asyncio.gather
實現協程并發。
三、大文件分塊
下載文件時,如果文件比較大,網絡又不穩定的情況下,很容易導致下載失敗,因此這里通過將文件分塊下載優化流程。
3.1 分塊
對文件分塊之前,先要獲取文件大小。向服務器發送一個預請求 head
,來獲取文件長度,這樣可以避免獲取整個文件,減少網絡傳輸耗時。
然后對文件進行分塊處理,在傳輸中,需要平衡 網絡擁塞 和 請求頻次 導致的消耗,這里選擇將文件分為 1024 * 1024
也就是 1 MB
的塊大小。
使用 asyncio.Semaphore
控制 同時進行的下載任務數量,避免過多并發導致服務器崩潰。
3.2 下載
修改 headers
中的 Range
獲取文件指定塊大小的內容。
通過裝飾器實現文件的斷點續傳功能,防止因網絡不穩定導致文件內容缺失。
當文件的某個塊下載失敗,超出重試次數時,取消所有該文件塊的下載任務,暫時放棄該文件,記錄到失敗下載隊列中保存,避免因為問件本就損壞這種情況導致不斷重試。
異步裝飾器的實現
3.3 拼接
result
按順序返回請求的結果,將請求的文件塊拼接完成。