1. @GlobalTransactional
和 @Transactional
是否沖突?
答:不沖突,它們可以協同工作,但作用域不同。
@Transactional
: 這是 Spring 提供的注解,用于管理單個數據源內的本地事務。在你當前的register
方法中,它確保了userRepository.save(user)
操作要么成功提交到auth_service_new
的數據庫 (mall_auth_new
),要么在發生異常時回滾(比如數據庫連接失敗、約束沖突等)。它只關心本服務內的數據庫操作原子性。@GlobalTransactional
: 這是 Seata 提供的注解,用于開啟一個分布式全局事務。它的目的是協調跨多個服務、多個數據源的操作,保證這些操作要么全部成功,要么全部回滾。Seata 的 AT 模式(你目前可能使用的模式,因其最簡單)通過代理數據源 (DataSourceProxy
) 自動記錄 SQL 執行前后的鏡像,并在需要時生成反向 SQL 來實現回滾。
如何協同? Seata AT 模式下的分支事務實際上是基于本地事務的。當 @GlobalTransactional
存在時,Seata 會攔截 @Transactional
管理的本地事務的提交/回滾。
- 本地事務提交時:Seata RM (Resource Manager) 會先向 TC (Transaction Coordinator) 注冊分支,報告本地事務執行成功(Phase 1),但不會立即真正提交物理數據庫連接。它會等待 TC 的統一指令。
- 全局事務提交時:TC 通知所有 RM 提交分支事務(Phase 2),這時 RM 才提交本地事務對應的物理數據庫連接。
- 全局事務回滾時:TC 通知所有 RM 回滾分支事務(Phase 2),RM 會根據之前記錄的 Undo Log 生成反向 SQL 來回滾本地數據庫的更改。
結論:同時使用兩者是常見且必要的。@Transactional
保證本地操作的原子性,而 @GlobalTransactional
則將這種原子性擴展到分布式環境下的多個參與者。
2. Seata (AT 模式) 和 RabbitMQ 是否沖突?
答:沖突!在期望跨服務數據庫原子性的場景下,同步調用 Seata AT 模式和異步發送 RabbitMQ 消息是矛盾的。
-
Seata AT 模式的局限性: Seata AT 模式主要設計用于同步調用場景下的數據庫操作。它無法管理消息隊列(如 RabbitMQ)的操作。也就是說,Seata 不能:
- 保證消息發送成功后,如果后續全局事務需要回滾,能把消息“撤回”。
- 保證消息被消費者成功處理后,如果全局事務需要回滾,能讓消費者的操作也回滾。
-
你當前代碼的問題:
AuthServiceImpl.register
方法在@GlobalTransactional
內執行userRepository.save(user)
。這個操作被 Seata 納入了全局事務分支。- 緊接著,它調用
messageService.sendUserCreatedEvent(savedUser)
發送 RabbitMQ 消息。這個發送操作本身不受 Seata 全局事務的管理。 user_moudle
中的UserEventListener
會異步地消費這個消息,并執行userService.createUserFromEvent
來寫入user_moudle
的數據庫 (mall_users
)。這個數據庫寫入操作也不在AuthServiceImpl.register
發起的那個 Seata 全局事務的范圍內。
-
后果:
- 如果在發送消息之后,
AuthServiceImpl.register
方法內部(或其調用的其他同步下游服務)發生了需要全局回滾的異常,auth_service_new
數據庫的User
記錄會被 Seata 回滾,但 RabbitMQ 消息已經發出去了,user_moudle
仍然會收到消息并嘗試創建用戶,導致數據不一致(user_moudle
有用戶,auth_service_new
沒有)。 - 如果在
user_moudle
消費消息并寫入數據庫時失敗,auth_service_new
的事務早已提交(因為消息是異步的),也無法回滾,同樣導致數據不一致。
- 如果在發送消息之后,
結論:如果你希望 auth_service_new
寫入 auth_user
表 和 user_moudle
寫入 ums_user
表這兩個數據庫操作具有原子性(要么都成功,要么都失敗),那么在 @GlobalTransactional
方法內部使用 RabbitMQ 進行跨服務通信是錯誤的設計。
3. 應該怎么做?
為了實現 auth_service_new
和 user_moudle
在用戶注冊時的數據庫寫入原子性,最佳實踐是使用同步調用,讓 user_moudle
的數據庫操作也成為 Seata 全局事務的一個分支。
修改步驟:
-
在
auth_service_new
中定義 Feign 客戶端調用user_moudle
:- 創建一個接口,例如
UserModuleClient.java
:
package com.mall.auth.client;import com.mall.auth.dto.UserSyncDTO; // 需要創建一個簡單的DTO傳遞必要信息 import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody;// name 指向 user_moudle 的服務名 (spring.application.name) @FeignClient(name = "userservice", path = "/api/internal/users") // 使用內部API路徑 public interface UserModuleClient {@PostMapping("/sync-create")void syncCreateUser(@RequestBody UserSyncDTO userSyncDTO); }
- 創建
UserSyncDTO.java
(可以簡化RegisterRequest
或UserCreatedEvent
的字段):
package com.mall.auth.dto;import lombok.Builder; import lombok.Data;@Data @Builder public class UserSyncDTO {private Long authUserId;private String username;private String email;private String phone;// 注意:不需要傳遞密碼,user_moudle 只存占位符 }
- 創建一個接口,例如
-
修改
AuthServiceImpl.register
方法:- 移除
messageService.sendUserCreatedEvent
調用。 - 注入并使用
UserModuleClient
進行同步調用。
// ... 其他注入 ... import com.mall.auth.client.UserModuleClient; import com.mall.auth.dto.UserSyncDTO; // ...@Service public class AuthServiceImpl implements AuthService {// ... 其他字段和構造函數 ...private final UserModuleClient userModuleClient;public AuthServiceImpl(// ... 其他參數 ...UserModuleClient userModuleClient, // 添加注入MessageService messageService) { // MessageService 仍然可以注入,但注冊時不在此調用// ... 其他賦值 ...this.userModuleClient = userModuleClient;this.messageService = messageService; // 保留注入}@GlobalTransactional(name = "user-register-tx", rollbackFor = Exception.class)@Override@Transactional // 本地事務仍然需要public User register(RegisterRequest registerRequest) {log.info("開始用戶注冊流程 (同步事務): {}", registerRequest.getUsername());// ... (省略之前的檢查邏輯) ...// 創建新用戶User user = User.builder()// ... (省略屬性設置) ....build();// 1. 保存到 auth_service_new 數據庫 (參與 Seata 分支事務)User savedUser = userRepository.save(user);log.info("AuthService: 用戶基礎信息保存成功: {}", savedUser.getUsername());// 2. 同步調用 user_moudle 保存用戶信息 (參與 Seata 分支事務)try {UserSyncDTO syncDTO = UserSyncDTO.builder().authUserId(savedUser.getId()).username(savedUser.getUsername()).email(savedUser.getEmail()).phone(savedUser.getPhone()).build();log.info("AuthService: 準備同步調用 UserModule 創建用戶...");userModuleClient.syncCreateUser(syncDTO); // 通過 Feign 調用log.info("AuthService: UserModule 同步調用成功");} catch (Exception e) {log.error("AuthService: 同步調用 UserModule 失敗: {}", e.getMessage(), e);// 拋出異常,觸發 @GlobalTransactional 回滾// 注意:需要確保 Feign 客戶端在調用失敗時能正確拋出異常被 Seata 捕獲// 可能需要配置 Feign 的 ErrorDecoderthrow new RuntimeException("同步用戶模塊失敗,觸發全局回滾", e);}// 發送消息的操作可以移到事務成功提交之后 (如果還需要的話)// 例如使用 Spring 的 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)// messageService.sendUserCreatedEvent(savedUser); // 從事務中移除return savedUser;}// ... 其他方法 ... }
- 移除
-
在
user_moudle
中添加對應的 Controller Endpoint:- 創建一個新的 Controller 或在
UserController
中添加一個內部接口(路徑建議與 Feign Client 對應,如/api/internal/users
)。
package com.user.controler;import com.user.dto.UserSyncDTO; // 引入對應的 DTO import com.user.service.UserService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;@Slf4j @RestController @RequestMapping("/api/internal/users") // 內部調用路徑 @RequiredArgsConstructor public class UserInternalController {private final UserService userService;@PostMapping("/sync-create")public ResponseEntity<Void> syncCreateUser(@RequestBody UserSyncDTO userSyncDTO) {log.info("UserModule: 收到同步創建用戶請求: authUserId={}", userSyncDTO.getAuthUserId());try {// 這里需要一個類似 createUserFromEvent 的方法,但參數是 UserSyncDTO// 或者直接調用現有的 createUserFromEvent,但需要適配 DTOuserService.createUserFromSync(userSyncDTO); // 假設有這個方法log.info("UserModule: 同步創建用戶成功: authUserId={}", userSyncDTO.getAuthUserId());return ResponseEntity.ok().build();} catch (Exception e) {log.error("UserModule: 同步創建用戶失敗: {}", e.getMessage(), e);// 拋出異常,讓 Seata 感知到錯誤,觸發全局回滾// Spring MVC 默認會將未捕獲的異常轉換為 500 錯誤,// Feign 客戶端默認會將 4xx/5xx 視為異常throw new RuntimeException("創建用戶記錄失敗", e);}} }
- 確保
user_moudle
的UserService
有一個處理UserSyncDTO
的方法,并且這個方法的數據庫操作會因為配置了DataSourceProxy
而自動加入到 Seata 的全局事務中。
- 創建一個新的 Controller 或在
總結:
@GlobalTransactional
和@Transactional
不沖突,前者依賴后者。- 在期望數據庫原子性的場景下,Seata AT 與異步消息(RabbitMQ)沖突。
- 最佳實踐:將跨服務需要原子性的數據庫操作改為同步 RPC 調用 (如 Feign),并在調用發起方的方法上使用
@GlobalTransactional
。移除事務內的異步消息發送。
這樣修改后,用戶注冊時對兩個數據庫的寫入就能真正實現分布式事務的原子性了。