java kafka 拉取_java獲取kafka consumer lag

maven依賴

org.apache.kafka

kafka-clients

0.10.1.0

注意:kafka-clients版本需要0.10.1.0以上,因為調用了新增接口endOffsets;

lag=logsize-offset

logsize通過consumer的endOffsets接口獲得;offset通過consumer的committed接口獲得;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

public class KafkaConsumeLagMonitor {

public static Properties getConsumeProperties(String groupID, String bootstrap_server) {

Properties props = new Properties();

props.put("group.id", groupID);

props.put("bootstrap.servers", bootstrap_server);

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

return props;

}

public static void main(String[] args) {

String bootstrap_server = args[0];

String groupID = args[1];

String topic = args[2];

Map endOffsetMap = new HashMap();

Map commitOffsetMap = new HashMap();

Properties consumeProps = getConsumeProperties(groupID, bootstrap_server);

System.out.println("consumer properties:" + consumeProps);

//查詢topic partitions

KafkaConsumer consumer = new KafkaConsumer(consumeProps);

List topicPartitions = new ArrayList();

List partitionsFor = consumer.partitionsFor(topic);

for (PartitionInfo partitionInfo : partitionsFor) {

TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

topicPartitions.add(topicPartition);

}

//查詢log size

Map endOffsets = consumer.endOffsets(topicPartitions);

for (TopicPartition partitionInfo : endOffsets.keySet()) {

endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));

}

for (Integer partitionId : endOffsetMap.keySet()) {

System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));

}

//查詢消費offset

for (TopicPartition topicAndPartition : topicPartitions) {

OffsetAndMetadata committed = consumer.committed(topicAndPartition);

commitOffsetMap.put(topicAndPartition.partition(), committed.offset());

}

//累加lag

long lagSum = 0l;

if (endOffsetMap.size() == commitOffsetMap.size()) {

for (Integer partition : endOffsetMap.keySet()) {

long endOffSet = endOffsetMap.get(partition);

long commitOffSet = commitOffsetMap.get(partition);

long diffOffset = endOffSet - commitOffSet;

lagSum += diffOffset;

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", partition:" + partition + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);

}

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", LAG:" + lagSum);

} else {

System.out.println("this topic partitions lost");

}

consumer.close();

}

}

另外一個思路可參考kafka源碼kafka.tools.ConsumerOffsetChecker實現,offset直接讀取 zk節點內容,logsize通過consumer的getOffsetsBefore方法獲取,整體來說,較麻煩;

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/529682.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/529682.shtml
英文地址,請注明出處:http://en.pswp.cn/news/529682.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

java開源圖像處理ku_83 項開源視覺 SLAM 方案夠你用了嗎?

原標題:83 項開源視覺 SLAM 方案夠你用了嗎?公眾號:3D視覺工坊主要關注:3D視覺算法、SLAM、vSLAM、計算機視覺、深度學習、自動駕駛、圖像處理以及技術干貨分享運營者和嘉賓介紹:運營者來自國內一線大廠的算法工程師&a…

java 方法的拆分_java – 字符串拆分和比較 – 最快的方法

>將輸入讀入byte []數組以將指針保持在代碼的一側.>逐字節讀取,計算整數元素&#xff1a;int b inputBytes[p];int d b - 0;if (0 < d) {if (d < 9) {element element * 10 d;} else {// b :}} else {// b ,// add element to the hash; element 0;...}if (…

java sql異常_java.sql.SQLException: Io 異常: Got minus one from a

java.sql.SQLException: Io 異常: Got minus one from a read callat oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:111)at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:145)at oracle.jdbc.driver.DatabaseError.thro…

java 攔截器ajax_(轉)攔截器深入實踐 - JAVA XML JAVASCRIPT AJAX CSS - BlogJava

Interceptor的定義我們來看一下Interceptor的接口的定義&#xff1a;Java代碼 publicinterfaceInterceptorextendsSerializable {/*** Called to let an interceptor clean up any resources it has allocated.*/voiddestroy();/*** Called after an interceptor is created, b…

php學的是什么意思_為什么要學習PHP?到底什么是PHP?

為什么要學習PHP?到底什么是PHP?PHP可以做什么?相信這樣的問題困擾著很多的人&#xff0c;在我沒工作之前&#xff0c;都沒有聽說過PHP&#xff0c;自從工作后&#xff0c;慢慢接觸到代碼&#xff0c;慢慢知道什么是PHP。PHP是做網站一種語言&#xff0c;很多工程師都使用PH…

php 多數據庫聯合查詢,php如何同時連接多個數據庫_PHP教程

下面是一個函數能夠保證連接多個數據庫的下不同的表的函數&#xff0c;可以收藏一下&#xff0c;比較實用&#xff0c;測試過是有用的。function mysql_oper($oper,$db,$table,$where1,$limit10){$connmysql_connect(localhost,like,admin,true) or mysql_error();mysql_select…

java判斷有沒有修改,java字節碼判斷對象應用是否被修改

原創1 背景在學習并發的時候看到了ConcurrentLinkedQueue隊列的源碼&#xff0c;剛開始的時候是看網上的帖子&#xff0c;然后就到IDE里邊看源碼&#xff0c;發現offer()方法在1.7版的時候有過修改。樓主的問題不是整個方法&#xff0c;而是其中的一截代碼“(t ! (t tail))”&…

php接口 含義,php晉級必備:一文讀懂php接口特點和使用!

