【Kafka基礎】消費者命令行完全指南:從基礎到高級消費

Kafka消費者是消息系統的關鍵組成部分,掌握/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh工具的使用對于調試、測試和監控都至關重要。本文將全面介紹該工具的各種用法,幫助您高效地從Kafka消費消息。

1 基礎消費模式

1.1 從最新位置消費

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic
參數解析
  • --bootstrap-server: 指定Kafka集群地址
  • --topic: 指定消費的主題名稱
特點
  • 從該消費者組最后提交的offset開始消費
  • 如果沒有提交記錄,則從最新消息開始

1.2 從最早位置消費

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--from-beginning
關鍵參數
  • --from-beginning: 從主題最早的消息開始消費
應用場景
  • 數據回溯
  • 新消費者組初始化

2 消息元數據展示

2.1 顯示消息Key

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property key.separator=":"
參數說明
  • print.key=true: 顯示消息key
  • key.separator: 指定key/value分隔符

2.2 顯示完整元數據

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property print.value=true \--property print.partition=true \--property print.offset=true \--property print.timestamp=true \--property key.separator=":" \--property line.separator="\n"
元數據參數
  • print.partition: 顯示分區號
  • print.offset: 顯示消息offset
  • print.timestamp: 顯示時間戳

3 精準消費控制

3.1 指定分區消費

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 1
參數說明
  • --partition: 指定消費的分區編號

3.2 指定Offset消費

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 0 \--offset 1000
參數說明
  • --offset: 指定開始消費的offset位置

3.3 消費超時設置

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--timeout-ms 10000
參數說明
  • --timeout-ms: 設置無消息時的超時時間(毫秒)

4 消費者組管理

4.1 使用消費者組

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup
參數說明
  • --group: 指定消費者組名稱
特點
  • 支持offset自動提交
  • 支持消費者組rebalance

4.2 手動控制offset提交

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup \--property enable.auto.commit=false
參數說明
  • enable.auto.commit=false: 關閉自動提交

5 高級消費配置

5.1 限制消費消息數

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--max-messages 100
參數說明
  • --max-messages: 最大消費消息數

5.2 過濾消費

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property filter.key.regex="test.*"
參數說明
  • filter.key.regex: 按key正則過濾

5.3 消費速率控制

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property fetch.min.bytes=1024 \--property fetch.max.wait.ms=500
參數說明
  • fetch.min.bytes: 最小拉取字節數
  • fetch.max.wait.ms: 最大等待時間

6 常用命令擴展

6.1 消費多個主題

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--whitelist "topic1|topic2"

6.2 顯示消息頭信息

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.headers=true

6.3 消費特定時間后的消息

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property offsets.storage=kafka \--property timestamp=1625097600000

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/77272.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/77272.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/77272.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CausalML 基于機器學習算法的因果推理方法

CausalML 是一個 Python 包,它使用基于最新研究的機器學習算法提供一套提升建模和因果推理方法。它提供了一個標準界面,允許用戶從實驗或觀察數據中估計條件平均處理效應 (CATE),也稱為個體治療效應 (ITE&a…

解鎖深度學習激活函數

在深度學習的廣袤天地里,激活函數宛如隱匿于神經網絡架構中的神奇密碼,掌控著模型學習與表達的關鍵力量。今天,就讓我們一同深入探究這些激活函數的奇妙世界,揭開它們神秘的面紗。 一、激活函數為何不可或缺? 想象一…

從零到有的游戲開發(visual studio 2022 + easyx.h)

引言 本文章適用于C語言初學者掌握基本的游戲開發, 我將用詳細的步驟引領大家如何開發屬于自己的游戲。 作者溫馨提示:不要認為開發游戲很難,一些基本的游戲邏輯其實很簡單, 關于游戲的開發環境也不用擔心,我會詳細…

大數據專業學習路線

大數據專業學習路線 目錄 基礎知識核心技術進階技能實戰項目職業發展學習資源學習計劃常見問題 1. 基礎知識 1.1 編程語言 Python:大數據分析的基礎語言 基礎語法和數據類型函數和模塊面向對象編程文件操作和異常處理常用庫:NumPy, Pandas, Matplot…

flink部署使用(flink-connector-jdbc)連接達夢數據庫并寫入讀取數據

flink介紹 1)Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink 被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。 2)在實時計算或離線任務中,往往需要…

用swift playground寫個ios應用和大模型或者網站交互

