【消息隊列kafka_中間件】一、快速入門分布式消息隊列

????????在當今大數據和分布式系統盛行的時代,消息隊列作為一種關鍵的中間件技術,發揮著舉足輕重的作用。其中,Apache Kafka 以其卓越的性能、高可擴展性和強大的功能,成為眾多企業構建分布式應用的首選消息隊列解決方案。本篇文章將帶你深入了解 Kafka 的基礎概念、架構原理、核心組件,并通過實際代碼示例,讓你快速上手 Kafka,揭開分布式消息隊列的神秘面紗。

一、Kafka 簡介與背景?

Kafka 最初是由 LinkedIn 公司開發,用于處理公司內部大規模的實時數據流。隨著其開源并在社區的不斷發展壯大,Kafka 已成為一款廣泛應用于大數據處理、實時流計算、日志收集與處理、系統解耦等眾多領域的分布式消息隊列系統。?

與傳統消息隊列相比,Kafka 具有顯著的優勢。它能夠支持超高的吞吐量,每秒可以處理數十萬甚至數百萬條消息,這使得它在應對海量數據傳輸時表現出色。同時,Kafka 具備低延遲的特性,消息的生產和消費延遲可以控制在毫秒級,滿足了許多對實時性要求極高的應用場景。此外,Kafka 的分布式架構設計使其具有強大的可擴展性,能夠輕松應對不斷增長的數據處理需求。

二、關鍵概念剖析?

2.1 生產者(Producer)?

生產者是 Kafka 系統中負責發送消息的組件。在實際應用中,生產者通常是由業務系統中的某個模塊或服務擔當,它將業務數據封裝成 Kafka 能夠識別的消息格式,并發送到指定的主題(Topic)中。?

以 Java 語言為例,以下是一個簡單的 Kafka 生產者代碼示例:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 設置生產者屬性Properties props = new Properties();// Kafka集群地址,格式為host1:port1,host2:port2,...props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 鍵的序列化方式,這里使用字符串序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 值的序列化方式,同樣使用字符串序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 創建生產者實例KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 要發送的消息內容String messageKey = "key1";String messageValue = "Hello, Kafka!";// 創建消息對象,指定主題和鍵值對ProducerRecord<String, String> record = new ProducerRecord<>("test - topic", messageKey, messageValue);try {// 發送消息并獲取響應RecordMetadata metadata = producer.send(record).get();System.out.println("Message sent successfully to partition " + metadata.partition() +" with offset " + metadata.offset());} catch (Exception e) {e.printStackTrace();} finally {// 關閉生產者,釋放資源producer.close();}}
}

2.2 消費者(Consumer)?

消費者負責從 Kafka 主題中讀取消息并進行處理。Kafka 的消費者是以消費者組(Consumer Group)的形式存在的,同一消費者組內的消費者共同消費主題中的消息,通過負載均衡的方式提高消息處理的效率。不同消費者組之間相互獨立,每個消費者組都可以完整地消費主題中的所有消息。?

以下是一個 Java 語言的 Kafka 消費者代碼示例:

import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 設置消費者屬性Properties props = new Properties();// Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 消費者組ID,同一組內的消費者共享消費偏移量props.put(ConsumerConfig.GROUP_ID_CONFIG, "test - group");// 鍵的反序列化方式,這里使用字符串反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 值的反序列化方式,同樣使用字符串反序列化props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 自動提交消費偏移量,默認true,建議設置為false,手動管理偏移量以確保數據不丟失props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 創建消費者實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱主題consumer.subscribe(Arrays.asList("test - topic"));try {while (true) {// 拉取消息,設置拉取超時時間為100毫秒ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " +"topic = " + record.topic() +", partition = " + record.partition() +", offset = " + record.offset() +", key = " + record.key() +", value = " + record.value());}// 手動提交消費偏移量consumer.commitSync();}} catch (Exception e) {e.printStackTrace();} finally {// 關閉消費者,釋放資源consumer.close();}}
}

2.3 主題(Topic)?

主題是 Kafka 中消息分類存儲的邏輯概念,類似于數據庫中的表。每個主題可以包含多個分區(Partition),生產者發送的消息會被存儲到指定的主題中,消費者則通過訂閱主題來獲取消息。在實際應用中,通常會根據不同的業務類型或數據類型創建不同的主題。例如,在一個電商系統中,可以創建 “order - topic” 用于存儲訂單相關的消息,“user - behavior - topic” 用于存儲用戶行為數據相關的消息等。?