PHP接口與類是什么關系&#xff1f;前面提到了php中抽象類和抽象方法&#xff0c;今天給大家談談php中接口技術。在PHP中每個類只能繼承一個父類&#xff0c;如果聲明的新類繼承了抽象類實現了以后&#xff0c;這個新類就不能有其它的父類了。但是在實際中需要繼承多個類實現功…

php獲取不重復的隨機數字,php如何生成不重復的隨機數字

【摘要】PHP即“超文本預處理器”&#xff0c;是一種通用開源腳本語言。PHP是在服務器端執行的腳本語言&#xff0c;與C語言類似&#xff0c;是常用的網站編程語言。PHP獨特的語法混合了C、Java、Perl以及 PHP 自創的語法。下面是php如何生成不重復的隨機數字&#xff0c;讓我們…

java 素數乘積,求助2424379123 = 兩個素數的乘積,求這兩個素數?

該樓層疑似違規已被系統折疊 隱藏此樓查看此樓import java.util.ArrayList;import java.util.Date;public class Test {static ArrayList list new ArrayList();/*** 初始化素數表* return*/public static ArrayList initArrayList() {list.add(2);list.add(3);list.add(5);li…

php header什么意思,php header是什么意思

header函數在PHP中是發送一些頭部信息的, 我們可以直接使用它來做301跳轉等&#xff0c;下面我來總結關于header函數用法與一些常用見問題解決方法。發送一個原始 HTTP 標頭[Http Header]到客戶端。標頭 (header) 是服務器以 HTTP 協義傳 HTML 資料到瀏覽器前所送出的字串&…

matlab dct稀疏系數,Matlab DCT詳解

轉自&#xff1a;http://blog.csdn.net/ahafg/article/details/48808443DCT變換DCT又稱離散余弦變換&#xff0c;是一種塊變換方式&#xff0c;只使用余弦函數來表達信號&#xff0c;與傅里葉變換緊密相關。常用于圖像數據的壓縮&#xff0c;通過將圖像分成大小相等(一般為8*8)…

matlab驗潮站,驗潮站的作用是什么

驗潮站的作用是什么?驗潮站的建成投入使用&#xff0c;可實時觀測沿海潮汐等觀測要素&#xff0c;為潮汐預報、赤潮的發生、風暴潮預警報、海嘯預警及海平面變化提供基礎數據保障以及預測&#xff0c;同時為科學開發海洋提供有力的支持&#xff0c;為海洋經濟健康發展保駕護航…

答題闖關php,成語答題闖關紅包流量主小程序源碼

修復紅包頁面提現提示文字得疊的問題限制過關紅包每天領取個數左側影響美觀的小程序鏈接的文字去掉了增加版本號沒有問題的可以暫不更新此版本修復前一版本客服提現沒有授權的問題管理后臺增加主動推送客服消息(紅包)給用戶的功能&#xff0c;喚醒用戶使用自定義分享的配置增加…

php是音頻嗎,只要是用PHP和JS發布的HTML5是否可以播放音頻?

我正在嘗試創建一個可以上傳播客的頁面。我想擁有“發布”或“取消發布”的能力。我讓每個播客添加到一個數據庫中,包含它的信息和發布列,可以是真是假。目前我使用以下代碼PHP:if(isPublished()){header(Cache-Control: max-age100000);header(Content-Transfer-Encoding: bin…

php收購,php中文網收購全國用戶量最大的phpstudy集成開發環境揭秘

phpstudy介紹2008年第一個版本誕生&#xff0c;至今已有&#xff19;年,該程序包集成最新的ApachePHPMySQLphpMyAdminZendOptimizer,一次性安裝,無須配置即可使用,是非常方便、好用的PHP調試環境.該程序不僅包括PHP調試環境,還包括了開發工具、開發手冊等.總之學習PHP只需一個包…

復雜電網三相短路計算的matlab仿真,復雜電網三相短路計算的MATLAB仿真電力系統分析課設報告 - 圖文...

XG?XT**35.3100??0.11003000.856100???0.05100120發電廠B&#xff1a;XG?XT**17.65100 ??0.051003000.853100???0.025100120發電廠H&#xff1a;XG?XT**17.65100??0.051003000.8512100???0.1100120變電站C&#xff1a;3.6100*XT???0.03100120 線路&#x…

php 將多個數組 相同的鍵重組,PHP – 合并兩個類似于array_combine但具有重復鍵的數組...

你可以使用array_map&#xff1a;$arrKeys array(str, str, otherStr);$arrVals array(1.22, 1.99, 5.17);function foo($key, $val) {return array($key>$val);}$arrResult array_map(foo, $arrKeys, $arrVals);print_r($arrResult);Array([0] > Array([str] > 1.…

C php反序列化,php反序列化漏洞 - anansec的個人空間 - OSCHINA - 中文開源技術交流社區...

反序列化本身是沒有漏洞的&#xff0c;但是當反序列化和一些魔術方法結合使用時就可能會產生安全風險。常用的魔術方法__wakeup反序列化漏洞示例(__wekeup)class A{var $test "demo";function __wakeup(){eval($this->test);}}$b new A();$c serialize($b);$a …

oracle lob值是什么,關于Oracle數據庫LOB大字段總結

概述在ORACLE數據庫中&#xff0c;DBA_OBJECTS視圖中OBJECT_TYPE為LOB的對象是什么東西呢&#xff1f;其實OBJECT_TYPE為LOB就是大對象(LOB)&#xff0c;它指那些用來存儲大量數據的數據庫字段。Oracle 11gR2 文檔&#xff1a;http://download.oracle.com/docs/cd/E11882_01/Ap…