Apache Kafka 簡介
一、什么是 Kafka?
Apache Kafka 是一個高吞吐量、分布式、可擴展的流處理平臺,用于構建實時數據管道和流應用程序。它最初由 LinkedIn 開發,并于 2011 年開源,目前由 Apache 軟件基金會進行維護。
Kafka 具備以下核心特性:
- 發布-訂閱消息系統:支持生產者向主題(Topic)發送消息,消費者從主題中讀取消息。
- 高吞吐量與低延遲:可處理百萬級每秒消息,延遲低于幾毫秒。
- 持久化存儲:消息以日志形式存儲在磁盤上,可設定保留時間。
- 可水平擴展:通過分區(Partition)機制輕松擴展讀寫能力。
- 高容錯性:副本機制保障在節點故障時依舊能夠正常運行。
Kafka 不僅是一個消息隊列,更是一個用于流數據處理的統一平臺。
二、Kafka 的應用場景
Kafka 在大數據和分布式系統領域具有廣泛應用,主要包括:
1. 日志收集與傳輸
Kafka 可統一收集來自不同服務或服務器的日志,作為日志系統的核心組件,將數據傳輸至后端處理系統(如 Hadoop、Elasticsearch 等)。
2. 實時數據分析
結合 Apache Flink、Spark Streaming 等流處理框架,Kafka 可用于構建實時分析平臺,實現實時用戶行為分析、實時監控等。
3. 事件驅動架構(EDA)
Kafka 作為微服務架構中的事件總線,使服務之間通過事件解耦,從而提高系統靈活性與可維護性。
4. 數據管道(Data Pipeline)
Kafka 能將數據從數據庫、日志系統等源系統傳輸到數據倉庫或數據湖,是構建高效可靠數據管道的核心工具。
5. 替代傳統消息隊列
在對吞吐量、可擴展性有更高要求的系統中,Kafka 可替代傳統消息中間件(如 RabbitMQ、ActiveMQ)作為消息傳遞通道。
三、Kafka 的誕生背景
Kafka 的誕生源于 LinkedIn 內部對于日志處理和數據傳輸系統的性能瓶頸:
- LinkedIn 的業務快速增長,系統需要處理海量的用戶行為數據與日志。
- 傳統的消息隊列系統難以滿足高吞吐量與高可用性的要求。
- 工程團隊設計了一種新的架構,將“分布式日志”作為核心思想,構建出一個同時支持日志收集、傳輸與處理的統一平臺。
Kafka 在設計上融合了以下理念:
- 以持久化日志為核心數據結構:每條消息即為一條日志記錄,可重復讀取。
- 分布式架構支持高可用性與高擴展性:通過集群部署和分區副本機制實現。
- 支持批處理和流處理雙模式:既可用于數據采集與離線分析,也適合實時流處理。
這一創新架構為 Kafka 后來的廣泛應用打下了堅實基礎,也推動了現代數據架構的演進。
好的!以下是 Kafka Java 快速入門指南,適合初學者快速了解如何在 Java 程序中使用 Kafka 實現消息的生產與消費。
明白了!以下是使用 Mermaid 格式圖形 重新整理的 Kafka 集群搭建指南,清晰展示了 Kafka + ZooKeeper 的集群結構和搭建步驟。
Kafka 集群搭建指南(ZooKeeper 模式)
一、Kafka 集群架構圖(Mermaid 格式)
二、準備工作
1. 系統要求
- Linux/CentOS/Ubuntu(或容器)
- Java 8+(推薦 Java 11)
- 至少 3 臺機器或虛擬節點
2. 下載 Kafka 安裝包(每臺機器)
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
三、配置 ZooKeeper 集群
1. 修改 config/zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888
2. 設置每個節點的 myid
echo 1 > /tmp/zookeeper/myid # 節點1
echo 2 > /tmp/zookeeper/myid # 節點2
echo 3 > /tmp/zookeeper/myid # 節點3
3. 啟動 ZooKeeper(每臺執行)
bin/zookeeper-server-start.sh config/zookeeper.properties
四、配置 Kafka Broker
每臺機器修改 config/server.properties
,示例:
broker.id=1 # 每個節點唯一(如 1、2、3)
listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
啟動 Kafka:
bin/kafka-server-start.sh config/server.properties
五、驗證集群
1. 創建主題
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 --partitions 3 \
--topic test-topic
2. 查看主題分布
bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 192.168.1.101:9092
六、發送與消費消息
生產者:
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.1.101:9092
消費者:
bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server 192.168.1.102:9092 --from-beginning
七、常見問題
問題 | 原因 |
---|---|
Kafka 啟動失敗 | broker.id 重復或端口沖突 |
消息無法消費 | ZooKeeper 未正常連接,主題未正確創建 |
節點日志混亂或沖突 | log.dirs 配置重復,broker.id 沒有區分 |
ZooKeeper 單點故障 | 節點不足,推薦部署奇數個節點(3/5) |
Kafka Java 快速入門指南
一、準備工作
1. 添加 Maven 依賴
在你的項目的 pom.xml
中添加以下依賴:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>
</dependencies>
二、Kafka Producer 示例(生產者)
1. 編寫 KafkaProducerDemo.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 發送消息for (int i = 0; i < 5; i++) {String message = "Hello Kafka " + i;producer.send(new ProducerRecord<>("test-topic", message));System.out.println("Sent: " + message);}producer.close();}
}
三、Kafka Consumer 示例(消費者)
1. 編寫 KafkaConsumerDemo.java
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group"); // 消費組props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 從頭開始消費// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱主題consumer.subscribe(Collections.singletonList("test-topic"));// 消費消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, offset=%d%n",record.key(), record.value(), record.offset());}}}
}
四、運行順序建議
- 啟動 Kafka 服務(本地或遠程)
- 先運行消費者
KafkaConsumerDemo
(等待監聽) - 再運行生產者
KafkaProducerDemo
(發送消息)
五、調試小技巧
- 確保 Kafka 服務正常運行,端口默認為
9092
。 - 主題
test-topic
必須提前創建,或在 Kafka 開啟auto.create.topics.enable=true
的情況下自動創建。 - 消費者默認是“只消費一次”,再次運行需更改 group.id 或開啟重復讀取邏輯。