2.4 分區(Partition)?

分區是 Kafka 實現高吞吐量和可擴展性的關鍵機制。每個主題可以被劃分為多個分區,這些分區分布在 Kafka 集群的不同 Broker 節點上。當生產者發送消息時,Kafka 會根據一定的策略將消息分配到主題的某個分區中。常見的分區策略有按消息鍵的 Hash 值分配(如果消息帶有鍵)和輪詢分配(如果消息沒有鍵)。?

分區的好處主要有以下幾點:首先,通過將數據分散存儲在多個分區上,可以提高數據存儲和讀取的并行度,從而提升整體的吞吐量。例如,在一個擁有多個 Broker 節點的集群中,每個 Broker 可以同時處理不同分區的讀寫請求,大大加快了數據處理速度。其次,分區還可以實現數據的冗余備份。Kafka 會為每個分區創建多個副本,其中一個副本作為領導者(Leader)副本,負責處理讀寫請求,其他副本作為跟隨者(Follower)副本,從領導者副本同步數據。當領導者副本所在的 Broker 節點發生故障時,Kafka 會自動從跟隨者副本中選舉出一個新的領導者副本,確保數據的可用性和一致性。

三、Kafka 集群架構?

Kafka 集群由多個 Broker 節點組成,每個 Broker 節點實際上就是一個 Kafka 服務器進程。這些 Broker 節點共同協作,實現了 Kafka 的分布式存儲和消息處理功能。?

在 Kafka 集群中,Zookeeper 扮演著至關重要的角色。Zookeeper 是一個分布式協調服務,它負責管理 Kafka 集群的元數據信息,包括 Broker 節點的注冊與發現、主題與分區的元數據管理、分區領導者副本的選舉等。具體來說,當一個新的 Broker 節點加入集群時,它會向 Zookeeper 注冊自己的信息,Zookeeper 會將這些信息同步給其他 Broker 節點,使得整個集群能夠感知到新節點的加入。在主題與分區管理方面,Zookeeper 存儲了每個主題的分區信息,包括分區的數量、每個分區的領導者副本和跟隨者副本所在的 Broker 節點等。當某個分區的領導者副本出現故障時,Zookeeper 會觸發領導者選舉過程,從跟隨者副本中選舉出一個新的領導者副本,確保分區的正常工作。

如上圖所示,Kafka 集群中的多個 Broker 節點通過 Zookeeper 進行協調和管理。生產者和消費者通過與 Broker 節點進行通信來發送和接收消息,而 Zookeeper 則在幕后負責維護集群的一致性和穩定性。

四、安裝與環境搭建?

4.1 下載 Kafka?

首先,從 Apache Kafka 官方網站(Apache Kafka)下載 Kafka 的安裝包。目前 Kafka 的最新版本可以在官網上找到,選擇適合自己操作系統的安裝包進行下載。例如,對于 Linux 系統,可以下載.tgz格式的壓縮包。?

4.2 解壓安裝包?

下載完成后,使用解壓命令將安裝包解壓到指定目錄。假設將安裝包下載到了/downloads目錄下,解壓命令如下:

tar -xzf kafka_2.13 - 3.3.1.tgz -C /usr/local/

上述命令將 Kafka 安裝包解壓到了/usr/local/目錄下,解壓后的目錄名稱為kafka_2.13 - 3.3.1,其中2.13是 Scala 的版本號,3.3.1是 Kafka 的版本號。?

4.3 配置環境變量?

為了方便在命令行中使用 Kafka 的命令工具,需要將 Kafka 的bin目錄添加到系統的環境變量中。在 Linux 系統中,可以編輯~/.bashrc文件,在文件末尾添加以下行:

export PATH=$PATH:/usr/local/kafka_2.13 - 3.3.1/bin

然后執行以下命令使環境變量生效:

source ~/.bashrc

4.4 配置 Kafka?

