pymongo功能整理與基礎操作類

以下是 Python 與 PyMongo 的完整功能整理,涵蓋基礎操作、高級功能、性能優化及常見應用場景:


1. 安裝與連接
(1) 安裝 PyMongo

pip install pymongo

(2) 連接 MongoDB

from pymongo import MongoClient# 基礎連接(默認本地,端口27017)
client = MongoClient('mongodb://localhost:27017/')# 帶認證的連接
client = MongoClient('mongodb://username:password@host:27017/dbname?authSource=admin'
)# 連接副本集或分片集群
client = MongoClient('mongodb://node1:27017,node2:27017/?replicaSet=rs0')

2. 數據庫與集合操作
(1) 選擇數據庫和集合

db = client['mydatabase']    # 選擇數據庫(惰性創建)
collection = db['users']     # 選擇集合(惰性創建)

(2) 列出數據庫和集合

# 列出所有數據庫(需權限)
print(client.list_database_names())# 列出數據庫中的集合
print(db.list_collection_names())

(3) 刪除集合或數據庫

db.drop_collection('users')  # 刪除集合
client.drop_database('mydatabase')  # 刪除數據庫

3. 文檔操作(CRUD)
(1) 插入文檔

# 插入單條文檔
user = {'name': 'Alice', 'age': 30, 'tags': ['python', 'dev']}
result = collection.insert_one(user)
print(result.inserted_id)  # 輸出插入的 ObjectId# 批量插入
users = [{'name': 'Bob'}, {'name': 'Charlie'}]
result = collection.insert_many(users)
print(result.inserted_ids)  # 輸出所有插入的 ObjectId 列表

(2) 查詢文檔

# 查詢單條文檔
user = collection.find_one({'name': 'Alice'})
print(user)  # 返回字典或 None# 查詢多條文檔(帶條件、投影和排序)
cursor = collection.find({'age': {'$gt': 25}},           # 條件:年齡大于25{'_id': 0, 'name': 1},          # 投影:僅返回 name 字段
).sort('age', pymongo.ASCENDING)    # 按年齡升序排序for doc in cursor:print(doc)# 統計數量
count = collection.count_documents({'age': {'$gt': 25}})

(3) 更新文檔

# 更新單條文檔
result = collection.update_one({'name': 'Alice'},{'$set': {'age': 31}, '$addToSet': {'tags': 'database'}}  # 更新操作符
)
print(result.modified_count)  # 輸出影響的文檔數# 批量更新
result = collection.update_many({'age': {'$lt': 30}},{'$inc': {'age': 1}}  # 年齡加1
)

(4) 刪除文檔

# 刪除單條文檔
result = collection.delete_one({'name': 'Alice'})# 批量刪除
result = collection.delete_many({'age': {'$gt': 40}})

4. 高級查詢與聚合
(1) 查詢操作符

# 比較:$gt, $gte, $lt, $lte, $ne
collection.find({'age': {'$gt': 20}})# 邏輯:$and, $or, $not
collection.find({'$or': [{'age': 30}, {'name': 'Bob'}]})# 數組:$in, $nin, $all, $elemMatch
collection.find({'tags': {'$in': ['python', 'java']}})

(2) 聚合管道

pipeline = [{'$match': {'age': {'$gt': 25}}},          # 篩選條件{'$group': {'_id': '$city', 'count': {'$sum': 1}}},  # 按城市分組計數{'$sort': {'count': -1}},                  # 按計數降序排序{'$limit': 5}                              # 取前5條
]result = collection.aggregate(pipeline)
for doc in result:print(doc)

(3) 索引管理

# 創建索引
collection.create_index([('name', pymongo.ASCENDING)], unique=True)# 查看索引
print(collection.index_information())# 刪除索引
collection.drop_index('name_1')

5. 事務與原子性操作
(1) 多文檔事務(MongoDB 4.0+)

with client.start_session() as session:session.start_transaction()try:collection.update_one({'name': 'Alice'},{'$inc': {'balance': -100}},session=session)collection.update_one({'name': 'Bob'},{'$inc': {'balance': 100}},session=session)session.commit_transaction()except Exception as e:session.abort_transaction()print("事務回滾:", e)

(2) 原子操作符

# 原子更新
collection.find_one_and_update({'name': 'Alice'},{'$inc': {'counter': 1}},return_document=pymongo.ReturnDocument.AFTER  # 返回更新后的文檔
)

6. 性能優化與最佳實踐
(1) 查詢優化
? 使用投影減少返回字段:

collection.find({}, {'_id': 0, 'name': 1})

