【爬蟲】04 - 高級數據存儲

爬蟲04 - 高級數據存儲

文章目錄

  • 爬蟲04 - 高級數據存儲
    • 一:加密數據的存儲
    • 二:JSON Schema校驗
    • 三:云原生NoSQL(了解)
    • 四:Redis Edge近端計算(了解)
    • 五:二進制存儲
      • 1:Pickle
      • 2:Parquet

一:加密數據的存儲

對于用戶的隱私數據,可能需要進行數據的加密才能存儲到文件或者數據庫,因為要遵守《個人信息保護法》

所以對于這些隱私數據處理順序是:明文數據 → AES加密 → 密文字節流 → 序列化(如Base64) → 存儲至文件

from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import base64
import json# 第一步:生成256位(32字節)密鑰 + 16字節IV(CBC模式必需)
key = get_random_bytes(32)
iv = get_random_bytes(16)# 第二步:加密數據
def encrypt_data(data: str, key: bytes, iv: bytes) -> bytes:cipher = AES.new(key, AES.MODE_CBC, iv)data_bytes = data.encode('utf-8')# PKCS7填充至分組長度倍數pad_len = AES.block_size - (len(data_bytes) % AES.block_size)padded_data = data_bytes + bytes([pad_len] * pad_len)return cipher.encrypt(padded_data)plain_text = "用戶機密數據:張三|13800138000|身份證1101..."
encrypted_bytes = encrypt_data(plain_text, key, iv)# 第三步:轉成base64并寫入json文件中
encrypted_b64 = base64.b64encode(iv + encrypted_bytes).decode('utf-8')# 寫入文件(JSON示例)
with open("encrypted_data.json", "w") as f:json.dump({"encrypted_data": encrypted_b64}, f)# 第四步,讀取文件并解密
def decrypt_data(encrypted_b64: str, key: bytes) -> str:encrypted_full = base64.b64decode(encrypted_b64)iv = encrypted_full[:16]  # 提取IVciphertext = encrypted_full[16:]cipher = AES.new(key, AES.MODE_CBC, iv)decrypted_padded = cipher.decrypt(ciphertext)# 去除PKCS7填充pad_len = decrypted_padded[-1]return decrypted_padded[:-pad_len].decode('utf-8')# 從文件讀取并解密
with open("encrypted_data.json", "r") as f:data = json.load(f)
decrypted_text = decrypt_data(data["encrypted_data"], key)
print(decrypted_text)  # 輸出原始明文

二:JSON Schema校驗

在爬蟲開發中,?JSON?因其輕量、易讀和跨平臺特性,成為數據存儲的主流格式。

然而,面對動態變化的網頁結構或API響應,未經校驗的JSON數據可能導致字段缺失、類型混亂甚至數據污染,進而引發下游分析錯誤或系統崩潰。本文聚焦?JSON Schema校驗?,結合Python的jsonschema庫,詳解如何為爬蟲數據“上保險”,確保存儲的JSON文件結構合法、字段完整,為數據質量筑起第一道防線。

規則場景實例
required確保商品名稱、價格等核心字段必填
enum限定狀態字段為[“已售罄”, “在售”]
pattern驗證手機號、郵箱格式合法性
custom format使用date-time校驗爬取時間戳格式
oneOf/anyOf處理多態結構(如不同店鋪的商品模型)
from jsonschema import validate, ValidationError
import json
from datetime import datetime
import logging# 定義商品數據Schema
product_schema = {"type": "object","required": ["title", "price"], # 必須包含title和price字段"properties": { # 定義字段類型和范圍"title": {"type": "string"}, # title字段類型為字符串"price": {"type": "number", "minimum": 0}, # price字段類型為數字,最小值為0"currency": {"enum": ["CNY", "USD"]}, # currency字段類型為枚舉,可選值為CNY和USD"images": {"type": "array", "items": {"type": "string", "format": "uri"}} # images字段類型為數組,數組元素類型為字符串,格式為URI}
}# 驗證商品數據, 對于傳入的數據進行驗證,返回驗證結果和錯誤信息
def validate_product(data: dict):try:validate(instance=data, schema=product_schema)return True, Noneexcept ValidationError as e:return False, f"校驗失敗:{e.message} (路徑:{e.json_path})"def save_product(data: dict):# 校驗數據合法性is_valid, error = validate_product(data)if not is_valid:logging.error(f"數據丟棄:{error}")return# 添加爬取時間戳data["crawled_time"] = datetime.now().isoformat()# 寫入JSON文件(按行存儲)with open("products.jsonl", "a") as f:f.write(json.dumps(data, ensure_ascii=False) + "\n")

