云原生時代 Kafka 深度實踐:02快速上手與環境搭建

2.1 本地開發環境搭建

單機模式安裝

  1. 下載與解壓:前往Apache Kafka 官網,下載最新穩定版本的 Kafka 二進制包(如kafka_2.13-3.6.0.tgz,其中2.13為 Scala 版本)。解壓到本地目錄,例如/opt/kafka
tar -xzf kafka\_2.13-3.6.0.tgz
mv kafka\_2.13-3.6.0 /opt/kafka
  1. 配置文件調整:Kafka 的核心配置文件位于/opt/kafka/config目錄下。
  • server.properties:修改關鍵參數,如listeners=PLAINTEXT://``localhost:9092指定 Broker 監聽地址和端口;log.dirs=/var/lib/kafka-logs設置消息存儲目錄;zookeeper.connect=``localhost:2181(若使用 Zookeeper)配置元數據管理地址。
  • zookeeper.properties(若未單獨安裝 Zookeeper):可保持默認配置,默認數據存儲目錄為/tmp/zookeeper,端口為2181
  1. 啟動服務:依次啟動 Zookeeper 和 Kafka Broker:
# 啟動Zookeeper(若未單獨安裝)
/opt/kafka/bin/zookeeper-server-start.sh 
/opt/kafka/config/zookeeper.properties# 啟動Kafka Broker
/opt/kafka/bin/kafka-server-start.sh 
/opt/kafka/config/server.properties

啟動后,Kafka 將在localhost:9092監聽 Producer 和 Consumer 的請求。

Docker 容器化部署

使用 Docker Compose 可快速搭建多節點 Kafka 集群,并簡化環境管理:

  1. 創建docker-compose.yml文件
version: '3'  # 指定Docker Compose文件版本為3services:zookeeper:image: confluentinc/cp-zookeeper:7.3.0  # 使用Confluent提供的Zookeeper鏡像,版本7.3.0environment:ZOOKEEPER_CLIENT_PORT: 2181  # 設置Zookeeper客戶端連接端口為2181ZOOKEEPER_TICK_TIME: 2000  # 設置Zookeeper的心跳時間(單位:毫秒)ports:- "2181:2181"  # 將容器內的2181端口映射到主機的2181端口kafka:image: confluentinc/cp-kafka:7.3.0  # 使用Confluent提供的Kafka鏡像,版本7.3.0depends_on:- zookeeper  # 指定Kafka服務依賴于Zookeeper服務environment:KAFKA_BROKER_ID: 1  # 設置Kafka broker的唯一IDKAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'  # 指定Kafka連接的Zookeeper地址KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT  # 定義監聽器安全協議映射KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_INTERNAL://localhost:9093  # 定義對外廣播的監聽器地址KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093  # 定義Kafka監聽的地址和端口KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL  # 指定broker間通信使用的監聽器名稱ports:- "9092:9092"  # 將容器內的9092端口映射到主機的9092端口
  1. 啟動集群:在包含docker-compose.yml的目錄下執行:
docker-compose up -d

此配置啟動一個單節點 Zookeeper 和一個 Kafka Broker,通過映射本地端口9092實現外部訪問。如需擴展集群,可增加kafka服務實例并調整配置。

2.2 基礎操作入門

命令行工具實戰

  1. 創建 Topic:使用kafka-topics.sh命令創建一個名為test_topic,包含 3 個分區、2 個副本的 Topic:

/opt/kafka/bin/kafka-topics.sh --create \--topic test_topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 2
  1. 生產與消費消息
  • 生產者:通過kafka-console-producer.shtest_topic發送消息:
/opt/kafka/bin/kafka-console-producer.sh \--topic test_topic \--bootstrap-server localhost:9092

輸入消息內容(如Hello, Kafka!)并回車發送。

  • 消費者:使用kafka-console-consumer.shtest_topic拉取消息,支持從頭開始消費或從最新位置消費:
