分布式爬蟲架構設計

隨著互聯網數據的爆炸式增長,單機爬蟲已經難以滿足大規模數據采集的需求。分布式爬蟲應運而生,它通過多節點協作,實現了數據采集的高效性和容錯性。本文將深入探討分布式爬蟲的架構設計,包括常見的架構模式、關鍵技術組件、完整項目示例以及面臨的挑戰與優化方向。

一、常見架構模式

1. 主從架構(Master - Slave)

架構組成
  • 主節點(Master) :作為整個爬蟲系統的控制中心,負責全局調度工作。

  • 從節點(Slave) :接受主節點分配的任務,執行具體的網頁爬取和數據解析操作。

工作原理
  • 任務調度 :主節點維護一個待爬取 URL 隊列,按照預設策略(如輪詢、權重分配等)將任務分配給從節點。

  • 負載均衡 :主節點實時監控從節點的負載情況(如 CPU 使用率、內存占用、網絡帶寬等),根據監控數據動態調整任務分配策略,確保各從節點的負載相對均衡。

  • 容錯管理 :主節點定期檢測從節點的運行狀態,一旦發現節點故障(如宕機、網絡中斷等),會將該節點未完成的任務重新分配給其他正常節點,保障任務的持續執行。

通信方式
  • HTTP/RPC 調用 :從節點通過調用主節點提供的 HTTP API 或 RPC 接口獲取任務。

  • 消息隊列 :主節點利用 Redis、Kafka 等消息隊列中間件推送任務,從節點訂閱相應的隊列接收任務。

代碼示例(Python + Redis)
  • 主節點:任務分發

# _*_ coding: utf - 8 _*_
import redis
import time
import threadingclass MasterNode:def __init__(self):self.r = redis.Redis(host='master', port=6379, decode_responses=True)self.task_timeout = 20  # 任務超時時間(秒)def task_distribute(self):# 清空之前的任務隊列和任務狀態記錄self.r.delete('task_queue')self.r.delete('task_status')# 初始化待爬取的 URL 列表urls = ['https://example.com/page1', 'https://example.com/page2', 'https://example.com/page3']# 將任務推入 Redis 隊列for url in urls:task_id = self.r.incr('task_id')  # 生成任務 IDtask_info = {'task_id': task_id, 'url': url, 'status': 'waiting', 'assign_time': time.time()}self.r.hset('task_status', task_id, str(task_info))self.r.lpush('task_queue', task_id)print("任務分發完成,共有", len(urls), "個任務")def monitor_tasks(self):while True:# 獲取所有任務狀態task_statuses = self.r.hgetall('task_status')current_time = time.time()for task_id, task_info in task_statuses.items():task_info_dict = eval(task_info)if task_info_dict['status'] == 'processing':# 檢查任務是否超時if current_time - task_info_dict['assign_time'] > self.task_timeout:print(f"任務 {task_id} 超時,重新分配")# 重新分配任務task_info_dict['status'] = 'waiting'self.r.hset('task_status', task_id, str(task_info_dict))self.r.rpush('task_queue', task_id)# 適當間隔檢測time.sleep(5)if __name__ == "__main__":master = MasterNode()master.task_distribute()# 啟動任務監控線程monitor_thread = threading.Thread(target=master.monitor_tasks)monitor_thread.daemon = Truemonitor_thread.start()# 阻止主線程退出monitor_thread.join()
  • 從節點:任務執行