import SwiftUIstruct ContentView: View {State private var textFieldText: String ""State private var outputText: String "輸出將會顯示在這里"private let tip:String "消息已發送,請等待"State private var history:[Stri…

springboot+vue2集成JWT token實現權限驗證

前端項目搭建參考: Vue項目的搭建和啟動_vue項目啟動 csdn-CSDN博客 Vue ElementUI 登錄頁面_vue用戶登錄頁面-CSDN博客 跨域問題前端解決-CSDN博客 實現思路: 1. 實現的目的:為了保護網站安全信息,使用jwt進行權限驗證&#xf…

Cursor編程-從入門到精通__0409

早期的Github Copilot 最近更新了,支持Agent編程,字節跳動Trae使用(免費),但成熟程度不如Cursor,Cursor前50次免費 Copilot VS Cursor*** 1,Cursor VSCode 二次開發,IDE級別 2&…

MyBatis 詳解及代碼示例

MyBatis 是一個 半自動 ORM 框架,主要用于 Java 與數據庫之間的持久化操作,它本質是對 JDBC 的封裝 全名:MyBatis(前身 iBATIS)核心作用:自動將 SQL 執行結果映射為 Java 對象;也可以將 Java 對…

1.6-抓包技術(Burp Suite\Yakit抓包\Web、APP、小程序)

1.6-抓包技術(Burp Suite\Yakit抓包\Web、APP、小程序) 如果要使用抓包軟件,基本上第一步都是要安裝證書的。原因如下: 客戶端(瀏覽器或應用)會檢測到證書不受信任,并彈出 證書錯誤&#xff0…

Java 大視界 -- 基于 Java 的大數據隱私保護在金融客戶信息管理中的實踐與挑戰(178)

💖親愛的朋友們,熱烈歡迎來到 青云交的博客!能與諸位在此相逢,我倍感榮幸。在這飛速更迭的時代,我們都渴望一方心靈凈土,而 我的博客 正是這樣溫暖的所在。這里為你呈上趣味與實用兼具的知識,也…

第十屆 藍橋杯 嵌入式 省賽

一、分析 這屆的真題,有點像第七屆的液位檢測。 這屆的題目開始,貌似比賽描述的功能,邏輯上變得更好梳理了。一開始就把大致的功能給你說明一遍,不像之前都是一塊一塊的說明。 1. 基本功能 1)測量競賽板上電位器 R…

實現usb的MTP功能

前言:最終結果根據用戶自主選擇可實現host和device功能的切換。 效果展示: 當插入usb時設備會彈窗 當用戶選擇設備模式時pc端就會出現mtp設備盤符 實現mtp設備 ubuntu架構根文件系統通過uMTP-Responder實現usb的MTP功能 添加服務 /home/flynn/firfly_rootfs/lib/system…

React-05React中props屬性(傳遞數據),propTypes校驗,類式與函數式組件props的使用

1.類式組件props基本數據讀取與解構運算符傳遞 <script type"text/babel">// 創建組件class PersonalInfo extends React.Component {render() {// 讀取props屬性 并讀取值console.log(props,this.props);return(<ul><li>姓名&#xff1a;{this.p…

PCI認證 密鑰注入 ECC算法工具 NID_secp521r1 國密算法 openssl 全套證書生成,從證書提取公私鑰數組 x,y等

步驟 1.全套證書已經生成。OK 2.找國芯要ECC加密解密簽名驗簽代碼。給的邏輯說明沒有示例代碼很難的上。 3.集成到工具 與SP聯調。 1.用openssl全套證書生成及驗證 注意&#xff1a;這里CA 簽發 KLD 證書用的是SHA256。因為芯片只支持SHA256算法,不支持SHA512。改成統一。…

藍橋杯每日刷題c++

目錄 P9240 [藍橋杯 2023 省 B] 冶煉金屬 - 洛谷 (luogu.com.cn) P8748 [藍橋杯 2021 省 B] 時間顯示 - 洛谷 (luogu.com.cn) P10900 [藍橋杯 2024 省 C] 數字詩意 - 洛谷 (luogu.com.cn) P10424 [藍橋杯 2024 省 B] 好數 - 洛谷 (luogu.com.cn) P8754 [藍橋杯 2021 省 AB2…

oracle 數據庫字段類型為NUMBER(5,2)時,并且數據庫值為0.1,為什么Java執行SQL查出來時為“.1“?

在 Oracle 數據庫中&#xff0c;當字段類型為 NUMBER(5,2) 且存儲的值為 0.1 時&#xff0c;Java 程序查詢結果可能顯示為 ".1"&#xff08;省略前導零&#xff09;&#xff0c;這是由 Oracle JDBC 驅動默認的數字格式化行為 導致的。以下是原因分析和解決方案&#…

3月AI論文精選十篇

1. Feature-Level Insights into Artificial Text Detection with Sparse Autoencoders[1] 核心貢獻&#xff1a;通過稀疏自編碼器揭示AI生成文本的檢測特征&#xff0c;提出基于特征分布的鑒別方法。研究發現&#xff0c;AI文本在稀疏編碼空間中呈現獨特的"高頻低幅"…

STM32在裸機(無RTOS)環境下,需要手動實現隊列機制來替代FreeRTOS的CAN發送接收函數

xQueueSendToBackFromISR(ecuCanRxQueue, hcan->pRxMsg, &xHigherPriorityTaskWoken)&#xff0c;xQueueReceive(mscCanRxQueue,&mscRxMsg,0)和xQueueSendToBack(mscCanTxQueue, &TxMessageTemp, 0 )這3個函數&#xff0c;在裸機下實現&#xff1a; 在裸機&…

使用PX4,gazebo,mavros為旋翼添加下視的相機(仿真采集openrealm數據集-第一步)

目錄 一.方法一&#xff08;沒成功&#xff09; 1.運行PX4 2.運行mavros通訊 3.啟動仿真世界和無人機 &#xff08;1&#xff09;單獨測試相機 &#xff08;2&#xff09;make px4_sitl gazebo啟動四旋翼iris無人機 二.方法二&#xff08;成功&#xff09; 1.通過 rosl…