Kafka的基本使用

目錄

認識Kafka

消息隊列

消息隊列的核心概念

核心價值與解決的問題

Kafka

ZooKeeper

Kafka的基本使用

環境安裝

啟動zookeeper

啟動Kafka

消息主題

創建主題

查詢主題

修改主題

發送數據

命令行操作

?JavaAPI操作

消費數據

?命令行操作

JavaAPI操作


認識Kafka

消息隊列

消息隊列是分布式系統和現代應用架構中至關重要的中間件。它的核心作用是解耦異步削峰填谷,像一個高效的“通信員”和“緩沖池”協調不同組件之間的工作。

消息隊列的核心概念

  1. 生產者:?產生消息(數據、任務請求、事件通知)并發送到隊列的應用程序或服務。

  2. 消息隊列:?一個臨時的、持久化的存儲區域(通常基于內存、磁盤或數據庫),用于存放生產者發送的消息。消息按照先進先出的順序存儲,但很多隊列支持優先級、延遲等特性。

  3. 消費者:?從隊列中獲取消息并進行處理的應用程序或服務。

  4. 消息:?隊列中傳輸的數據單元,通常包含有效載荷(實際數據)和元數據(如ID、時間戳、優先級等)。

核心價值與解決的問題

  1. 解耦:

    • 問題:?系統組件(服務)之間直接調用會導致緊密耦合。一個組件的變更、故障或性能瓶頸會直接影響其他依賴它的組件。擴展也變得困難。

    • 解決:?生產者只需將消息發送到隊列,無需知道誰(消費者)會處理它,消費者只需從隊列訂閱消息,無需知道消息是誰(生產者)發送的。雙方只依賴隊列,不直接依賴對方,大大降低了耦合度。系統更靈活、更易于維護和擴展。

  2. 異步:

    • 問題:?同步調用要求調用方(生產者)必須等待被調用方(消費者)處理完成并返回結果才能繼續執行。如果處理耗時很長,調用方會被阻塞,資源利用率低,用戶體驗差(如網頁卡頓)。

    • 解決:?生產者發送消息到隊列后即可返回,無需等待消費者處理。消費者在后臺異步地從隊列拉取消息進行處理。這顯著提高了系統的吞吐量和響應速度。

  3. 削峰填谷:

    • 問題:?系統流量往往存在高峰和低谷。高峰期如果請求量遠超消費者處理能力,會導致系統過載、崩潰或請求超時。低谷期資源又可能閑置。

    • 解決:?隊列作為緩沖區,在流量高峰時積壓請求,平滑地將大量請求暫存起來。消費者按照自己的穩定處理能力從隊列中拉取消息進行處理,避免了瞬間洪峰壓垮下游系統。在流量低谷時,消費者可以繼續處理隊列中積壓的消息。

  4. 冗余與可靠性:

    • 問題:?直接調用時,如果消費者臨時不可用(故障、重啟、維護),生產者的請求會丟失或失敗。

    • 解決:?消息隊列通常提供消息持久化功能(將消息寫入磁盤)。即使消費者暫時離線,消息也會安全存儲在隊列中,待消費者恢復后繼續處理,確保消息不丟失。許多隊列還提供確認機制(ACK),消費者處理成功后才會從隊列中移除消息。

  5. 可伸縮性:

    • 問題:?單一消費者處理能力有限,難以應對增長的業務量。

    • 解決:?可以很容易地增加消費者的數量(水平擴展),讓多個消費者并行地從同一個隊列中拉取消息進行處理,顯著提高系統的整體吞吐量。隊列本身也可以做成分布式集群來應對高吞吐量需求。

  6. 順序保證:

    • 問題:?在分布式環境中保證消息處理的嚴格順序很困難。

    • 解決:?雖然完全全局有序很難,但許多消息隊列能保證分區有序隊列有序(在單個隊列/分區內,消息按照發送順序被消費)。這對于某些需要保證因果關系的業務場景(如賬戶流水)非常重要。

  7. 緩沖:

    • 問題:?生產者和消費者的處理速度不一致。

    • 解決:?隊列天然提供了緩沖能力,允許生產者和消費者以各自不同的速率工作,不會互相拖累。