# _*_ coding: utf - 8 _*_
import redis
import requests
from bs4 import BeautifulSoup
import time
import threadingclass SlaveNode:def __init__(self):self.r = redis.Redis(host='master', port=6379, decode_responses=True)def task_execute(self):while True:# 從 Redis 隊列中獲取任務 IDtask_id = self.r.brpop('task_queue', 10)if task_id:task_id = task_id[1]# 更新任務狀態為處理中task_info = eval(self.r.hget('task_status', task_id))task_info['status'] = 'processing'self.r.hset('task_status', task_id, str(task_info))print(f"從節點獲取到任務 {task_id}:", task_info['url'])try:response = requests.get(task_info['url'], timeout=5)if response.status_code == 200:# 解析網頁數據soup = BeautifulSoup(response.text, 'html.parser')title = soup.find('title').get_text() if soup.find('title') else '無標題'links = [link.get('href') for link in soup.find_all('a') if link.get('href')]# 更新任務狀態為完成task_info['status'] = 'completed'task_info['result'] = {'url': task_info['url'], 'title': title, 'links': links}self.r.hset('task_status', task_id, str(task_info))print(f"任務 {task_id} 處理完成,結果已存儲")else:# 更新任務狀態為失敗task_info['status'] = 'failed'task_info['error'] = f"請求失敗,狀態碼:{response.status_code}"self.r.hset('task_status', task_id, str(task_info))print(f"任務 {task_id} 請求失敗,狀態碼:{response.status_code}")except Exception as e:# 更新任務狀態為失敗task_info['status'] = 'failed'task_info['error'] = f"請求或解析過程中出現錯誤:{str(e)}"self.r.hset('task_status', task_id, str(task_info))print(f"任務 {task_id} 處理過程中出現錯誤:", str(e))else:print("任務隊列為空,等待新任務...")# 為避免頻繁輪詢,設置一個短暫的休眠時間time.sleep(1)if __name__ == "__main__":slave = SlaveNode()slave.task_execute()

?二、對等架構(Peer - to - Peer)

架構特點
  • 節點平等 :所有節點地位平等,不存在中心化的控制節點,每個節點既是任務的執行者,也是任務的協調者。

  • 自主協調 :節點通過分布式算法自主協調任務,實現任務的自分配和數據去重。

工作原理
  • 任務自分配 :每個節點根據一定的規則(如 URL 哈希值)自主決定要爬取的任務。例如,采用一致性哈希算法將 URL 映射到特定的節點上。

  • 數據去重 :利用布隆過濾器或分布式哈希表(DHT)等技術避免重復爬取,確保每個 URL 只被一個節點處理。

通信方式
  • 哈希算法 :一致性哈希算法將 URL 映射到一個哈希環上,根據哈希值確定對應的節點。

  • P2P 協議 :節點之間通過 P2P 協議直接通信,傳遞任務信息、數據以及節點狀態等。

代碼示例(Node.js + RabbitMQ)
  • 任務分配與發送

