? ? ? ?
目錄
一、引入Kafka的依賴
二、配置Kafka
三、創建主題
1、自動創建(不推薦)
2、手動動創建
四、生產者代碼
五、消費者代碼 ?
六、常用的KafKa的命令
? ? ? ? Kafka是一個高性能、分布式的消息發布-訂閱系統,被廣泛應用于大數據處理、實時日志分析等場景。Spring Boot作為目前最流行的Java開發框架之一,其簡潔的配置和豐富的工具使得與Kafka的集成變得更加容易。本文將介紹如何使用Spring Boot整合Kafka,實現高效的數據處理和消息傳遞。
一、引入Kafka的依賴
? ? ? ?<dependency>
? ? ? ? ? ? <groupId>org.springframework.cloud</groupId>
? ? ? ? ? ? <artifactId>spring-cloud-starter-stream-kafka</artifactId>
? ? ? ? </dependency>
二、配置Kafka
spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,這里有三個地址,用逗號分隔。listener:ack-mode: manual_immediate #設置消費者的確認模式為manual_immediate,表示消費者在接收到消息后立即手動確認。concurrency: 3 #設置消費者的并發數為3missing-topics-fatal: false #設置為false,表示如果消費者訂閱的主題不存在,不會拋出異常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 設置消息鍵的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #設置消息值的序列化器acks: 1 #一般就是選擇1,兼顧可靠性和吞吐量 ,如果想要更高的吞吐量設置為0,如果要求更高的可靠性就設置為-1consumer:auto-offset-reset: earliest #設置為"earliest"表示將從最早的可用消息開始消費,即從分區的起始位置開始讀取消息。enable-auto-commit: false #禁用了自動提交偏移量的功能,為了避免出現重復數據和數據丟失,一般都是手動提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 設置消息鍵的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #設置消息值的反序列化器
注:kafka的acks有三個值,可以根據實際情況和需求平衡消息系統的吞吐量和數據安全性,來選擇對應的值。
acks=0
:這是最不可靠的模式。當設置為acks=0
時,生產者在發送消息后不會等待任何服務器端的確認響應。這種模式下,生產者可以迅速繼續發送下一批消息,效率最高,但風險也最大。如果在此模式下發生網絡問題或broker故障,發送的消息可能會永久丟失,生產者無法得知消息是否成功到達Kafka broker。因此,這種配置適合于能夠容忍少量數據丟失的場景,例如實時數據分析或生成非關鍵的實時報表。acks=1
:這是默認的配置模式,也是一種折衷方案。在這種模式下,生產者會等待分區的領導者節點(leader)確認消息已經成功寫入磁盤,才會發送確認信息給生產者。這提高了數據的安全性,因為只要領導者節點保存了消息,即使跟隨者(replicas)沒有及時同步,消息也不會丟失。然而,如果領導者在同步給所有追隨者之前崩潰,那么尚未同步的副本將無法獲取該消息,仍然存在消息丟失的風險。acks=all或-1
:這是最可靠的模式。在這個模式下,生產者不僅需要領導者節點確認,還會等待所有同步副本(In-sync replicas, ISR)都確認寫入消息后才會收到確認。這極大地增強了數據的持久性保證,確保了即使在多個節點故障的情況下,消息也不會丟失。此模式適用于數據可靠性要求非常高的場景,如金融交易系統或重要的日志記錄
三、創建主題
? ? 1、自動創建(不推薦)
? ? ? ? ?不存在的主題,會自動創建,分區數和副本數均為默認值。而默認值可能會不符合某些場景的要求。
在kafka的安裝目錄conf目錄下找到該配置文件server.properties,添加如下配置:
num.partitions=3 #默認3個分區
auto.create.topics.enable=true #開啟自動創建主題
default.replication.factor=3 #默認3個副本
? ? 2、手動動創建
? ? ? ? ?在kafka的安裝目錄bin目錄下,執行如下命令:?
//創建一個有三個分區和三個副本,名為zhuoye的主題
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 ?--topic zhuoye?
四、生產者代碼
@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "zhuoye";@Overridepublic void queryECSMetricInfo() {//發送到kafka的消息集合,因為使用了多線程,并且在多線程中往該集合進行添加操作,所以需要線程安全的List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//獲取上次查詢時間Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {//查詢出所有的運行中的實例List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定義計數器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍歷查詢for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {//獲取內網流出帶寬,并將結果封裝到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error("獲取ECS的指標數據-多線程處理任務異常!", e);} finally {latch.countDown();}});}//等待任務執行完畢latch.await();//將最終的消息集合發送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName, messages.get(i));}}} catch (Exception e) {flag = false;log.error("獲取ECS的指標數據失敗", e);}//更新記錄上次查詢時間if (flag) {QueryTimeRecord queryTimeRecord = new QueryTimeRecord();queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //開始時間往前推1分鐘queryTimeRecordMapper.updateByBelongId(queryTimeRecord);}}
這個時候,如果你想看有沒有把消息發送到kafka的指定主題可以使用如下命令:
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye?
五、消費者代碼 ?
@Slf4j
@Component
public class KafkaConsumer {// 消費監聽@KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//處理數據,存入openTsDb.................................ack.acknowledge();//手動提交}catch (Exception e){log.error("kafa-topic【zhuoye】消費阿里云指標源消息【失敗】");log.error(e.getMessage());}}
}
六、常用的KafKa的命令
//創建主題
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 ?--topic zhuoye
//查看kafka是否接收對應的消息
?kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
// 修改kafka-topic分區數
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
// 查看topic分區數
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
// 查看用戶組消費情況
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe