kafka
一,介紹
Kafka 是一個開源的分布式流處理平臺,最初由 LinkedIn 開發并貢獻給 Apache 軟件基金會。它設計用于構建高性能、持久性、可伸縮和容錯的實時數據管道和流處理應用程序。
以下是 Kafka 的一些關鍵特點和概念:
- 發布-訂閱模型:Kafka 使用發布-訂閱模型,其中消息生產者將消息發布到主題(topic),而消息消費者從主題訂閱消息。這種模型可以支持多個消費者同時消費消息,并且具有良好的擴展性。
- 主題(Topic):主題是消息的邏輯分類,相當于一個消息隊列。消息被發布到主題,并且消費者從主題訂閱消息。
- 分區(Partition):每個主題可以劃分為一個或多個分區,每個分區是一個有序的日志。分區可以分布在集群的不同節點上,以提供水平擴展和負載均衡。
- 副本(Replication):Kafka 支持將每個分區的數據復制到多個副本,以提供容錯性和數據可靠性。副本位于集群的不同節點上,確保即使某個節點故障,數據仍然可用。
- 生產者(Producer):生產者負責將消息發布到主題。它們可以選擇將消息發送到特定的分區,也可以根據負載均衡策略將消息均勻分布到不同的分區中。
- 消費者(Consumer):消費者從主題訂閱消息,并按照其偏移量(offset)順序消費消息。消費者組(Consumer Group)是一組共享相同主題的消費者,它們協調以確保每個分區的消息只有一個消費者消費。
- ZooKeeper:Kafka 使用 ZooKeeper 來進行分布式協調和管理,如協調生產者和消費者、維護集群元數據等。
Kafka 提供了高性能、可靠、持久的消息傳遞系統,適用于大規模的實時數據處理和流式處理應用程序。它已經成為許多企業構建實時數據管道和流處理應用程序的首選工具之一。
二,運行原理
Kafka 的運行原理涉及多個組件和過程,主要包括生產者發送消息、消息存儲在代理 (Broker) 中的分區中、消費者從分區中讀取消息等。以下是 Kafka 的基本運行原理:
- 生產者發送消息:
- 生產者將消息發送到 Kafka 的特定主題 (Topic) 中。
- 生產者可以選擇性地將消息發送到特定的分區 (Partition),或者使用 Kafka 的默認分區分配策略,由 Kafka 在發送時決定將消息發送到哪個分區。
- 消息存儲在分區中:
- 主題可以被分成多個分區,每個分區是一個有序的消息序列。
- 每條消息在分區內有一個唯一的偏移量 (Offset),用于標識消息在分區中的位置。
- 消息被持久化在 Kafka 的分區中,直到滿足一定的保留策略(如時間或者大小限制)。
- 消費者從分區中讀取消息:
- 消費者從 Kafka 的特定主題中讀取消息。
- 消費者可以以不同的方式訂閱主題,例如:
- 指定訂閱的主題和分區。
- 加入一個消費者組 (Consumer Group),使得消費者可以以并行的方式消費主題中的消息。
- 每個消費者在消費主題時,會維護自己的消費偏移量 (Offset),用于記錄已經消費的消息位置。
- 分區和副本管理:
- Kafka 使用分區來實現并行處理和水平擴展。
- 每個分區可以有多個副本,其中一個是領導者副本 (Leader),其余的是追隨者副本 (Follower)。
- 領導者副本負責處理讀寫請求,追隨者副本用于備份數據。
- 如果領導者副本失效,Kafka 會從追隨者副本中選舉新的領導者。
- ZooKeeper 協調:
- Kafka 集群依賴 ZooKeeper 來進行集群管理、元數據存儲和領導者選舉等任務。
- ZooKeeper 存儲了 Kafka 集群的元數據,包括主題、分區、消費者組等信息,同時也用于監控和管理集群的健康狀態。
通過這些組件和過程,Kafka 實現了高吞吐量、持久性、分布式和水平擴展等特性,使得它成為處理大規模實時數據流的理想選擇。
三,Spring Boot 項目中整合 Kafka (簡單使用)
1.添加 Maven 依賴
首先,在你的 Spring Boot 項目的 pom.xml
文件中添加 Spring Kafka 的依賴:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.配置 Kafka 連接信息:
在 application.properties
或 application.yml
中添加 Kafka 服務器的連接信息:
kafka:bootstrap-servers: 192.168.193.131:9092producer: #生產者序列化器retries: 10 #如果發生故障,生產者將嘗試重新發送消息的次數。key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生產者消息鍵的類。value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生產者消息值的類。ack-mode: manualconsumer: #消費者序列化器group-id: ${spring.application.name}-test # 消費者組的唯一標識符。在消費者組中的所有消費者將共享消費者組的工作負載。key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消費者消息鍵的類。value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消費者消息值的類。listener: #配置了監聽器相關的設置。ack-mode: manual #開啟手動確認 設置為手動,表示消費者將等待手動確認來確定是否已成功處理消息。
3.創建kafka配置類:
配置 Kafka 主題(topic)的創建
@Configuration
public class KafkaConfig {/*** @return org.apache.kafka.clients.admin.NewTopic* @date 2024/5/31 14:42* @Description: TODO @Bean 注解* 作用:將方法返回的對象注冊為 Spring 容器中的一個 Bean。* 返回值類型:NewTopic,表示 Kafka 主題的配置信息。*/@Beanpublic NewTopic viewUserTopic(){/*第一個參數:主題名稱,這里是 "viewUserTopic"。第二個參數:分區數量,這里設置為 1。第三個參數:副本數量,這里設置為 1。*/return new NewTopic("viewUserTopic",1,(short) 1);}}
4.注入kafka模板類
@Autowired
private KafkaTemplate kafkaTemplate;
5.發送消息
//定義消息的唯一ID 防止消息重復消費
String msgId = "msg-" + UUID.randomUUID().toString();
//定義消息內容
String msgBody = JSON.toJSONString(tbUser);//將消息唯一表示存入redis緩存 防止消息重復消費
stringRedisTemplate.opsForValue().set(msgId, msgBody);/*組裝消息體 發送消息隊列*/
MessageVO messageVO = new MessageVO();
messageVO.setMsgID(msgId);
messageVO.setMsgBody(msgBody);//向名為 "viewUserTopic" 的 Kafka 主題發送消息。//參數一: 表示目標 Kafka 主題的名稱。 參數二:消息內容kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo))//通過該方法設置回調函數,用于處理消息發送的成功和失敗情況。.addCallback(//成功回調函數,處理消息發送成功的情況。new SuccessCallback() {@Overridepublic void onSuccess(Object o) {// 消息發送成功System.out.println("kafka 消息發送成功了~~~~~~~~~~~~");}},//失敗回調函數,處理消息發送失敗的情況。new FailureCallback() {@Overridepublic void onFailure(Throwable throwable) {// 消息發送失敗了,再次發送System.out.println("kafka 消息發送失敗了,再次發送");kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo));}});
6.消息的接收(監聽)
/*** @param message 表示接收到的消息內容,這里是 JSON 格式的字符串。* @param acknowledgment 用于手動提交消費者偏移量的對象。* @date 2024/5/31 15:13* @Description: TODO* @KafkaListener 通過該注解指定了監聽的 Kafka 主題為 "viewUserTopic"。*/@KafkaListener(topics = "viewUserTopic")public void recvViewUserMessage(String message, Acknowledgment acknowledgment) {//--1 接收消息MessageVO messageVo = JSON.parseObject(message, MessageVO.class);//--2 根據消息的唯一ID,判斷消息是否重復String msgId = messageVo.getMsgID();if (!stringRedisTemplate.hasKey(msgId)) {// 消息重復了System.out.println("kafka 消息重復了");// 使用 acknowledgment.acknowledge() 方法手動確認消費完成,通知 Kafka 服務器該消息已經被處理。acknowledgment.acknowledge();return;}//--3 消費消息(處理消息)String msgBody = messageVo.getMsgBody();TbLog tbLog = JSON.parseObject(msgBody, TbLog.class);tbLog.setCreateTime(new Date());tbLogMapper.insert(tbLog);//--4 手動確認消息//手動確認消費完成,通知 Kafka 服務器該消息已經被處理。acknowledgment.acknowledge();//--5 刪除消息的唯一ID,防止消息重復消費stringRedisTemplate.delete(msgId);}