# 從頭開始消費
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--from-beginning \--bootstrap-server localhost:9092# 從最新位置消費
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--bootstrap-server localhost:9092
  1. 查看 Topic 元數據:使用--describe參數查看test_topic的分區分布、Leader 副本等信息:
/opt/kafka/bin/kafka-topics.sh --describe \--topic test_topic \--bootstrap-server localhost:9092
  1. 消費位移管理:默認情況下,Consumer 自動提交 Offset。如需手動提交,可在消費時添加--enable-auto-commit=false參數,并通過commitSync()commitAsync()方法控制提交時機。

2.3 首個 Java 程序:Producer & Consumer

Maven 依賴配置

pom.xml中添加 Kafka 客戶端依賴:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

Producer 代碼示例

import org.apache.kafka.clients.producer.*;  
import java.util.Properties;  
import java.util.concurrent.ExecutionException;  public class KafkaProducerExample {public static void main(String[] args) {// 1. 配置Kafka生產者屬性Properties props = new Properties();// 設置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 設置鍵的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 設置值的序列化類props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 設置消息確認機制:等待所有副本確認(最可靠但最慢)props.put(ProducerConfig.ACKS_CONFIG, "all");// 設置發送失敗時的重試次數props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2. 創建Kafka生產者實例Producer<String, String> producer = new KafkaProducer<>(props);// 3. 創建要發送的消息記錄// 參數:topic名稱,消息key,消息valueProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key1", "message1");try {// 4. 發送消息(同步方式)// send()返回Future,get()會阻塞直到收到響應RecordMetadata metadata = producer.send(record).get();// 5. 打印消息發送成功的元數據System.out.println("Message sent to partition " + metadata.partition() + " with offset " + metadata.offset());} catch (InterruptedException | ExecutionException e) {// 6. 處理發送過程中可能出現的異常e.printStackTrace();} finally {// 7. 關閉生產者(重要!避免資源泄漏)producer.close();}}
}

Consumer 代碼示例

import org.apache.kafka.clients.consumer.*; 
import java.time.Duration;                  
import java.util.Collections;              
import java.util.Properties;               public class KafkaConsumerExample {public static void main(String[] args) {// 1. 配置Kafka消費者屬性Properties props = new Properties();// 設置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 設置消費者組ID(同一組內的消費者共享消息)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");// 設置鍵的反序列化類props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 設置值的反序列化類props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 注意:默認是自動提交offset,這里我們改為手動提交(見下方commitSync())// 2. 創建Kafka消費者實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 訂閱主題(可以訂閱多個主題,這里用單例集合訂閱單個主題)consumer.subscribe(Collections.singletonList("test_topic"));// 4. 持續輪詢消息while (true) {// poll()方法獲取消息,參數是等待時間(避免CPU空轉)// 返回一批記錄(可能包含0到多條消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 5. 處理收到的每條消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}// 6. 手動同步提交offset(確保消息被成功處理后再提交)// 注意:生產環境應考慮錯誤處理和異步提交(commitAsync)consumer.commitSync(); }// 實際應用中應該添加關閉邏輯(如通過ShutdownHook)// consumer.close();}
}

上述 Java 程序分別實現了消息的生產與消費,通過配置 Producer 和 Consumer 的參數,可靈活控制消息發送策略與消費行為。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/85014.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/85014.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/85014.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Vue Hook Store 設計模式最佳實踐指南

Vue Hook Store 設計模式最佳實踐指南 一、引言 在 Vue 3 組合式 API 與 TypeScript 普及的背景下&#xff0c;Hook Store 設計模式應運而生&#xff0c;它結合了 Vue 組合式 API 的靈活性與狀態管理的最佳實踐&#xff0c;為開發者提供了一種輕量級、可測試且易于維護的狀態…

無人機多人協同控制技術解析

一、運行方式 無人機多人點對點控制通常采用以下兩種模式&#xff1a; 1. 主從控制模式 指定一個主控用戶擁有最高優先級&#xff0c;負責飛行路徑規劃、緊急操作等關鍵指令&#xff1b;其他用戶作為觀察者&#xff0c;僅能查看實時畫面或提交輔助指令&#xff0c;需經主…

樹型表查詢方法 —— SQL遞歸

目錄 引言&#xff1a; 自鏈接查詢&#xff1a; 遞歸查詢&#xff1a; 編寫service接口實現&#xff1a; 引言&#xff1a; 看下圖&#xff0c;這是 course_category 課程分類表的結構&#xff1a; 這張表是一個樹型結構&#xff0c;通過父結點id將各元素組成一個樹。 我…

微服務難題?Nacos服務發現來救場

文章目錄 前言1.什么是服務發現2.Nacos 閃亮登場2.1 服務注冊2.2 服務發現 3.Nacos 的優勢3.1 簡單易用3.2 高可用3.3 動態配置 4.實戰演練4.1安裝 Nacos4.2 服務注冊與發現示例代碼&#xff08;以 Spring Boot 為例&#xff09; 總結 前言 大家好&#xff0c;我是沛哥兒。今天…

AStar低代碼平臺-腳本調用C#方法

修改報工表表單&#xff0c;右鍵定義彈出菜單&#xff0c;新增一個菜單項&#xff0c;并在點擊事件腳本中編寫調用腳本。 編譯腳本&#xff0c;然后在模塊代碼里面定義這個方法&#xff1a; public async Task<int> on_call_import(DataRow curRow) {PrintDataRow(cur…

python調用langchain實現RAG

一、安裝langchain 安裝依賴 python -m venv env.\env\Scripts\activatepip3 install langchainpip3 install langchain-corepip3 install langchain-openaipip3 install langchain-communitypip3 install dashscopepip3 install langchain_postgrespip3 install "psyc…

大學大模型教學:基于NC數據的全球氣象可視化解決方案

引言 氣象數據通常以NetCDF(Network Common Data Form)格式存儲,這是一種廣泛應用于科學數據存儲的二進制文件格式。在大學氣象學及相關專業的教學中,掌握如何讀取、處理和可視化NC數據是一項重要技能。本文將詳細介紹基于Python的NC數據處理與可視化解決方案,包含完整的代…

ORB-SLAM2學習筆記:ComputeKeyPointsOctTree分析過程記錄

ComputeKeyPointsOctTree是ORB特征提取器中計算關鍵點的部分&#xff0c;特別是使用八叉樹&#xff08;OctTree&#xff09;方法進行關鍵點分布。 首先&#xff0c;函數參數是vector<vector的引用allKeypoints&#xff0c;用來存儲各層的關鍵點。代碼開頭調整了allKeypoint…

LeetCode Hot100(多維動態規劃)

62. 不同路徑 比較板子的dp&#xff0c;實際上就是到達一個點有兩種方式&#xff0c;從上面來或者是左邊&#xff0c;加起來就可以了 class Solution {public int uniquePaths(int m, int n) {int [][]arr new int[m2][n2];arr[1][1]1;for(int i1;i<m;i){for(int j1;j<…

Oracle MOVE ONLINE 實現原理

Oracle MOVE ONLINE 實現原理 Oracle 的 MOVE ONLINE 操作是一種在線重組表的技術&#xff0c;允許在不中斷業務的情況下重新組織表數據。以下是其實現原理的詳細分析&#xff1a; 基本概念 MOVE ONLINE 是 Oracle 12c 引入的特性&#xff0c;用于替代傳統的 ALTER TABLE ..…

工作流長任務處置方案

以下是前后端協作處理長任務工作流的完整實現方案&#xff0c;結合技術選型與設計要點&#xff0c;以清晰結構呈現&#xff1a; 一、后端實現方案 異步任務隊列架構 ? 技術選型&#xff1a; ? 消息隊列&#xff1a;NATS&#xff08;輕量級&#xff09;或 RabbitMQ&#xf…

RabbitMQ仲裁隊列高可用架構解析

#作者&#xff1a;閆乾苓 文章目錄 概述工作原理1.節點之間的交互2.消息復制3.共識機制4.選舉領導者5.消息持久化6.自動故障轉移 集群環境節點管理仲裁隊列增加集群節點重新平衡仲裁隊列leader所在節點仲裁隊列減少集群節點 副本管理add_member 在給定節點上添加仲裁隊列成員&…

fingerprint2瀏覽器指紋使用記錄

我在uniapp-vue3-H5端使用的&#xff0c;記錄一下 抄的這里前端使用fingerprintjs2獲取瀏覽器指紋fingerprintjs2是通過設備瀏覽器信息獲取瀏覽器指紋的插件&#xff08; - 掘金 1、安裝依賴 npm i fingerprintjs2 -S2、抽成模塊文件&#xff0c;/utils/Fingerprint2.js 生成指…

深度學習面試八股簡略速覽

在準備深度學習面試時&#xff0c;你可能會感到有些不知所措。畢竟&#xff0c;深度學習是一個龐大且不斷發展的領域&#xff0c;涉及眾多復雜的技術和概念。但別擔心&#xff0c;本文將為你提供一份全面的指南&#xff0c;從基礎理論到實際應用&#xff0c;幫助你在面試中脫穎…

使用 Redis 作為向量數據庫

一、什么是向量數據庫&#xff1f; 向量&#xff08;Vector&#xff09;&#xff1a;在機器學習和 AI 中&#xff0c;向量是由一系列數字組成的序列&#xff0c;用于數值化地描述數據的特征或語義。文本、圖像、音頻等非結構化數據可以通過模型轉換成固定長度的向量。 向量數據…

變量的計算

不同類型變量之間的計算 數字型變量可以直接計算 在python中&#xff0c;數字型變量可以直接通過算術運算符計算bool型變量&#xff1a;True 對應數字1 &#xff1b;False 對應數字0、 字符串變量 使用 拼接字符串 使用 * 拼接指定倍數的相同字符串 變量的輸入&#xff1a;&…

PostgreSQL學會如何建表

開始使用PostgreSQL之前&#xff0c; 上一節我們說了怎樣安裝它。 PostgreSQL可能已經安裝到你的電腦上了,安裝后postgre服務默認在電腦開機時運行啟動。 一.了解PostgreSQL的運行 PostgreSQL使用一種客戶端/服務器&#xff08;C/S&#xff09;模型。 和其他典型的客戶端/服務…

Linux驅動學習筆記(十)

熱插拔 1.熱插拔&#xff1a;就是帶電插拔&#xff0c;即允許用戶在不關閉系統&#xff0c;不切斷電源的情況下拆卸或安裝硬盤&#xff0c;板卡等設備。熱插拔是內核和用戶空間之間&#xff0c;通過調用用戶空間程序實現交互來實現的&#xff0c;當內核發生了某種熱拔插事件時…

大模型應用開發第五講:成熟度模型:從ChatGPT(L2)到未來自主Agent(L4)

大模型應用開發第五講&#xff1a;成熟度模型&#xff1a;從ChatGPT&#xff08;L2&#xff09;到未來自主Agent&#xff08;L4&#xff09; 資料取自《大模型應用開發&#xff1a;動手做AI Agent 》。 查看總目錄&#xff1a;學習大綱 關于DeepSeek本地部署指南可以看下我之…

Delphi 導入excel

Delphi導入Excel的常見方法可分為兩種主流方案&#xff1a;基于OLE自動化操作Excel原生接口和利用第三方組件庫。以下為具體實現流程及注意事項&#xff1a; ?一、OLE自動化方案&#xff08;推薦基礎場景&#xff09;? 該方法通過COM接口調用本地安裝的Excel程序&#xff0c…