目錄
- Python快速上手(三十一)
- Python MongoDB 詳解
- 1. 安裝 pymongo
- 2. 連接 MongoDB
- 3. 創建和刪除集合
- 4. CRUD 操作
- 5. 查詢操作
- 6. 索引
- 7. 聚合
- 8. 其他操作
- 9. 連接池和超時
- 10. 實際應用案例
Python快速上手(三十一)
Python MongoDB 詳解
MongoDB 是一種 NoSQL 數據庫,它使用文檔存儲數據,提供高性能、高可用性和易擴展性。Python 提供了 pymongo 庫來與 MongoDB 進行交互。本文將詳細講解如何使用 Python 與 MongoDB 進行各種操作,包括連接數據庫、CRUD 操作、查詢、索引和聚合。
1. 安裝 pymongo
在使用 MongoDB 前,需要安裝 pymongo 庫。可以使用以下命令安裝:
pip install pymongo
2. 連接 MongoDB
2.1 基本連接
使用 MongoClient 類連接到 MongoDB 服務器:
from pymongo import MongoClientclient = MongoClient('localhost', 27017)
2.2 使用 URI 連接
可以使用 MongoDB URI 連接字符串:
client = MongoClient('mongodb://localhost:27017/')
2.3 訪問數據庫
連接到特定的數據庫:
db = client['mydatabase']
或者:
db = client.mydatabase
2.4 認證
如果 MongoDB 需要認證,使用以下方式連接:
client = MongoClient('mongodb://username:password@localhost:27017/mydatabase')
db = client['mydatabase']
3. 創建和刪除集合
3.1 創建集合
MongoDB 會在第一次插入文檔時自動創建集合:
collection = db['mycollection']
3.2 刪除集合
使用 drop 方法刪除集合:
collection.drop()
4. CRUD 操作
4.1 插入文檔
4.1.1 插入單個文檔
使用 insert_one 方法插入一個文檔:
document = {"name": "John", "age": 25, "city": "New York"}
collection.insert_one(document)
4.1.2 插入多個文檔
使用 insert_many 方法插入多個文檔:
documents = [{"name": "Anna", "age": 28, "city": "London"},{"name": "Mike", "age": 32, "city": "Chicago"}
]
collection.insert_many(documents)
4.2 查詢文檔
4.2.1 查詢單個文檔
使用 find_one 方法查詢單個文檔:
result = collection.find_one({"name": "John"})
print(result)
4.2.2 查詢多個文檔
使用 find 方法查詢多個文檔:
results = collection.find({"age": {"$gt": 20}})
for result in results:print(result)
4.3 更新文檔
4.3.1 更新單個文檔
使用 update_one 方法更新單個文檔:
collection.update_one({"name": "John"}, {"$set": {"age": 26}})
4.3.2 更新多個文檔
使用 update_many 方法更新多個文檔:
collection.update_many({"age": {"$gt": 20}}, {"$set": {"city": "San Francisco"}})
4.4 刪除文檔
4.4.1 刪除單個文檔
使用 delete_one 方法刪除單個文檔:
collection.delete_one({"name": "John"})
4.4.2 刪除多個文檔
使用 delete_many 方法刪除多個文檔:
collection.delete_many({"age": {"$gt": 20}})
5. 查詢操作
5.1 條件查詢
使用查詢操作符進行條件查詢:
results = collection.find({"age": {"$gte": 25}})
常用查詢操作符:
- $eq:等于
- $ne:不等于
- $gt:大于
- $gte:大于等于
- $lt:小于
- $lte:小于等于
- $in:在指定數組內
- $nin:不在指定數組內
5.2 邏輯操作符
使用邏輯操作符進行查詢:
results = collection.find({"$or": [{"age": {"$lt": 25}}, {"city": "London"}]})
常用邏輯操作符:
- $and:與
- $or:或
- $not:非
- $nor:非或
5.3 正則表達式
使用正則表達式進行查詢:
results = collection.find({"name": {"$regex": "^J"}})
5.4 字段選擇
指定返回的字段:
results = collection.find({}, {"_id": 0, "name": 1, "age": 1})
5.5 排序
使用 sort 方法進行排序:
results = collection.find().sort("age", -1)
5.6 限制和跳過
使用 limit 和 skip 方法進行分頁:
results = collection.find().skip(5).limit(10)
6. 索引
6.1 創建索引
使用 create_index 方法創建索引:
collection.create_index([("name", 1)])
6.2 列出索引
使用 list_indexes 方法列出索引:
for index in collection.list_indexes():print(index)
6.3 刪除索引
使用 drop_index 方法刪除索引:
collection.drop_index("name_1")
6.4 刪除所有索引
使用 drop_indexes 方法刪除所有索引:
collection.drop_indexes()
7. 聚合
7.1 基本聚合
使用 aggregate 方法進行聚合:
pipeline = [{"$match": {"age": {"$gte": 25}}},{"$group": {"_id": "$city", "average_age": {"$avg": "$age"}}}
]
results = collection.aggregate(pipeline)
for result in results:print(result)
7.2 聚合操作符
常用聚合操作符:
- $match:過濾數據
- $group:分組并進行計算
- $sort:排序
- $limit:限制結果數量
- $skip:跳過指定數量的結果
- $project:改變輸出文檔的結構
- $unwind:拆分數組字段中的元素
7.3 聚合示例
計算每個城市的平均年齡:
pipeline = [{"$group": {"_id": "$city", "average_age": {"$avg": "$age"}}}
]
results = collection.aggregate(pipeline)
for result in results:print(result)
8. 其他操作
8.1 統計集合文檔數量
使用 count_documents 方法統計文檔數量:
count = collection.count_documents({"age": {"$gte": 25}})
print(count)
8.2 執行命令
使用 command 方法執行數據庫命令:
result = db.command("serverStatus")
print(result)
9. 連接池和超時
9.1 設置連接池
可以設置連接池參數,例如最大連接數:
client = MongoClient('mongodb://localhost:27017/', maxPoolSize=50)
9.2 設置超時
可以設置連接超時和操作超時:
client = MongoClient('mongodb://localhost:27017/', serverSelectionTimeoutMS=5000, socketTimeoutMS=2000)
10. 實際應用案例
10.1 用戶注冊系統
以下示例展示了如何使用 MongoDB 實現一個簡單的用戶注冊系統:
from pymongo import MongoClient
import datetimeclient = MongoClient('mongodb://localhost:27017/')
db = client['user_database']
users = db['users']# 注冊用戶
def register_user(username, password, email):user = {"username": username,"password": password,"email": email,"created_at": datetime.datetime.now()}users.insert_one(user)print(f"User {username} registered successfully")# 查詢用戶
def find_user(username):user = users.find_one({"username": username})if user:print(f"User found: {user}")else:print("User not found")# 更新用戶密碼
def update_password(username, new_password):result = users.update_one({"username": username}, {"$set": {"password": new_password}})if result.matched_count:print(f"Password updated for user {username}")else:print("User not found")# 刪除用戶
def delete_user(username):result = users.delete_one({"username": username})