以下是 Python 與 PyMongo 的完整功能整理,涵蓋基礎操作、高級功能、性能優化及常見應用場景:
1. 安裝與連接
(1) 安裝 PyMongo
pip install pymongo
(2) 連接 MongoDB
from pymongo import MongoClient# 基礎連接(默認本地,端口27017)
client = MongoClient('mongodb://localhost:27017/')# 帶認證的連接
client = MongoClient('mongodb://username:password@host:27017/dbname?authSource=admin'
)# 連接副本集或分片集群
client = MongoClient('mongodb://node1:27017,node2:27017/?replicaSet=rs0')
2. 數據庫與集合操作
(1) 選擇數據庫和集合
db = client['mydatabase'] # 選擇數據庫(惰性創建)
collection = db['users'] # 選擇集合(惰性創建)
(2) 列出數據庫和集合
# 列出所有數據庫(需權限)
print(client.list_database_names())# 列出數據庫中的集合
print(db.list_collection_names())
(3) 刪除集合或數據庫
db.drop_collection('users') # 刪除集合
client.drop_database('mydatabase') # 刪除數據庫
3. 文檔操作(CRUD)
(1) 插入文檔
# 插入單條文檔
user = {'name': 'Alice', 'age': 30, 'tags': ['python', 'dev']}
result = collection.insert_one(user)
print(result.inserted_id) # 輸出插入的 ObjectId# 批量插入
users = [{'name': 'Bob'}, {'name': 'Charlie'}]
result = collection.insert_many(users)
print(result.inserted_ids) # 輸出所有插入的 ObjectId 列表
(2) 查詢文檔
# 查詢單條文檔
user = collection.find_one({'name': 'Alice'})
print(user) # 返回字典或 None# 查詢多條文檔(帶條件、投影和排序)
cursor = collection.find({'age': {'$gt': 25}}, # 條件:年齡大于25{'_id': 0, 'name': 1}, # 投影:僅返回 name 字段
).sort('age', pymongo.ASCENDING) # 按年齡升序排序for doc in cursor:print(doc)# 統計數量
count = collection.count_documents({'age': {'$gt': 25}})
(3) 更新文檔
# 更新單條文檔
result = collection.update_one({'name': 'Alice'},{'$set': {'age': 31}, '$addToSet': {'tags': 'database'}} # 更新操作符
)
print(result.modified_count) # 輸出影響的文檔數# 批量更新
result = collection.update_many({'age': {'$lt': 30}},{'$inc': {'age': 1}} # 年齡加1
)
(4) 刪除文檔
# 刪除單條文檔
result = collection.delete_one({'name': 'Alice'})# 批量刪除
result = collection.delete_many({'age': {'$gt': 40}})
4. 高級查詢與聚合
(1) 查詢操作符
# 比較:$gt, $gte, $lt, $lte, $ne
collection.find({'age': {'$gt': 20}})# 邏輯:$and, $or, $not
collection.find({'$or': [{'age': 30}, {'name': 'Bob'}]})# 數組:$in, $nin, $all, $elemMatch
collection.find({'tags': {'$in': ['python', 'java']}})
(2) 聚合管道
pipeline = [{'$match': {'age': {'$gt': 25}}}, # 篩選條件{'$group': {'_id': '$city', 'count': {'$sum': 1}}}, # 按城市分組計數{'$sort': {'count': -1}}, # 按計數降序排序{'$limit': 5} # 取前5條
]result = collection.aggregate(pipeline)
for doc in result:print(doc)
(3) 索引管理
# 創建索引
collection.create_index([('name', pymongo.ASCENDING)], unique=True)# 查看索引
print(collection.index_information())# 刪除索引
collection.drop_index('name_1')
5. 事務與原子性操作
(1) 多文檔事務(MongoDB 4.0+)
with client.start_session() as session:session.start_transaction()try:collection.update_one({'name': 'Alice'},{'$inc': {'balance': -100}},session=session)collection.update_one({'name': 'Bob'},{'$inc': {'balance': 100}},session=session)session.commit_transaction()except Exception as e:session.abort_transaction()print("事務回滾:", e)
(2) 原子操作符
# 原子更新
collection.find_one_and_update({'name': 'Alice'},{'$inc': {'counter': 1}},return_document=pymongo.ReturnDocument.AFTER # 返回更新后的文檔
)
6. 性能優化與最佳實踐
(1) 查詢優化
? 使用投影減少返回字段:
collection.find({}, {'_id': 0, 'name': 1})
? 覆蓋查詢(Covered Query):確保查詢字段和投影字段在索引中。
(2) 批量操作
# 批量插入(減少網絡開銷)
bulk_ops = [pymongo.InsertOne({'name': f'User_{i}'}) for i in range(1000)]
collection.bulk_write(bulk_ops)
(3) 連接池管理
client = MongoClient('mongodb://localhost:27017/',maxPoolSize=100, # 最大連接數minPoolSize=10, # 最小空閑連接socketTimeoutMS=3000 # 超時時間
)
7. 數據建模與模式設計
(1) 內嵌文檔與引用
? 內嵌文檔:適合頻繁訪問的子數據。
user = {'name': 'Alice','address': {'city': 'New York', 'zip': '10001'} # 內嵌文檔
}
? 引用關系:適合獨立實體。
# 用戶引用訂單
order = {'user_id': user['_id'], 'product': 'Laptop'}
(2) 分片策略
? 選擇分片鍵:高頻查詢字段(如 user_id
)。
? 分片命令(需在 mongos
執行):
sh.shardCollection("mydb.orders", {"user_id": 1})
8. 安全與運維
(1) 認證與權限
# 創建用戶
db.command('createUser', 'admin',pwd='secret',roles=[{'role': 'readWrite', 'db': 'mydb'}]
)
(2) 備份與恢復
? mongodump
備份:
mongodump --uri="mongodb://user:pass@host:27017/mydb" --out=/backup
? mongorestore
恢復:
mongorestore --uri="mongodb://host:27017" /backup/mydb
(3) 監控與日志
? 查看數據庫狀態:
server_status = db.command('serverStatus')
print(server_status['connections']['available'])
? 啟用慢查詢日志:
mongod --setParameter slowMS=100 --profileLevel 2
9. 常見應用場景
(1) 日志存儲與分析
# 插入日志
log_entry = {'timestamp': datetime.now(),'level': 'INFO','message': 'User login success'
}
collection.insert_one(log_entry)# 分析錯誤日志數量
pipeline = [{'$match': {'level': 'ERROR'}},{'$group': {'_id': '$service', 'count': {'$sum': 1}}}
]
(2) 實時排行榜
# 更新分數
collection.update_one({'user_id': 1001},{'$inc': {'score': 10}},upsert=True
)# 獲取前10名
top_players = collection.find().sort('score', -1).limit(10)
10. 擴展工具與庫
(1) 使用 Motor 實現異步操作
from motor.motor_asyncio import AsyncIOMotorClientasync def query_data():client = AsyncIOMotorClient('mongodb://localhost:27017')collection = client.mydb.userscursor = collection.find({'age': {'$gt': 20}})async for doc in cursor:print(doc)
(2) 使用 MongoEngine(ORM)
from mongoengine import Document, StringField, IntFieldclass User(Document):name = StringField(required=True)age = IntField()# 查詢數據
users = User.objects(age__gt=25)
總結
功能 | PyMongo 方法/操作 | 典型場景 |
---|---|---|
基礎 CRUD | insert_one , find , update_many | 數據增刪改查 |
聚合分析 | aggregate + 管道操作 | 復雜統計、日志分析 |
事務管理 | start_session + 事務塊 | 轉賬、訂單處理 |
性能優化 | 索引、批量操作、連接池 | 高并發讀寫、大數據處理 |
數據建模 | 內嵌文檔、引用關系、分片 | 電商、社交網絡、IoT 數據存儲 |
通過合理使用 PyMongo,可以高效操作 MongoDB 應對多樣化的數據存儲需求,結合 Redis 實現緩存加速,構建高性能應用。
以下是一個基于 pymongo
封裝的 MongoDB 基礎操作類,支持連接管理、CRUD、索引操作、聚合查詢、分頁等常用功能:
from typing import Any, Dict, List, Optional, Union
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import PyMongoError
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from pymongo.collection import Collection
from bson import ObjectIdclass MongoDBClient:"""MongoDB 基礎操作類"""def __init__(self, uri: str = "mongodb://localhost:27017/",db_name: str = "mydatabase", collection_name: str = "default_collection"):"""初始化 MongoDB 客戶端:param uri: MongoDB 連接 URI:param db_name: 數據庫名稱:param collection_name: 集合名稱"""self.client = MongoClient(uri)self.db = self.client[db_name]self.collection: Collection = self.db[collection_name]# ------------------------- 基礎 CRUD 操作 -------------------------def insert_one(self, document: Dict) -> Optional[ObjectId]:"""插入單條文檔"""try:result: InsertOneResult = self.collection.insert_one(document)return result.inserted_idexcept PyMongoError as e:print(f"Insert one error: {e}")return Nonedef insert_many(self, documents: List[Dict]) -> Optional[List[ObjectId]]:"""批量插入文檔"""try:result: InsertManyResult = self.collection.insert_many(documents)return result.inserted_idsexcept PyMongoError as e:print(f"Insert many error: {e}")return Nonedef find_one(self, query: Dict, projection: Optional[Dict] = None) -> Optional[Dict]:"""查詢單條文檔"""try:return self.collection.find_one(query, projection)except PyMongoError as e:print(f"Find one error: {e}")return Nonedef find(self, query: Dict, projection: Optional[Dict] = None,sort: Optional[List[tuple]] = None,limit: int = 0) -> List[Dict]:"""查詢多條文檔"""try:cursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)if limit > 0:cursor = cursor.limit(limit)return list(cursor)except PyMongoError as e:print(f"Find error: {e}")return []def update_one(self, query: Dict, update_data: Dict, upsert: bool = False) -> Optional[UpdateResult]:"""更新單條文檔"""try:result: UpdateResult = self.collection.update_one(query, {'$set': update_data}, upsert=upsert)return resultexcept PyMongoError as e:print(f"Update one error: {e}")return Nonedef delete_one(self, query: Dict) -> Optional[DeleteResult]:"""刪除單條文檔"""try:result: DeleteResult = self.collection.delete_one(query)return resultexcept PyMongoError as e:print(f"Delete one error: {e}")return None# ------------------------- 高級操作 -------------------------def create_index(self, keys: List[tuple], unique: bool = False, background: bool = True) -> Optional[str]:"""創建索引"""try:index_name = self.collection.create_index(keys, unique=unique, background=background)return index_nameexcept PyMongoError as e:print(f"Create index error: {e}")return Nonedef count_documents(self, query: Dict) -> int:"""統計文檔數量"""try:return self.collection.count_documents(query)except PyMongoError as e:print(f"Count documents error: {e}")return 0def aggregate(self, pipeline: List[Dict]) -> List[Dict]:"""聚合查詢"""try:return list(self.collection.aggregate(pipeline))except PyMongoError as e:print(f"Aggregate error: {e}")return []def paginate(self, query: Dict, page: int = 1, per_page: int = 10,sort: Optional[List[tuple]] = None,projection: Optional[Dict] = None) -> Dict:"""分頁查詢"""try:total = self.count_documents(query)skip = (page - 1) * per_pagecursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)documents = cursor.skip(skip).limit(per_page)return {"total": total,"page": page,"per_page": per_page,"data": list(documents)}except PyMongoError as e:print(f"Paginate error: {e}")return {"total": 0, "page": page, "per_page": per_page, "data": []}# ------------------------- 事務支持 -------------------------def execute_transaction(self, operations: callable) -> bool:"""執行事務操作"""session = self.client.start_session()try:with session.start_transaction():operations(self.collection, session)return Trueexcept PyMongoError as e:session.abort_transaction()print(f"Transaction aborted: {e}")return False# ------------------------- 工具方法 -------------------------@staticmethoddef to_objectid(_id: Union[str, ObjectId]) -> ObjectId:"""將字符串轉換為 ObjectId"""return _id if isinstance(_id, ObjectId) else ObjectId(_id)def close(self):"""關閉連接"""self.client.close()# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":# 初始化客戶端mongo_client = MongoDBClient(uri="mongodb://user:pass@localhost:27017/",db_name="test_db",collection_name="users")# 插入數據user_id = mongo_client.insert_one({"name": "Alice","age": 30,"email": "alice@example.com"})print(f"Inserted ID: {user_id}")# 查詢數據user = mongo_client.find_one({"name": "Alice"})print(f"Found user: {user}")# 分頁查詢pagination = mongo_client.paginate(query={"age": {"$gt": 20}},page=1,per_page=10,sort=[("age", ASCENDING)])print(f"Page 1 data: {pagination['data']}")# 關閉連接mongo_client.close()
核心功能說明
功能 | 方法 | 說明 |
---|---|---|
連接管理 | __init__ , close | 支持自定義 URI、數據庫和集合名稱 |
CRUD 操作 | insert_one , find 等 | 提供單條/批量操作,支持投影和排序 |
索引管理 | create_index | 可創建唯一索引、后臺索引 |
聚合查詢 | aggregate | 支持完整的聚合管道操作 |
分頁查詢 | paginate | 返回分頁數據和總記錄數 |
事務支持 | execute_transaction | 封裝多文檔事務操作 |
類型轉換 | to_objectid | 字符串與 ObjectId 互轉 |
使用場景
- 快速開發:直接繼承或實例化類,無需重復編寫 CRUD 代碼。
- Web 后端:集成到 FastAPI/Django 服務中,處理用戶數據。
- 數據分析:通過聚合方法實現復雜統計。
- 定時任務:封裝數據清洗、日志處理等操作。
優化建議
- 連接池配置:在初始化時添加
maxPoolSize
、minPoolSize
參數。 - 日志記錄:將
print
替換為 logging 模塊記錄錯誤信息。 - 異步支持:使用
motor
庫實現異步版本(適合 FastAPI 等異步框架)。 - 數據校驗:集成
pydantic
對輸入數據進行模式驗證。 - 緩存集成:結合 Redis 實現高頻查詢緩存。