三:云原生NoSQL(了解)

傳統自建的noSQL有如下的問題:

  • ?運維黑洞?:分片配置、版本升級、備份恢復消耗30%以上開發精力。
  • ?擴展滯后?:突發流量導致集群擴容不及時,引發數據丟失或性能瓶- 頸。
  • ?容災脆弱?:自建多機房方案成本高昂,且故障切換延遲高。
  • ?安全風險?:未及時修補漏洞導致數據泄露,合規審計難度大。

云原生的NoSQL有如下的優勢:

特性價值典型的場景
全托管架構開發者聚焦業務邏輯,無需管理服務器中小團隊快速構建爬蟲存儲系統
自動彈性伸縮根據負載動態調整資源,成本降低40%+應對“雙十一”級數據洪峰
全球多活數據就近寫入,延遲低于50ms跨國爬蟲數據本地化存儲
內置安全自動加密、漏洞掃描、合規認證(如GDPR)用戶隱私數據安全存儲

AWS DynamoDB

高并發寫入、固定模式查詢(如URL去重、狀態記錄)。

  • 使用自適應容量(Adaptive Capacity)避免熱點分片 throttling。
  • 對歷史數據啟用TTL自動刪除(節省存儲費用)。
  • 通過IAM策略限制爬蟲節點的最小權限(如只允許PutItem)。
  • 啟用KMS加密靜態數據。