Kafka 的主要配置文件位于其安裝目錄下的config文件夾中,其中最重要的配置文件是server.properties。在這個文件中,可以配置 Kafka 的各種參數,如 Kafka 監聽的端口、日志存儲路徑、連接 Zookeeper 的地址等。?

以下是一些常見的配置參數及說明:

# Kafka監聽的端口,默認9092
listeners=PLAINTEXT://localhost:9092
# 日志存儲路徑,可以配置多個路徑,用逗號分隔
log.dirs=/tmp/kafka - logs
# 連接Zookeeper的地址,格式為host1:port1,host2:port2,...
zookeeper.connect=localhost:2181
# 每個分區的副本因子,即每個分區有多少個副本,建議設置為大于1的奇數,以確保容錯
num.partitions=1
replica.fetch.max.bytes=1048576

4.5 啟動 Zookeeper 與 Kafka?

在完成 Kafka 配置后,需要先啟動 Zookeeper,因為 Kafka 依賴 Zookeeper 進行集群管理。在 Kafka 安裝目錄下,執行以下命令啟動 Zookeeper:

bin/zookeeper - server - start.sh config/zookeeper.properties

上述命令會使用config/zookeeper.properties配置文件啟動 Zookeeper 服務。啟動成功后,終端會輸出一些啟動日志信息,顯示 Zookeeper 已正常運行并監聽在指定端口(默認為 2181)。?

接著,啟動 Kafka 服務,執行命令:

bin/kafka - server - start.sh config/server.properties

此命令通過config/server.properties配置文件啟動 Kafka 服務器進程。啟動過程中,日志會顯示 Kafka 加載配置、注冊到 Zookeeper 等信息。當看到類似 “[KafkaServer id=0] started” 的日志時,表明 Kafka 已成功啟動。

4.6 使用 Kafka 命令行工具驗證安裝?

Kafka 提供了豐富的命令行工具,方便我們進行各種操作與驗證。例如,使用以下命令創建一個新的主題:

bin/kafka - topics.sh --create --bootstrap - servers localhost:9092 --replication - factor 1 --partitions 1 --topic new - topic

參數說明:?

  • --create:表示執行創建主題操作。?
  • --bootstrap - servers localhost:9092:指定 Kafka 集群地址,這里是本地的 9092 端口。?
  • --replication - factor 1:設置主題的副本因子為 1,即每個分區只有一個副本。在生產環境中,為了數據冗余與容錯,通常設置為大于 1 的奇數,如 3 或 5。?
  • --partitions 1:指定主題的分區數量為 1。根據業務需求可調整,若業務數據量較大且對并行處理要求高,可設置多個分區。?
  • --topic new - topic:指定要創建的主題名稱為new - topic。

創建成功后,可使用以下命令查看當前 Kafka 集群中的所有主題:

bin/kafka - topics.sh --list --bootstrap - servers localhost:9092

該命令會列出所有已創建的主題名稱,若能看到剛剛創建的new - topic,則說明主題創建成功。?

還可以使用生產者和消費者命令行工具進行消息的發送與接收測試。首先,啟動一個生產者終端,執行命令:

bin/kafka - console - producer.sh --bootstrap - servers localhost:9092 --topic new - topic

啟動后,終端會等待輸入消息。此時,輸入任意消息內容并回車,消息就會被發送到new - topic主題中。?

然后,在另一個終端啟動消費者,執行命令:

bin/kafka - console - consumer.sh --bootstrap - servers localhost:9092 --topic new - topic --from - beginning

--from - beginning參數表示從主題的起始位置開始消費消息。

啟動消費者后,就能看到之前生產者發送的消息,這表明 Kafka 的基本功能正常,安裝與環境搭建成功。

通過這些命令行工具的操作,不僅驗證了安裝,也進一步熟悉了 Kafka 的基本使用方式。你將發現其在分布式消息處理領域的強大功能,無論是構建大規模數據處理系統,還是實現復雜業務系統的解耦與異步通信,Kafka 都能成為有力的技術支撐。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/75628.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/75628.shtml
英文地址,請注明出處:http://en.pswp.cn/web/75628.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在線地圖支持天地圖和騰訊地圖,儀表板和數據大屏支持發布功能,DataEase開源BI工具v2.10.7 LTS版本發布