常見的消息隊列有RabbitMQ,Kafka,RocketMQ。這里主要介紹Kafka。?

Kafka

Kafka?通常指?Apache Kafka,這是一個開源的、分布式的、高吞吐量、低延遲的流處理平臺。它最初由 LinkedIn 開發,后來捐贈給了 Apache 軟件基金會,并迅速成為大數據和實時數據處理領域的核心基礎設施之一。

? Kafka 不僅僅是一個消息隊列,它是一個高吞吐、低延遲、分布式、持久化、可水平擴展的流數據平臺。它設計之初就是為了處理持續產生、體量巨大、需要實時處理的“數據流”

ZooKeeper是一個開源的分布式應用程序協調軟件,而Kafka是分布式事件處理平臺,底層是使用分布式架構設計,所以Kafka的多個節點之間是采用zookeeper來實現協調調度的。

ZooKeeper

ZooKeeper是一個開源的分布式應用程序協調軟件,而Kafka是分布式事件處理平臺,底層是使用分布式架構設計,所以Kafka的多個節點之間是采用zookeeper來實現協調調度的。

Zookeeper的核心作用

  1. ZooKeeper的數據存儲結構可以簡單地理解為一個Tree結構,而Tree結構上的每一個節點可以用于存儲數據,所以一般情況下,我們可以將分布式系統的元數據(環境信息以及系統配置信息)保存在ZooKeeper節點中。
  2. ZooKeeper創建數據節點時,會根據業務場景創建臨時節點或永久(持久)節點。永久節點就是無論客戶端是否連接上ZooKeeper都一直存在的節點,而臨時節點指的是客戶端連接時創建,斷開連接后刪除的節點。同時,ZooKeeper也提供了Watch(監控)機制用于監控節點的變化,然后通知對應的客戶端進行相應的變化。Kafka軟件中就內置了ZooKeeper的客戶端,用于進行ZooKeeper的連接和通信。

Kafka的基本使用

環境安裝

我們這里先安裝簡單的Windows單機環境。在安裝之前務必先安裝Java8。

下載Kafka:Kafka下載地址Apache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/downloads

選擇版本為2.13-3.8.0

下載完成后進行解壓,解壓目錄放在非系統盤根目錄下。為了訪問方便,可以將解壓后的文件夾名稱修改為Kafka

Kafka的文件目錄

bin

linux系統下可執行腳本文件

bin/windows

windows系統下可執行腳本文件

config

配置文件

libs

依賴類庫

licenses

許可信息

site-docs

文檔

logs

服務日志

啟動zookeeper

當前版本的Kafka軟件仍然依賴Zookeeper,所以啟動Kafka之前,需要先啟動Zookeeper,Kafka軟件內置了Zookeeper,所以無需額外安裝,直接調用啟動腳本即可。

?1.?進入Kafka解壓縮文件夾的config目錄,修改zookeeper.properties配置文件

修改dataDir配置,用于設置ZooKeeper數據存儲位置,該路徑如果不存在會自動創建。

dataDir=D:/kafka/data/zk

在kafka解壓縮后的目錄中創建Zookeeper啟動腳本文件:zk.cmd。

輸入:

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

上述指令就是調用zookeeper啟動命令,同時指定配置文件?

雙擊啟動即可:

?啟動完成。

啟動Kafka

進入Kafka解壓縮文件夾的config目錄,修改server.properties配置文件.

設置Kafka數據的存儲目錄。如果文件目錄不存在,會自動生成。

在kafka解壓縮后的目錄中創建Kafka啟動腳本文件:kfk.cmd。

輸入:

call bin/windows/kafka-server-start.bat config/server.properties

雙擊啟動即可:?

?

