🚀 ABP VNext + MongoDB 數據存儲:多模型支持與 NoSQL 擴展(生產級實踐)
目錄
- 🚀 ABP VNext + MongoDB 數據存儲:多模型支持與 NoSQL 擴展(生產級實踐)
- 🎯 引言
- 🧰 環境與依賴
- ?? appsettings.json
- 🏗? 架構概述
- 🤖 集成與配置
- 📑 模塊注冊
- 📘 DbContext 定義
- 📦 自定義倉儲實現
- 🔐 事務處理與一致性
- 🔄 UnitOfWork 流程
- 🗺? 分片與模型設計
- 🔑 Shard Key 評估
- 🏗? 多模型建模
- 🚀 性能優化指南
- 📈 索引創建
- ? 批量寫入
- 📊 監控與可觀測性
- 🐢 慢查詢檢測(CommandSucceededEvent)
- 🌐 Application Insights
- 🛠? Controller 全 CRUD 示例
- 🧪 單元測試示例(xUnit + Mongo2Go + DI)
- 📚 附錄
🎯 引言
在高并發、快速迭代的業務環境中,傳統 RDBMS 因結構僵硬、事務開銷大而難以應對。MongoDB 以其靈活文檔模型、高吞吐與分布式能力,成為 ABP 應用的理想補充。本文將示范如何在 ABP VNext 中生產級地集成 MongoDB——從配置、DI、倉儲,到事務、多模型設計與監控全覆蓋。
💬 業務痛點
- 頻繁迭代導致表結構變更成本高
- 大規模寫入時事務與鎖競爭瓶頸明顯
- 多租戶隔離需高擴展性
🧰 環境與依賴
- 🖥? .NET 8
- 📦 ABP v6+
- 🌐 MongoDB Server 6.x(Replica Set / Sharded Cluster)
- 📦 NuGet 包
MongoDB.Driver
Volo.Abp.MongoDB
?? appsettings.json
{"ConnectionStrings": {"MongoDb": "mongodb://localhost:27017/?maxPoolSize=200&minPoolSize=50"},"MongoDb": {"DatabaseName": "MyProjectDb"}
}
🏗? 架構概述
🤖 集成與配置
📑 模塊注冊
public override void PreConfigureServices(ServiceConfigurationContext context)
{Configure<AbpMongoDbContextOptions>(options =>{options.ConnectionStringName = "MongoDb";});
}public override void ConfigureServices(ServiceConfigurationContext context)
{context.Services.AddMongoDbContext<MyMongoDbContext>(builder =>{// includeAllEntities: false 僅為聚合根生成倉儲builder.AddDefaultRepositories(includeAllEntities: false);});
}
💡 可根據項目需要,將
includeAllEntities
設置為true
或false
。
📘 DbContext 定義
[ConnectionStringName("MongoDb")]
[MultiTenant]
public class MyMongoDbContext : AbpMongoDbContext
{public IMongoCollection<Order> Orders => Database.GetCollection<Order>("Orders");public IMongoCollection<Address> Addresses => Database.GetCollection<Address>("Addresses");public MyMongoDbContext(IAbpMongoDbContextOptions<MyMongoDbContext> options): base(options) { }
}
- 建議:在模塊
PreConfigureServices
注入ICurrentTenant
控制數據庫路由。
📦 自定義倉儲實現
public interface IMongoRepository<TEntity, TKey> : IRepository<TEntity, TKey>where TEntity : class, IEntity<TKey>
{Task BulkInsertAsync(IEnumerable<TEntity> entities, bool isOrdered = false);Task<IEnumerable<TResult>> AggregateLookupAsync<TForeign, TResult>(Expression<Func<TEntity, object>> localField,Expression<Func<TForeign, object>> foreignField,PipelineDefinition<TEntity, TResult> pipeline);
}public class MongoRepository<TEntity, TKey>: MongoDbRepository<MyMongoDbContext, TEntity, TKey>, IMongoRepository<TEntity, TKey>where TEntity : class, IEntity<TKey>
{private readonly IMongoCollection<TEntity> _collection;public MongoRepository(IDbContextProvider<MyMongoDbContext> dbContextProvider): base(dbContextProvider){_collection = dbContextProvider.GetDbContext().Database.GetCollection<TEntity>(typeof(TEntity).Name);}public async Task BulkInsertAsync(IEnumerable<TEntity> entities, bool isOrdered = false){var models = entities.Select(e => new InsertOneModel<TEntity>(e));await _collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = isOrdered });}public async Task<IEnumerable<TResult>> AggregateLookupAsync<TForeign, TResult>(Expression<Func<TEntity, object>> localField,Expression<Func<TForeign, object>> foreignField,PipelineDefinition<TEntity, TResult> pipeline){return await _collection.Aggregate(pipeline).ToListAsync();}
}
🔐 事務處理與一致性
🔄 UnitOfWork 流程
public class OrderAppService : ApplicationService, IOrderAppService
{private readonly IMongoRepository<Order, Guid> _orderRepository;public OrderAppService(IMongoRepository<Order, Guid> orderRepository)=> _orderRepository = orderRepository;[UnitOfWork]public async Task<OrderDto> CreateAsync(CreateOrderDto input){var order = ObjectMapper.Map<Order>(input);await _orderRepository.InsertAsync(order);return ObjectMapper.Map<OrderDto>(order);}
}
🗺? 分片與模型設計
🔑 Shard Key 評估
sh.shardCollection("MyProjectDb.Orders", { CustomerId: 1, CreatedAt: 1 });
?? 復合鍵示例,可有效避免單一熱點。
🏗? 多模型建模
// 示例 $lookup 聚合
var results = await _context.Orders.Aggregate().Lookup<Address, LookupResult>(_context.Addresses,o => o.AddressId,a => a.Id,result => result.Addresses).ToListAsync();
🚀 性能優化指南
📈 索引創建
var orderCollection = context.Database.GetCollection<Order>("Orders");
await orderCollection.Indexes.CreateManyAsync(new[]
{new CreateIndexModel<Order>(Builders<Order>.IndexKeys.Ascending(o => o.CustomerId)),new CreateIndexModel<Order>(Builders<Order>.IndexKeys.Descending(o => o.CreatedAt))
});
? 批量寫入
await repository.BulkInsertAsync(largeOrderList, isOrdered: false);
🛠? 捕獲
BulkWriteException
并重試或補償處理。
📊 監控與可觀測性
🐢 慢查詢檢測(CommandSucceededEvent)
Configure<AbpMongoOptions>(options =>
{options.ClusterConfigurator = cb =>{cb.Subscribe<CommandSucceededEvent>(e =>{if (e.CommandName == "find" && e.Duration > TimeSpan.FromMilliseconds(100)){Logger.LogWarning("🐢 Slow MongoDB query: {Command}", e.Command);}});};
});
🌐 Application Insights
services.AddApplicationInsightsTelemetry();
var telemetryClient = serviceProvider.GetRequiredService<TelemetryClient>();
// 示例上報連接池使用率
var poolUsage = /* 讀取連接池狀態 */;
telemetryClient.TrackMetric("mongo.connectionPoolUsage", poolUsage);
🛠? Controller 全 CRUD 示例
[ApiController]
[Route("api/orders")]
public class OrdersController : AbpController
{private readonly IOrderAppService _service;public OrdersController(IOrderAppService service) => _service = service;[HttpPost][ProducesResponseType(typeof(OrderDto), 201)]public async Task<OrderDto> Create(CreateOrderDto input){return await _service.CreateAsync(input);}[HttpGet("{id}")][ProducesResponseType(typeof(OrderDto), 200)][ProducesResponseType(404)]public Task<OrderDto> Get(Guid id) => _service.GetAsync(id);[HttpPut("{id}")][ProducesResponseType(typeof(OrderDto), 200)]public Task<OrderDto> Update(Guid id, UpdateOrderDto input){input.Id = id;return _service.UpdateAsync(input);}[HttpDelete("{id}")][ProducesResponseType(204)]public Task Delete(Guid id) => _service.DeleteAsync(id);
}
🧪 單元測試示例(xUnit + Mongo2Go + DI)
public class OrderRepositoryTests : IClassFixture<ServiceFixture>
{private readonly IMongoRepository<Order, Guid> _repository;public OrderRepositoryTests(ServiceFixture fixture){_repository = fixture.ServiceProvider.GetRequiredService<IMongoRepository<Order, Guid>>();}[Fact]public async Task BulkInsert_Should_Insert_All_Orders(){var orders = Enumerable.Range(1, 10).Select(i => new Order { Id = Guid.NewGuid(), CustomerId = $"C{i}" }).ToList();await _repository.BulkInsertAsync(orders);var count = await _repository.GetCountAsync();Assert.Equal(10, count);}[Fact]public async Task Update_Should_Modify_Order(){var order = await _repository.InsertAsync(new Order { Id = Guid.NewGuid(), CustomerId = "C0" });order.CustomerId = "C0-Updated";await _repository.UpdateAsync(order);var fetched = await _repository.GetAsync(order.Id);Assert.Equal("C0-Updated", fetched.CustomerId);}[Fact]public async Task Delete_Should_Remove_Order(){var order = await _repository.InsertAsync(new Order { Id = Guid.NewGuid(), CustomerId = "C1" });await _repository.DeleteAsync(order.Id);await Assert.ThrowsAsync<EntityNotFoundException>(() => _repository.GetAsync(order.Id));}
}public class ServiceFixture : IDisposable
{public ServiceProvider ServiceProvider { get; }public ServiceFixture(){var runner = MongoDbRunner.Start();var services = new ServiceCollection();services.Configure<AbpMongoDbContextOptions<MyMongoDbContext>>(options =>{options.ConnectionStringName = "MongoDb";});services.AddMongoDbContext<MyMongoDbContext>(builder =>{builder.AddDefaultRepositories(includeAllEntities: false);});services.AddTransient(typeof(IMongoRepository<,>), typeof(MongoRepository<,>));services.AddSingleton(runner);ServiceProvider = services.BuildServiceProvider();}public void Dispose(){var runner = ServiceProvider.GetRequiredService<MongoDbRunner>();runner.Dispose();}
}
📚 附錄
- ABP 官方文檔:https://docs.abp.io
- MongoDB 索引指南:https://www.mongodb.com/docs/manual/indexes/
- Mongo2Go:https://github.com/Mongo2Go/Mongo2Go