爬蟲03 - 爬蟲的數據存儲
文章目錄
- 爬蟲03 - 爬蟲的數據存儲
- 一:CSV數據存儲
- 1:基本介紹
- 2:基本使用
- 3:高級使用
- 4:使用示例
- 二:JSON數據存儲
- 1:基礎json讀寫
- 2:字符串和對象的轉換
- 3:日期和時間的特殊處理
- 4:自定義序列化
- 5:高級使用
- 三:數據庫數據存儲
- 1:基礎部分學習
- 2:補充說明
一:CSV數據存儲
1:基本介紹
csv是一種輕量級、跨平臺的文件格式,廣泛用于數據交換、日志記錄及中小規模數據存儲
- 無需依賴?:直接通過Python標準庫csv模塊操作。
- ?人類可讀?:文本格式可直接用Excel或文本編輯器查看。
- ?高效靈活?:適合快速導出、導入表格型數據。
csv格式如下:
- 每行表示一條記錄,字段以特定分隔符(默認為逗號)分隔。
- 支持文本引用(如雙引號)包裹含特殊字符的字段。
- 首行可定義列名(表頭)。
適用場景 | 說明 |
---|---|
數據導出、備份 | 從數據庫或API批量導出結構化數據 |
數據分析預處理 | 配合Pandas進行統計與可視化 |
跨系統數據交換 | 兼容Excel/R/MATLAB等工具 |
2:基本使用
python內置CSV模塊,無需額外安裝
基本讀寫csv實例
import csvheaders = ["id", "name", "email"]
data = [[1, "張三", "zhangsan@example.com"],[2, "李四", "lisi@test.org"],[3, "王五", "wangwu@demo.net"]
]# 用open函數,第一個參數是文件名,第二個參數是模式(r -> read, w -> wirte),第三個參數是編碼方式encoding -> utf-8
with open("output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f)writer.writerow(headers) # 寫入表頭writer.writerows(data) # 批量寫入數據
import csvwith open ("output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.reader(f)for row in reader:print(row)
字典方式讀寫實例
# 寫入字典數據
with open("dict_output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "name", "email"])writer.writeheader()writer.writerow({"id": 1, "name": "張三", "email": "zhangsan@example.com"})# 讀取為字典數據
with open("dict_output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:print(row)print(row["id"], row["name"], row["email"])
自定義分割符和引號規則
# 使用分號分隔,雙引號包裹所有字段
with open("custom.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, delimiter=";", quoting=csv.QUOTE_ALL)writer.writerow(["id", "name"])writer.writerow([1, "張三"])
3:高級使用
含特殊字符的字段?
字段包含逗號、換行符等破壞CSV結構的字符 -> 使用引號包裹字段,并配置csv.QUOTE_MINIMAL
或csv.QUOTE_NONNUMERIC
data = [[4, "Alice, Smith", "alice@example.com"],[5, "Bob\nJohnson", "bob@test.org"]
]with open("special_chars.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)writer.writerows(data)
嵌套數據
import jsondata = [{"id": 1, "info": '{"age": 30, "city": "北京"}'},{"id": 2, "info": '{"age": 25, "city": "上海"}'}
]# 寫入嵌套JSON
with open("nested_data.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "info"])writer.writeheader()writer.writerows(data)# 讀取并解析JSON
with open("nested_data.csv", "r", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:info = json.loads(row["info"])print(f"ID: {row['id']}, 城市: {info['city']}")
復雜情況的處理
大文件 -> 逐行讀取 & 分批讀取
復雜數據處理 -> pandas
4:使用示例
# 讀取百度圖書,并將結果放入到csv文件中,方便后面使用pandas進行數據分析
import csv
import requests
from bs4 import BeautifulSoupurl = "https://book.douban.com/top250"
headers = {"User-Agent": "Mozilla/5.0"}response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")books = []
for item in soup.select("tr.item"):title = item.select_one(".pl2 a")["title"]score = item.select_one(".rating_nums").textbooks.append({"title": title, "score": score})# 寫入CSV
with open("douban_books.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["title", "score"])writer.writeheader()writer.writerows(books)print("數據已存儲至 douban_books.csv")
二:JSON數據存儲
使用場景 | 說明 |
---|---|
配置文件存儲 | 程序參數、路徑配置等(如config.json) |
API數據交互 | 前后端通過JSON格式傳遞請求與響應 |
結構化日志記錄 | 記錄帶元數據的操作日志,便于后續分析 |
1:基礎json讀寫
dump -> 寫入到json, load -> 從json讀取
import jsondata = {"project": "數據分析平臺","version": 2.1,"authors": ["張三", "李四"],"tags": {"python": 5, "database": 3}
}with open("data.json", "w", encoding="utf-8") as f:json.dump(data, f, ensure_ascii=False, indent=2) # 禁用ASCII轉義,縮進2空格with open("data.json", "r", encoding="utf-8") as f:loaded_data = json.load(f)
print(loaded_data["tags"]["python"]) # 輸出:5
2:字符串和對象的轉換
dumps -> json轉成字符串,loads -> 字符串轉成json
data_str = json.dumps(data, ensure_ascii=False)
print(type(data_str)) # <class 'str'>json_str = '{"name": "王五", "age": 30}'
obj = json.loads(json_str)
print(obj["age"]) # 輸出:30
3:日期和時間的特殊處理
JSON默認不支持Python的datetime對象,需自定義轉換邏輯
from datetime import datetimedef datetime_encoder(obj):if isinstance(obj, datetime):return obj.isoformat() # 轉為ISO格式字符串raise TypeError("類型無法序列化")data = {"event": "發布會", "time": datetime.now()}# 序列化時指定自定義編碼函數
json_str = json.dumps(data, default=datetime_encoder, ensure_ascii=False)
print(json_str) # {"event": "發布會", "time": "2024-07-20T15:30:45.123456"}
4:自定義序列化
class User:def __init__(self, name, level):self.name = nameself.level = leveluser = User("趙六", 3)# 方法1:手動轉換字典
user_dict = {"name": user.name, "level": user.level}
json.dumps(user_dict)# 方法2:使用__dict__(需類屬性均為可序列化類型)
json.dumps(user.__dict__)
5:高級使用
跳過特定的字段
def filter_encoder(obj):if "password" in obj:del obj["password"]return objuser_data = {"name": "張三", "password": "123456"}
json.dumps(user_data, default=filter_encoder) # {"name": "張三"}
取消縮進和空格(緊湊輸出)
json.dumps(data, separators=(",", ":")) # 輸出最簡格式
ujson代替(C實現的JSON超高速庫,API完全兼容)
import ujson as json # 替換標準庫
json.dumps(data) # 速度提升3-10倍
大文件的讀取
# 逐行讀取JSON數組文件(每行為獨立JSON對象)
with open("large_data.json", "r", encoding="utf-8") as f:for line in f:item = json.loads(line)process(item)
三:數據庫數據存儲
1:基礎部分學習
各種數據庫的數據存儲請看這里
2:補充說明
可以使用dbutils進行mysql的連接池管理
import pymysql
from dbutils.pooled_db import PooledDBpool = PooledDB(creator=pymysql, # 使用鏈接數據庫的模塊maxconnections=5, # 連接池允許的最大連接數,0和None表示不限制連接數mincached=1, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建maxcached=2, # 鏈接池中最多閑置的鏈接,0和None不限制maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待ping=0, # ping MySQL服務端,檢查是否服務可用host='127.0.0.1', # 數據庫服務器的IP地址port=3306, # 數據庫服務端口user='root', # 數據庫用戶名password='314159', # 數據庫密碼database='test', # 數據庫名稱charset='utf8' # 數據庫編碼
)# 創建游標對象
conn = pool.connection()
cursor = conn.cursor()# 有了游標對象就能操作了,方式都是 -> cursor.???(sql, args)
sql = "select * from user where id = %s"
cursor.execute(sql, [1]) # 執行SQL語句 -> select * from user where id = 1
# 獲取結果
print(cursor.fetchone())
mongodb的管道操作
from datetime import datetimefrom pymongo import MongoClient
from pymongo.errors import ConnectionFailure# 建立連接(默認連接池大小100)
client = MongoClient(host="localhost",port=27017,
)try:# 心跳檢測client.admin.command('ping')print("Successfully connected to MongoDB!")
except ConnectionFailure:print("Server not available")# 選擇數據庫與集合(自動懶創建)
db = client["ecommerce"]
products_col = db["products"]# 插入文檔(自動生成_id)
product_data = {"name": "Wireless Mouse","price": 49.99,"tags": ["electronics", "computer"],"stock": {"warehouse_A": 100, "warehouse_B": 50},"last_modified": datetime.now()
}
insert_result = products_col.insert_one(product_data)
print(f"Inserted ID: {insert_result.inserted_id}")# 查詢文檔(支持嵌套查詢)
query = {"price": {"$lt": 60}, "tags": "electronics"} # 查詢價格小于60 并且 tags是electronics 的文檔
projection = {"name": 1, "price": 1} # 類似SQL SELECT, 只返回name和price字段
cursor = products_col.find(query, projection).limit(5)
for doc in cursor:print(doc)# 更新文檔(原子操作)
update_filter = {"name": "Wireless Mouse"}
update_data = {"$inc": {"stock.warehouse_A": -10}, "$set": {"last_modified": datetime.now()}}
update_result = products_col.update_one(update_filter, update_data)
print(f"Modified count: {update_result.modified_count}")# 刪除文檔
delete_result = products_col.delete_many({"price": {"$gt": 200}})
print(f"Deleted count: {delete_result.deleted_count}")# 管道操作
# 統計各倉庫庫存總量
pipeline = [{"$unwind": "$stock"}, # 階段一:展開嵌套文檔{"$group": { # 階段二:分組聚合# 分組字段(分段的字段是stock.warehouse)記為_id# 聚合字段為 sum(對每一個分組的stock.quantity進行sum)"_id": "$stock.warehouse","total_stock": {"$sum": "$stock.quantity"}}},# 階段三:排序, 根據分組條件降序排序{"$sort": {"total_stock": -1}}
]
results = products_col.aggregate(pipeline)
for res in results:print(f"Warehouse {res['_id']}: {res['total_stock']} units")
mongodb大數據的處理
from pymongo import MongoClient
from faker import Faker
import timeclient = MongoClient('mongodb://localhost:27017/')
db = client['bigdata']
collection = db['user_profiles']fake = Faker()
batch_size = 5000 # 分批次插入減少內存壓力def generate_batch(batch_size):return [{"name": fake.name(),"email": fake.email(),"last_login": fake.date_time_this_year()} for _ in range(batch_size)]start_time = time.time()
for _ in range(200): # 總數據量100萬batch_data = generate_batch(batch_size)collection.insert_many(batch_data, ordered=False) # 無序插入提升速度print(f"已插入 {(i+1)*batch_size} 條數據")print(f"總耗時: {time.time()-start_time:.2f}秒") # 分析電商訂單數據(含嵌套結構)
pipeline = [{"$unwind": "$items"}, # 展開訂單中的商品數組{"$match": {"status": "completed"}}, # 篩選已完成訂單{"$group": {"_id": "$items.category","total_sales": {"$sum": "$items.price"},"avg_quantity": {"$avg": "$items.quantity"},"top_product": {"$max": "$items.name"}}},{"$sort": {"total_sales": -1}},{"$limit": 10}
]orders_col = db["orders"]
results = orders_col.aggregate(pipeline)for res in results:print(f"品類 {res['_id']}: 銷售額{res['total_sales']}元")
性能優化的關鍵措施 -> 添加索引 & bluk操作
# 創建索引(提升查詢速度)
products_col.create_index([("name", pymongo.ASCENDING)], unique=True)
products_col.create_index([("price", pymongo.ASCENDING), ("tags", pymongo.ASCENDING)])# 批量寫入提升吞吐量
bulk_ops = [pymongo.InsertOne({"name": "Keyboard", "price": 89.99}),pymongo.UpdateOne({"name": "Mouse"}, {"$set": {"price": 59.99}}),pymongo.DeleteOne({"name": "Earphones"})
]
results = products_col.bulk_write(bulk_ops)
高可用架構配置
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure# MongoDB 副本集連接字符串
# replicaSet=rs0 是副本集的名稱
uri = "mongodb://192.127.1.1:27017,192.127.1.2:27017,192.127.1.3:27017/mydb?replicaSet=rs0"# 創建 MongoClient 實例
client = MongoClient(uri)# 測試連接
try:# 通過執行一個簡單的命令來驗證連接是否成功client.admin.command('ping')print("成功連接到 MongoDB 副本集!")
except ConnectionFailure as e:print(f"無法連接到 MongoDB 副本集: {e}")