? 覆蓋查詢(Covered Query):確保查詢字段和投影字段在索引中。

(2) 批量操作

# 批量插入(減少網絡開銷)
bulk_ops = [pymongo.InsertOne({'name': f'User_{i}'}) for i in range(1000)]
collection.bulk_write(bulk_ops)

(3) 連接池管理

client = MongoClient('mongodb://localhost:27017/',maxPoolSize=100,      # 最大連接數minPoolSize=10,       # 最小空閑連接socketTimeoutMS=3000  # 超時時間
)

7. 數據建模與模式設計
(1) 內嵌文檔與引用
? 內嵌文檔:適合頻繁訪問的子數據。

user = {'name': 'Alice','address': {'city': 'New York', 'zip': '10001'}  # 內嵌文檔
}

? 引用關系:適合獨立實體。

# 用戶引用訂單
order = {'user_id': user['_id'], 'product': 'Laptop'}

(2) 分片策略
? 選擇分片鍵:高頻查詢字段(如 user_id)。

? 分片命令(需在 mongos 執行):

sh.shardCollection("mydb.orders", {"user_id": 1})

8. 安全與運維
(1) 認證與權限

# 創建用戶
db.command('createUser', 'admin',pwd='secret',roles=[{'role': 'readWrite', 'db': 'mydb'}]
)

(2) 備份與恢復
? mongodump 備份:

mongodump --uri="mongodb://user:pass@host:27017/mydb" --out=/backup

? mongorestore 恢復:

mongorestore --uri="mongodb://host:27017" /backup/mydb

(3) 監控與日志
? 查看數據庫狀態:

server_status = db.command('serverStatus')
print(server_status['connections']['available'])

? 啟用慢查詢日志:

mongod --setParameter slowMS=100 --profileLevel 2

9. 常見應用場景
(1) 日志存儲與分析

# 插入日志
log_entry = {'timestamp': datetime.now(),'level': 'INFO','message': 'User login success'
}
collection.insert_one(log_entry)# 分析錯誤日志數量
pipeline = [{'$match': {'level': 'ERROR'}},{'$group': {'_id': '$service', 'count': {'$sum': 1}}}
]

(2) 實時排行榜

# 更新分數
collection.update_one({'user_id': 1001},{'$inc': {'score': 10}},upsert=True
)# 獲取前10名
top_players = collection.find().sort('score', -1).limit(10)

10. 擴展工具與庫
(1) 使用 Motor 實現異步操作

from motor.motor_asyncio import AsyncIOMotorClientasync def query_data():client = AsyncIOMotorClient('mongodb://localhost:27017')collection = client.mydb.userscursor = collection.find({'age': {'$gt': 20}})async for doc in cursor:print(doc)

(2) 使用 MongoEngine(ORM)

from mongoengine import Document, StringField, IntFieldclass User(Document):name = StringField(required=True)age = IntField()# 查詢數據
users = User.objects(age__gt=25)

總結

功能PyMongo 方法/操作典型場景
基礎 CRUDinsert_one, find, update_many數據增刪改查
聚合分析aggregate + 管道操作復雜統計、日志分析
事務管理start_session + 事務塊轉賬、訂單處理
性能優化索引、批量操作、連接池高并發讀寫、大數據處理
數據建模內嵌文檔、引用關系、分片電商、社交網絡、IoT 數據存儲

通過合理使用 PyMongo,可以高效操作 MongoDB 應對多樣化的數據存儲需求,結合 Redis 實現緩存加速,構建高性能應用。

以下是一個基于 pymongo 封裝的 MongoDB 基礎操作類,支持連接管理、CRUD、索引操作、聚合查詢、分頁等常用功能:

