這篇文章我們一起把記賬模塊從單體應用遷移到微服務架構中。記賬模塊的功能想必大家都已經了解了,主要是記錄用戶的收入和支出,以及對這些記錄的刪除修改和查詢等操作。具體的功能可以參考單體應用專欄,在這里就不多講了。我們現在一起開始遷移記賬模塊的代碼吧。
一、小修改
和前面的功能一樣,我們把記賬模塊的代碼從單體應用中抽離出來,放到微服務項目中。但是需要做一些小修改,這些修改包括前面幾篇文章中提到的將接口修改為標準的restful接口、將Controller繼承自ControllerBase
而不是BaseController
,以及調整路由地址等,在記賬模塊中,還要對新增記賬、修改記賬、刪除記賬這三個功能進行調整。我們先來看一下單體應用中這三個功能在Service層中的代碼:
// more code .../// <summary>
/// 收支記錄實現類
/// </summary>
public class IncomeExpenditureRecordImp : IIncomeExpenditureRecordServer
{// more code .../// <summary>/// 新增收支記錄/// </summary>/// <param name="incomeExpenditureRecord"></param>public void Add(IncomeExpenditureRecord incomeExpenditureRecord){//開啟事務using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction()){try{// 查找記錄范圍內的預算var budget = _sporeAccountingDbContext.Budgets.FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId&& x.StartTime <= incomeExpenditureRecord.RecordDate &&x.EndTime >= incomeExpenditureRecord.RecordDate&& x.IncomeExpenditureClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);if (budget != null){// 查詢分類var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications.FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (classification.Type== IncomeExpenditureTypeEnmu.Income){budget.Remaining -= incomeExpenditureRecord.AfterAmount;// 獲取包含支出記錄記錄日期的報表記錄var reports = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= incomeExpenditureRecord.RecordDate.Year &&x.Month >= incomeExpenditureRecord.RecordDate.Month &&x.ClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果沒有就說明程序還未將其寫入報表,那么就不做任何處理for (int i = 0; i < reports.Count(); i++){var report = reports.ElementAt(i);report.Amount += incomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(report);}}_sporeAccountingDbContext.Budgets.Update(budget);}_sporeAccountingDbContext.IncomeExpenditureRecords.Add(incomeExpenditureRecord);_sporeAccountingDbContext.SaveChanges();//提交事務transaction.Commit();}catch (Exception e){//回滾事務transaction.Rollback();throw;}}}/// <summary>/// 刪除收支記錄/// </summary>/// <param name="incomeExpenditureRecordId"></param>/// <returns></returns>public void Delete(string incomeExpenditureRecordId){//開啟事務using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction()){try{var incomeExpenditureRecord = _sporeAccountingDbContext.IncomeExpenditureRecords.FirstOrDefault(x => x.Id == incomeExpenditureRecordId);if (incomeExpenditureRecord != null){// 查找記錄范圍內的預算var budget = _sporeAccountingDbContext.Budgets.FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId&& x.StartTime <= incomeExpenditureRecord.RecordDate &&x.EndTime >= incomeExpenditureRecord.RecordDate&& x.IncomeExpenditureClassificationId == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (budget != null){// 查詢分類var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications.FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (classification.Type== IncomeExpenditureTypeEnmu.Income){budget.Remaining += incomeExpenditureRecord.AfterAmount;// 獲取包含支出記錄記錄日期的報表記錄var reports = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= incomeExpenditureRecord.RecordDate.Year &&x.Month >= incomeExpenditureRecord.RecordDate.Month &&x.ClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果沒有就說明程序還未將其寫入報表,那么就不做任何處理for (int i = 0; i < reports.Count(); i++){var report = reports.ElementAt(i);report.Amount -= incomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(report);}}_sporeAccountingDbContext.Budgets.Update(budget);}_sporeAccountingDbContext.IncomeExpenditureRecords.Remove(incomeExpenditureRecord);_sporeAccountingDbContext.SaveChanges();//提交事務transaction.Commit();}}catch (Exception e){//回滾事務transaction.Rollback();throw;}}}/// <summary>/// 修改收支記錄/// </summary>/// <param name="incomeExpenditureRecord"></param>/// <returns></returns>public void Update(IncomeExpenditureRecord incomeExpenditureRecord){using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction()){try{// 查詢原記錄var oldIncomeExpenditureRecord = _sporeAccountingDbContext.IncomeExpenditureRecords.FirstOrDefault(x => x.Id == incomeExpenditureRecord.Id);// 查找記錄范圍內的預算var budget = _sporeAccountingDbContext.Budgets.FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId&& x.StartTime <= incomeExpenditureRecord.RecordDate &&x.EndTime >= incomeExpenditureRecord.RecordDate&& x.IncomeExpenditureClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);if (budget != null){// 查詢分類var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications.FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (classification.Type== IncomeExpenditureTypeEnmu.Income){//如果是支出,需要減去原來的金額budget.Remaining = (budget.Amount - incomeExpenditureRecord.AfterAmount);// 根據舊的支出記錄判斷是否修改了記錄日期// 如果是修改了記錄日期,那么就將原記錄日期所在的報表對應的分類金額減去,將新記錄日期所在報表對應的分類金額加上if (oldIncomeExpenditureRecord.RecordDate != incomeExpenditureRecord.RecordDate){// 獲取包含支出記錄記錄日期的報表記錄var oldReports = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= oldIncomeExpenditureRecord.RecordDate.Year &&x.Month >= oldIncomeExpenditureRecord.RecordDate.Month &&x.ClassificationId == oldIncomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果沒有就說明程序還未將其寫入報表,那么就不做任何處理for (int i = 0; i < oldReports.Count(); i++){var oldReport = oldReports.ElementAt(i);oldReport.Amount -= oldIncomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(oldReport);}// 獲取包含支出記錄記錄日期的報表記錄var newReport = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= incomeExpenditureRecord.RecordDate.Year &&x.Month >= incomeExpenditureRecord.RecordDate.Month &&x.ClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果沒有就說明程序還未將其寫入報表,那么就不做任何處理for (int i = 0; i < newReport.Count(); i++){var report = newReport.ElementAt(i);report.Amount += incomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(report);}}}else{//如果是收入,需要加上原來的金額budget.Remaining = (budget.Amount + incomeExpenditureRecord.AfterAmount);}_sporeAccountingDbContext.Budgets.Update(budget);}oldIncomeExpenditureRecord.AfterAmount = incomeExpenditureRecord.AfterAmount;oldIncomeExpenditureRecord.BeforAmount = incomeExpenditureRecord.BeforAmount;oldIncomeExpenditureRecord.RecordDate = incomeExpenditureRecord.RecordDate;oldIncomeExpenditureRecord.Remark = incomeExpenditureRecord.Remark;oldIncomeExpenditureRecord.CurrencyId = incomeExpenditureRecord.CurrencyId;oldIncomeExpenditureRecord.IncomeExpenditureClassificationId =incomeExpenditureRecord.IncomeExpenditureClassificationId;_sporeAccountingDbContext.IncomeExpenditureRecords.Update(oldIncomeExpenditureRecord);_sporeAccountingDbContext.SaveChanges();//提交事務transaction.Commit();}catch (Exception e){//回滾事務transaction.Rollback();throw;}}}// more code ...
}
在上面的代碼中,我們看到這三個功能,都有兩個共同的操作:一個是對預算的操作,另一個是對報表的操作。在這個單體應用代碼中,我們都是直接調用預算和報表的DbSet
進行操作,并且啟用了事務來保證數據的一致性。這里看似合理,但是實際上并不符合微服務的設計原則,因為微服務應該是獨立的,不能直接操作其他服務的數據,同時我們也要保證方法的單一原則,因此在這三個方法里操作其他數據是不合理的。并且,如果我們存儲記賬記錄時,預算數據或者報表數據沒有存儲成功,就會觸發回滾操作,這樣就會導致記賬記錄沒有存儲成功,對于這種設計來不符合用戶體驗,因此我們需要對這三個方法進行修改。
我們要做的是將預算增刪操作抽離出來,讓新增記賬、修改記賬、刪除記賬這三個功能在每次存儲數據成功后,發送MQ消息,然后預算訂閱MQ消息,進行相應的增刪改操作。這樣就可以保證記賬模塊的獨立性,同時也能保證數據的一致性,即使是預算服務出現問題,也不會影響到記賬模塊的正常運行。修改后的Service層代碼如下:
// more code .../// <summary>
/// 記賬服務實現類
/// </summary>
public class AccountingServerImpl : IAccountingServer
{/// <summary>/// RabbitMQ消息處理/// </summary>private readonly RabbitMqMessage _rabbitMqMessage;/// <summary>/// 新增記賬/// </summary>/// <param name="accountBookId">賬本ID</param>/// <param name="request">記賬添加請求</param>/// <returns></returns>public long Add(long accountBookId, AccountingAddRequest request){// more code ...//通過MQ發送記賬數據到消息隊列,從預算中扣除金額MqPublisher mqPublisher = new MqPublisher(accounting.AfterAmount.ToString("F2"),MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey,MqQueue.BudgetQueue, MessageType.BudgetDeduct, ExchangeType.Direct);_rabbitMqMessage.SendAsync(mqPublisher).Start();// 返回新增的記賬IDreturn accounting.Id;}/// <summary>/// 刪除記賬/// </summary>/// <param name="accountBookId">賬本ID</param>/// <param name="id">記賬ID</param>public void Delete(long accountBookId, long id){// more code ...//通過MQ發送刪除記賬數據到消息隊列,把預算中的金額恢復MqPublisher mqPublisher = new MqPublisher(accounting.AfterAmount.ToString("F2"),MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey,MqQueue.BudgetQueue, MessageType.BudgetAdd, ExchangeType.Direct);_rabbitMqMessage.SendAsync(mqPublisher).Start();}/// <summary>/// 修改記賬/// </summary>/// <param name="accountBookId">賬本ID</param>/// <param name="request">修改請求</param>public void Edit(long accountBookId, AccountingEditRequest request){// more code ...// 通過MQ發送修改記賬數據到消息隊列,更新預算中的金額MqPublisher mqPublisher = new MqPublisher(amountDifference.ToString("F2"),MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey,MqQueue.BudgetQueue, MessageType.BudgetUpdate, ExchangeType.Direct);_rabbitMqMessage.SendAsync(mqPublisher).Start();}// more code ...
}
在上面的代碼中,我們對新增記賬、刪除記賬、修改記賬這三個方法進行了修改。新增記賬時,我們將記賬數據發送到MQ消息隊列中,預算服務會訂閱這個消息,并進行相應的扣款操作。刪除記賬時,我們同樣發送消息到MQ消息隊列中,預算服務會將金額恢復到預算中。修改記賬時,我們計算出原金額與新金額的差額,并發送到MQ消息隊列中,預算服務會根據差額進行相應的更新操作。下面是新增的預算訂閱MQ消息處理類的代碼:
using SP.Common.Message.Model;
using SP.Common.Message.Mq;
using SP.Common.Message.Mq.Model;
using SP.FinanceService.Models.Entity;
using SP.FinanceService.Models.Enumeration;
using SP.FinanceService.Service;namespace SP.FinanceService.Mq;/// <summary>
/// Budget 消息消費者服務
/// </summary>
public class BudgetConsumerService : BackgroundService
{/// <summary>/// RabbitMq 消息/// </summary>private readonly RabbitMqMessage _rabbitMqMessage;/// <summary>/// 日志記錄器/// </summary>private readonly ILogger<BudgetConsumerService> _logger;/// <summary>/// 預算服務/// </summary>private readonly IBudgetServer _budgetService;/// <summary>/// Budget 消息消費者服務/// </summary>/// <param name="rabbitMqMessage"></param>/// <param name="logger"></param>/// <param name="budgetService"></param>public BudgetConsumerService(RabbitMqMessage rabbitMqMessage, ILogger<BudgetConsumerService> logger,IBudgetServer budgetService){_logger = logger;_rabbitMqMessage = rabbitMqMessage;_budgetService = budgetService;}/// <summary>/// RabbitMq 消息/// </summary>/// <param name="stoppingToken"></param>/// <returns></returns>protected override async Task ExecuteAsync(CancellationToken stoppingToken){MqSubscriber subscriber = new MqSubscriber(MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey, MqQueue.BudgetQueue);await _rabbitMqMessage.ReceiveAsync(subscriber, async message =>{// 驗證消息MqMessage mqMessage = ValidateMessage(message);if (mqMessage == null) return;// 獲取當前預算List<Budget> budgets = GetCurrentBudgets();if (budgets == null || budgets.Count == 0) return;decimal amount = decimal.Parse(message.Body);// 根據消息類型處理預算switch (mqMessage.Type){case MessageType.BudgetAdd:_logger.LogInformation("接收到預算增加處理消息, {Message}", mqMessage);UpdateBudgetsByAmount(budgets, amount, true);break;case MessageType.BudgetUpdate:_logger.LogInformation("接收到預算更新消息, {Message}", mqMessage);UpdateBudgetsByAmount(budgets, amount, true);break;case MessageType.BudgetDeduct:_logger.LogInformation("接收到預算扣除消息, {Message}", mqMessage);UpdateBudgetsByAmount(budgets, amount, false);break;default:_logger.LogWarning("未知的消息類型: {Type}", mqMessage.Type);break;}await Task.CompletedTask;});}/// <summary>/// 驗證消息/// </summary>/// <param name="message">原始消息</param>/// <returns>驗證后的 MqMessage,如果驗證失敗返回 null</returns>private MqMessage ValidateMessage(object message){MqMessage mqMessage = message as MqMessage;if (mqMessage == null){_logger.LogError("消息轉換失敗");return null;}return mqMessage;}/// <summary>/// 獲取當前預算/// </summary>/// <returns>當前預算列表,如果沒有可用預算返回空列表</returns>private List<Budget> GetCurrentBudgets(){List<Budget> budgets = _budgetService.QueryCurrentBudgets();if (budgets == null || budgets.Count == 0){_logger.LogInformation("當前沒有可用的預算,不執行處理");return new List<Budget>();}return budgets;}/// <summary>/// 根據金額更新預算/// </summary>/// <param name="budgets">預算列表</param>/// <param name="amount">金額</param>/// <param name="isAdd">是否為增加操作,true為增加,false為扣除</param>private void UpdateBudgetsByAmount(List<Budget> budgets, decimal amount, bool isAdd){string operation = isAdd ? "增加" : "扣除";decimal operationAmount = isAdd ? amount : -amount;// 更新月度預算UpdateBudgetByPeriod(budgets, PeriodEnum.Month, operationAmount, operation);// 更新年度預算UpdateBudgetByPeriod(budgets, PeriodEnum.Year, operationAmount, operation);// 更新季度預算UpdateBudgetByPeriod(budgets, PeriodEnum.Quarter, operationAmount, operation);// 保存更改到數據庫_budgetService.UpdateBudgets(budgets);}/// <summary>/// 根據周期更新預算/// </summary>/// <param name="budgets">預算列表</param>/// <param name="period">預算周期</param>/// <param name="amount">金額變化</param>/// <param name="operation">操作類型描述</param>private void UpdateBudgetByPeriod(List<Budget> budgets, PeriodEnum period, decimal amount, string operation){Budget budget = budgets.FirstOrDefault(b => b.Period == period);if (budget != null){budget.Amount += amount;string periodName = GetPeriodName(period);_logger.LogInformation("{PeriodName}預算{Operation}成功,{Operation}金額: {Amount}", periodName, operation, operation, budget.Amount);}}/// <summary>/// 獲取周期名稱/// </summary>/// <param name="period">預算周期</param>/// <returns>周期名稱</returns>private string GetPeriodName(PeriodEnum period){return period switch{PeriodEnum.Month => "月度",PeriodEnum.Year => "年度",PeriodEnum.Quarter => "季度",_ => "未知"};}
}
在上面的代碼中,我們創建了一個BudgetConsumerService
類,它繼承自BackgroundService
,用于處理預算相關的MQ消息。我們在ExecuteAsync
方法中訂閱了MQ消息,并根據消息類型進行相應的處理。我們還添加了一些日志記錄,以便于調試和監控。
對于調用報表的部分,我們暫時先將這部分代碼刪除掉,因為這個設計其實是不符合業務邏輯的,我們的報表都是定時生成的,而不是實時生成的,因此對報表的修改其實是沒必要的,因此我們在這里不對報表進行任何操作。等到后面我們實現了報表模塊后,再來處理報表相關的邏輯。
二、總結
在這篇文章中,我們將記賬模塊從單體應用遷移到微服務架構中,并對新增記賬、刪除記賬、修改記賬這三個功能進行了調整。我們將預算的增刪改操作抽離出來,讓記賬模塊通過MQ消息與預算服務進行交互,從而實現了模塊的獨立性和數據的一致性。我們還創建了一個BudgetConsumerService
類,用于處理預算相關的MQ消息。
通過這些修改,我們使得記賬模塊能夠更好地適應微服務架構的設計原則,同時也提高了系統的可維護性和可擴展性。接下來,我們將繼續實現記賬模塊的其他功能,并將其與其他模塊進行集成。