Kafka入門4.0.0版本(基于Java、SpringBoot操作)
一、kafka概述
Kafka最初是由LinkedIn公司開發的,是一個高可靠、高吞吐量、低延遲的分布式發布訂閱消息系統,它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會,隨后成為Apache的頂級開源項目。主要特點有:
- 為發布和訂閱提供高吞吐量
- 消息持久化
- 分布式
- 消費消息采用Pull模式
- 支持在線和離線場景
本次采用最新的kafka版本4.0.0,Kafka 4.0 最引人矚目的變化之一,當屬其默認運行在 KRaft(Kafka Raft)模式下,徹底擺脫了對 Apache ZooKeeper 的依賴。在 Kafka 的發展歷程中,ZooKeeper 曾是其核心組件,負責協調分布式系統中的元數據管理、Broker 注冊、主題分區分配等關鍵任務。然而,隨著 Kafka 功能的不斷豐富與用戶規模的持續擴大,ZooKeeper 逐漸成為系統部署和運維中的一個復雜性來源,增加了運營成本與管理難度。
KRaft 模式的引入,標志著 Kafka 在架構上的自我進化達到了一個新高度。通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數據管理內嵌于自身體系,實現了對 ZooKeeper 的無縫替代。這一轉變帶來了多方面的顯著優勢:
簡化部署與運維:運維人員無需再為維護 ZooKeeper 集群投入額外精力,降低了整體運營開銷。新架構減少了系統的復雜性,使得 Kafka 的安裝、配置和日常管理變得更加直觀和高效。
增強可擴展性:KRaft 模式下,Kafka 集群的擴展性得到了進一步提升。新增 Broker 節點的加入流程更加簡便,能夠更好地適應大規模數據處理場景下對系統資源動態調整的需求。
提升系統性能與穩定性:去除 ZooKeeper 這一外部依賴后,Kafka 在元數據操作的響應速度和一致性方面表現出色。尤其是在高并發寫入和讀取場景下,系統的穩定性和可靠性得到了增強,減少了因外部組件故障可能導致的單點問題。
- 之前的架構
- 現在的架構
kafka消費模型
不同消費者組可以消費全量的消息,相同消費者組內的消費者只能消費一部分。
kafka基本概念
Producer(生產者)
消息的生產者,負責將消息發送到Kafka集群中。
Consumer(消費者)
消息的消費者,負責從Kafka集群中讀取并處理消息
Broker(服務代理節點)
Kafka集群中的一個或多個服務器,負責存儲和轉發消息。
Topic(主題)
Kafka中的消息以主題為單位進行歸類,生產者發送消息到特定主題,消費者訂閱并消費這些主題的消息。
Partition(分區)
每個主題可以細分為多個分區,分區是Kafka存儲消息的物理單位,每個分區可以看作是一個有序的、不可變的消息序列。
Replica(副本)
Kafka為每個分區引入了多副本機制,以提高數據的安全性和可靠性。副本分為leader和follower,其中leader負責處理讀寫請求,follower負責從leader同步數據。
Consumer Group(消費者組)
由多個消費者組成,消費者組內的消費者共同消費同一個主題的消息,但每個消費者只負責消費該主題的一個或多個分區,避免消息重復消費。
kraft
通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數據管理內嵌于自身體系,實現了對 ZooKeeper 的無縫替代
kafka發送端采用push模式
kafka消費端采用pull模式訂閱并消費消息
Kafka的工作原理
可以概括為以下幾個步驟:
-
消息發布: 生產者將消息發送到Kafka集群的特定主題,并可以選擇發送到該主題的哪個分區。如果未指定分區,Kafka會根據負載均衡策略自動選擇分區。
-
消息存儲: Kafka將接收到的消息存儲在磁盤上的分區中,每個分區都是一個有序的消息序列。Kafka使用順序寫入和零拷貝技術來提高寫入性能,并通過多副本機制確保數據的安全性和可靠性。
- 消息消費: 消費者組中的消費者從Kafka集群中訂閱并消費消息。每個消費者負責消費一個或多個分區中的消息,并確保消息至少被消費一次。消費者可以使用拉(Pull)模式或推(Push)模式從Kafka中拉取消息。
-
負載均衡: Kafka通過ZooKeeper維護集群的元數據信息,包括分區和消費者的對應關系。當消費者數量或分區數量發生變化時,Kafka會重新分配分區給消費者,以實現負載均衡。
-
容錯機制: Kafka通過多副本機制實現容錯。當leader副本出現故障時,Kafka會從ISR(In-Sync Replicas)集合中選擇一個新的leader副本繼續對外提供服務。同時,Kafka還提供了多種可靠性級別供用戶選擇,以滿足不同的業務需求。
kafka特點
一、Kafka的持久化機制
Kafka的持久化機制主要涉及消息的存儲和復制。Kafka以日志的形式存儲消息,每個主題(Topic)被劃分為多個分區(Partition),每個分區中的消息按照順序進行存儲。Kafka使用多個副本(Replica)來保證消息的持久性和可靠性,每個分區的消息會被復制到多個副本中,以防止數據丟失。此外,Kafka還允許根據配置的保留策略來保留已消費的消息一段時間,以便在需要時進行檢索和恢復。
Kafka的副本機制是其實現高可用性和數據持久性的重要基石。每個主題的每個分區都配置有多個副本,這些副本分散保存在不同的Broker上,從而能夠對抗部分Broker宕機帶來的數據不可用問題。Kafka的副本機制包括領導者副本(Leader Replica)和追隨者副本(Follower Replica):
領導者副本:負責處理所有的讀寫請求,包括生產者的消息寫入和消費者的消息讀取。
追隨者副本:從領導者副本異步拉取消息,并寫入到自己的提交日志中,從而實現與領導者副本的同步。追隨者副本不對外提供服務,只作為數據的冗余備份。
Kafka還引入了ISR(In-Sync Replicas)機制,即與領導者副本保持同步的副本集合。只有處于ISR中的副本才能參與到消息的寫入和讀取過程中,以確保數據的一致性和可靠性。當某個副本與領導者副本的同步延遲超過一定的閾值時,它會被踢出ISR,直到同步恢復正常。
二、Kafka的數據一致性
Kafka通過多個機制來確保數據的一致性,包括副本同步、ISR機制、生產者事務和消費者事務等:
副本同步:確保主副本將數據同步到所有副本的過程,在副本同步完成之前,生產者才會認為消息已經被成功寫入。
ISR機制:通過動態調整ISR列表中的副本,確保只有可靠的副本參與到數據的讀寫操作,從而提高數據的一致性和可靠性。
生產者事務:Kafka的生產者事務機制可以確保消息的Exactly-Once語義,即消息不會被重復寫入或丟失。生產者事務將消息的發送和位移提交等操作放在同一個事務中,一旦事務提交成功,就意味著消息已經被成功寫入,并且對應的位移也已經提交。
消費者事務:雖然Kafka的消費者通常不直接支持事務但消費者可以通過提交位移(Offset)來確保消息的正確消費。消費者事務將消息的拉取和位移提交等操作放在同一個事務中,以確保消息不會被重復消費或丟失。
二、kafka應用
2.1 win11安裝kafka4.0.0
下載地址:https://kafka.apache.org/downloads 下載最后一個kafka-2.13-4.0.0.tgz
下載好之后,把這個壓縮包解壓就行了,然后找到config下面的server.properties
找到log.dirs改成自己電腦上的目錄
log.dirs=E:\\runSoft\\kafka\\data
先打開命令行,進入到bin下面的windows目錄下
命令
kafka-storage.bat random-uuid
先獲取uuid,我的uuid為ANVnC_s-QYGJF1C7wu9Aww
命令:
kafka-storage.bat format --standalone -t PPEZ2LW8T8yjZNWnfNHorQ -c ../../config/server.properties
打開命令行,進入到bin下面的windows目錄下 啟動命令
kafka-server-start.bat ../../config/server.properties
創建topic
kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
啟動一個消費端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
啟動一個生產端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
問題
1、如果提示如下
命令行 輸入行太長。 命令語法不正確。
則需要把目錄變短,目錄太長,win11不讓輸入。
2,tgz需要解壓兩次
只解壓一次是不行的,tgz是打包之后壓縮的。
3、如果啟動失敗,需要重新配置
重新配置時。把log.dirs的路徑下面的東西清空
2.2 java開發kafka
第一步,引入依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version>
</dependency>
第二步,建立生產者
public class Producer {public static void main(String[] args) {Map<String,Object> props = new HashMap<>();// kafka 集群 節點props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");String topic = "test";KafkaProducer<String, String> producer = new KafkaProducer(props);producer.send(new ProducerRecord<String, String>(topic, "key", "value-1"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-2"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-3"));producer.close();}}
ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),**可選的分區號(Partition Number)**以及可選的鍵值對構成。
第三步、建立消費者類
public class Consumer {public static void main(String[] args){Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofDays(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("partition = %d ,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}}}
}
運行效果
2.3 spring boot整合kafka
第一步,引入依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent in repository --></parent><artifactId>spring_boot_kafka_demo</artifactId><packaging>jar</packaging><name>spring_boot_kafka_demo Maven Webapp</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
第二步,編寫配置文件
編寫resources下的application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: earliest
第三步,編寫生產者
@Service
public class Producer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");}
}
第四步,編寫消費者
@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}
第五步,編寫啟動類
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}
運行效果
2.4 記錄日志到kafka中
第一步,在2.3的基礎上,添加依賴
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.12</version> <!-- Spring Boot 3.x 推薦版本 -->
</dependency>
第二步,添加kafka的日志appender類
public class KafkaLogbackAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {private String topic = "application-logs";private String bootstrapServers = "localhost:9092";private KafkaProducer<String, String> producer;@Overridepublic void start() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());this.producer = new KafkaProducer<>(props);super.start();}@Overrideprotected void append(ILoggingEvent eventObject) {String msg = eventObject.getFormattedMessage();producer.send(new ProducerRecord<>(topic, msg));}@Overridepublic void stop() {if (producer != null) {producer.close();}super.stop();}// Getter and Setter for XML configpublic void setTopic(String topic) {this.topic = topic;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}
}
第三步,在resources下添加logback-spring.xml文件
<configuration debug="false" scan="true" scanPeriod="30 seconds"><!-- 定義日志格式 --><property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/><!-- 控制臺輸出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${PATTERN}</pattern></encoder></appender><!-- Kafka Appender --><appender name="KAFKA" class="com.demo.KafkaLogbackAppender"><bootstrapServers>localhost:9092</bootstrapServers><topic>application-logs</topic></appender><!-- 根日志輸出 --><root level="info"><appender-ref ref="STDOUT"/><appender-ref ref="KAFKA"/></root></configuration>
第四步,修改Producer類
@Service
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");}
}
第五步,修改Consumer類
@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}@KafkaListener(id = "myId2", topics = "application-logs")public void listen2(String in) {System.out.println("resinfo:"+in);}
}