from typing import Any, Dict, List, Optional, Union
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import PyMongoError
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from pymongo.collection import Collection
from bson import ObjectIdclass MongoDBClient:"""MongoDB 基礎操作類"""def __init__(self, uri: str = "mongodb://localhost:27017/",db_name: str = "mydatabase", collection_name: str = "default_collection"):"""初始化 MongoDB 客戶端:param uri: MongoDB 連接 URI:param db_name: 數據庫名稱:param collection_name: 集合名稱"""self.client = MongoClient(uri)self.db = self.client[db_name]self.collection: Collection = self.db[collection_name]# ------------------------- 基礎 CRUD 操作 -------------------------def insert_one(self, document: Dict) -> Optional[ObjectId]:"""插入單條文檔"""try:result: InsertOneResult = self.collection.insert_one(document)return result.inserted_idexcept PyMongoError as e:print(f"Insert one error: {e}")return Nonedef insert_many(self, documents: List[Dict]) -> Optional[List[ObjectId]]:"""批量插入文檔"""try:result: InsertManyResult = self.collection.insert_many(documents)return result.inserted_idsexcept PyMongoError as e:print(f"Insert many error: {e}")return Nonedef find_one(self, query: Dict, projection: Optional[Dict] = None) -> Optional[Dict]:"""查詢單條文檔"""try:return self.collection.find_one(query, projection)except PyMongoError as e:print(f"Find one error: {e}")return Nonedef find(self, query: Dict, projection: Optional[Dict] = None,sort: Optional[List[tuple]] = None,limit: int = 0) -> List[Dict]:"""查詢多條文檔"""try:cursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)if limit > 0:cursor = cursor.limit(limit)return list(cursor)except PyMongoError as e:print(f"Find error: {e}")return []def update_one(self, query: Dict, update_data: Dict, upsert: bool = False) -> Optional[UpdateResult]:"""更新單條文檔"""try:result: UpdateResult = self.collection.update_one(query, {'$set': update_data}, upsert=upsert)return resultexcept PyMongoError as e:print(f"Update one error: {e}")return Nonedef delete_one(self, query: Dict) -> Optional[DeleteResult]:"""刪除單條文檔"""try:result: DeleteResult = self.collection.delete_one(query)return resultexcept PyMongoError as e:print(f"Delete one error: {e}")return None# ------------------------- 高級操作 -------------------------def create_index(self, keys: List[tuple], unique: bool = False, background: bool = True) -> Optional[str]:"""創建索引"""try:index_name = self.collection.create_index(keys, unique=unique, background=background)return index_nameexcept PyMongoError as e:print(f"Create index error: {e}")return Nonedef count_documents(self, query: Dict) -> int:"""統計文檔數量"""try:return self.collection.count_documents(query)except PyMongoError as e:print(f"Count documents error: {e}")return 0def aggregate(self, pipeline: List[Dict]) -> List[Dict]:"""聚合查詢"""try:return list(self.collection.aggregate(pipeline))except PyMongoError as e:print(f"Aggregate error: {e}")return []def paginate(self, query: Dict, page: int = 1, per_page: int = 10,sort: Optional[List[tuple]] = None,projection: Optional[Dict] = None) -> Dict:"""分頁查詢"""try:total = self.count_documents(query)skip = (page - 1) * per_pagecursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)documents = cursor.skip(skip).limit(per_page)return {"total": total,"page": page,"per_page": per_page,"data": list(documents)}except PyMongoError as e:print(f"Paginate error: {e}")return {"total": 0, "page": page, "per_page": per_page, "data": []}# ------------------------- 事務支持 -------------------------def execute_transaction(self, operations: callable) -> bool:"""執行事務操作"""session = self.client.start_session()try:with session.start_transaction():operations(self.collection, session)return Trueexcept PyMongoError as e:session.abort_transaction()print(f"Transaction aborted: {e}")return False# ------------------------- 工具方法 -------------------------@staticmethoddef to_objectid(_id: Union[str, ObjectId]) -> ObjectId:"""將字符串轉換為 ObjectId"""return _id if isinstance(_id, ObjectId) else ObjectId(_id)def close(self):"""關閉連接"""self.client.close()# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":# 初始化客戶端mongo_client = MongoDBClient(uri="mongodb://user:pass@localhost:27017/",db_name="test_db",collection_name="users")# 插入數據user_id = mongo_client.insert_one({"name": "Alice","age": 30,"email": "alice@example.com"})print(f"Inserted ID: {user_id}")# 查詢數據user = mongo_client.find_one({"name": "Alice"})print(f"Found user: {user}")# 分頁查詢pagination = mongo_client.paginate(query={"age": {"$gt": 20}},page=1,per_page=10,sort=[("age", ASCENDING)])print(f"Page 1 data: {pagination['data']}")# 關閉連接mongo_client.close()

核心功能說明

功能方法說明
連接管理__init__, close支持自定義 URI、數據庫和集合名稱
CRUD 操作insert_one, find提供單條/批量操作,支持投影和排序
索引管理create_index可創建唯一索引、后臺索引
聚合查詢aggregate支持完整的聚合管道操作
分頁查詢paginate返回分頁數據和總記錄數
事務支持execute_transaction封裝多文檔事務操作
類型轉換to_objectid字符串與 ObjectId 互轉

使用場景

  1. 快速開發:直接繼承或實例化類,無需重復編寫 CRUD 代碼。
  2. Web 后端:集成到 FastAPI/Django 服務中,處理用戶數據。
  3. 數據分析:通過聚合方法實現復雜統計。
  4. 定時任務:封裝數據清洗、日志處理等操作。

優化建議

  1. 連接池配置:在初始化時添加 maxPoolSizeminPoolSize 參數。
  2. 日志記錄:將 print 替換為 logging 模塊記錄錯誤信息。
  3. 異步支持:使用 motor 庫實現異步版本(適合 FastAPI 等異步框架)。
  4. 數據校驗:集成 pydantic 對輸入數據進行模式驗證。
  5. 緩存集成:結合 Redis 實現高頻查詢緩存。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/77338.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/77338.shtml
英文地址,請注明出處:http://en.pswp.cn/web/77338.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Trae+DeepSeek學習Python開發MVC框架程序筆記(四):使用sqlite存儲查詢并驗證用戶名和密碼

繼續通過Trae向DeepSeek發問并修改程序,實現程序運行時生成數據庫,用戶在系統登錄頁面輸入用戶名和密碼后,控制器通過模型查詢用戶數據庫表來驗證用戶名和密碼,驗證通過后顯示登錄成功頁面,驗證失敗則顯示登錄失敗頁面…

如何識別金融欺詐行為并進行分析預警

金融行業以其高效便捷的服務深刻改變了人們的生活方式。然而,伴隨技術進步而來的,是金融欺詐行為的日益猖獗。從信用卡盜刷到復雜的龐氏騙局,再到網絡釣魚和洗錢活動,金融欺詐的形式層出不窮,其規模和影響也在不斷擴大。根據全球反欺詐組織(ACFE)的最新報告,僅2022年,…

紛析云:開源財務管理軟件的創新與價值

在企業數字化轉型中,紛析云作為一款優秀的開源財務管理軟件,正為企業財務管理帶來新變革,以下是其核心要點。 一、產品概述與技術架構 紛析云采用微服務架構,功能組件高內聚低耦合,可靈活擴展和定制。前端基于現代框…

蛋白質大語言模型ESM介紹

ESM(Evolutionary Scale Modeling)是 Meta AI Research 團隊開發的一系列用于蛋白質的預訓練語言模型。這些模型在蛋白質結構預測、功能預測和蛋白質設計等領域展現出了強大的能力。以下是對 ESM 的詳細介紹: 核心特點 大規模預訓練:基于大規模蛋白質序列數據進行無監督學…

OpenCv高階(七)——圖像拼接

