2.1 本地開發環境搭建
單機模式安裝
- 下載與解壓:前往Apache Kafka 官網,下載最新穩定版本的 Kafka 二進制包(如
kafka_2.13-3.6.0.tgz
,其中2.13
為 Scala 版本)。解壓到本地目錄,例如/opt/kafka
:
tar -xzf kafka\_2.13-3.6.0.tgz
mv kafka\_2.13-3.6.0 /opt/kafka
- 配置文件調整:Kafka 的核心配置文件位于
/opt/kafka/config
目錄下。
server.properties
:修改關鍵參數,如listeners=PLAINTEXT://``localhost:9092
指定 Broker 監聽地址和端口;log.dirs=/var/lib/kafka-logs
設置消息存儲目錄;zookeeper.connect=``localhost:2181
(若使用 Zookeeper)配置元數據管理地址。zookeeper.properties
(若未單獨安裝 Zookeeper):可保持默認配置,默認數據存儲目錄為/tmp/zookeeper
,端口為2181
。
- 啟動服務:依次啟動 Zookeeper 和 Kafka Broker:
# 啟動Zookeeper(若未單獨安裝)
/opt/kafka/bin/zookeeper-server-start.sh
/opt/kafka/config/zookeeper.properties# 啟動Kafka Broker
/opt/kafka/bin/kafka-server-start.sh
/opt/kafka/config/server.properties
啟動后,Kafka 將在localhost:9092
監聽 Producer 和 Consumer 的請求。
Docker 容器化部署
使用 Docker Compose 可快速搭建多節點 Kafka 集群,并簡化環境管理:
- 創建
docker-compose.yml
文件:
version: '3' # 指定Docker Compose文件版本為3services:zookeeper:image: confluentinc/cp-zookeeper:7.3.0 # 使用Confluent提供的Zookeeper鏡像,版本7.3.0environment:ZOOKEEPER_CLIENT_PORT: 2181 # 設置Zookeeper客戶端連接端口為2181ZOOKEEPER_TICK_TIME: 2000 # 設置Zookeeper的心跳時間(單位:毫秒)ports:- "2181:2181" # 將容器內的2181端口映射到主機的2181端口kafka:image: confluentinc/cp-kafka:7.3.0 # 使用Confluent提供的Kafka鏡像,版本7.3.0depends_on:- zookeeper # 指定Kafka服務依賴于Zookeeper服務environment:KAFKA_BROKER_ID: 1 # 設置Kafka broker的唯一IDKAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 指定Kafka連接的Zookeeper地址KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT # 定義監聽器安全協議映射KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_INTERNAL://localhost:9093 # 定義對外廣播的監聽器地址KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093 # 定義Kafka監聽的地址和端口KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL # 指定broker間通信使用的監聽器名稱ports:- "9092:9092" # 將容器內的9092端口映射到主機的9092端口
- 啟動集群:在包含
docker-compose.yml
的目錄下執行:
docker-compose up -d
此配置啟動一個單節點 Zookeeper 和一個 Kafka Broker,通過映射本地端口9092
實現外部訪問。如需擴展集群,可增加kafka
服務實例并調整配置。
2.2 基礎操作入門
命令行工具實戰
- 創建 Topic:使用
kafka-topics.sh
命令創建一個名為test_topic
,包含 3 個分區、2 個副本的 Topic:
/opt/kafka/bin/kafka-topics.sh --create \--topic test_topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 2
- 生產與消費消息:
- 生產者:通過
kafka-console-producer.sh
向test_topic
發送消息:
/opt/kafka/bin/kafka-console-producer.sh \--topic test_topic \--bootstrap-server localhost:9092
輸入消息內容(如Hello, Kafka!
)并回車發送。
- 消費者:使用
kafka-console-consumer.sh
從test_topic
拉取消息,支持從頭開始消費或從最新位置消費:
# 從頭開始消費
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--from-beginning \--bootstrap-server localhost:9092# 從最新位置消費
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--bootstrap-server localhost:9092
- 查看 Topic 元數據:使用
--describe
參數查看test_topic
的分區分布、Leader 副本等信息:
/opt/kafka/bin/kafka-topics.sh --describe \--topic test_topic \--bootstrap-server localhost:9092
- 消費位移管理:默認情況下,Consumer 自動提交 Offset。如需手動提交,可在消費時添加
--enable-auto-commit=false
參數,并通過commitSync()
或commitAsync()
方法控制提交時機。
2.3 首個 Java 程序:Producer & Consumer
Maven 依賴配置
在pom.xml
中添加 Kafka 客戶端依賴:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
Producer 代碼示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException; public class KafkaProducerExample {public static void main(String[] args) {// 1. 配置Kafka生產者屬性Properties props = new Properties();// 設置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 設置鍵的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 設置值的序列化類props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 設置消息確認機制:等待所有副本確認(最可靠但最慢)props.put(ProducerConfig.ACKS_CONFIG, "all");// 設置發送失敗時的重試次數props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2. 創建Kafka生產者實例Producer<String, String> producer = new KafkaProducer<>(props);// 3. 創建要發送的消息記錄// 參數:topic名稱,消息key,消息valueProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key1", "message1");try {// 4. 發送消息(同步方式)// send()返回Future,get()會阻塞直到收到響應RecordMetadata metadata = producer.send(record).get();// 5. 打印消息發送成功的元數據System.out.println("Message sent to partition " + metadata.partition() + " with offset " + metadata.offset());} catch (InterruptedException | ExecutionException e) {// 6. 處理發送過程中可能出現的異常e.printStackTrace();} finally {// 7. 關閉生產者(重要!避免資源泄漏)producer.close();}}
}
Consumer 代碼示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties; public class KafkaConsumerExample {public static void main(String[] args) {// 1. 配置Kafka消費者屬性Properties props = new Properties();// 設置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 設置消費者組ID(同一組內的消費者共享消息)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");// 設置鍵的反序列化類props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 設置值的反序列化類props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 注意:默認是自動提交offset,這里我們改為手動提交(見下方commitSync())// 2. 創建Kafka消費者實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 訂閱主題(可以訂閱多個主題,這里用單例集合訂閱單個主題)consumer.subscribe(Collections.singletonList("test_topic"));// 4. 持續輪詢消息while (true) {// poll()方法獲取消息,參數是等待時間(避免CPU空轉)// 返回一批記錄(可能包含0到多條消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 5. 處理收到的每條消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}// 6. 手動同步提交offset(確保消息被成功處理后再提交)// 注意:生產環境應考慮錯誤處理和異步提交(commitAsync)consumer.commitSync(); }// 實際應用中應該添加關閉邏輯(如通過ShutdownHook)// consumer.close();}
}
上述 Java 程序分別實現了消息的生產與消費,通過配置 Producer 和 Consumer 的參數,可靈活控制消息發送策略與消費行為。