利用事務鉤子函數解決業務異步發送問題
- 一、問題背景
- 二、實現方案
- 1、生產者代碼
- 2、消費者代碼
- 三、測試與驗證
- 1、未開啟事務場景
- 2、開啟事務場景
- 四、項目結構及源碼
一、問題背景
在某項業務中,需要在事務完成后,寫入日志到某數據庫中。需要要么都成功,要么都失敗,而且需要異步實現。在不考慮分布式事務框架下,如何實現這個業務功能呢?
二、實現方案
前提需要啟動kafka_2.12-3.9.1內置的zookeeper和kafka。
在kafka創建好topic
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testTopic
可以利用事務鉤子函數實現異步發送,保證同時成功和失敗。注冊事務鉤子,在事務提交或回滾后執行。
if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 調用異步方法發送日志System.out.println("事務未開啟");// 異步發送日志(解決由于同一個類內部方法調用不會創建代理,所以aop不生效,則@Async注解無作用問題)kafkaSender.send();} else {// 注冊事務鉤子,在事務提交或回滾后執行TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCompletion(int status) {System.out.println("事務開啟并執行完畢");// 異步發送日志(解決由于同一個類內部方法調用不會創建代理,所以aop不生效,則@Async注解無作用問題)kafkaSender.send();}});}
1、生產者代碼
KafkaController
@RestController
public class KafkaController {@Autowiredprivate TestService testService;@GetMapping("/send/{type}")public String sendMessageToKafka(@PathVariable int type) {if(type == 1){// 模擬執行事務未開啟的業務邏輯testService.executeServiceNoTranscational(type);}else{// 模擬執行事務開啟的業務邏輯testService.executeService(type);}//模擬還要執行其他的serviceSystem.out.println("執行其他業務邏輯");return "ok";}
}
KafkaSender
@Component
@Slf4j
public class KafkaSender {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@Asyncpublic void send() {System.out.println("異步發送消息");Map<String, String> messageMap = new HashMap<>();messageMap.put("log", "日志:執行完成");ObjectMapper objectMapper = new ObjectMapper();String data;try {data = objectMapper.writeValueAsString(messageMap);} catch (JsonProcessingException e) {throw new RuntimeException(e);}String key = String.valueOf(UUID.randomUUID());//kakfa的推送消息方法有多種,可以采取帶有任務key的,也可以采取不帶有的(不帶時默認為null)this.send("testTopic", key, data);}public void send(String topic, String key, String data) {//發送消息CompletableFuture<SendResult<String, Object>> completable = kafkaTemplate.send(topic, key, data);completable.whenCompleteAsync((result, ex) -> {if (null == ex) {log.info(topic + "生產者發送消息成功:" + result.toString());} else {log.info(topic + "生產者發送消息失敗:" + ex.getMessage());}});}
}
LogService注冊鉤子函數,異步發送
@Service
public class LogService {@AutowiredKafkaSender kafkaSender;public void sendLogAsync() {if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 調用異步方法發送日志System.out.println("事務未開啟");// 異步發送日志(解決由于同一個類內部方法調用不會創建代理,所以aop不生效,則@Async注解無作用問題)kafkaSender.send();} else {// 注冊事務鉤子,在事務提交或回滾后執行TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCompletion(int status) {System.out.println("事務開啟并執行完畢");// 異步發送日志(解決由于同一個類內部方法調用不會創建代理,所以aop不生效,則@Async注解無作用問題)kafkaSender.send();}});}}
}
TestService
@Service
public class TestService {@AutowiredLogService logService;@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)public void executeService(int type){System.out.println("執行業務邏輯");/*System.out.println("業務執行完成");if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 調用異步方法發送日志System.out.println("事務未開啟");}else{System.out.println("事務已開啟");}*/logService.sendLogAsync();}public void executeServiceNoTranscational(int type){System.out.println("業務執行完成");if (!TransactionSynchronizationManager.isSynchronizationActive()) {// 調用異步方法發送日志System.out.println("事務未開啟");}else{System.out.println("事務已開啟");}logService.sendLogAsync();}
}
2、消費者代碼
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaReceiver listener() {return new KafkaReceiver();}
}
KafkaReceiver
@Component
@Slf4j
public class KafkaReceiver {/*** 下面的主題是一個數組,可以同時訂閱多主題,只需按數組格式即可,也就是用","隔開*/@KafkaListener(topics = {"testTopic"})public void receive(ConsumerRecord<?, ?> record){log.info("消費者收到的消息key: " + record.key());log.info("消費者收到的消息value: " + record.value().toString());}
}
三、測試與驗證
1、未開啟事務場景
生產者執行結果
消費者執行結果
2、開啟事務場景
生產者執行結果
消費者執行結果
四、項目結構及源碼
源碼下載,歡迎star!