【爬蟲】03 - 爬蟲的基本數據存儲

爬蟲03 - 爬蟲的數據存儲

文章目錄

  • 爬蟲03 - 爬蟲的數據存儲
    • 一:CSV數據存儲
      • 1:基本介紹
      • 2:基本使用
      • 3:高級使用
      • 4:使用示例
    • 二:JSON數據存儲
      • 1:基礎json讀寫
      • 2:字符串和對象的轉換
      • 3:日期和時間的特殊處理
      • 4:自定義序列化
      • 5:高級使用
    • 三:數據庫數據存儲
      • 1:基礎部分學習
      • 2:補充說明

一:CSV數據存儲

1:基本介紹

csv是一種輕量級、跨平臺的文件格式,廣泛用于數據交換、日志記錄及中小規模數據存儲

  • 無需依賴?:直接通過Python標準庫csv模塊操作。
  • ?人類可讀?:文本格式可直接用Excel或文本編輯器查看。
  • ?高效靈活?:適合快速導出、導入表格型數據。

csv格式如下:

  • 每行表示一條記錄,字段以特定分隔符(默認為逗號)分隔。
  • 支持文本引用(如雙引號)包裹含特殊字符的字段。
  • 首行可定義列名(表頭)。
適用場景說明
數據導出、備份從數據庫或API批量導出結構化數據
數據分析預處理配合Pandas進行統計與可視化
跨系統數據交換兼容Excel/R/MATLAB等工具

2:基本使用

python內置CSV模塊,無需額外安裝

基本讀寫csv實例

