MongoDB分片技術實現
概述
MongoDB分片(Sharding)是MongoDB的水平擴展解決方案,通過將數據分布到多個分片(shard)上來處理大數據量和高吞吐量的需求。
MongoDB分片架構
1. 分片集群組件
# MongoDB分片集群架構
version:?'3.8'
services:# Config Server副本集config1:image:?mongo:5.0command:?mongod?--configsvr?--replSet?configReplSet?--port?27019ports:-?"27019:27019"volumes:-?config1_data:/data/dbconfig2:image:?mongo:5.0command:?mongod?--configsvr?--replSet?configReplSet?--port?27019ports:-?"27020:27019"volumes:-?config2_data:/data/dbconfig3:image:?mongo:5.0command:?mongod?--configsvr?--replSet?configReplSet?--port?27019ports:-?"27021:27019"volumes:-?config3_data:/data/db# 分片1副本集shard1_replica1:image:?mongo:5.0command:?mongod?--shardsvr?--replSet?shard1ReplSet?--port?27018ports:-?"27022:27018"volumes:-?shard1_replica1_data:/data/dbshard1_replica2:image:?mongo:5.0command:?mongod?--shardsvr?--replSet?shard1ReplSet?--port?27018ports:-?"27023:27018"volumes:-?shard1_replica2_data:/data/db# 分片2副本集shard2_replica1:image:?mongo:5.0command:?mongod?--shardsvr?--replSet?shard2ReplSet?--port?27018ports:-?"27024:27018"volumes:-?shard2_replica1_data:/data/dbshard2_replica2:image:?mongo:5.0command:?mongod?--shardsvr?--replSet?shard2ReplSet?--port?27018ports:-?"27025:27018"volumes:-?shard2_replica2_data:/data/db# mongos路由服務mongos1:image:?mongo:5.0command:?mongos?--configdb?configReplSet/config1:27019,config2:27019,config3:27019?--port?27017ports:-?"27017:27017"depends_on:-?config1-?config2-?config3mongos2:image:?mongo:5.0command:?mongos?--configdb?configReplSet/config1:27019,config2:27019,config3:27019?--port?27017ports:-?"27026:27017"depends_on:-?config1-?config2-?config3volumes:config1_data:config2_data:config3_data:shard1_replica1_data:shard1_replica2_data:shard2_replica1_data:shard2_replica2_data:
2. 分片集群初始化腳本
#!/bin/bash
# mongodb-cluster-init.shecho?"初始化MongoDB分片集群..."# 等待服務啟動
sleep 30# 初始化Config Server副本集
echo?"初始化Config Server副本集..."
mongo --host config1:27019 --eval?'
rs.initiate({_id: "configReplSet",configsvr: true,members: [{ _id: 0, host: "config1:27019" },{ _id: 1, host: "config2:27019" },{ _id: 2, host: "config3:27019" }]
})'# 等待副本集初始化完成
sleep 20# 初始化分片1副本集
echo?"初始化分片1副本集..."
mongo --host shard1_replica1:27018 --eval?'
rs.initiate({_id: "shard1ReplSet",members: [{ _id: 0, host: "shard1_replica1:27018" },{ _id: 1, host: "shard1_replica2:27018" }]
})'# 初始化分片2副本集
echo?"初始化分片2副本集..."
mongo --host shard2_replica1:27018 --eval?'
rs.initiate({_id: "shard2ReplSet",members: [{ _id: 0, host: "shard2_replica1:27018" },{ _id: 1, host: "shard2_replica2:27018" }]
})'# 等待分片副本集初始化完成
sleep 30# 添加分片到集群
echo?"添加分片到集群..."
mongo --host mongos1:27017 --eval?'
sh.addShard("shard1ReplSet/shard1_replica1:27018,shard1_replica2:27018")
sh.addShard("shard2ReplSet/shard2_replica1:27018,shard2_replica2:27018")
'echo?"MongoDB分片集群初始化完成!"
Java應用集成
1. Spring Boot配置
@Configuration
public?class?MongoShardingConfig?{@Value("${spring.data.mongodb.uri}")private?String mongoUri;@Beanpublic?MongoClient?mongoClient()?{// 連接到mongos路由服務ConnectionString connectionString =?new?ConnectionString(mongoUri);MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(connectionString).readPreference(ReadPreference.secondaryPreferred())?// 讀寫分離.writeConcern(WriteConcern.MAJORITY)?// 寫關注.readConcern(ReadConcern.MAJORITY)?// 讀關注.retryWrites(true)?// 重試寫入.retryReads(true)?// 重試讀取.applyToConnectionPoolSettings(builder -> {builder.maxSize(100)?// 最大連接數.minSize(10)?// 最小連接數.maxWaitTime(30, TimeUnit.SECONDS)?// 最大等待時間.maxConnectionIdleTime(60, TimeUnit.SECONDS);?// 連接空閑時間}).build();return?MongoClients.create(settings);}@Beanpublic?MongoTemplate?mongoTemplate()?{return?new?MongoTemplate(mongoClient(),?"sharded_database");}
}
2. 分片鍵設計
@Document(collection =?"users")
public?class?User?{@Idprivate?String id;@Indexedprivate?String userId;?// 分片鍵private?String username;private?String email;private?Date createTime;private?String region;?// 地理位置// 構造函數、getter、setter
}@Document(collection =?"orders")
public?class?Order?{@Idprivate?String id;@Indexedprivate?String customerId;?// 分片鍵private?String orderId;private?BigDecimal amount;private?Date orderTime;private?String status;// 構造函數、getter、setter
}@Document(collection =?"products")
public?class?Product?{@Idprivate?String id;@Indexedprivate?String categoryId;?// 分片鍵private?String productName;private?BigDecimal price;private?String description;// 構造函數、getter、setter
}
3. 分片管理服務
@Service
public?class?MongoShardingService?{@Autowiredprivate?MongoTemplate mongoTemplate;/*** 啟用數據庫分片*/public?void?enableSharding(String database)?{Document command =?new?Document("enableSharding", database);mongoTemplate.getDb().runCommand(command);log.info("已啟用數據庫分片: {}", database);}/*** 對集合進行分片*/public?void?shardCollection(String database, String collection, String shardKey)?{Document command =?new?Document("shardCollection", database +?"."?+ collection).append("key",?new?Document(shardKey,?1));mongoTemplate.getDb().runCommand(command);log.info("已對集合進行分片: {}.{}, 分片鍵: {}", database, collection, shardKey);}/*** 創建哈希分片*/public?void?createHashedSharding(String database, String collection, String shardKey)?{Document command =?new?Document("shardCollection", database +?"."?+ collection).append("key",?new?Document(shardKey,?"hashed"));mongoTemplate.getDb().runCommand(command);log.info("已創建哈希分片: {}.{}, 分片鍵: {}", database, collection, shardKey);}/*** 創建范圍分片*/public?void?createRangeSharding(String database, String collection, String shardKey)?{Document command =?new?Document("shardCollection", database +?"."?+ collection).append("key",?new?Document(shardKey,?1));mongoTemplate.getDb().runCommand(command);log.info("已創建范圍分片: {}.{}, 分片鍵: {}", database, collection, shardKey);}/*** 創建復合分片鍵*/public?void?createCompoundSharding(String database, String collection,?Map<String, Object> shardKeys)?{Document keyDoc =?new?Document();shardKeys.forEach(keyDoc::append);Document command =?new?Document("shardCollection", database +?"."?+ collection).append("key", keyDoc);mongoTemplate.getDb().runCommand(command);log.info("已創建復合分片: {}.{}, 分片鍵: {}", database, collection, shardKeys);}/*** 查看分片狀態*/public?Document?getShardingStatus()?{return?mongoTemplate.getDb().runCommand(new?Document("sh.status",?1));}/*** 查看集合分片信息*/public?Document?getCollectionShardInfo(String database, String collection)?{Document command =?new?Document("collStats", collection).append("verbose",?true);return?mongoTemplate.getDb(database).runCommand(command);}
}
4. 分片初始化配置
@Component
public?class?ShardingInitializer?{@Autowiredprivate?MongoShardingService shardingService;@EventListener(ApplicationReadyEvent.class)public?void?initializeSharding()?{try?{// 啟用數據庫分片shardingService.enableSharding("sharded_database");// 用戶集合 - 使用userId哈希分片shardingService.createHashedSharding("sharded_database",?"users",?"userId");// 訂單集合 - 使用customerId范圍分片shardingService.createRangeSharding("sharded_database",?"orders",?"customerId");// 產品集合 - 使用復合分片鍵Map<String, Object> productShardKeys =?new?HashMap<>();productShardKeys.put("categoryId",?1);productShardKeys.put("productId",?1);shardingService.createCompoundSharding("sharded_database",?"products", productShardKeys);log.info("MongoDB分片初始化完成");}?catch?(Exception e) {log.error("MongoDB分片初始化失敗", e);}}
}
分片策略優化
1. 智能分片鍵選擇
@Service
public?class?ShardKeyOptimizer?{@Autowiredprivate?MongoTemplate mongoTemplate;/*** 分析集合的查詢模式*/public?ShardKeyRecommendation?analyzeQueryPatterns(String collection)?{// 分析查詢日志List<Document> queryLogs = getQueryLogs(collection);Map<String, Integer> fieldUsageCount =?new?HashMap<>();Map<String, Double> fieldSelectivity =?new?HashMap<>();for?(Document log : queryLogs) {Document query = log.get("command", Document.class);if?(query !=?null?&& query.containsKey("find")) {Document filter = query.get("filter", Document.class);if?(filter !=?null) {analyzeFilterFields(filter, fieldUsageCount);}}}// 計算字段選擇性for?(String field : fieldUsageCount.keySet()) {double?selectivity = calculateFieldSelectivity(collection, field);fieldSelectivity.put(field, selectivity);}return?recommendShardKey(fieldUsageCount, fieldSelectivity);}private?void?analyzeFilterFields(Document filter, Map<String, Integer> fieldUsageCount)?{for?(String field : filter.keySet()) {fieldUsageCount.merge(field,?1, Integer::sum);}}private?double?calculateFieldSelectivity(String collection, String field)?{// 計算字段的選擇性(不重復值的比例)Aggregation aggregation = Aggregation.newAggregation(Aggregation.group(field),Aggregation.count().as("distinctCount"));AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, collection, Document.class);long?distinctCount = results.getMappedResults().size();long?totalCount = mongoTemplate.count(new?Query(), collection);return?totalCount >?0?? (double) distinctCount / totalCount :?0;}private?ShardKeyRecommendation?recommendShardKey(Map<String, Integer> fieldUsageCount,?Map<String, Double> fieldSelectivity)?{// 綜合考慮使用頻率和選擇性String recommendedField = fieldUsageCount.entrySet().stream().max((e1, e2) -> {double?score1 = e1.getValue() * fieldSelectivity.getOrDefault(e1.getKey(),?0.0);double?score2 = e2.getValue() * fieldSelectivity.getOrDefault(e2.getKey(),?0.0);return?Double.compare(score1, score2);}).map(Map.Entry::getKey).orElse("_id");return?new?ShardKeyRecommendation(recommendedField,?fieldSelectivity.getOrDefault(recommendedField,?0.0));}private?List<Document>?getQueryLogs(String collection)?{// 從MongoDB profiler獲取查詢日志Query query =?new?Query(Criteria.where("ns").is("sharded_database."?+ collection).and("ts").gte(new?Date(System.currentTimeMillis() -?24?*?60?*?60?*?1000)));?// 最近24小時return?mongoTemplate.find(query, Document.class, "system.profile");}public?static?class?ShardKeyRecommendation?{private?String field;private?double?selectivity;public?ShardKeyRecommendation(String field,?double?selectivity)?{this.field = field;this.selectivity = selectivity;}// getter和setter}
}
2. 數據平衡監控
@Service
public?class?ShardBalanceMonitor?{@Autowiredprivate?MongoTemplate mongoTemplate;@Autowiredprivate?MeterRegistry meterRegistry;/*** 監控分片數據分布*/@Scheduled(fixedRate =?300000)?// 5分鐘檢查一次public?void?monitorShardDistribution()?{try?{Document shardStats = mongoTemplate.getDb().runCommand(new?Document("shardDistribution",?1));analyzeShardBalance(shardStats);}?catch?(Exception e) {log.error("分片分布監控失敗", e);}}private?void?analyzeShardBalance(Document shardStats)?{Document shards = shardStats.get("shards", Document.class);if?(shards ==?null)?return;Map<String, Long> shardSizes =?new?HashMap<>();long?totalSize =?0;for?(String shardName : shards.keySet()) {Document shardInfo = shards.get(shardName, Document.class);long?size = shardInfo.getLong("size");shardSizes.put(shardName, size);totalSize += size;}// 計算分布不均衡度double?imbalanceRatio = calculateImbalanceRatio(shardSizes, totalSize);// 記錄指標Gauge.builder("mongodb.shard.imbalance.ratio").register(meterRegistry, imbalanceRatio);// 如果不均衡度超過閾值,觸發重新平衡if?(imbalanceRatio >?0.3) {?// 30%的不均衡度log.warn("檢測到分片數據不均衡,不均衡度: {:.2f}", imbalanceRatio);triggerRebalance();}}private?double?calculateImbalanceRatio(Map<String, Long> shardSizes,?long?totalSize)?{if?(shardSizes.isEmpty() || totalSize ==?0)?return?0;double?avgSize = (double) totalSize / shardSizes.size();double?maxDeviation = shardSizes.values().stream().mapToDouble(size -> Math.abs(size - avgSize) / avgSize).max().orElse(0);return?maxDeviation;}private?void?triggerRebalance()?{try?{// 啟動平衡器mongoTemplate.getDb().runCommand(new?Document("balancerStart",?1));log.info("已啟動分片重新平衡");}?catch?(Exception e) {log.error("啟動分片重新平衡失敗", e);}}/*** 監控chunk分布*/@Scheduled(fixedRate =?600000)?// 10分鐘檢查一次public?void?monitorChunkDistribution()?{try?{// 查詢chunks集合Query query =?new?Query();List<Document> chunks = mongoTemplate.find(query, Document.class, "chunks");Map<String, Integer> shardChunkCount =?new?HashMap<>();for?(Document chunk : chunks) {String shard = chunk.getString("shard");shardChunkCount.merge(shard,?1, Integer::sum);}// 記錄每個分片的chunk數量for?(Map.Entry<String, Integer> entry : shardChunkCount.entrySet()) {Gauge.builder("mongodb.shard.chunk.count").tag("shard", entry.getKey()).register(meterRegistry, entry.getValue());}}?catch?(Exception e) {log.error("Chunk分布監控失敗", e);}}
}
3. 查詢路由優化
@Service
public?class?QueryRoutingOptimizer?{@Autowiredprivate?MongoTemplate mongoTemplate;/*** 優化查詢以避免跨分片操作*/public?<T>?List<T>?optimizedFind(Query query, Class<T> entityClass, String collection)?{// 分析查詢是否包含分片鍵if?(containsShardKey(query, collection)) {// 包含分片鍵,可以路由到特定分片return?mongoTemplate.find(query, entityClass, collection);}?else?{// 不包含分片鍵,需要廣播查詢log.warn("查詢不包含分片鍵,將執行跨分片查詢: {}", query);return?mongoTemplate.find(query, entityClass, collection);}}/*** 批量查詢優化*/public?<T>?List<T>?optimizedBatchFind(List<String> shardKeyValues,?String shardKeyField,?Class<T> entityClass,?String collection)?{// 按分片鍵分組Map<String, List<String>> shardGroups = groupByShardKey(shardKeyValues, shardKeyField);List<T> results =?new?ArrayList<>();// 并行查詢各分片shardGroups.entrySet().parallelStream().forEach(entry -> {Query query =?new?Query(Criteria.where(shardKeyField).in(entry.getValue()));List<T> shardResults = mongoTemplate.find(query, entityClass, collection);synchronized?(results) {results.addAll(shardResults);}});return?results;}/*** 聚合查詢優化*/public?<T>?AggregationResults<T>?optimizedAggregate(Aggregation aggregation,?String collection,?Class<T> outputType)?{// 檢查聚合管道是否可以下推到分片if?(canPushDownToShards(aggregation)) {return?mongoTemplate.aggregate(aggregation, collection, outputType);}?else?{// 需要在mongos層進行聚合log.warn("聚合操作需要在mongos層執行,可能影響性能");return?mongoTemplate.aggregate(aggregation, collection, outputType);}}private?boolean?containsShardKey(Query query, String collection)?{// 獲取集合的分片鍵信息String shardKey = getShardKey(collection);if?(shardKey ==?null)?return?false;// 檢查查詢條件是否包含分片鍵Document queryDoc = query.getQueryObject();return?queryDoc.containsKey(shardKey);}private?String?getShardKey(String collection)?{try?{// 從config.collections獲取分片鍵信息Query query =?new?Query(Criteria.where("_id").is("sharded_database."?+ collection));Document collectionInfo = mongoTemplate.findOne(query, Document.class, "collections");if?(collectionInfo !=?null) {Document key = collectionInfo.get("key", Document.class);if?(key !=?null?&& !key.isEmpty()) {return?key.keySet().iterator().next();}}}?catch?(Exception e) {log.error("獲取分片鍵失敗", e);}return?null;}private?Map<String, List<String>> groupByShardKey(List<String> values, String shardKeyField) {// 根據分片鍵值計算目標分片Map<String, List<String>> groups =?new?HashMap<>();for?(String value : values) {String targetShard = calculateTargetShard(value, shardKeyField);groups.computeIfAbsent(targetShard, k ->?new?ArrayList<>()).add(value);}return?groups;}private?String?calculateTargetShard(String shardKeyValue, String shardKeyField)?{// 簡化的分片計算邏輯int?hash = shardKeyValue.hashCode();int?shardCount = getShardCount();int?shardIndex = Math.abs(hash) % shardCount;return?"shard"?+ shardIndex;}private?int?getShardCount()?{try?{Document listShards = mongoTemplate.getDb().runCommand(new?Document("listShards",?1));List<Document> shards = listShards.getList("shards", Document.class);return?shards !=?null?? shards.size() :?1;}?catch?(Exception e) {log.error("獲取分片數量失敗", e);return?1;}}private?boolean?canPushDownToShards(Aggregation aggregation)?{// 檢查聚合管道是否包含可以下推到分片的操作List<AggregationOperation> operations = aggregation.getOperations();for?(AggregationOperation operation : operations) {if?(operation?instanceof?GroupOperation ||?operation?instanceof?SortOperation ||operation?instanceof?LimitOperation) {// 這些操作通常需要在mongos層執行return?false;}}return?true;}
}
性能優化
1. 連接池優化
@Configuration
public?class?MongoConnectionOptimization?{@Beanpublic?MongoClientSettings?mongoClientSettings()?{return?MongoClientSettings.builder().applyToConnectionPoolSettings(builder -> {builder.maxSize(200) ? ? ? ? ? ? ? ? ? ?// 最大連接數.minSize(20) ? ? ? ? ? ? ? ? ? ? ? ?// 最小連接數.maxWaitTime(30, TimeUnit.SECONDS) ?// 最大等待時間.maxConnectionLifeTime(60, TimeUnit.MINUTES)?// 連接最大生存時間.maxConnectionIdleTime(30, TimeUnit.MINUTES)?// 連接最大空閑時間.maintenanceInitialDelay(0, TimeUnit.SECONDS).maintenanceFrequency(30, TimeUnit.SECONDS);?// 維護頻率}).applyToSocketSettings(builder -> {builder.connectTimeout(10, TimeUnit.SECONDS) ? ?// 連接超時.readTimeout(30, TimeUnit.SECONDS); ? ? ? ??// 讀取超時}).applyToServerSettings(builder -> {builder.heartbeatFrequency(10, TimeUnit.SECONDS) ??// 心跳頻率.minHeartbeatFrequency(500, TimeUnit.MILLISECONDS);?// 最小心跳頻率}).build();}
}
2. 批量操作優化
@Service
public?class?MongoBatchOptimization?{@Autowiredprivate?MongoTemplate mongoTemplate;/*** 批量插入優化*/public?<T>?void?optimizedBatchInsert(List<T> documents, String collection)?{if?(documents.isEmpty())?return;// 按分片鍵分組Map<String, List<T>> shardGroups = groupDocumentsByShardKey(documents, collection);// 并行插入各分片shardGroups.entrySet().parallelStream().forEach(entry -> {List<T> shardDocuments = entry.getValue();// 分批插入,避免單次操作過大int?batchSize =?1000;for?(int?i =?0; i < shardDocuments.size(); i += batchSize) {int?endIndex = Math.min(i + batchSize, shardDocuments.size());List<T> batch = shardDocuments.subList(i, endIndex);mongoTemplate.insert(batch, collection);}});}/*** 批量更新優化*/public?void?optimizedBatchUpdate(List<UpdateRequest> updateRequests, String collection)?{BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collection);for?(UpdateRequest request : updateRequests) {bulkOps.updateOne(request.getQuery(), request.getUpdate());}BulkWriteResult result = bulkOps.execute();log.info("批量更新完成,匹配: {}, 修改: {}",?result.getMatchedCount(), result.getModifiedCount());}/*** 批量刪除優化*/public?void?optimizedBatchDelete(List<Query> deleteQueries, String collection)?{BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collection);for?(Query query : deleteQueries) {bulkOps.remove(query);}BulkWriteResult result = bulkOps.execute();log.info("批量刪除完成,刪除數量: {}", result.getDeletedCount());}private?<T> Map<String, List<T>> groupDocumentsByShardKey(List<T> documents, String collection) {// 根據分片鍵對文檔進行分組Map<String, List<T>> groups =?new?HashMap<>();String shardKey = getShardKey(collection);if?(shardKey ==?null) {groups.put("default", documents);return?groups;}for?(T document : documents) {String shardKeyValue = extractShardKeyValue(document, shardKey);String targetShard = calculateTargetShard(shardKeyValue);groups.computeIfAbsent(targetShard, k ->?new?ArrayList<>()).add(document);}return?groups;}private?String?extractShardKeyValue(Object document, String shardKey)?{// 使用反射或其他方式提取分片鍵值try?{Field field = document.getClass().getDeclaredField(shardKey);field.setAccessible(true);Object value = field.get(document);return?value !=?null?? value.toString() :?"";}?catch?(Exception e) {log.error("提取分片鍵值失敗", e);return?"";}}private?String?getShardKey(String collection)?{// 獲取集合的分片鍵return?"userId";?// 簡化實現}private?String?calculateTargetShard(String shardKeyValue)?{// 計算目標分片return?"shard0";?// 簡化實現}public?static?class?UpdateRequest?{private?Query query;private?Update update;public?UpdateRequest(Query query, Update update)?{this.query = query;this.update = update;}// getter和setter}
}
3. 索引優化
@Service
public?class?MongoIndexOptimization?{@Autowiredprivate?MongoTemplate mongoTemplate;/*** 創建分片友好的索引*/public?void?createShardFriendlyIndexes(String collection)?{// 1. 分片鍵索引(自動創建)// 2. 復合索引(包含分片鍵)Index compoundIndex =?new?Index().on("userId", Sort.Direction.ASC) ?// 分片鍵.on("createTime", Sort.Direction.DESC).on("status", Sort.Direction.ASC);mongoTemplate.indexOps(collection).ensureIndex(compoundIndex);// 3. 查詢優化索引Index queryIndex =?new?Index().on("userId", Sort.Direction.ASC) ?// 分片鍵.on("email", Sort.Direction.ASC).sparse();?// 稀疏索引mongoTemplate.indexOps(collection).ensureIndex(queryIndex);// 4. 地理位置索引Index geoIndex =?new?Index().on("userId", Sort.Direction.ASC) ?// 分片鍵.on("location",?"2dsphere");mongoTemplate.indexOps(collection).ensureIndex(geoIndex);}/*** 監控索引使用情況*/@Scheduled(fixedRate =?3600000)?// 1小時檢查一次public?void?monitorIndexUsage()?{List<String> collections = Arrays.asList("users",?"orders",?"products");for?(String collection : collections) {try?{// 獲取索引統計信息Document indexStats = mongoTemplate.getDb().getCollection(collection).aggregate(Arrays.asList(new?Document("$indexStats",?new?Document()))).first();if?(indexStats !=?null) {analyzeIndexUsage(collection, indexStats);}}?catch?(Exception e) {log.error("監控索引使用情況失敗: {}", collection, e);}}}private?void?analyzeIndexUsage(String collection, Document indexStats)?{Document accesses = indexStats.get("accesses", Document.class);if?(accesses !=?null) {long?ops = accesses.getLong("ops");Date since = accesses.getDate("since");if?(ops ==?0?&& since !=?null) {long?daysSinceLastUse = (System.currentTimeMillis() - since.getTime()) / (24?*?60?*?60?*?1000);if?(daysSinceLastUse >?30) {log.warn("索引 {} 在集合 {} 中超過30天未使用,考慮刪除",?indexStats.getString("name"), collection);}}}}/*** 自動創建查詢優化索引*/public?void?autoCreateQueryIndexes(String collection, List<Document> queryPatterns)?{Map<String, Integer> fieldUsageCount =?new?HashMap<>();// 分析查詢模式for?(Document query : queryPatterns) {analyzeQueryFields(query, fieldUsageCount);}// 創建高頻查詢字段的索引fieldUsageCount.entrySet().stream().filter(entry -> entry.getValue() >?100)?// 使用次數超過100.forEach(entry -> {String field = entry.getKey();if?(!field.equals("_id")) {?// 跳過默認索引Index index =?new?Index().on(field, Sort.Direction.ASC);mongoTemplate.indexOps(collection).ensureIndex(index);log.info("為字段 {} 創建索引,使用頻率: {}", field, entry.getValue());}});}private?void?analyzeQueryFields(Document query, Map<String, Integer> fieldUsageCount)?{for?(String field : query.keySet()) {if?(!field.startsWith("$")) {?// 跳過操作符fieldUsageCount.merge(field,?1, Integer::sum);}}}
}
監控與運維
1. 分片集群監控
@Component
public?class?MongoShardingMonitor?{@Autowiredprivate?MongoTemplate mongoTemplate;@Autowiredprivate?MeterRegistry meterRegistry;/*** 監控分片集群健康狀態*/@Scheduled(fixedRate =?30000)public?void?monitorClusterHealth()?{try?{// 檢查mongos狀態Document isMaster = mongoTemplate.getDb().runCommand(new?Document("isMaster",?1));boolean?isMongos = isMaster.getBoolean("ismaster",?false);// 檢查分片狀態Document listShards = mongoTemplate.getDb().runCommand(new?Document("listShards",?1));List<Document> shards = listShards.getList("shards", Document.class);int?healthyShards =?0;int?totalShards = shards.size();for?(Document shard : shards) {String state = shard.getString("state");if?("1".equals(state)) {healthyShards++;}}// 記錄指標Gauge.builder("mongodb.cluster.shards.total").register(meterRegistry, totalShards);Gauge.builder("mongodb.cluster.shards.healthy").register(meterRegistry, healthyShards);// 檢查平衡器狀態Document balancerStatus = mongoTemplate.getDb().runCommand(new?Document("balancerStatus",?1));boolean?balancerEnabled = balancerStatus.getBoolean("mode",?false);Gauge.builder("mongodb.cluster.balancer.enabled").register(meterRegistry, balancerEnabled ??1?:?0);}?catch?(Exception e) {log.error("MongoDB集群健康監控失敗", e);}}/*** 監控分片性能指標*/@Scheduled(fixedRate =?60000)public?void?monitorShardPerformance()?{try?{Document serverStatus = mongoTemplate.getDb().runCommand(new?Document("serverStatus",?1));// 連接數Document connections = serverStatus.get("connections", Document.class);if?(connections !=?null) {int?current = connections.getInteger("current",?0);int?available = connections.getInteger("available",?0);Gauge.builder("mongodb.connections.current").register(meterRegistry, current);Gauge.builder("mongodb.connections.available").register(meterRegistry, available);}// 操作計數Document opcounters = serverStatus.get("opcounters", Document.class);if?(opcounters !=?null) {long?insert = opcounters.getLong("insert");long?query = opcounters.getLong("query");long?update = opcounters.getLong("update");long?delete = opcounters.getLong("delete");Counter.builder("mongodb.operations.insert").register(meterRegistry).increment(insert);Counter.builder("mongodb.operations.query").register(meterRegistry).increment(query);Counter.builder("mongodb.operations.update").register(meterRegistry).increment(update);Counter.builder("mongodb.operations.delete").register(meterRegistry).increment(delete);}}?catch?(Exception e) {log.error("MongoDB性能監控失敗", e);}}
}
2. 自動故障恢復
@Service
public?class?MongoFailoverService?{@Autowiredprivate?MongoTemplate mongoTemplate;@Autowiredprivate?NotificationService notificationService;/*** 檢測并處理分片故障*/@Scheduled(fixedRate =?15000)public?void?detectAndHandleFailures()?{try?{Document listShards = mongoTemplate.getDb().runCommand(new?Document("listShards",?1));List<Document> shards = listShards.getList("shards", Document.class);for?(Document shard : shards) {String shardId = shard.getString("_id");String host = shard.getString("host");String state = shard.getString("state");if?(!"1".equals(state)) {handleShardFailure(shardId, host);}}}?catch?(Exception e) {log.error("分片故障檢測失敗", e);}}private?void?handleShardFailure(String shardId, String host)?{log.error("檢測到分片故障: {} ({})", shardId, host);// 發送告警notificationService.sendAlert("MongoDB分片故障",String.format("分片 %s (%s) 發生故障,請及時處理", shardId, host));// 嘗試自動恢復attemptAutoRecovery(shardId, host);}private?void?attemptAutoRecovery(String shardId, String host)?{try?{// 檢查副本集狀態if?(host.contains("/")) {String[] parts = host.split("/");String replSetName = parts[0];String[] hosts = parts[1].split(",");// 嘗試連接副本集的其他成員for?(String memberHost : hosts) {if?(testConnection(memberHost)) {log.info("副本集 {} 的成員 {} 仍然可用", replSetName, memberHost);return;}}}// 如果所有成員都不可用,嘗試重啟服務log.warn("分片 {} 的所有成員都不可用,需要手動干預", shardId);}?catch?(Exception e) {log.error("自動恢復失敗", e);}}private?boolean?testConnection(String host)?{try?{MongoClient testClient = MongoClients.create("mongodb://"?+ host);testClient.getDatabase("admin").runCommand(new?Document("ping",?1));testClient.close();return?true;}?catch?(Exception e) {return?false;}}
}
總結
MongoDB分片技術是處理大規模數據的重要解決方案。成功實施需要考慮:
分片鍵設計:選擇合適的分片鍵是關鍵,需要平衡查詢性能和數據分布
架構規劃:合理規劃Config Server、分片和mongos的部署
查詢優化:盡量包含分片鍵以避免跨分片查詢
監控運維:建立完善的監控體系,及時發現和處理問題
最佳實踐:
選擇高基數、查詢頻繁的字段作為分片鍵
使用復合分片鍵提高查詢效率
定期監控數據分布和性能指標
建立自動化的故障檢測和恢復機制
通過合理的設計和實施,MongoDB分片可以為應用提供優秀的水平擴展能力。