1. 安裝Kafka
-
下載Kafka:從Kafka官網下載最新版本的Kafka。
-
解壓并啟動:
-
解壓Kafka文件后,進入
bin
目錄。 -
啟動ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
。 -
啟動Kafka:
./kafka-server-start.sh ../config/server.properties
。 -
確認啟動成功后,Kafka服務即可使用。
-
2. 創建Spring Boot項目
-
在Spring Initializr創建一個新項目,選擇需要的依賴(如Spring Web和Spring Kafka)。
-
下載并解壓項目,導入到IDE中。
3. 添加Kafka依賴
在pom.xml
中添加以下依賴:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
這個依賴會自動配置Spring Kafka的相關組件。
4. 配置Kafka
在application.yml
中添加Kafka的配置:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
這里配置了Kafka服務器地址、消費者組、序列化器等。
5. 創建Kafka生產者
-
創建生產者配置類:
@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
-
創建生產者服務類:
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(success -> System.out.println("Message sent successfully: " + message),failure -> System.err.println("Failed to send message: " + failure.getMessage()));}
}
通過KafkaTemplate
發送消息。
6. 創建Kafka消費者
-
創建消費者服務類:
@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}
使用@KafkaListener
注解監聽指定主題并接收消息。
7. 測試應用
-
創建一個控制器,用于發送消息:
@RestController
public class KafkaController {private final KafkaProducerService kafkaProducerService;public KafkaController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducerService.sendMessage("my-topic", message);return "Message sent";}
}
-
啟動Spring Boot應用,通過訪問
http://localhost:8080/send?message=HelloKafka
發送消息。
通過以上步驟,你可以在Spring Boot中成功集成并使用Kafka。
?
?
?
?