2025年4月11日&#xff0c;人人可用的開源BI工具DataEase正式發布v2.10.7 LTS版本。 這一版本的功能變動包括&#xff1a;數據源方面&#xff0c;Oracle數據源支持獲取和查詢物化視圖&#xff1b;圖表方面&#xff0c;在線地圖支持天地圖、騰訊地圖&#xff1b;新增子彈圖&…

【Linux實踐系列】:匿名管道收尾+完善shell外殼程序

&#x1f525; 本文專欄&#xff1a;Linux Linux實踐項目 &#x1f338;作者主頁&#xff1a;努力努力再努力wz &#x1f4aa; 今日博客勵志語錄&#xff1a; 人生總會有自己能力所不及的范圍&#xff0c;但是如果你在你能力所及的范圍盡了全部的努力&#xff0c;那你還有什么遺…

【C++初學】課后作業匯總復習(七) 指針-深淺copy

1、 HugeInt類:構造、、cout Description: 32位整數的計算機可以表示整數的范圍近似為&#xff0d;20億到&#xff0b;20億。在這個范圍內操作一般不會出現問題&#xff0c;但是有的應用程序可能需要使用超出上述范圍的整數。C可以滿足這個需求&#xff0c;創建功能強大的新的…

【C++】 —— 筆試刷題day_16

刷題_day16&#xff0c;繼續加油啊 一、字符串替換 題目解析 這道題是一道簡單的字符題目&#xff0c;題目給我們一個字符串A&#xff0c;和n表示A字符串的長度&#xff0c;再給出一個字符數組arg&#xff0c;m表示arg中是數據個數。 然我們在字符串A中找到%s然后替換成arg中的…

n8n 本地部署及實踐應用,實現零成本自動化運營 Telegram 頻道(保證好使)

n8n 本地部署及實踐應用&#xff0c;實現零成本自動化運營 Telegram 頻道&#xff08;保證好使&#xff09; 簡介 n8n 介紹 一、高度可定制性 二、豐富的連接器生態 三、自托管部署&#xff08;本地部署&#xff09; 四、社區驅動 n8n 的部署 一、前期準備 二、部署步…

flutter 桌面應用之系統托盤

系統托盤(Tray) 系統托盤就是狀態欄里面對應的圖標點擊菜單 主要有兩款框架 框架一句話評價tray_manager輕量、簡單、易用&#xff0c;適合常規托盤功能system_tray更底層、更強大、支持圖標/菜單/消息彈窗等更多功能&#xff0c;但復雜度更高 &#x1f9f1; 基礎能力對比 …

修改idea/android studio等編輯器快捷注釋從當前行開頭的反人類行為

不知道什么時候開始&#xff0c;idea編輯的快捷注釋開始從當前行開頭出現了&#xff0c;顯得實在是難受&#xff0c;我只想讓在當前行代碼的部份開始縮進兩個字符開始&#xff0c;這樣才會顯得更舒服。不知道有沒有強迫癥的猴子和我一樣&#xff0c;就像下面的效果&#xff1a;…

MySQL慢查詢全攻略:定位、分析與優化實戰

&#x1f680; MySQL慢查詢全攻略&#xff1a;定位、分析與優化實戰 #數據庫優化 #性能調優 #SQL優化 #MySQL實戰 一、慢查詢定位&#xff1a;找到性能瓶頸 1.1 開啟慢查詢日志 -- 查看當前配置 SHOW VARIABLES LIKE %slow_query%; -- 動態開啟&#xff08;重啟失效&…

當原型圖與文字說明完全不同時,測試要怎么做?

當測試遇上左右手互搏的需求&#xff0c;怎么辦&#xff1f; "這個彈窗樣式怎么和文檔寫的不一樣&#xff1f;"、"按鈕位置怎么跑到左邊去了&#xff1f;"——根據Deloitte的調查&#xff0c;62%的項目存在原型圖與需求文檔不一致的情況。這種"精神分…

關于量化交易在拉盤砸盤方面應用的部分思考

關于“砸盤”的深層解析與操盤邏輯 ??一、砸盤的本質與市場含義?? ??砸盤??指通過集中拋售大量籌碼導致價格快速下跌的行為&#xff0c;其核心目標是??制造恐慌、清洗浮籌或實現利益再分配??。不同場景下的砸盤含義不同&#xff1a; ??主動砸盤&#xff08;操控…