// _*_ coding: utf - 8 _*_
const amqplib = require('amqplib');
const crypto = require('crypto');async function sendTask(url) {try {// 連接 RabbitMQ 服務器const conn = await amqplib.connect('amqp://localhost');const ch = await conn.createChannel();// 定義任務隊列const queueName = 'task_queue';await ch.assertQueue(queueName, { durable: true });// 計算 URL 哈希值并確定節點const hash = crypto.createHash('md5').update(url).digest('hex');const nodeId = hash % 3; // 假設有 3 個節點console.log(`URL ${url} 分配給節點 node_${nodeId}`);// 將任務發送到對應節點的隊列const nodeQueue = `node_${nodeId}`;await ch.assertQueue(nodeQueue, { durable: true });ch.sendToQueue(nodeQueue, Buffer.from(url), { persistent: true });await ch.close();await conn.close();} catch (err) {console.error('發送任務出錯:', err);}
}// 測試發送多個任務
const urls = ['https://example.com/page1', 'https://example.com/page2', 'https://example.com/page3', 'https://example.com/page4'];
urls.forEach(url => {sendTask(url);
});
  • 節點接收與處理任務
// _*_ coding: utf - 8 _*_
const amqplib = require('amqplib');
const axios = require('axios');async function receiveTask(nodeId) {try {// 連接 RabbitMQ 服務器const conn = await amqplib.connect('amqp://localhost');const ch = await conn.createChannel();// 定義節點對應的隊列const queueName = `node_${nodeId}`;await ch.assertQueue(queueName, { durable: true });console.log(`節點 node_${nodeId} 開始接收任務...`);// 從隊列中接收任務并處理ch.consume(queueName, async (msg) => {if (msg !== null) {const url = msg.content.toString();console.log(`節點 node_${nodeId} 獲取到任務:${url}`);try {// 發送 HTTP 請求獲取網頁內容const response = await axios.get(url, { timeout: 5000 });if (response.status === 200) {// 解析網頁數據(此處僅為簡單示例,實際解析邏輯可根據需求定制)const data = {url: url,status: response.status,contentLength: response.headers['content-length']};console.log(`節點 node_${nodeId} 處理任務完成:`, data);// 這里可以將結果存儲到分布式數據庫等} else {console.log(`請求失敗,狀態碼:${response.status}`);}} catch (error) {console.error(`請求或解析過程中出錯:${error.message}`);} finally {// 確認任務已處理,可以從隊列中移除ch.ack(msg);}}}, { noAck: false });} catch (err) {console.error('接收任務出錯:', err);}
}// 啟動多個節點接收任務(實際應用中每個節點運行獨立的進程)
receiveTask(0);
receiveTask(1);
receiveTask(2);

三、關鍵技術組件

(1)任務調度與隊列

  1. Redis 隊列 :提供簡單列表結構,實現任務緩沖與分發。主節點用 LPUSH 推任務,從節點用 BRPOP 獲取任務,適用于小規模爬蟲。

  2. Kafka/RabbitMQ :適合大規模場景,支持高吞吐量任務流。Kafka 的分布式架構可將任務分配到多分區,實現并行消費,提升處理效率。

(2)數據存儲

  1. 分布式數據庫 :MongoDB 分片功能實現數據水平擴展,按業務需求設計分片策略,提升存儲容量與讀寫速度。

  2. 分布式文件系統 :HDFS 存儲大規模非結構化數據,采用冗余存儲機制,保障數據安全可靠,便于后續解析處理。

(3)負載均衡策略

  1. 輪詢調度 :主節點按固定順序分配任務,實現簡單但不適用節點性能差異大場景。

  2. 動態權重 :主節點依從節點性能動態調任務分配權重,充分利用資源,但需準確獲取性能信息且算法復雜。

四、完整項目示例(Scrapy - Redis)

(1)settings.py

# _*_ coding: utf - 8 _*_
# Scrapy settings for scrapy_redis_example project
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = 'redis://master:6379/0'
SCHEDULER_PERSIST = True
DOWNLOAD_DELAY = 1
RANDOMIZE_DOWNLOAD_DELAY = True
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8

(2)items.py

# _*_ coding: utf - 8 _*_
import scrapyclass ScrapyRedisExampleItem(scrapy.Item):title = scrapy.Field()url = scrapy.Field()content = scrapy.Field()

(3)spiders/distributed_spider.py

# _*_ coding: utf - 8 _*_
import scrapy
from scrapy_redis.spiders import RedisSpider
from scrapy_redis_example.items import ScrapyRedisExampleItemclass DistributedSpider(RedisSpider):name = 'distributed_spider'redis_key = 'distributed_spider:start_urls'def __init__(self, *args, **kwargs):super(DistributedSpider, self).__init__(*args, **kwargs)def parse(self, response):# 解析網頁數據item = ScrapyRedisExampleItem()item['title'] = response.css('h1::text').get()item['url'] = response.urlitem['content'] = response.css('div.content::text').get()yield item# 提取新的 URL 并生成請求new_urls = response.css('a::attr(href)').getall()for url in new_urls:# 過濾掉非絕對 URLif url.startswith('http'):yield scrapy.Request(url, callback=self.parse)

(4)啟動爬蟲

在主節點上啟動 Redis 服務器,然后運行以下命令啟動爬蟲。在從節點上,只需安裝相同的 Scrapy - Redis 項目,并連接到同一 Redis 服務器即可。

scrapy crawl distributed_spider

五、挑戰與優化方向

(1)反爬對抗

  1. 動態 IP 代理池 :構建動態 IP 代理池,從節點用不同代理 IP 發請求,防被目標網站封禁,可參考 proxy_pool 開源項目。

  2. 請求頻率偽裝 :隨機延遲請求發送時間,輪換 User - Agent,打亂請求模式,降低被識別風險。

(2)容錯機制

  1. 任務超時重試 :設任務超時時間,從節點未按時完成,主節點重試或轉交其他節點,借鑒 Celery retry 機制。

  2. 節點心跳檢測 :用 ZooKeeper 等服務監控節點存活,節點定期發心跳信號,主節點監聽判斷故障,及時重分配任務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/82471.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/82471.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/82471.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[java]eclipse中windowbuilder插件在線安裝

目錄 一、打開eclipse 二、打開插件市場 三、輸入windowbuilder,點擊install 四、進入安裝界面 五、勾選我同意... 重啟即可 一、打開eclipse 二、打開插件市場 三、輸入windowbuilder,點擊install 四、進入安裝界面 五、勾選我同意... 重啟即可

sass,less是什么?為什么要使用他們?

理解 他們都是css的預處理器,允許開發者通過更高級的語法編寫css代碼(支持變量,嵌套),然后通過編譯成css文件 使用原因 結構清晰,便于擴展提高開發效率,便于后期開發維護

Java設計模式之模板方法模式:從基礎到高級的全面解析(最詳解)

文章目錄 一、模板方法模式基礎概念1.1 什么是模板方法模式1.2 模板方法模式的核心結構1.3 模板方法模式中的方法分類1.4 模板方法模式的簡單示例二、模板方法模式的深入解析2.1 模板方法模式的核心原理2.2 模板方法模式的優勢與適用場景優勢分析適用場景2.3 模板方法模式與其他…

【C/C++】如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦

文章目錄 如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦?1 假設場景設計2 Codes3 流程圖4 優劣勢5 風險可能 如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦? 1 假設場景設計 Producer(生產者):生…

MVC和MVVM架構的區別

MVC和MVVM都是前端開發中常用的設計模式,都是為了解決前端開發中的復雜性而設計的,而MVVM模式則是一種基于MVC模式的新模式。 MVC(Model-View-Controller)的三個核心部分:模型、視圖、控制器相較于MVVM(Model-View-ViewModel)的三個核心部分…

蘭亭妙微 | 圖標設計公司 | UI設計案例復盤

在「33」「312」新高考模式下,選科決策成為高中生和家長的「頭等大事」。蘭亭妙微公司受委托優化高考選科決策平臺個人診斷報告界面,核心挑戰是:如何將復雜的測評數據(如學習能力傾向、學科報考機會、職業興趣等)轉化為…

有銅半孔的設計規范與材料創新

設計關鍵參數 孔徑與間距限制 最小孔徑需≥0.6mm,孔邊距≥0.5mm,避免銅層脫落;拼版時半孔區域需預留2mm間距防止撕裂。 阻焊橋設計 必須保留阻焊橋(寬度≥0.1mm),防止焊錫流入孔內造成短路。 獵板的材料…

Engineering a direct k-way Hypergraph Partitioning Algorithm【2017 ALENEX】

文章目錄 一、作者二、摘要三、相關工作四、算法概述五、實驗結果六、主要貢獻 一、作者 Yaroslav Akhremtsev, Tobias Heuer, Peter Sanders, Sebastian Schlag 二、摘要 我們開發了一種快速且高質量的多層算法,能夠直接將超圖劃分為 k 個平衡的塊 —— 無需借助遞…

視頻問答功能播放器(視頻問答)視頻彈題功能實例

視頻問答播放器是一種互動教學工具,在視頻播放過程中彈出題目卡,學員答題后才能繼續觀看,提升學習參與度。視頻問答功能播放器(視頻問答)視頻彈題功能實例: 視頻播放器的視頻問答功能(也叫問答播放器、視頻彈題、視頻問…

2025年AI代理演進全景:從技術成熟度曲線到產業重構

2025年AI代理演進全景:從技術成熟度曲線到產業重構 一、技術成熟度曲線定位:AI代理的“期望膨脹期” 根據Gartner技術成熟度曲線(Hype Cycle?),AI代理(Agentic AI)當前正處于期望膨脹期向泡沫…

基于python的機器學習(八)—— 評估算法(一)

目錄 一、機器學習評估的基本概念 1.1 評估的定義與目標 1.2 常見評估指標 1.3 訓練集、驗證集與測試集的劃分 二、分離數據集 2.1 分離訓練數據集和評估數據集 2.2 k折交叉驗證分離 2.3 棄一交叉驗證分離 2.4 重復隨機評估和訓練數據集分離 三、交叉驗證技術 3.…

Win11 系統登入時綁定微軟郵箱導致用戶名欠缺

Win11 系統登入時綁定微軟郵箱導致用戶名欠缺 解決思路 -> 解綁當前微軟郵箱和用戶名 -> 斷網離線建立本地賬戶 -> 設置本地賬戶為Admin權限 -> 注銷當前賬戶,登入新建的用戶 -> 聯網綁定微軟郵箱 -> 刪除舊的用戶命令步驟 管理員權限打開…

Mac系統-最方便的一鍵環境部署軟件ServBay(支持php,java,python,node,go,mysql等)沒有之一,已親自使用!

自從換成Mac電腦以后,做開發有時候要部署各種環境,如php,mysql,nginx,pgsql,java,node,python,go時,嘗試過原生環境部署,各種第三方軟件部署&…

Flink中Kafka連接器的基本應用

文章目錄 前言Kafka連接器基礎案例演示前置說明和環境準備步驟Kafka連接器基本配置關聯數據源映射轉換案例效果演示基于Kafka連接器同步數據到MySQL案例說明前置準備Kafka連接器消費位點調整映射轉換與數據投遞MysqlSlink持久化收集器數據最終效果演示小結參考前言 本文將基于…

Leetcode 刷題記錄 11 —— 二叉樹第二彈

本系列為筆者的 Leetcode 刷題記錄,順序為 Hot 100 題官方順序,根據標簽命名,記錄筆者總結的做題思路,附部分代碼解釋和疑問解答,01~07為C語言,08及以后為Java語言。 01 二叉樹的層序遍歷 /*** Definition…

【R語言科研繪圖】

R語言在繪制SCI期刊圖像時具有顯著優勢,以下從功能、靈活性和學術適配性三個方面分析其適用性: 數據可視化庫豐富 R語言擁有ggplot2、lattice、ggpubr等專業繪圖包,支持生成符合SCI期刊要求的高分辨率圖像(如TIFF/PDF格式&#…

【Node.js】Web開發框架

個人主頁:Guiat 歸屬專欄:node.js 文章目錄 1. Node.js Web框架概述1.1 Web框架的作用1.2 Node.js主要Web框架生態1.3 框架選擇考慮因素 2. Express.js2.1 Express.js概述2.2 基本用法2.2.1 安裝Express2.2.2 創建基本服務器 2.3 路由2.4 中間件2.5 請求…

PDF 轉 JPG 圖片小工具:CodeBuddy 助力解決轉換痛點

本文所使用的 CodeBuddy 免費下載鏈接:騰訊云代碼助手 CodeBuddy - AI 時代的智能編程伙伴 前言 在數字化辦公與內容創作的浪潮中,將 PDF 文件轉換為 JPG 圖片格式的需求日益頻繁。無論是學術文獻中的圖表提取,還是宣傳資料的視覺化呈現&am…

Linux 文件系統層次結構

Linux 的文件系統遵循 Filesystem Hierarchy Standard (FHS) 標準,其目錄結構是層次化的,每個目錄都有明確的用途。以下是 Linux 中部分目錄的作用解析: 1. 根目錄 / 作用:根目錄是整個文件系統的頂層目錄,所有其他目…

密碼學標準(Cryptography Standards)介紹

密碼學標準(Cryptography Standards)是為確保信息安全傳輸、存儲和處理而制定的一系列技術規范和協議,廣泛應用于通信、金融、互聯網等領域。以下從分類、主流標準、應用場景和發展趨勢四個方面進行詳細介紹: 一、密碼學標準的分類 密碼學標準可根據技術原理和應用場景分…