1、Kafka概述
Apache Kafka是由LinkedIn公司于2010年開發的一款分布式消息系統,旨在解決當時傳統消息隊列(如ActiveMQ、RabbitMQ)在高吞吐量和實時性場景下的性能瓶頸。隨著LinkedIn內部對實時日志處理、用戶行為追蹤等需求的激增,Kafka逐漸演化為一個支持水平擴展、持久化存儲的流數據平臺。2011年,Kafka成為Apache基金會頂級開源項目,并在全球范圍內被廣泛應用于大數據、實時計算和微服務架構領域。
Kafka的設計哲學源于發布-訂閱模型,但其創新性地引入了分布式存儲和分區化處理機制,使得系統能夠高效處理每秒百萬級的消息吞吐。這一特性使其迅速成為現代數據管道(Data Pipeline)和流式處理(Stream Processing)的核心組件。
Kafka是一個開源的高吞吐量的分布式消息中間件,對比于其他
1、緩沖和削峰:上游數據時有突發流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務就可以按照自己的節奏進行慢慢處理。
2、解耦和擴展性:項目開始的時候,并不能確定具體需求。消息隊列可以作為一個接口層,解耦重要的業務流程。只需要遵守約定,針對數據編程即可獲取擴展能力。
3、冗余:可以采用一對多的方式,一個生產者發布消息,可以被多個訂閱topic的服務消費到,供多個毫無關聯的業務使用。
4、健壯性:消息隊列可以堆積請求,所以消費端業務即使短時間死掉,也不會影響主要業務的正常進行。
5、異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
2、核心組件
組件 | 說明 |
Producer | 消息生產者,向Kafka發送消息 |
Consumer | 消息消費者,從Kafka讀取消息 |
Broker | Kafka服務器節點,組成Kafka集群 |
Topic | 消息類別/主題,生產者發送到特定Topic,消費者訂閱特定Topic |
Partition | Topic的分區,實現并行處理和水平擴展 |
Offset | 消息在分區中的唯一標識(位移) |
ZooKeeper | 管理Kafka集群元數據(新版本已逐步移除ZooKeeper依賴) |
3、Kafka的特點與優勢
1. 高吞吐量與低延遲
Kafka通過批處理、順序磁盤I/O和零拷貝技術(Zero-Copy)優化數據傳輸效率。生產者(Producer)將消息批量發送至Broker,消費者(Consumer)按順序拉取數據,避免了傳統消息系統的頻繁網絡交互。實測中,單臺Broker可輕松支持每秒數十萬條消息的讀寫。
2. 水平擴展與容錯性
Kafka集群由多個Broker(服務器節點)組成,支持動態擴容。每個主題(Topic)被劃分為多個分區(Partition),分區可分布在不同Broker上,通過多副本(Replica)機制實現數據冗余。若某Broker宕機,其他副本會自動接管服務,確保系統的高可用性。
3. 持久化存儲與回溯消費
消息在Kafka中默認保留7天(可配置為永久存儲),消費者可隨時重置偏移量(Offset)以重新消費歷史數據。這一特性在數據重放、故障恢復等場景中至關重要。
4. 生態兼容性
Kafka與主流大數據工具(如Spark、Flink、Hadoop)深度集成,并提供了Connect API和Streams API,支持構建端到端的流處理管道。
4、Kafka 使用場景
- 實時流處理:用戶行為追蹤、實時推薦
- 日志收集:集中式日志系統
- 事件源:微服務間的事件驅動架構
- 消息隊列:系統解耦、削峰填谷
- Metrics收集:監控數據聚合
5、什么是Zookeeper?
Zookeeper是一個高性能、高可靠的分布式協調服務,最初由雅虎開發,是Google Chubby的開源實現。它被廣泛應用于分布式系統中,用于解決分布式應用中的協調問題,如配置管理、服務注冊與發現、分布式鎖等。Zookeeper的設計目標是封裝復雜且容易出錯的關鍵服務,為分布式應用提供簡單易用的接口。
6、Zookeeper的應用場景
Zookeeper在分布式系統中扮演著重要的角色,其典型應用場景包括:
- 配置管理:Zookeeper可以作為分布式系統的配置中心,集中管理配置信息,確保所有節點能夠獲取到最新的配置。
- 服務注冊與發現:分布式系統中的服務可以通過Zookeeper進行注冊,客戶端可以通過查詢Zookeeper來發現所需服務。
- 分布式鎖:Zookeeper提供了一種實現分布式鎖的機制,確保多個節點對共享資源的訪問是互斥的。
- 集群管理:Zookeeper能夠監控集群中節點的狀態,及時發現并處理節點故障。
- 消息隊列:Zookeeper可以用于實現分布式消息隊列中的協調功能。
7、Zookeeper核心特性
Zookeeper具有以下關鍵特性:
- 順序一致性:客戶端的更新操作按照其發送的順序被應用到Zookeeper上,確保了操作的順序性。
- 原子性:所有對Zookeeper的操作都是原子的,要么全部成功,要么全部失敗。
- 單一系統映像:無論Zookeeper集群中有多少節點,客戶端看到的都是一個單一的、一致的視圖。
- 可靠性:Zookeeper通過副本機制和選舉算法確保系統的高可用性。
- 實時性:Zookeeper能夠實時監控節點的狀態變化,并及時通知客戶端。
8、ZooKeeper 的作用
ZooKeeper 是一個分布式協調服務,為 Kafka 提供以下關鍵功能:
功能 | 具體說明 |
集群管理 | 記錄 Kafka Broker 的節點狀態(存活/下線),維護 Broker 列表。 |
Controller 選舉 | Kafka 集群需要一個主控制器(Controller)處理分區和副本管理,ZooKeeper 負責選舉。 |
Topic 元數據存儲 | 存儲 Topic 的分區信息、副本分配、Leader 選舉結果等元數據。 |
消費者組管理 | 記錄消費者組的 offset(Kafka 2.8+ 已支持脫離 ZooKeeper,默認仍依賴)。 |
分布式鎖 | 確保多個 Broker 或客戶端操作時的數據一致性(如分區遷移)。 |
9、Kafka的安裝
1、安裝JAVA環境
yum -y install java-1.8.0-openjdk
2、安裝ZooKeeper
mkdir -p /opt/kafka
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
cd apache-zookeeper-3.8.1-bin/conf
cp zoo_sample.cfg zoo.cfg
../bin/zkServer.sh start
3、安裝Kafka
# 解壓
tar xvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
ls
# 啟動
bin/kafka-server-start.sh config/server.properties#后臺啟動
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &查看日志
tail -f kafka.log
單機啟動,并且ZooKeeper也剛好在本機,所以我們默認不需要修改任何配置就可以直接啟動。如果不在一起則修改配置文件:config/server.properties文件。
4、IDEA連接Kafka
cd /opt/kafka/kafka_2.13-2.8.2/configvim server.properties# 修改內容
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.142.131:9092
安裝Kafka插件進行連接
10、SpringBoot整合Kafka
1、導入jar包
<!--kafka-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version>
</dependency>
2、編寫配置
# Kafka 相關配置kafka:# Kafka 服務器地址bootstrap-servers: 192.168.142.131:9092# 生產者配置producer:# 序列化器,用于將鍵轉換為字節key-serializer: org.apache.kafka.common.serialization.StringSerializer# 序列化器,用于將值轉換為字節value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消費者配置consumer:# 消費者組ID,用于區分不同的消費者組group-id: my-application-group# 自動偏移量重置策略,當沒有初始偏移量時從最早的偏移量開始消費auto-offset-reset: earliest# 禁用自動提交功能enable-auto-commit: false# 反序列化器,用于將字節轉換為鍵key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 反序列化器,用于將字節轉換為值value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 配置監聽器的確認模式為手動立即確認listener:ack-mode: manual_immediate# 主題配置topic:default: default-topicmanual: manual-commit-topic
3、Kafka消息生產者
package com.lw.mqdemo.mq.kafka;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;/*** Kafka消息生產者服務類*/
@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 發送消息到指定主題* @param topic 主題名稱* @param message 消息內容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}/*** 發送消息到指定主題和分區* @param topic 主題名稱* @param partition 分區號* @param key 消息鍵* @param message 消息內容*/public void sendMessage(String topic, Integer partition, String key, String message) {kafkaTemplate.send(topic, partition, key, message);}
}
4、Kafka消息消費者
package com.lw.mqdemo.mq.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;/*** Kafka 消費者服務*/
@Service
public class KafkaConsumerService {/*** 監聽指定主題的消息(自動提交)* @param message 消息內容*/@KafkaListener(topics = "${spring.kafka.topic.default}", groupId = "${spring.kafka.consumer.group-id}")public void consumeAutoCommit(String message) {System.out.println("接收到自動提交消息: " + message);// 業務處理邏輯}/*** 監聽指定主題的消息(手動提交)* 偏移量(Offset) 是 Kafka 為分區(Partition)中的每條消息分配的唯一序號(從 0 開始遞增),用于標識消息在分區中的位置。* @param record 消息記錄(包含元數據)* @param ack 確認對象*/@KafkaListener(topics = "${spring.kafka.topic.manual}", groupId = "${spring.kafka.consumer.group-id}")public void consumeManualCommit(ConsumerRecord<String, String> record, Acknowledgment ack) {try {System.out.println("接收到手動提交消息: key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());// 業務處理邏輯// 手動提交偏移量ack.acknowledge();} catch (Exception e) {// 處理異常,可以選擇不提交偏移量以便重試System.err.println("消息處理失敗: " + e.getMessage());}}
}
5、測試類
package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.kafka.KafkaProducerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** Kafka 控制器*/
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {private final KafkaProducerService producerService;public KafkaController(KafkaProducerService producerService) {this.producerService = producerService;}/*** 發送消息到指定的Kafka主題*/@PostMapping("/send")public String sendMessage(@RequestParam("topic") String topic,@RequestParam("message") String message) {producerService.sendMessage(topic, message);return "消息已發送到主題: " + topic;}
}