在 PyMongo 中使用 compact
命令進行 MongoDB 碎片回收的完整操作指南如下:
一、核心執行方法
from pymongo import MongoClient
import time# 1. 連接到 MongoDB 實例
client = MongoClient("mongodb://username:password@host:27017/dbname?authSource=admin")# 2. 選擇目標數據庫和集合
db = client["your_database"]
collection = db["your_collection"]# 3. 執行 compact 命令
try:# 執行碎片回收(返回操作ID)result = db.command("compact", collection.name)print(f"Compact operation started. Operation ID: {result['operationTime']}")# 監控操作進度(可選)operation_id = result["operationTime"]while True:current_ops = db.command("currentOp", {"operationTime": operation_id})if not current_ops.get("inprog", []):breakprint("Compact in progress...")time.sleep(10)print("? Compact completed successfully!")except Exception as e:print(f"? Compact failed: {str(e)}")
finally:client.close()
二、關鍵參數配置
# 添加額外參數(副本集secondary節點需要force)
result = db.command("compact", collection.name,force=True, # 強制在secondary節點運行compression={"type": "zlib"}, # 指定壓縮算法paddingFactor=1.1, # 預留空間因子(0-4.0)maxPaddingBytes=1024, # 最大填充字節tieredStorage={"useRecycledSpace": True} # Atlas專用
)
三、集群環境操作方案
1. 副本集自動滾動執行
rs_members = ["rs1/mongo1:27017","rs1/mongo2:27017","rs1/mongo3:27017"
]for member in rs_members:member_client = MongoClient(f"mongodb://user:pass@{member}/admin?replicaSet=rs1")# 檢查節點類型is_primary = member_client.admin.command("isMaster").get("ismaster")# 降級主節點(每次處理前)if is_primary:member_client.admin.command("replSetStepDown", 300) # 降級300秒try:db = member_client["your_db"]db.command("compact", "your_collection", force=True)print(f"? Compact completed on {member}")except Exception as e:print(f"? Failed on {member}: {str(e)}")finally:member_client.close()
2. 分片集群自動處理
# 通過Config Server獲取分片列表
config_client = MongoClient("mongodb://config_server:27019")
shards = config_client.config.shards.find()for shard in shards:shard_name = shard["_id"]shard_host = shard["host"].split("/")[-1] # 提取主機地址try:shard_client = MongoClient(f"mongodb://{shard_host}/admin")# 確認是分片主節點if shard_client.admin.command("isMaster").get("ismaster"):db = shard_client["your_db"]db.command("compact", "your_collection")print(f"? Compact on shard {shard_name} completed")else:print(f"?? {shard_host} is not primary, skipped")except Exception as e:print(f"? Shard {shard_name} failed: {str(e)}")finally:shard_client.close()
四、操作結果驗證
# 對比前后存儲狀態
pre_stats = collection.stats()
# ... compact 執行 ...
post_stats = collection.stats()print(f"存儲優化報告:")
print(f"- 原始大小: {pre_stats['storageSize'] / 1024**2:.2f} MB")
print(f"- 優化后: {post_stats['storageSize'] / 1024**2:.2f} MB")
print(f"- 節省空間: {(pre_stats['storageSize'] - post_stats['storageSize']) / 1024**2:.2f} MB")
print(f"- 碎片率: {100 * (pre_stats['size'] / pre_stats['storageSize'] - 1):.1f}% → "f"{100 * (post_stats['size'] / post_stats['storageSize'] - 1):.1f}%")
五、安全操作注意事項
-
阻塞機制處理
# 檢查當前操作是否被阻塞 if db.current_op({"command.compact": {"$exists": True}}):print("?? Another compact already running")exit()# 設置超時自動中斷 client = MongoClient(connectTimeoutMS=30000, socketTimeoutMS=3600000)
-
磁盤空間保障
# 檢查磁盤空間 disk_stats = client.admin.command("fsInfo") free_space = disk_stats["fsUsedSize"] - disk_stats["fsTotalSize"] coll_size = collection.stats()["storageSize"]if free_space < coll_size * 1.5:print(f"? Insufficient disk space. Need {coll_size*1.5} bytes, only {free_space} available")exit()
-
Atlas 云服務專用
# Atlas需要特殊授權 client = MongoClient(connect_string, authMechanism="MONGODB-AWS")# 使用分層存儲API compact_opts = {"tieredStorage": {"useRecycledSpace": True,"reclaimSpace": True} }
六、替代方案實現
無損在線重建方案:
def online_recompact(db_name, coll_name):temp_name = f"{coll_name}_compact_{int(time.time())}"# 1. 創建臨時集合db.command("create", temp_name)# 2. 逐步復制數據(避免大事務阻塞)source = db[coll_name]dest = db[temp_name]batch_size = 1000total_docs = source.count_documents({})for skip in range(0, total_docs, batch_size):docs = source.find().skip(skip).limit(batch_size)dest.insert_many(list(docs))# 3. 原集合原子替換source.rename(f"old_{coll_name}", dropTarget=True)dest.rename(coll_name)db[f"old_{coll_name}"].drop()
最佳實踐總結
-
執行窗口選擇
# 獲取當前時間并判斷 from datetime import datetime current_hour = datetime.now().hour if 0 <= current_hour < 5: # 凌晨執行run_compact() else:print("?? Operation declined: Not in maintenance window")
-
定時清理腳本框架
import schedule import timedef weekly_compact():shard_cluster_compact() # 調用前述集群函數# 每周日凌晨1點執行 schedule.every().sunday.at("01:00").do(weekly_compact)while True:schedule.run_pending()time.sleep(60)
-
健康檢查指標
HEALTH_THRESHOLD = 0.8 # 碎片率閾值def needs_compact(collection):stats = collection.stats()fragmentation = 1 - (stats["size"] / stats["storageSize"])return fragmentation > HEALTH_THRESHOLD# 自動檢測執行 if needs_compact(collection):run_compact(collection)
關鍵提示:在MongoDB Atlas中,建議啟用https://docs.atlas.mongodb.com/tiered-storage/替代手動compact。對10GB以上的集合操作時,優先采用
online_recompact
方案確保業務連續性。