import csvheaders = ["id", "name", "email"]
data = [[1, "張三", "zhangsan@example.com"],[2, "李四", "lisi@test.org"],[3, "王五", "wangwu@demo.net"]
]# 用open函數,第一個參數是文件名,第二個參數是模式(r -> read, w -> wirte),第三個參數是編碼方式encoding -> utf-8
with open("output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f)writer.writerow(headers)  # 寫入表頭writer.writerows(data)    # 批量寫入數據
import csvwith open ("output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.reader(f)for row in reader:print(row)

字典方式讀寫實例

# 寫入字典數據
with open("dict_output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "name", "email"])writer.writeheader()writer.writerow({"id": 1, "name": "張三", "email": "zhangsan@example.com"})# 讀取為字典數據
with open("dict_output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:print(row)print(row["id"], row["name"], row["email"])

自定義分割符和引號規則

# 使用分號分隔,雙引號包裹所有字段
with open("custom.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, delimiter=";", quoting=csv.QUOTE_ALL)writer.writerow(["id", "name"])writer.writerow([1, "張三"])

3:高級使用

含特殊字符的字段?

字段包含逗號、換行符等破壞CSV結構的字符 -> 使用引號包裹字段,并配置csv.QUOTE_MINIMALcsv.QUOTE_NONNUMERIC

data = [[4, "Alice, Smith", "alice@example.com"],[5, "Bob\nJohnson", "bob@test.org"]
]with open("special_chars.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)writer.writerows(data)

嵌套數據

import jsondata = [{"id": 1, "info": '{"age": 30, "city": "北京"}'},{"id": 2, "info": '{"age": 25, "city": "上海"}'}
]# 寫入嵌套JSON
with open("nested_data.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "info"])writer.writeheader()writer.writerows(data)# 讀取并解析JSON
with open("nested_data.csv", "r", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:info = json.loads(row["info"])print(f"ID: {row['id']}, 城市: {info['city']}")

復雜情況的處理

大文件 -> 逐行讀取 & 分批讀取

復雜數據處理 -> pandas

4:使用示例

# 讀取百度圖書,并將結果放入到csv文件中,方便后面使用pandas進行數據分析
import csv
import requests
from bs4 import BeautifulSoupurl = "https://book.douban.com/top250"
headers = {"User-Agent": "Mozilla/5.0"}response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")books = []
for item in soup.select("tr.item"):title = item.select_one(".pl2 a")["title"]score = item.select_one(".rating_nums").textbooks.append({"title": title, "score": score})# 寫入CSV
with open("douban_books.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["title", "score"])writer.writeheader()writer.writerows(books)print("數據已存儲至 douban_books.csv")

二:JSON數據存儲

使用場景說明
配置文件存儲程序參數、路徑配置等(如config.json)
API數據交互前后端通過JSON格式傳遞請求與響應
結構化日志記錄記錄帶元數據的操作日志,便于后續分析

1:基礎json讀寫

dump -> 寫入到json, load -> 從json讀取

import jsondata = {"project": "數據分析平臺","version": 2.1,"authors": ["張三", "李四"],"tags": {"python": 5, "database": 3}
}with open("data.json", "w", encoding="utf-8") as f:json.dump(data, f, ensure_ascii=False, indent=2)  # 禁用ASCII轉義,縮進2空格with open("data.json", "r", encoding="utf-8") as f:loaded_data = json.load(f)
print(loaded_data["tags"]["python"])  # 輸出:5

2:字符串和對象的轉換

dumps -> json轉成字符串,loads -> 字符串轉成json

data_str = json.dumps(data, ensure_ascii=False)
print(type(data_str))  # <class 'str'>json_str = '{"name": "王五", "age": 30}'
obj = json.loads(json_str)
print(obj["age"])  # 輸出:30

3:日期和時間的特殊處理

JSON默認不支持Python的datetime對象,需自定義轉換邏輯

from datetime import datetimedef datetime_encoder(obj):if isinstance(obj, datetime):return obj.isoformat()  # 轉為ISO格式字符串raise TypeError("類型無法序列化")data = {"event": "發布會", "time": datetime.now()}# 序列化時指定自定義編碼函數
json_str = json.dumps(data, default=datetime_encoder, ensure_ascii=False)
print(json_str)  # {"event": "發布會", "time": "2024-07-20T15:30:45.123456"}

4:自定義序列化

class User:def __init__(self, name, level):self.name = nameself.level = leveluser = User("趙六", 3)# 方法1:手動轉換字典
user_dict = {"name": user.name, "level": user.level}
json.dumps(user_dict)# 方法2:使用__dict__(需類屬性均為可序列化類型)
json.dumps(user.__dict__)

5:高級使用

跳過特定的字段

def filter_encoder(obj):if "password" in obj:del obj["password"]return objuser_data = {"name": "張三", "password": "123456"}
json.dumps(user_data, default=filter_encoder)  # {"name": "張三"}

取消縮進和空格(緊湊輸出)

json.dumps(data, separators=(",", ":"))  # 輸出最簡格式

ujson代替(C實現的JSON超高速庫,API完全兼容)

import ujson as json  # 替換標準庫
json.dumps(data)      # 速度提升3-10倍

大文件的讀取

# 逐行讀取JSON數組文件(每行為獨立JSON對象)
with open("large_data.json", "r", encoding="utf-8") as f:for line in f:item = json.loads(line)process(item)

三:數據庫數據存儲

1:基礎部分學習

各種數據庫的數據存儲請看這里

2:補充說明

可以使用dbutils進行mysql的連接池管理

import pymysql
from dbutils.pooled_db import PooledDBpool = PooledDB(creator=pymysql,  # 使用鏈接數據庫的模塊maxconnections=5,  # 連接池允許的最大連接數,0和None表示不限制連接數mincached=1, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建maxcached=2, # 鏈接池中最多閑置的鏈接,0和None不限制maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待ping=0, # ping MySQL服務端,檢查是否服務可用host='127.0.0.1', # 數據庫服務器的IP地址port=3306, # 數據庫服務端口user='root', # 數據庫用戶名password='314159', # 數據庫密碼database='test', # 數據庫名稱charset='utf8' # 數據庫編碼
)# 創建游標對象
conn = pool.connection()
cursor = conn.cursor()# 有了游標對象就能操作了,方式都是 -> cursor.???(sql, args)
sql = "select * from user where id = %s"
cursor.execute(sql, [1]) # 執行SQL語句 -> select * from user where id = 1
# 獲取結果
print(cursor.fetchone())

mongodb的管道操作

from datetime import datetimefrom pymongo import MongoClient
from pymongo.errors import ConnectionFailure# 建立連接(默認連接池大小100)
client = MongoClient(host="localhost",port=27017,
)try:# 心跳檢測client.admin.command('ping')print("Successfully connected to MongoDB!")
except ConnectionFailure:print("Server not available")# 選擇數據庫與集合(自動懶創建)
db = client["ecommerce"]
products_col = db["products"]# 插入文檔(自動生成_id)
product_data = {"name": "Wireless Mouse","price": 49.99,"tags": ["electronics", "computer"],"stock": {"warehouse_A": 100, "warehouse_B": 50},"last_modified": datetime.now()
}
insert_result = products_col.insert_one(product_data)
print(f"Inserted ID: {insert_result.inserted_id}")# 查詢文檔(支持嵌套查詢)
query = {"price": {"$lt": 60}, "tags": "electronics"} # 查詢價格小于60 并且 tags是electronics 的文檔
projection = {"name": 1, "price": 1}  # 類似SQL SELECT, 只返回name和price字段
cursor = products_col.find(query, projection).limit(5)
for doc in cursor:print(doc)# 更新文檔(原子操作)
update_filter = {"name": "Wireless Mouse"}
update_data = {"$inc": {"stock.warehouse_A": -10}, "$set": {"last_modified": datetime.now()}}
update_result = products_col.update_one(update_filter, update_data)
print(f"Modified count: {update_result.modified_count}")# 刪除文檔
delete_result = products_col.delete_many({"price": {"$gt": 200}})
print(f"Deleted count: {delete_result.deleted_count}")# 管道操作
# 統計各倉庫庫存總量
pipeline = [{"$unwind": "$stock"},  # 階段一:展開嵌套文檔{"$group": { # 階段二:分組聚合# 分組字段(分段的字段是stock.warehouse)記為_id# 聚合字段為 sum(對每一個分組的stock.quantity進行sum)"_id": "$stock.warehouse","total_stock": {"$sum": "$stock.quantity"}}},# 階段三:排序, 根據分組條件降序排序{"$sort": {"total_stock": -1}}
]
results = products_col.aggregate(pipeline)
for res in results:print(f"Warehouse {res['_id']}: {res['total_stock']} units")

mongodb大數據的處理

from pymongo import MongoClient
from faker import Faker
import timeclient = MongoClient('mongodb://localhost:27017/')
db = client['bigdata']
collection = db['user_profiles']fake = Faker()
batch_size = 5000  # 分批次插入減少內存壓力def generate_batch(batch_size):return [{"name": fake.name(),"email": fake.email(),"last_login": fake.date_time_this_year()} for _ in range(batch_size)]start_time = time.time()
for _ in range(200):  # 總數據量100萬batch_data = generate_batch(batch_size)collection.insert_many(batch_data, ordered=False)  # 無序插入提升速度print(f"已插入 {(i+1)*batch_size} 條數據")print(f"總耗時: {time.time()-start_time:.2f}秒") # 分析電商訂單數據(含嵌套結構)
pipeline = [{"$unwind": "$items"},  # 展開訂單中的商品數組{"$match": {"status": "completed"}},  # 篩選已完成訂單{"$group": {"_id": "$items.category","total_sales": {"$sum": "$items.price"},"avg_quantity": {"$avg": "$items.quantity"},"top_product": {"$max": "$items.name"}}},{"$sort": {"total_sales": -1}},{"$limit": 10}
]orders_col = db["orders"]
results = orders_col.aggregate(pipeline)for res in results:print(f"品類 {res['_id']}: 銷售額{res['total_sales']}元")

性能優化的關鍵措施 -> 添加索引 & bluk操作

# 創建索引(提升查詢速度)
products_col.create_index([("name", pymongo.ASCENDING)], unique=True)
products_col.create_index([("price", pymongo.ASCENDING), ("tags", pymongo.ASCENDING)])# 批量寫入提升吞吐量
bulk_ops = [pymongo.InsertOne({"name": "Keyboard", "price": 89.99}),pymongo.UpdateOne({"name": "Mouse"}, {"$set": {"price": 59.99}}),pymongo.DeleteOne({"name": "Earphones"})
]
results = products_col.bulk_write(bulk_ops)

高可用架構配置

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure# MongoDB 副本集連接字符串
# replicaSet=rs0 是副本集的名稱
uri = "mongodb://192.127.1.1:27017,192.127.1.2:27017,192.127.1.3:27017/mydb?replicaSet=rs0"# 創建 MongoClient 實例
client = MongoClient(uri)# 測試連接
try:# 通過執行一個簡單的命令來驗證連接是否成功client.admin.command('ping')print("成功連接到 MongoDB 副本集!")
except ConnectionFailure as e:print(f"無法連接到 MongoDB 副本集: {e}")

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/89959.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/89959.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/89959.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深入分析計算機網絡數據鏈路層和網絡層面試題

計算機網絡體系結構1. 請簡述 OSI 七層模型和 TCP/IP 四層模型&#xff0c;并比較它們的異同。OSI 七層模型&#xff1a;應用層&#xff1a;直接為用戶的應用進程提供服務&#xff0c;如 HTTP&#xff08;超文本傳輸協議&#xff0c;用于 Web 瀏覽器與服務器通信&#xff09;、…

云服務器新裝的mysql8,無法通過遠程連接,然后本地pymysql也連不上

阿里云服務器&#xff0c;用apt-get新裝的mysql-server&#xff0c;竟然無法通過遠程連接到&#xff0c;竟然是這個原因。不是防火墻&#xff0c;iptables早就關了。也不是安全組&#xff0c;不是人為限制訪問的話&#xff0c;根本沒必要弄安全組 排查過程 netstat -antop|grep…

質量即服務:從測試策略到平臺運營的全鏈路作戰手冊

&#xff08;零&#xff09;為什么需要“質量即服務” 當業務方說“今晚一定要上線”&#xff0c; 當開發說“我只改了兩行代碼”&#xff0c; 當運維說“回滾窗口只有 5 分鐘”&#xff0c; 質量必須像水電一樣隨取隨用&#xff0c;而不是上線前的大壩泄洪。 這篇手冊提供一張…

Java -- 自定義異常--Wrapper類--String類

自定義異常&#xff1a;概念&#xff1a;當程序中出現了某些錯誤&#xff0c;但該錯誤信息并沒有在Throwable子類中描述處理&#xff0c;這個時候可以自己設計異常&#xff0c;用于描述該錯誤信息。步驟&#xff1a;1. 定義類&#xff1a;自定義異常類名&#xff08;程序員自己…

一文速通《線性方程組》

目錄 一、解題必記知識點 二、解題必備技巧 三、非齊次線性方程組求解 四、齊次線性方程組求解 ★五、解析題目信息&#xff0c;獲取暗含條件 一、解題必記知識點 (1) (2)基礎解系線性無關&#xff0c;基礎解系 解空間的一個基&#xff0c;基 一組線性無關的、能夠生…

【Django】DRF API版本和解析器

講解 Python3 下 Django REST Framework (DRF) API 版本控制解析器&#xff08;Parser&#xff09;一、DRF API 版本控制詳解 API 版本控制是構建健壯、可維護的 RESTful API 的關鍵&#xff0c;尤其在項目演進中需要兼容不同版本的客戶端請求。 1.1 API 版本控制的核心原理 AP…

Windows系統暫停更新工具

功能說明 暫停更新至2999年恢復系統更新徹底禁用更新&#xff08;不可逆&#xff09; 使用方法 下載解壓后雙擊運行 .bat 文件 輸入數字選擇功能&#xff1a; 輸入 1&#xff1a;暫停更新至2999年&#xff08;推薦&#xff09;輸入 2&#xff1a;恢復系統更新輸入 3&#xf…

git push新版問題解決

git 好像不能通過username:password的方式來git push了。但我的電腦依然彈出username和password的彈窗。轉戰ssh來git push。由于之前是用git clone克隆的&#xff0c;需要再轉換成ssh的url來git push。

PyCharm + AI 輔助編程

PyCharm AI&#xff1a;初學者友好的 2 個實用場景&#xff08;附操作步驟&#xff09; PyCharm 專業版&#xff08;或通過插件集成&#xff09;支持 AI 輔助編程&#xff08;如 JetBrains AI 或 GitHub Copilot&#xff09;&#xff0c;能根據代碼上下文自動生成代碼、解釋邏…

瘋狂星期四文案網第15天運營日記

網站運營第15天&#xff0c;點擊觀站&#xff1a; 瘋狂星期四 crazy-thursday.com 全網最全的瘋狂星期四文案網站 運營報告 昨日訪問量 昨天只有20來ip, 太慘了&#xff0c;感覺和最近沒有發新段子有關&#xff0c;也沒有發新的外鏈&#xff0c;不知道這周四會怎么樣 昨日搜…

如何解決pip安裝報錯ModuleNotFoundError: No module named ‘Cython’問題

【Python系列Bug修復PyCharm控制臺pip install報錯】如何解決pip安裝報錯ModuleNotFoundError: No module named ‘Cython’問題 摘要 在使用 PyCharm 控制臺或命令行執行 pip install Cython 時&#xff0c;常會遇到 ModuleNotFoundError: No module named Cython 的報錯。本…

freertos任務調度關鍵函數理解 vTaskSwitchContext

void vTaskSwitchContext(void) {//my_printf( "uxSchedulerSuspended %d\n", uxSchedulerSuspended );/* 調度器處于掛起狀態 */if (uxSchedulerSuspended ! (UBaseType_t)pdFALSE) {/*** The scheduler is currently suspended - do not allow a context* switch.…

CPU 密集型 和 I/O 密集型 任務

文章目錄**CPU 密集型任務&#xff08;CPU-bound&#xff09;**定義&#xff1a;特點&#xff1a;常見場景&#xff1a;如何優化 CPU 密集型任務&#xff1a;**I/O 密集型任務&#xff08;I/O-bound&#xff09;**定義&#xff1a;特點&#xff1a;常見場景&#xff1a;如何優化…

[2025CVPR-小目標檢測方向]基于特征信息驅動位置高斯分布估計微小目標檢測模型

核心問題 ?小目標檢測性能差&#xff1a;?? 盡管通用目標檢測器&#xff08;如 Faster R-CNN, YOLO, SSD&#xff09;在常規目標上表現出色&#xff0c;但在檢測微小目標&#xff08;如 AI-TOD 基準定義的&#xff1a;非常小目標 2-8 像素&#xff0c;小目標 8-16 像素&…

三大工廠設計模式

1.簡單工廠模式1.1需求入手從需求進行入手&#xff0c;可以更深入的理解什么是設計模式。有一個制作披薩的需求&#xff1a;需要便于擴展披薩的種類&#xff0c;便于維護。1.披薩的種類有很多&#xff1a;GreekPizz&#xff0c;CheesePizz等2.披薩的制作流程&#xff1a;prepar…

SpringBoot--Mapper XML 和 Mapper 接口在不同包

&#x1f9e9; 背景說明在 Spring Boot 中&#xff0c;MyBatis 默認要求 Mapper 接口和 XML 文件位于相同包路徑。 但在實際項目中&#xff0c;為了模塊化或結構清晰&#xff0c;常將 XML 放在 resources/mybatis/... 下&#xff0c;這種做法就必須進行額外配置。&#x1f4c1;…

公交車客流人數統計管理解決方案:智能化技術與高效運營實踐

1. 引言公交車作為城市公共交通的核心組成部分&#xff0c;其客流數據的精準統計與管理直接影響運營效率、調度優化和乘客體驗。傳統的人工統計方式效率低、誤差大&#xff0c;難以滿足現代智慧交通的需求。隨著人工智能&#xff08;AI&#xff09;、物聯網&#xff08;IoT&…

正則表達式完全指南:從入門到實戰

目錄 一、什么是正則表達式&#xff1f; 二、基礎語法速查表 三、進階特性 1.分組與捕獲 2.非捕獲分組 3.前瞻與后顧 4.貪婪與懶惰匹配 四、實戰案例 案例1&#xff1a;驗證手機號 案例2&#xff1a;提取網頁中所有鏈接 案例3&#xff1a;密碼強度驗證 一、什么是正…

SmartETL循環流程的設計與應用

1. 引言 **檢索增強生成&#xff08;RAG&#xff09;**是指通過檢索對大模型生成進行增強的技術&#xff0c;通過充分利用信息檢索&#xff08;尤其是語義檢索&#xff09;相關技術&#xff0c;實現大模型快速擴展最新知識、有效減少幻覺的能力。主流RAG框架包括問題理解、知識…

uni-app開發小程序,根據圖片提取主題色值

需求&#xff0c;在頁面根據傳入的圖片提取圖片主色值并用來設置區塊背景色<template><view class"icon-container"><view class"sport-icon" :style"{ backgroundColor: mainColor }"><image :src"/static/images/sp…