DOS窗口中,輸入jps指令,查看當前啟動的軟件進程:

    這里名稱為QuorumPeerMain的就是ZooKeeper軟件進程,名稱為Kafka的就是Kafka系統進程。此時,說明Kafka已經可以正常使用了。?

    消息主題

    ? 在發布訂閱模型中,為了讓消費者對感興趣的消息進行消費,而不是消費所有消息,所以就定義了主題(Topic),也就是說將不同的消息進行分類,分成不同的主題(Topic),然后消息生產者在生成消息時,就會向指定的主題(Topic)中發送,而消息消費者也可以訂閱自己感興趣的主題(Topic)并從中獲取消息。

    有很多種方式都可以操作Kafka消息中的主題(Topic):命令行、第三方工具、Java API、自動創建。而對于初學者來講,掌握基本的命令行操作是必要的。所以接下來,我們采用命令行進行操作。

    創建主題

    使用命令行方式創建主題test

    打開DOS窗口,在確保Zookeeper和Kafkfa啟動的情況下,進入Kafkfa解壓目錄下的bin/windows目錄。

    輸入如下命令創建主題test: kafka-topics.bat?--bootstrap-server localhost:9092 --create --topic test

    test主題創建完成。

    查詢主題

    輸入如下命令進行主題查詢:kafka-topics.bat?--bootstrap-server localhost:9092 --list

    修改主題

    kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

    上述命令將test主題的分區數量設置為2.關于分區的信息,后面會詳細介紹。

    發送數據

    命令行操作

    使用命令行方式發送:

    kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

    上述操作就是在控制臺生成數據,hello kafka 這里的數據需要回車,才會發送到Kafka服務器。

    ?JavaAPI操作

    引入依賴

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.8.0</version>
    </dependency>

    ?編寫生產者

    public class ProducerTest {public static void main(String[] args) {//  配置屬性集合Map<String, Object> configMap = new HashMap<>();//  配置屬性:Kafka服務器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//  配置屬性:Kafka生產的數據為KV對,所以在生產數據進行傳輸前需要分別對K,V進行對應的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//  創建Kafka生產者對象,建立Kafka連接//      構造對象時,需要傳遞配置參數KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);//  準備數據,定義泛型//      構造對象時需要傳遞 【Topic主題名稱】,【Key】,【Value】三個參數for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// 生產(發送)數據producer.send(record);}//  關閉生產者連接producer.close();}
    }

    消費數據

    ?命令行操作

    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

    JavaAPI操作

    public class ConsumerTest {public static void main(String[] args) {
    //         創建配置對象Map<String, Object> configMap = new HashMap<>();configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //         反序列化類配置configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //         組ID配置configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//  創建消費者對象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);//  從kafka主題中獲取對象 訂閱主題consumer.subscribe(Collections.singleton("test"));// 消費者從Kafka主題中拉取數據while (true) {ConsumerRecords<String, String> datas = consumer.poll(100);for (ConsumerRecord<String, String> data : datas) {System.out.println(data);}}//  關閉消費者對象// consumer.close();}
    }

    本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
    如若轉載,請注明出處:http://www.pswp.cn/diannao/92035.shtml
    繁體地址,請注明出處:http://hk.pswp.cn/diannao/92035.shtml
    英文地址,請注明出處:http://en.pswp.cn/diannao/92035.shtml

    如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

    相關文章

    Flink2.0學習筆記:Table API SQL

    stevensu1/EC0720 表 API 和 SQL# 表 API 和 SQL——用于統一流和批處理 加工。表 API 是適用于 Java、Scala 和 Python 的語言集成查詢 API&#xff0c;它 允許組合來自關系運算符的查詢&#xff0c;例如 selection、filter 和 join in 一種非常直觀的方式。Flink 的 SQL 支…

    【 SpringAI核心特性 | Prompt工程 】

    1. Prompt 工程 基本概念&#xff1a;Prompt ?工程又叫提示?詞工程&#xff0c;簡單來說&#xff0c;就是輸入?給 AI 的指令。 比如下面?這段內容&#xff0c;就是提示詞&#xff1a; 請問桂林電子科技大學是一個怎么樣的學校&#xff1f;1.1 Prompt分類 在 AI ?對話中…

    windows wsl2-06-docker hello world

    hello-world 例子 就像其他任何一門語言一樣&#xff0c;我們來體驗 docker 的 hello world $ docker run hello-world但是報錯 :~$ docker run hello-world Unable to find image hello-world:latest locally docker: Error response from daemon: Get "https://registry…

    Python知識點4-嵌套循環break和continue使用死循環

    一、循環【重點掌握】 1.嵌套循環類似于嵌套if語句 語法&#xff1a; while 表達式1&#xff1a;while 表達式2&#xff1a;語句# 1. # 循環5次&#xff0c;打印0~4 m 0 while m < 5:print(m)m 1 # 循環3次&#xff0c;打印0~2 n 0 while n < 3:print(n)n 1print(&qu…

    將HTML+JS+CSS數獨游戲包裝為安卓App

    HTMLJSCSS制作一個數獨游戲-CSDN博客 中開發了一個數獨游戲&#xff0c;這個數獨游戲提供了一次性回退到指定步驟的輔助功能&#xff0c;在解決復雜數獨問題時十分有幫助&#xff0c;可作為玩數獨游戲的輔助工具&#xff0c;因此&#xff0c;考慮將它改裝成安卓App安裝在手機上…

    編程語言Java入門——核心技術篇(一)封裝、繼承和多態

    同專欄基礎知識篇寫在這里&#xff0c;有興趣的可以去看看&#xff1a; 編程語言Java入門——基礎知識篇&#xff08;一&#xff09;-CSDN博客 編程語言Java入門——基礎知識篇&#xff08;二&#xff09;-CSDN博客 編程語言Java入門——基礎知識篇&#xff08;三&#xff0…

    【39】MFC入門到精通——C++ /MFC操作文件行(讀取,刪除,修改指定行)

    文章目錄1 通過關鍵詞&#xff0c;讀取某一行 &#xff08;3種方法&#xff09;2 刪除 指定行3 修改 指定行1 通過關鍵詞&#xff0c;讀取某一行 &#xff08;3種方法&#xff09; 通過定位關鍵詞&#xff0c;讀取某一行信息,返回CString //通過定位關鍵詞&#xff0c;讀取某…

    5 種可行的方法:如何將 Redmi 聯系人備份到 Mac

    將 Redmi 聯系人備份到 Mac 是防止因手機損壞、丟失或更換設備而導致數據丟失的重要措施。雖然云服務提供了便利性&#xff0c;但擁有離線備份可以提供額外的安全性&#xff0c;而無需完全依賴互聯網。如果您想知道如何將 Redmi 聯系人備份到 Mac&#xff0c;本文將為您介紹 5 …

    LeRobot 具身智能機械臂 SO-ARM100 從搭建到訓練全流程

    今天給大家分享一下 LeRobot 具身智能機械臂 SO-ARM100 的完整使用流程&#xff0c;包括設備組裝、環境配置、遠程控制、數據錄制到模型訓練的全過程。適合剛入門具身智能的小伙伴參考學習。 一、前期準備與資源獲取 在開始之前&#xff0c;我們需要準備好相關的資源和工具&a…

    LINUX720 SWAP擴容;新增邏輯卷;邏輯卷擴容;數據庫遷移;gdisk

    SWAP空間擴展 方法一 增加硬盤或分區擴展 swap -s mkswap /dev/sdd6 blkid /dev/sdd6 swapon /dev/sdd6 swapon -s vim /etc/fstab /dev/sdd6 swap swap defaults 0 0 開機自動擴容 swap -s [rootweb ~]# lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT sd…

    Python 進程間通信:TCP安全加密數據傳輸

    最近在寫安全方面的程序&#xff0c;有需求&#xff0c;就做了這些TCP加密數據傳輸類。 utils.safeUtils的內容詳見&#xff1a; SafeObj&#xff1a;Python 高安全性加密數據容器類-CSDN博客SafeKey&#xff1a;Python 高安全性加密密碼容器類-CSDN博客 如有任何問題或漏洞歡迎…

    Windows批量修改文件屬性方法

    標題使用icacls命令&#xff08;推薦批量操作&#xff09;打開管理員權限的命令提示符&#xff08;CMD&#xff09;執行以下命令&#xff1a;cmd icacls "文件夾路徑" /grant 用戶名:(OI)(CI)F /T /C 參數說明&#xff1a;(OI)&#xff1a;對象繼承 - 適用于文件夾(C…

    Entity Component System架構

    ECS架構 1 簡介 在當今快速發展的軟件開發領域&#xff0c;游戲開發、實時模擬等場景對系統的性能、靈活性和可擴展性提出了極高的要求。傳統的面向對象架構在面對復雜且動態變化的實體時&#xff0c;往往會出現代碼耦合度高、擴展性差等問題。? ECS&#xff08;Entity - Com…

    .vscode 擴展配置

    一、vue快捷鍵配置 在項目.vscode下新建vue3.0.code-snippets 每當輸入vue3.0后自動生成代碼片段 {"Vue3.0快速生成模板": {"scope": "vue","prefix": "Vue3.0","body": ["<template>"," &…

    一個基于阿里云的C端Java服務的整體項目架構

    1.背景介紹 總結一下工作使用到的基于通常的公有云的項目整體架構&#xff0c;如何基于公有云建設安全可靠的服務&#xff0c;以阿里云為例的整體架構&#xff1b;1. 全局流量治理層&#xff08;用戶請求入口&#xff09;1.1 域名與 DNS 解析域名注冊與備案&#xff1a;通過阿里…

    《剝開洋蔥看中間件:Node.js請求處理效率與錯誤控制的深層邏輯》

    在Node.js的運行時環境中&#xff0c;中間件如同一系列精密咬合的齒輪&#xff0c;驅動著請求從進入到響應的完整旅程&#xff0c;而洋蔥模型則是這組齒輪的傳動系統。它以一種看似矛盾的方式融合了順序與逆序、分離與協作——讓每個處理環節既能獨立工作&#xff0c;又能感知全…

    GaussDB union 的用法

    1 union 的作用union 運算符用于組合兩個或更多 select 語句的結果集。2 union 使用前提union 中的每個 select 語句必須具有相同的列數這些列也必須具有相似的數據類型每個 select 語句中的列也必須以相同的順序排列3 union 語法select column_name(s) from table1 union sele…

    構建足球實時比分APP:REST API與WebSocket接入方案詳解

    在開發足球實時比分應用時&#xff0c;數據接入方式的選擇直接影響用戶體驗和系統性能。本文將客觀分析REST API和WebSocket兩種主流接入方案的技術特點、適用場景和實現策略&#xff0c;幫助開發者做出合理選擇。一、REST API&#xff1a;靈活的數據獲取方案核心優勢標準化接口…

    Linux文件系統三要素:塊劃分、分區管理與inode結構解析

    理解文件系統 我們知道文件可以分為磁盤文件和內存文件&#xff0c;內存文件前面我們已經談過了&#xff0c;下面我們來談談磁盤文件。 目錄 一、引入"塊"概念 解析 stat demo.c 命令輸出 基本信息 設備信息 索引節點信息 權限信息 時間戳 二、引入"分區…

    基于paddleDetect的半監督目標檢測實戰

    基于paddleDetect的半監督目標檢測實戰前言相關介紹前提條件實驗環境安裝環境項目地址使用paddleDetect的半監督方法訓練自己的數據集準備數據分割數據集配置參數文件PaddleDetection-2.7.0/configs/semi_det/denseteacher/denseteacher_ppyoloe_plus_crn_l_coco_semi010.ymlPa…