一丶IDEA創建一個空項目
????????
二丶添加相關依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.12</version></dependency></dependencies>
三丶編寫簡單生產者
/*** 簡單的生產者消費者* @param message*/@GetMapping("/kafka/normal/message")public void sendNormalMessage(@RequestParam("message") String message) {log.info("======================="+message);kafkaTemplate.send("sb_topic", 0, System.currentTimeMillis(), "key1", message);}
四丶編寫簡單消費者
@Component
public class KafkaConsumer {//監聽消費//@KafkaListener(topics = {"sb_topic"})@KafkaListener(topics = {"sb_topic","callbackOne_topic"}, groupId = "testGroup")public void onNormalMessage(ConsumerRecords<String,Object> records) {for (ConsumerRecord<String, Object> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}}}
這里有個坑,ConsumerRecord如果不加s會報錯,我之間在借鑒他人代碼的時候出現的,不知道是不是版本問題。我也剛用kafka,正在研究哈哈,見諒見諒;
postman請求:
成功:
結尾:目前只是一個簡單的demo,后續我在完善,我也正在學習這玩意兒,哈哈,喜歡的朋友點個贊收藏吧;