import boto3
from boto3.dynamodb.types import Binary# 創建DynamoDB資源(密鑰從環境變量注入)
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('CrawlerData')# 動態創建表(按需計費模式)
table = dynamodb.create_table(TableName='CrawlerData',KeySchema=[{'AttributeName': 'data_id', 'KeyType': 'HASH'},  # 分區鍵{'AttributeName': 'crawled_time', 'KeyType': 'RANGE'}  # 排序鍵],AttributeDefinitions=[{'AttributeName': 'data_id', 'AttributeType': 'S'},{'AttributeName': 'crawled_time', 'AttributeType': 'N'}],BillingMode='PAY_PER_REQUEST'  # 按請求量計費,無預置容量
)# 寫入加密數據(結合前文AES加密)
encrypted_data = aes_encrypt("敏感數據")
table.put_item(Item={'data_id': 'page_123','crawled_time': 1633027200,'content': Binary(encrypted_data),'source_site': 'example.com'
})# 查詢特定時間段數據
response = table.query(KeyConditionExpression=Key('data_id').eq('page_123') & Key('crawled_time').between(1633027000, 1633027400)
)

MongoDB Atlas

動態結構數據存儲(如商品詳情異構字段)、復雜聚合分析

  • 選擇Serverless實例應對突發流量(費用=請求數×數據量)。
  • 啟用壓縮(Snappy)減少存儲開銷。
  • 使用字段級加密(Client-Side Field Level Encryption)。
  • 配置網絡訪問規則(僅允許爬蟲服務器IP段)。
from pymongo import MongoClient
from pymongo.encryption import ClientEncryption# 連接Atlas集群(SRV連接串自動分片)
uri = "mongodb+srv://user:password@cluster0.abcd.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(uri)
db = client['crawler']
collection = db['dynamic_data']# 寫入動態結構數據(無預定義Schema)
product_data = {"title": "智能手表","price": 599.99,"attributes": {"防水等級": "IP68", "電池容量": "200mAh"},"extracted_time": "2023-10-05T14:30:00Z"  # ISO 8601格式
}
collection.insert_one(product_data)# 執行聚合查詢(統計各價格區間商品數)
pipeline = [{"$match": {"price": {"$exists": True}}},{"$bucket": {"groupBy": "$price","boundaries": [0, 100, 500, 1000],"default": "Other","output": {"count": {"$sum": 1}}}}
]
results = collection.aggregate(pipeline)

四:Redis Edge近端計算(了解)

當爬蟲節點遍布全球邊緣網絡時,傳統“端側采集-中心存儲-云端計算”的鏈路過長,導致?高延遲?、?帶寬成本激增?與?實時性缺失?。?Redis Edge Module?通過將數據處理能力下沉至爬蟲節點,實現?數據去重?、?實時聚合?與?規則過濾?的近端執行,重構了爬蟲存儲架構的邊界

在這里插入圖片描述

而中心化計算有如下的三個問題:

  • ?延遲敏感場景失效?:跨國爬蟲數據回傳延遲高達200ms+,無法滿足實時監控需求。
  • ?帶寬成本失控?:重復數據(如相似頁面內容)占用80%以上傳輸資源。
  • ?數據處理滯后?:中心服務器批量處理無法觸發即時響應(如突發輿情告警)。
模塊功能爬蟲場景價值
RedisTimeSeries毫秒級時序數據處理實時統計爬蟲吞吐量/成功率
RedisBloom布隆過濾器實現去重近端URL去重,節省90%帶寬
RedisGears邊緣側執行Python函數數據清洗/格式化前置
RedisAI部署輕量ML模型實時敏感內容識別

傳統架構中:爬蟲節點 -> 原始數據上傳 -> 中心數據庫 -> 批量處理

邊緣架構中:爬蟲節點 -> Redis Edge -> 規則執行數據過濾和聚合 -> 壓縮有效數據同步到中心數據庫

# 下載Redis Edge鏡像(集成所有模塊)  
docker run -p 6379:6379 redislabs/redisedge:latest  # 啟用模塊(示例啟用Bloom和TimeSeries)  
redis-cli module load /usr/lib/redis/modules/redisbloom.so  
redis-cli module load /usr/lib/redis/modules/redistimeseries.so  
# 使用布隆過濾器去重
import redis  
from redisbloom.client import Client  # 連接邊緣Redis  
r = redis.Redis(host='edge-node-ip', port=6379)  
rb = Client(connection_pool=r.connection_pool)  def url_deduplicate(url: str) -> bool:  if rb.bfExists('crawler:urls', url):  return False  rb.bfAdd('crawler:urls', url)  return True  # 在爬蟲循環中調用  
if url_deduplicate(target_url):  data = crawl(target_url)  process_data(data)  
else:  print(f"URL已存在:{target_url}")  
# 時序數據實時統計
# 創建時序數據集  
import redis
from redisbloom.client import Client  # 連接邊緣Redis  
r = redis.Redis(host='edge-node-ip', port=6379)  
rb = Client(connection_pool=r.connection_pool)  r.ts().create('crawl:latency', retention_msec=86400000)  # 記錄每次請求延遲  
def log_latency(latency_ms: float):  r.ts().add('crawl:latency', '*', latency_ms, duplicate_policy='first')  # 每5秒聚合平均延遲  
avg_latency = r.ts().range('crawl:latency', '-', '+', aggregation_type='avg', bucket_size_msec=5000)  
print(f"近5秒平均延遲:{avg_latency[-1][1]} ms")  

五:二進制存儲

傳統的文本格式(如CSV、JSON)雖然易于閱讀和解析,但在處理大規模數據時存在讀寫速度慢、存儲空間占用高等問題。

而二進制格式憑借其緊湊的存儲方式和高效的序列化機制,成為優化性能的重要選擇。

和文本文件存儲相比,二進制文件有如下的優勢:

  1. 更快的讀寫速度?:無需文本編碼/解碼,直接操作二進制流。
  2. 更小的存儲體積?:二進制數據壓縮效率更高,節省磁盤空間。
  3. 支持復雜數據類型?:可序列化自定義對象、多維數組等非結構化數據。

1:Pickle

Pickle是Python內置的序列化模塊,可將任意Python對象轉換為二進制數據并保存到文件,適用于臨時緩存或中間數據存儲。

  • 支持所有Python原生數據類型。
  • 序列化/反序列化速度快,代碼簡潔。
import pickle# 保存數據
data = {"name": "Alice", "age": 30, "tags": ["Python", "Web"]}
with open("data.pkl", "wb") as f:pickle.dump(data, f)# 讀取數據
with open("data.pkl", "rb") as f:loaded_data = pickle.load(f)
print(loaded_data)  # 輸出: {'name': 'Alice', 'age': 30, 'tags': ['Python', 'Web']}

2:Parquet

Parquet是一種面向列的二進制存儲格式,專為大數據場景設計,支持高效壓縮和快速查詢,廣泛應用于Hadoop、Spark等分布式系統。

  • 列式存儲?:按列壓縮和讀取,減少I/O開銷,適合聚合查詢。
  • ?高壓縮率?:默認使用Snappy壓縮算法,體積比CSV減少70%以上。
  • ?跨平臺兼容?:支持Java、Python、Spark等多種語言和框架。
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd# 創建示例數據
df = pd.DataFrame({"id": [1, 2, 3],"content": ["text1", "text2", "text3"]
})# 保存為Parquet文件
table = pa.Table.from_pandas(df)
pq.write_table(table, "data.parquet")# 讀取Parquet文件
parquet_table = pq.read_table("data.parquet")
print(parquet_table.to_pandas())
指標PickleParquet
讀寫速度快(Python專用)快(大數據優化)
存儲體積中等極小(高壓縮)
適用場景臨時緩存、復雜對象結構化數據、分析查詢

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/89608.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/89608.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/89608.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

UDP和TCP的主要區別是什么?

在網絡通信中,TCP(傳輸控制協議)和UDP(用戶數據報協議)是兩種核心的傳輸層協議。它們各自的特點和應用場景截然不同,理解兩者的區別對于選擇合適的通信方式至關重要。本文將通過幾個關鍵點,用簡…

Softhub軟件下載站實戰開發(十八):軟件分類展示

Softhub軟件下載站實戰開發(十八):軟件分類展示 🖥? 在之前文章中,我們實現了后臺管理相關部分,本篇文章開始我們來實現用戶端頁面,由于內網使用,不需要sso優化等特性,我…

linux--------------------BlockQueue的生產者消費模型

1.基礎BlockingQueue的生產者消費模型 1.1 BlockQueue 在多線程編程中阻塞隊列是一種常用于實現生產者和消費者模型的數據結構,它與普通的隊列區別在于,當隊列為空時,從隊列獲取元素的操作將被阻塞,直到隊列中放入了新的數據。當…

堆排序算法詳解:原理、實現與C語言代碼

堆排序(Heap Sort)是一種高效的排序算法,利用二叉堆數據結構實現。其核心思想是將待排序序列構造成一個大頂堆(或小頂堆),通過反復調整堆結構完成排序。下面從原理到實現進行詳細解析。一、核心概念&#x…

SSM框架——注入類型

引用類型的注入:Setter方法簡單類型的注入:定義簡單實例和方法在配置文件中對bean進行配置,使用porperty屬性 值用value(ref是用來獲取bean的)構造器方法:構造器方法中需要寫name,這樣程序就會耦…

信息學奧賽一本通 1552:【例 1】點的距離

【題目鏈接】 ybt 1552:【例 1】點的距離 【題目考點】 1. 最近公共祖先(LCA):倍增求LCA 知識點講解見:洛谷 P3379 【模板】最近公共祖先(LCA) 【解題思路】 首先用鄰接表保存輸入的無權圖…

1Panel中的OpenResty使用alias

問題 在服務器上使用了1Panel的OpenResty來管理網站服務,當作是一個Nginx用,想做一個alias來直接管理某個文件夾的文件,于是直接在其中一個網站中使用了alias配置。 location /upload {alias /root/upload;autoindex on;charset utf-8;charse…

小明記賬簿煥新記:從單色到多彩的主題進化之路

【從冷靜藍到多彩世界,這一次我們重新定義記賬美學】 曾經,打開“小明記賬簿”是一片沉穩的藍色海洋,它像一位理性的財務管家,默默守護著你的每一筆收支。但總有人悄悄問:“能不能多一些顏色?”今天&#x…

Apache IoTDB(1):時序數據庫介紹與單機版安裝部署指南

目錄一、Apache IoTDB 是什么?1.1 產品介紹1.2 產品體系1.3 產品架構二、IoTDB 環境配置2.1 Linux系統需準備環境2.2 Windows系統需準備環境2.3 網絡配置2.3.1 關閉防火墻2.3.2 查看端口是否占用2.3.3 避雷經驗三、IoTDB 單機版系統部署安裝指南3.1 產品下載3.2 注意…

Python 圖片爬取入門:從手動下載到自動批量獲取

前言 想批量下載網頁圖片卻嫌手動保存太麻煩?本文用 Python 帶你實現自動爬取,從分析網站到代碼運行,步驟清晰,新手也能快速上手,輕松搞定圖片批量獲取。 1.安裝模塊 在開始爬取圖片前,我們需要準備好工具…

aspect-ratio: 1 / 1樣式在部分手機瀏覽器中失效的問題怎么解決?

最近在uniapp開發時又遇到了安卓手機不兼容問題&#xff0c;ios系統無影響。開發背景&#xff1a;小編想通過網格布局來實現答題卡的布局&#xff0c;實現五列多行的形式。代碼片段&#xff1a;<view class"question-grid"><viewv-for"(question, inde…

RecyclerView與ListView深度對比分析

1. 使用流程對比ListView: 布局XML&#xff1a; 在布局文件中放置 <ListView> 控件&#xff0c;指定 id (如 android:id"id/listView")。數據適配器 (Adapter)&#xff1a; 繼承 BaseAdapter 或 ArrayAdapter / CursorAdapter / SimpleAdapter。 重寫 getCount…

deepseekAI對接大模型的網頁PHP源碼帶管理后臺(可實現上傳分析文件)

前端后端都已進行優化&#xff0c;新增可上傳文件功能&#xff08;拖拽進去也可以&#xff09;&#xff0c;后端進行風格主題設置&#xff0c;優化數據結構&#xff01;依舊測試網站&#xff1a;iEPMS我的工具箱&#xff0c;你的智慧助手&#xff01;還是那句話兄弟們輕點搞我的…

NJU 凸優化導論(9) 對偶(II)KKT條件+變形重構

https://www.lamda.nju.edu.cn/chengq/optfall24/slides/Lecture_9.pdf 目錄 關于對偶的一些解釋 1. Max-min characterization 最大最小準則 2. Saddle-point Interpretation 鞍點解釋 3. Game interpretation 博弈論里的對偶 Optimality Conditions 最優條件 1. Certi…

Vue Swiper組件

Vue 漸進式JavaScript 框架 基于Vue2的學習筆記 - Vue Swiper組件實現筆記 目錄 Swiper組件 下載swiper 創建swiper組件 保存時修復 編寫swiper內容 引入swiper 使用swiper Swiper子組件 創建Swiper列表組件 使用子組件 增加生命周期 增加圖片顯示 加載數據 渲染…

Linux:lvs集群技術

一.集群和分布式1.1 集群集群是為了解決某個特定問題將多臺計算機組合起來形成的單個系統。即當單獨一臺主機無法承載現有的用戶請求量&#xff1b;或者一臺主機因為單一故障導致業務中斷的時候&#xff0c;就可以增加服務主機數&#xff0c;這些主機在一起提供服務&#xff0c…

【管理】持續交付2.0:業務引領的DevOps-精要增訂本,讀書筆記(理論模型,技術架構,業務價值)

【管理】持續交付2.0&#xff1a;業務引領的DevOps-精要增訂本&#xff0c;讀書筆記&#xff08;理論模型&#xff0c;技術架構&#xff0c;業務價值&#xff09; 文章目錄1、持續交付的理論模型&#xff08;第1-3章&#xff09;1.1 結構圖1.2 持續交付的演進1.3 雙環模型理論體…

Wilcox檢驗的星星怎么規定的?

在 R 里&#xff0c;常見的把 p 值映射為“星號”標記&#xff08;顯著性水平&#xff09;的規則通常是&#xff1a;p 值范圍標記p ≤ 0.0001“****”0.0001 < p ≤ 0.001“***”0.001 < p ≤ 0.01“**”0.01 < p ≤ 0.05“*”0.05 < p ≤ 0.1“.”p > 0.1…

https與DNS的運行流程

HTTPS流程&#xff1a;HTTPS核心:加了TLS層&#xff0c;加密傳輸身份認證TLS:信息加密、校驗機制、身份證書TLS&#xff08;Transport Layer Security&#xff09;握手是建立安全通信通道的關鍵過程&#xff0c;發生在客戶端&#xff08;如瀏覽器&#xff09;和服務器之間。其主…

板子 5.29--7.19

板子 5.29–7.19 目錄 1. 樹狀數組 2. KMP 3. 矩陣快速冪 4. 數位DP 5. 狀壓枚舉子集 6. 快速冪&#xff08;新版 7. priority_queue 8. dijkstra 9. 單調棧 10. debug內容 1. 樹狀數組 // 樹狀數組 快速求前綴和 / 前綴最大值 // 維護位置數量(離散化)...// (區間加 區間求和…