爬蟲04 - 高級數據存儲
文章目錄
- 爬蟲04 - 高級數據存儲
- 一:加密數據的存儲
- 二:JSON Schema校驗
- 三:云原生NoSQL(了解)
- 四:Redis Edge近端計算(了解)
- 五:二進制存儲
- 1:Pickle
- 2:Parquet
一:加密數據的存儲
對于用戶的隱私數據,可能需要進行數據的加密才能存儲到文件或者數據庫,因為要遵守《個人信息保護法》
所以對于這些隱私數據處理順序是:明文數據 → AES加密 → 密文字節流 → 序列化(如Base64) → 存儲至文件
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import base64
import json# 第一步:生成256位(32字節)密鑰 + 16字節IV(CBC模式必需)
key = get_random_bytes(32)
iv = get_random_bytes(16)# 第二步:加密數據
def encrypt_data(data: str, key: bytes, iv: bytes) -> bytes:cipher = AES.new(key, AES.MODE_CBC, iv)data_bytes = data.encode('utf-8')# PKCS7填充至分組長度倍數pad_len = AES.block_size - (len(data_bytes) % AES.block_size)padded_data = data_bytes + bytes([pad_len] * pad_len)return cipher.encrypt(padded_data)plain_text = "用戶機密數據:張三|13800138000|身份證1101..."
encrypted_bytes = encrypt_data(plain_text, key, iv)# 第三步:轉成base64并寫入json文件中
encrypted_b64 = base64.b64encode(iv + encrypted_bytes).decode('utf-8')# 寫入文件(JSON示例)
with open("encrypted_data.json", "w") as f:json.dump({"encrypted_data": encrypted_b64}, f)# 第四步,讀取文件并解密
def decrypt_data(encrypted_b64: str, key: bytes) -> str:encrypted_full = base64.b64decode(encrypted_b64)iv = encrypted_full[:16] # 提取IVciphertext = encrypted_full[16:]cipher = AES.new(key, AES.MODE_CBC, iv)decrypted_padded = cipher.decrypt(ciphertext)# 去除PKCS7填充pad_len = decrypted_padded[-1]return decrypted_padded[:-pad_len].decode('utf-8')# 從文件讀取并解密
with open("encrypted_data.json", "r") as f:data = json.load(f)
decrypted_text = decrypt_data(data["encrypted_data"], key)
print(decrypted_text) # 輸出原始明文
二:JSON Schema校驗
在爬蟲開發中,?JSON?因其輕量、易讀和跨平臺特性,成為數據存儲的主流格式。
然而,面對動態變化的網頁結構或API響應,未經校驗的JSON數據可能導致字段缺失、類型混亂甚至數據污染,進而引發下游分析錯誤或系統崩潰。本文聚焦?JSON Schema校驗?,結合Python的jsonschema庫,詳解如何為爬蟲數據“上保險”,確保存儲的JSON文件結構合法、字段完整,為數據質量筑起第一道防線。
規則 | 場景實例 |
---|---|
required | 確保商品名稱、價格等核心字段必填 |
enum | 限定狀態字段為[“已售罄”, “在售”] |
pattern | 驗證手機號、郵箱格式合法性 |
custom format | 使用date-time校驗爬取時間戳格式 |
oneOf/anyOf | 處理多態結構(如不同店鋪的商品模型) |
from jsonschema import validate, ValidationError
import json
from datetime import datetime
import logging# 定義商品數據Schema
product_schema = {"type": "object","required": ["title", "price"], # 必須包含title和price字段"properties": { # 定義字段類型和范圍"title": {"type": "string"}, # title字段類型為字符串"price": {"type": "number", "minimum": 0}, # price字段類型為數字,最小值為0"currency": {"enum": ["CNY", "USD"]}, # currency字段類型為枚舉,可選值為CNY和USD"images": {"type": "array", "items": {"type": "string", "format": "uri"}} # images字段類型為數組,數組元素類型為字符串,格式為URI}
}# 驗證商品數據, 對于傳入的數據進行驗證,返回驗證結果和錯誤信息
def validate_product(data: dict):try:validate(instance=data, schema=product_schema)return True, Noneexcept ValidationError as e:return False, f"校驗失敗:{e.message} (路徑:{e.json_path})"def save_product(data: dict):# 校驗數據合法性is_valid, error = validate_product(data)if not is_valid:logging.error(f"數據丟棄:{error}")return# 添加爬取時間戳data["crawled_time"] = datetime.now().isoformat()# 寫入JSON文件(按行存儲)with open("products.jsonl", "a") as f:f.write(json.dumps(data, ensure_ascii=False) + "\n")
三:云原生NoSQL(了解)
傳統自建的noSQL有如下的問題:
- ?運維黑洞?:分片配置、版本升級、備份恢復消耗30%以上開發精力。
- ?擴展滯后?:突發流量導致集群擴容不及時,引發數據丟失或性能瓶- 頸。
- ?容災脆弱?:自建多機房方案成本高昂,且故障切換延遲高。
- ?安全風險?:未及時修補漏洞導致數據泄露,合規審計難度大。
云原生的NoSQL有如下的優勢:
特性 | 價值 | 典型的場景 |
---|---|---|
全托管架構 | 開發者聚焦業務邏輯,無需管理服務器 | 中小團隊快速構建爬蟲存儲系統 |
自動彈性伸縮 | 根據負載動態調整資源,成本降低40%+ | 應對“雙十一”級數據洪峰 |
全球多活 | 數據就近寫入,延遲低于50ms | 跨國爬蟲數據本地化存儲 |
內置安全 | 自動加密、漏洞掃描、合規認證(如GDPR) | 用戶隱私數據安全存儲 |
AWS DynamoDB
高并發寫入、固定模式查詢(如URL去重、狀態記錄)。
- 使用自適應容量(Adaptive Capacity)避免熱點分片 throttling。
- 對歷史數據啟用TTL自動刪除(節省存儲費用)。
- 通過IAM策略限制爬蟲節點的最小權限(如只允許PutItem)。
- 啟用KMS加密靜態數據。
import boto3
from boto3.dynamodb.types import Binary# 創建DynamoDB資源(密鑰從環境變量注入)
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('CrawlerData')# 動態創建表(按需計費模式)
table = dynamodb.create_table(TableName='CrawlerData',KeySchema=[{'AttributeName': 'data_id', 'KeyType': 'HASH'}, # 分區鍵{'AttributeName': 'crawled_time', 'KeyType': 'RANGE'} # 排序鍵],AttributeDefinitions=[{'AttributeName': 'data_id', 'AttributeType': 'S'},{'AttributeName': 'crawled_time', 'AttributeType': 'N'}],BillingMode='PAY_PER_REQUEST' # 按請求量計費,無預置容量
)# 寫入加密數據(結合前文AES加密)
encrypted_data = aes_encrypt("敏感數據")
table.put_item(Item={'data_id': 'page_123','crawled_time': 1633027200,'content': Binary(encrypted_data),'source_site': 'example.com'
})# 查詢特定時間段數據
response = table.query(KeyConditionExpression=Key('data_id').eq('page_123') & Key('crawled_time').between(1633027000, 1633027400)
)
MongoDB Atlas
動態結構數據存儲(如商品詳情異構字段)、復雜聚合分析
- 選擇Serverless實例應對突發流量(費用=請求數×數據量)。
- 啟用壓縮(Snappy)減少存儲開銷。
- 使用字段級加密(Client-Side Field Level Encryption)。
- 配置網絡訪問規則(僅允許爬蟲服務器IP段)。
from pymongo import MongoClient
from pymongo.encryption import ClientEncryption# 連接Atlas集群(SRV連接串自動分片)
uri = "mongodb+srv://user:password@cluster0.abcd.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(uri)
db = client['crawler']
collection = db['dynamic_data']# 寫入動態結構數據(無預定義Schema)
product_data = {"title": "智能手表","price": 599.99,"attributes": {"防水等級": "IP68", "電池容量": "200mAh"},"extracted_time": "2023-10-05T14:30:00Z" # ISO 8601格式
}
collection.insert_one(product_data)# 執行聚合查詢(統計各價格區間商品數)
pipeline = [{"$match": {"price": {"$exists": True}}},{"$bucket": {"groupBy": "$price","boundaries": [0, 100, 500, 1000],"default": "Other","output": {"count": {"$sum": 1}}}}
]
results = collection.aggregate(pipeline)
四:Redis Edge近端計算(了解)
當爬蟲節點遍布全球邊緣網絡時,傳統“端側采集-中心存儲-云端計算”的鏈路過長,導致?高延遲?、?帶寬成本激增?與?實時性缺失?。?Redis Edge Module?通過將數據處理能力下沉至爬蟲節點,實現?數據去重?、?實時聚合?與?規則過濾?的近端執行,重構了爬蟲存儲架構的邊界
而中心化計算有如下的三個問題:
- ?延遲敏感場景失效?:跨國爬蟲數據回傳延遲高達200ms+,無法滿足實時監控需求。
- ?帶寬成本失控?:重復數據(如相似頁面內容)占用80%以上傳輸資源。
- ?數據處理滯后?:中心服務器批量處理無法觸發即時響應(如突發輿情告警)。
模塊 | 功能 | 爬蟲場景價值 |
---|---|---|
RedisTimeSeries | 毫秒級時序數據處理 | 實時統計爬蟲吞吐量/成功率 |
RedisBloom | 布隆過濾器實現去重 | 近端URL去重,節省90%帶寬 |
RedisGears | 邊緣側執行Python函數 | 數據清洗/格式化前置 |
RedisAI | 部署輕量ML模型 | 實時敏感內容識別 |
傳統架構中:爬蟲節點 -> 原始數據上傳 -> 中心數據庫 -> 批量處理
邊緣架構中:爬蟲節點 -> Redis Edge -> 規則執行數據過濾和聚合 -> 壓縮有效數據同步到中心數據庫
# 下載Redis Edge鏡像(集成所有模塊)
docker run -p 6379:6379 redislabs/redisedge:latest # 啟用模塊(示例啟用Bloom和TimeSeries)
redis-cli module load /usr/lib/redis/modules/redisbloom.so
redis-cli module load /usr/lib/redis/modules/redistimeseries.so
# 使用布隆過濾器去重
import redis
from redisbloom.client import Client # 連接邊緣Redis
r = redis.Redis(host='edge-node-ip', port=6379)
rb = Client(connection_pool=r.connection_pool) def url_deduplicate(url: str) -> bool: if rb.bfExists('crawler:urls', url): return False rb.bfAdd('crawler:urls', url) return True # 在爬蟲循環中調用
if url_deduplicate(target_url): data = crawl(target_url) process_data(data)
else: print(f"URL已存在:{target_url}")
# 時序數據實時統計
# 創建時序數據集
import redis
from redisbloom.client import Client # 連接邊緣Redis
r = redis.Redis(host='edge-node-ip', port=6379)
rb = Client(connection_pool=r.connection_pool) r.ts().create('crawl:latency', retention_msec=86400000) # 記錄每次請求延遲
def log_latency(latency_ms: float): r.ts().add('crawl:latency', '*', latency_ms, duplicate_policy='first') # 每5秒聚合平均延遲
avg_latency = r.ts().range('crawl:latency', '-', '+', aggregation_type='avg', bucket_size_msec=5000)
print(f"近5秒平均延遲:{avg_latency[-1][1]} ms")
五:二進制存儲
傳統的文本格式(如CSV、JSON)雖然易于閱讀和解析,但在處理大規模數據時存在讀寫速度慢、存儲空間占用高等問題。
而二進制格式憑借其緊湊的存儲方式和高效的序列化機制,成為優化性能的重要選擇。
和文本文件存儲相比,二進制文件有如下的優勢:
- 更快的讀寫速度?:無需文本編碼/解碼,直接操作二進制流。
- 更小的存儲體積?:二進制數據壓縮效率更高,節省磁盤空間。
- 支持復雜數據類型?:可序列化自定義對象、多維數組等非結構化數據。
1:Pickle
Pickle是Python內置的序列化模塊,可將任意Python對象轉換為二進制數據并保存到文件,適用于臨時緩存或中間數據存儲。
- 支持所有Python原生數據類型。
- 序列化/反序列化速度快,代碼簡潔。
import pickle# 保存數據
data = {"name": "Alice", "age": 30, "tags": ["Python", "Web"]}
with open("data.pkl", "wb") as f:pickle.dump(data, f)# 讀取數據
with open("data.pkl", "rb") as f:loaded_data = pickle.load(f)
print(loaded_data) # 輸出: {'name': 'Alice', 'age': 30, 'tags': ['Python', 'Web']}
2:Parquet
Parquet是一種面向列的二進制存儲格式,專為大數據場景設計,支持高效壓縮和快速查詢,廣泛應用于Hadoop、Spark等分布式系統。
- 列式存儲?:按列壓縮和讀取,減少I/O開銷,適合聚合查詢。
- ?高壓縮率?:默認使用Snappy壓縮算法,體積比CSV減少70%以上。
- ?跨平臺兼容?:支持Java、Python、Spark等多種語言和框架。
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd# 創建示例數據
df = pd.DataFrame({"id": [1, 2, 3],"content": ["text1", "text2", "text3"]
})# 保存為Parquet文件
table = pa.Table.from_pandas(df)
pq.write_table(table, "data.parquet")# 讀取Parquet文件
parquet_table = pq.read_table("data.parquet")
print(parquet_table.to_pandas())
指標 | Pickle | Parquet |
---|---|---|
讀寫速度 | 快(Python專用) | 快(大數據優化) |
存儲體積 | 中等 | 極小(高壓縮) |
適用場景 | 臨時緩存、復雜對象 | 結構化數據、分析查詢 |