目錄 一、圖像拼接的原理過程 1. 特征檢測與描述(Feature Detection & Description) 2. 特征匹配(Feature Matching) 3. 圖像配準(Image Registration) 4. 圖像變換與投影(Warping&…

Native層Trace監控性能

一、基礎實現方法 1.1 頭文件引用 #include <utils/Trace.h> // 基礎版本 #include <cutils/trace.h> // 兼容舊版本1.2 核心宏定義 // 區間追蹤&#xff08;推薦&#xff09; ATRACE_BEGIN("TraceTag"); ...被監控代碼... ATRACE_END();// 函數級自…

金融行業微服務架構設計與挑戰 - Java架構師面試實戰

金融行業微服務架構設計與挑戰 - Java架構師面試實戰 本文通過模擬一位擁有十年Java研發經驗的資深架構師馬架構與面試官之間的對話&#xff0c;深入探討了金融行業項目在微服務架構下的技術挑戰與解決方案。 第一輪提問 面試官&#xff1a; 馬架構&#xff0c;請介紹一下您…

服務器虛擬化:技術解析與實踐指南

在信息技術飛速發展的今天,企業對服務器資源的需求日益增長,傳統物理服務器存在資源利用率低、部署周期長、管理成本高等問題。服務器虛擬化技術應運而生,它通過將物理服務器的計算、存儲、網絡等資源進行抽象和整合,劃分成多個相互隔離的虛擬服務器,從而提高資源利用率、…

OpenCV 圖形API(54)顏色空間轉換-----將圖像從 RGB 色彩空間轉換到 HSV色彩空間RGB2HSV()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 將圖像從 RGB 色彩空間轉換為 HSV。該函數將輸入圖像從 RGB 色彩空間轉換到 HSV。R、G 和 B 通道值的常規范圍是 0 到 255。 輸出圖像必須是 8 位…

Spring Boot的優點:賦能現代Java開發的利器

Spring Boot 是基于 Spring 框架的快速開發框架&#xff0c;自 2014 年發布以來&#xff0c;憑借其簡潔性、靈活性和強大的生態系統&#xff0c;成為 Java 后端開發的首選工具。尤其在 2025 年&#xff0c;隨著微服務、云原生和 DevOps 的普及&#xff0c;Spring Boot 的優勢更…

基于強化學習的智能交通控制系統設計

標題:基于強化學習的智能交通控制系統設計 內容:1.摘要 隨著城市交通流量的不斷增長&#xff0c;傳統交通控制方法在應對復雜多變的交通狀況時逐漸顯現出局限性。本文旨在設計一種基于強化學習的智能交通控制系統&#xff0c;以提高交通運行效率、減少擁堵。通過構建強化學習模…

數據挖掘技術與應用課程論文——數據挖掘中的聚類分析方法及其應用研究

數據挖掘中的聚類分析方法及其應用研究 摘要 聚類分析是數據挖掘技術中的一個重要組成部分,它通過將數據集中的對象劃分為多個組或簇,使得同一簇內的對象具有較高的相似性,而不同簇之間的對象具有較低的相似性。 本文系統地研究了數據挖掘中的多種聚類分析方法及其應用。首先…

Java基礎語法10分鐘速成

Java基礎語法10分鐘速成&#xff0c;記筆記版 JDKhello world變量字符串 類&#xff0c;繼承&#xff0c;多態&#xff0c;重載 JDK JDK即Java development key&#xff0c;Java環境依賴包 在jdk中 編譯器javac將代碼的Java源文件編譯為字節碼文件&#xff08;.class&#xff…

在WSL2+Ubuntu22.04中通過conda pack導出一個conda環境包,然后嘗試導入該環境包

如何導出一個離線conda環境&#xff1f;有兩種方式&#xff0c;一種是導出env.yml即環境配置&#xff0c;一種是通過conda pack導出為一個環境包&#xff0c;前者只是導出配置&#xff08;包括包名、版本等&#xff09;&#xff0c;而后者是直接將環境中所有的內容打包&#xf…

盈達科技:登頂GEO優化全球制高點,以AICC定義AI時代內容智能優化新標桿

一、技術制高點——全球獨創AICC系統架構&#xff0c;構建AI內容優化新范式 作為全球首個實現AI內容全鏈路優化的技術供應商&#xff0c;盈達科技憑借AICC智能協同中心&#xff08;自適應內容改造、智能數據投喂、認知權重博弈、風險動態響應四大引擎&#xff09;&#…

設計看似完美卻測不過? Intra-Pair Skew 是「訊號完整性(Signal Integrity)」里最隱形的殺手

各位不知道有沒有遇過&#xff0c;一對很長的差分走線&#xff0c;看起來很正常&#xff0c;但是測試結果偶爾會fail偶爾會pass&#xff0c;不像是軟件問題&#xff0c;也不像是制程問題。 看了一下Layout&#xff0c;發現阻抗匹配控制的非常好&#xff0c;TDR測試也顯示阻抗好…

介紹常用的退燒與消炎藥

每年春夏交替之季&#xff0c;是感冒發燒、咳嗽、咽喉腫痛、支氣管炎、扁桃體炎的高發期。在家里或公司&#xff0c;常備幾種預防感冒發燒、咳嗽、流鼻涕、咽喉發炎的藥品&#xff0c;是非常必要的。下面介紹幾款效果非常明顯的中成藥、西藥&#xff0c;具體如下。 1 蓮芝消炎…

Redis為什么不直接使用C語言中的字符串?

因為C語言字符串存在問題&#xff1a; 獲取字符串長度需要進行運算(獲取字符串長度需要遍歷整個字符串&#xff0c;直到遇到終止符 \0&#xff0c;時間復雜度為 O(n))非二進制安全&#xff08;結束標識符\0可能在一些二進制格式的數據處理時字符串時產生錯誤&#xff09;不可修…

直線模組精度測試的標準是什么?

直線模組的精度測試是確保其性能和穩定性的重要環節。那么&#xff0c;大家知道直線模組精度測試的標準是什么嗎&#xff1f; 1、定位精度&#xff1a;以最大行程為基準長度&#xff0c;用從基準位置開始實際移動的距離與指令值之間的最大誤差的絕對值來表示。一般來說&#xf…

開源AI視頻FramePack發布:6GB顯卡本地運行

您現在可以在自己的筆記本電腦上免費生成完整的離線AI視頻。 只有GPU和純粹的創造力。 這到底是什么? 一個名為FramePack的新型離線AI視頻生成器幾天前在GitHub上發布 — 幾乎沒人在談論它。這很奇怪,因為這個工具真的很厲害。 它允許您從靜態圖像和提示詞在自己的機器上…