【項目管理】第12章 項目質量管理-- 知識點整理

項目管理-相關文檔,希望互相學習,共同進步 風123456789~-CSDN博客 (一)知識總覽 項目管理知識域 知識點: (項目管理概論、立項管理、十大知識域、配置與變更管理、績效域) 對應:第6章-第19章 第6章 項目管理概論 4分第13章 項目資源管理 3-4分第7章 項目…

一個好看的圖集展示html頁面源碼

源碼介紹 一個好看的圖集展示html頁面源碼&#xff0c;適合展示自己的作品&#xff0c;頁面美觀大氣&#xff0c;也可以作為產品展示或者個人引導頁等等 源碼由HTMLCSSJS組成&#xff0c;記事本打開源碼文件可以進行內容文字之類的修改&#xff0c; 雙擊html文件可以本地運行…

2021第十二屆藍橋杯大賽軟件賽省賽C/C++ 大學 B 組

記錄刷題的過程、感悟、題解。 希望能幫到&#xff0c;那些與我一同前行的&#xff0c;來自遠方的朋友&#x1f609; 大綱&#xff1a; 1、空間-&#xff08;題解&#xff09;-字節單位轉換 2、卡片-&#xff08;題解&#xff09;-可以不用當組合來寫&#xff0c;思維題 3、直…

LabVIEW 中 JSON 數據與簇的轉換

在 LabVIEW 編程中&#xff0c;數據格式的處理與轉換是極為關鍵的環節。其中&#xff0c;將數據在 JSON 格式與 LabVIEW 的簇結構之間進行轉換是一項常見且重要的操作。這里展示的程序片段就涉及到這一關鍵功能&#xff0c;以下將詳細介紹。 一、JSON 數據與簇的轉換功能 &am…

藍橋杯大模板

init.c void System_Init() {P0 0x00; //關閉蜂鳴器和繼電器P2 P2 & 0x1f | 0xa0;P2 & 0x1f;P0 0x00; //關閉LEDP2 P2 & 0x1f | 0x80;P2 & 0x1f; } led.c #include <LED.H>idata unsigned char temp_1 0x00; idata unsigned char temp_old…

通過HTTP協議實現Git免密操作的解決方案

工作中會遇到這樣的問題的。 通過HTTP協議實現Git免密操作的解決方案 方法一&#xff1a;啟用全局憑據存儲&#xff08;推薦&#xff09; 配置憑證存儲? 執行以下命令&#xff0c;讓Git永久保存賬號密碼&#xff08;首次操作后生效&#xff09;&#xff1a; git config --g…

Java常見面試問題

一.Liunx 二.Java基礎 1.final 2.static 3.與equals 三.Collection 1.LIst 2.Map 3.Stream 四、多線程 1.實現方法 2.線程池核心參數 3.應用場景 五、JVM 1.堆 2.棧 六、Spring 1.面向對象 2.IOC 3.AOP 七、Springboot 1.自動裝配 八、SpringCloud 1.Nacos 2.seata 3.ga…

【藍橋杯】第十六屆藍橋杯 JAVA B組記錄

試題 A: 逃離高塔 很簡單&#xff0c;簽到題&#xff0c;但是需要注意精度&#xff0c;用int會有溢出風險 答案&#xff1a;202 package lanqiao.t1;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWrit…

PyTorch Tensor維度變換實戰:view/squeeze/expand/repeat全解析

本文從圖像數據處理、模型輸入適配等實際場景出發&#xff0c;系統講解PyTorch中view、squeeze、expand和repeat四大維度變換方法。通過代碼演示對比不同方法的適用性&#xff0c;助您掌握數據維度調整的核心技巧。 一、基礎維度操作方法 1. view&#xff1a;內存連續的形狀重…

Kubernetes nodeName Manual Scheduling practice (K8S節點名稱綁定以及手工調度)

Manual Scheduling 在 Kubernetes 中&#xff0c;手動調度框架允許您將 Pod 分配到特定節點&#xff0c;而無需依賴默認調度器。這對于測試、調試或處理特定工作負載非常有用。您可以通過在 Pod 的規范中設置 nodeName 字段來實現手動調度。以下是一個示例&#xff1a; apiVe…