通過 API 接口管理 Kafka

文章目錄

  • 前言
  • Topic 管理
  • 配置管理
  • 消費者群組管理
    • 查看消費者群組
    • 修改消費者群組
  • 為主題添加分區
  • 從主題中刪除消息
  • 首領選舉

前言

除了通過命令行和可視化界面對 kafka 進行管理,也可以通過 AdminClient的 API 對 kafka 進行管理。
本文將介紹如何通過 AdminClient 進行 kafka 管理:主題管理、消費者群組管理和配置管理。

創建 AdminClient 對象

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// TODO: 用AdminClient做一些有用的事情
admin.close(Duration.ofSeconds(30));

Topic 管理

列出集群所有 Topic

ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);

驗證主題是否存在,如果不存在就創建新的主題

DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST); ?try {topicDescription = demoTopic.values().get(TOPIC_NAME).get(); ?System.out.println("Description of demo topic:" + topicDescription);if (topicDescription.partitions().size() != NUM_PARTITIONS) { ?System.out.println("Topic has wrong number of partitions. Exiting.");System.exit(-1);}
} catch (ExecutionException e) { ?// 對于大部分異常,提前退出if (! (e.getCause() instanceof UnknownTopicOrPartitionException)) {e.printStackTrace();throw e;}// 如果執行到這里,則說明主題不存在System.out.println("Topic " + TOPIC_NAME +" does not exist. Going to create it now");// 需要注意的是,分區和副本數是可選的// 如果沒有指定,那么將使用broker的默認配置CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR))); ?// 檢查主題是否已創建成功:if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) { ?System.out.println("Topic has wrong number of partitions.");System.exit(-1);}
}

? 為了檢查主題是否配置正確,可以調用describeTopics()方法,并傳入一組想要驗證的主題名字作為參數。它會返回DescribeTopicResult對象,這個對象對map(主題名字到Future的映射)進行了包裝。
? 如果一直等待Future完成,那么可以調用get()得到想要的結果——在這里是一個TopicDescription對象。但服務器也可能無法正確處理請求——如果主題不存在,那么服務器就不會返回我們想要的結果。在這種情況下,服務器將返回一個錯誤,Future將拋出ExecutionException,這個異常是服務器返回的錯誤導致的。因為我們想要處理主題不存在的情況,所以需要處理這些異常。
? 如果主題存在,那么Future將返回一個TopicDescription對象,其中包含了主題的所有分區、分區首領所在的broker、副本清單和同步副本清單。需要注意的是,這個對象并不包含主題的配置信息。本章后面會討論配置管理。
? 如果Kafka返回錯誤,那么所有的AdminClient結果對象都會拋出ExecutionException異常,因為AdminClient結果對Future對象進行了包裝,而Future又對異常進行了包裝,所以需要檢查ExecutionException的嵌套異常才能獲取到Kafka返回的錯誤信息。
? 如果主題不存在,就創建一個新主題。在創建主題時,可以只指定主題名字,其他參數使用默認值。當然,也可以指定分區數量、副本數量和其他配置參數。
? 最后,等待主題創建完成并驗證結果。這里只檢查分區數量。因為在創建主題時指定了分區數量,所以要確保它是對的。如果我們在創建主題時使用了broker默認配置,則更加需要驗證結果。再次調用get()獲取CreateTopic的結果,這個方法也有可能拋出異常,最常見的是TopicExistsException,我們需要處理這個異常。

刪除主題

admin.deleteTopics(TOPIC_LIST).all().get();// 檢查主題是否已刪除
// 需要注意的是,由于刪除是異步操作,這個時候主題可能還存在
try {topicDescription = demoTopic.values().get(TOPIC_NAME).get();System.out.println("Topic " + TOPIC_NAME + " is still around");
} catch (ExecutionException e) {System.out.println("Topic " + TOPIC_NAME + " is gone");
}

異步回調獲取執行結果
前邊的操作都調用了 get()獲取執行結果,但是這個是阻塞等待的,有時候我們希望異步回調的方式獲取結果:

vertx.createHttpServer().requestHandler(request -> { ?String topic = request.getParam("topic"); ?String timeout = request.getParam("timeout");int timeoutMs = NumberUtils.toInt(timeout, 1000);DescribeTopicsResult demoTopic = admin.describeTopics( ?Collections.singletonList(topic),new DescribeTopicsOptions().timeoutMs(timeoutMs));demoTopic.values().get(topic).whenComplete( ?new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {@Overridepublic void accept(final TopicDescription topicDescription,final Throwable throwable) {if (throwable != null) {request.response().end("Error trying to describe topic "+ topic + " due to " + throwable.getMessage()); ?} else {request.response().end(topicDescription.toString()); ?}}});
}).listen(8080);

? 用Vert.x創建一個簡單的HTTP服務器。服務器在收到請求時會調用我們定義的requestHandler。
? 請求當中包含了一個主題名字,我們將用這個主題的描述信息作為響應。
? 像往常一樣調用AdminClient.describeTopics,并得到一個包裝好的Future對象。
? 這里沒有調用get()方法,而是構造了一個函數,Future在完成時會調用這個函數。
? 如果Future拋出異常,就將錯誤返回給HTTP客戶端。
? 如果Future順利完成,就將主題描述信息返回給客戶端。

配置管理

配置管理是通過描述和更新一系列配置資源(ConfigResource)來實現的。配置資源可以是broker、broker日志記錄器和主題。我們通常會用kafka-config.sh或其他Kafka管理工具來檢查和修改broker及broker日志配置,但主題配置管理是在應用程序中完成的。
例如,很多應用程序使用了壓實的主題,它們會定期(為安全起見,要比默認的保留期限更加頻繁一些)檢查主題是否被壓實,如果沒有,就采取相應的行動來糾正主題配置:

ConfigResource configResource =new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); ?
DescribeConfigsResult configsResult =admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
// 打印非默認配置
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println); ?// 檢查主題是否被壓實
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {// 如果主題沒有被壓實,就將其壓實Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET)); ?Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();alterConf.put(configResource, configOp);admin.incrementalAlterConfigs(alterConf).all().get();
} else {System.out.println("Topic " + TOPIC_NAME + " is compacted topic");
}

? 如上所述,ConfigResource有幾種類型,這里檢查的是主題配置。也可以在同一個請求中指定多個不同類型的資源。
? describeConfigs的結果是一個map(從ConfigResource到配置的映射)。每個配置項都有一個isDefault()方法,可以讓我們知道哪些配置被修改了。如果用戶為主題配置了非默認值,或者修改了broker級別的配置,而創建的主題繼承了broker的非默認配置,那么我們便能知道這個配置不是默認的。
? 為了修改配置,這里指定了需要修改的ConfigResource和一組操作。每個修改操作都由一個配置條目(配置的名字和值,此處名字是cleanup.policy,值是compacted)和操作類型組成。Kafka的4種操作類型分別是:SET(用于設置值)、DELETE(用于刪除值并重置為默認值)、APPEND和SUBSTRACT。后兩種只適用于List類型的配置,用于向列表中添加值或從列表中移除值,這樣就不用每次都把整個列表發送給Kafka了。

消費者群組管理

查看消費者群組

列出消費者群組

admin.listConsumerGroups().valid().get().forEach(System.out::println);
  • 這里調用valid()方法,可以讓get()返回的消費者群組只包含由集群正常返回的消費者群組。錯誤都將被忽略,不作為異常拋出。

查看更多描述信息

ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST).describedGroups().get(CONSUMER_GROUP).get();System.out.println("Description of group " + CONSUMER_GROUP+ ":" + groupDescription);
  • 描述信息包括群組成員、它們的標識符和主機地址、分配給它們的分區、分配分區的算法以及群組協調器的主機地址。

獲取分區最近偏移量和最近提交偏移量信息

Map<TopicPartition, OffsetAndMetadata> offsets =admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get(); ?Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();for(TopicPartition tp: offsets.keySet()) {requestLatestOffsets.put(tp, OffsetSpec.latest()); ?
}Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =admin.listOffsets(requestLatestOffsets).all().get();for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) { ?String topic = e.getKey().topic();int partition =  e.getKey().partition();long committedOffset = e.getValue().offset();long latestOffset = latestOffsets.get(e.getKey()).offset();System.out.println("Consumer group " + CONSUMER_GROUP+ " has committed offset " + committedOffset+ " to topic " + topic + " partition " + partition+ ". The latest offset in the partition is "+ latestOffset + " so consumer group is "+ (latestOffset - committedOffset) + " records behind");
}
  • ? 獲取消費者群組讀取的所有主題和分區,以及每個分區最新的提交偏移量。與describeConsumerGroups不同,listConsumerGroupOffsets只接受一個消費者群組而不是一個集合作為參數。
  • ? 我們希望獲取到結果集中每一個分區最后一條消息的偏移量。OffsetSpec提供了3個非常方便的實現:earliest()latest()forTimestamp(),分別用于獲取分區中最早和最近的偏移量,以及在指定時間或緊接在指定時間之后寫入的消息的偏移量。
  • ? 最后,遍歷所有分區,將最近提交的偏移量、分區中最近的偏移量以及它們之間的差值打印出來。

修改消費者群組

AdminClient也提供了修改消費者群組的方法:刪除群組、移除成員、刪除提交的偏移量和修改偏移量。
顯式地將提交的偏移量修改為最早的偏移量,可以強制消費者從主題開頭位置開始讀取,實際上就是“重置”消費者。

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =admin.listOffsets(requestEarliestOffsets).all().get(); ?Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:earliestOffsets.entrySet()) {resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset())); ?
}try {admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get(); ?
} catch (ExecutionException e) {System.out.println("Failed to update the offsets committed by group "+ CONSUMER_GROUP + " with error " + e.getMessage());if (e.getCause() instanceof UnknownMemberIdException)System.out.println("Check if consumer group is still active."); ?
}
  • ? 要重置消費者群組,并讓它從最早的偏移量位置開始消費,需要先獲取最早的偏移量。
  • ? 在這個循環中,將listOffsets返回的ListOffsetsResultInfo轉成alterConsumerGroupOffsets需要的OffsetAndMetadata
  • ? 調用alterConsumerGroupOffsets之后等待Future完成,這樣便可知道是否執行成功。
  • ? 導致alterConsumerGroupOffsets執行失敗最常見的一個原因是沒有停止消費者群組(只能直接關閉消費者應用程序,因為沒有可用于關閉消費者群組的命令)。如果消費者群組仍然處于活躍狀態,那么一旦修改了偏移量,群組協調器就會認為有非群組成員正在提交偏移量,并拋出UnknownMemberIdException異常。

為主題添加分區

可以用createPartitions方法為主題添加分區。需要注意的是,如果一次性為多個主題添加分區,則可能會出現一些主題添加成功一些主題添加失敗的情況。

Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2)); ?
admin.createPartitions(newPartitions).all().get();
  • ? 在添加分區時,需要指定添加分區后主題將擁有的分區總數,而不是要添加的新分區的數量。

從主題中刪除消息

deleteRecords方法會將所有偏移量早于指定偏移量的消息標記為已刪除,使消費者無法讀取這些數據。這個方法將返回被刪除的消息的最大偏移量,這樣我們就可以檢查刪除操作是否按預期執行了。從磁盤上徹底刪除數據是異步進行的。需要注意的是,可以用listOffsets方法獲取在特定時間點或之后寫入的消息的偏移量。也可以組合使用這些方法來刪除早于任意特定時間點的消息。

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>  e:olderOffsets.entrySet())recordsToDelete.put(e.getKey(),RecordsToDelete.beforeOffset(e.getValue().offset()));
admin.deleteRecords(recordsToDelete).all().get();

首領選舉

首選首領選舉

  • 每一個分區都有一個可以被指定為首選首領的副本。如果所有分區的首領都是它們的首選首領副本,那么每個broker上的首領數量應該是均衡的。
  • 在默認情況下,Kafka每5分鐘會檢查一次首領是否就是首選首領副本,如果不是,但它有資格成為首領,就會選擇首選首領副本作為首領。
  • 如果auto.leader.rebalance.enable被設置為false,或者你想快一點兒執行選舉,則可以調用electLeader()方法。

不徹底的首領選舉
如果一個分區的首領副本變得不可用,而其他副本沒有資格成為首領(通常是因為缺少數據),那么這個分區將沒有首領,也就不可用了。解決這個問題的一種方法是觸發不徹底的首領選舉,也就是選舉一個本來沒有資格成為首領的副本作為首領。這可能導致數據丟失——所有寫入舊首領但未被復制到新首領的消息都將丟失。electLeader()方法也可以用來觸發不徹底的首領選舉。

Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get(); ?
} catch (ExecutionException e) {if (e.getCause() instanceof ElectionNotNeededException) {System.out.println("All leaders are preferred already"); ?}
}
  • ? 這里選舉的是某個特定主題的某個分區的首選首領。我們可以指定任意數量的分區和主題。如果在調用這個方法時傳入null而不是分區列表,則它將觸發所有分區的首領選舉。
  • ? 如果集群的狀態是健康的,那么這個方法將不執行任何操作。首選首領選舉和不徹底的首領選舉只在當前首領不是首選首領副本時才有效。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/40626.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/40626.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/40626.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[Vue學習]生命周期及其各階段舉例

當我們運行vue項目&#xff0c;看到了屏幕上顯示的界面&#xff0c;看到了界面上顯示的數據和標簽&#xff0c;之后將這個界面叉掉&#xff0c;這一過程其實經歷了一整個vue的生命周期的四個階段&#xff0c;即創建階段、掛載階段、更新階段以及銷毀階段, 而對于每個階段的啟動…

使用 pyecharts 渲染成圖片程序報錯: echarts is not defined問題處理

背景 之前寫的使用 snapshot_selenium 來保存pyeacharts渲染成的網頁截圖&#xff0c;可以正常運行。程序擱置了半年&#xff0c;不知道動了電腦哪里&#xff0c;再次運行程序時&#xff0c;程序開始報錯&#xff1a;JavascriptException: javascript error: echarts is not d…

【SQL】已解決:SQL分組去重并合并相同數據

文章目錄 一、分析問題背景二、可能出錯的原因三、錯誤代碼示例四、正確代碼示例五、注意事項 已解決&#xff1a;SQL分組去重并合并相同數據 在數據庫操作中&#xff0c;數據的分組、去重以及合并是常見需求。然而&#xff0c;初學者在編寫SQL語句時&#xff0c;可能會遇到一…

正弦波與單位圓關系的可視化 包括源碼

正弦波與單位圓關系的可視化 包括源碼 flyfish 正弦波與單位圓的關系 正弦波可以通過單位圓上的點在直線&#xff08;通常是 y 軸&#xff09;上的投影來表示。具體來說&#xff0c;考慮一個單位圓&#xff0c;其半徑為 1&#xff0c;圓心在原點。我們可以通過旋轉一個角度 …

每日一道算法題 判斷子序列

題目 判斷子序列_牛客題霸_牛客網 (nowcoder.com) Python # # 代碼中的類名、方法名、參數名已經指定&#xff0c;請勿修改&#xff0c;直接返回方法規定的值即可 # # # param S string字符串 # param T string字符串 # return bool布爾型 # class Solution:def isSubseq…

【全網最全流程+所有代碼】企業微信回調聯調,開通企微回調和收到企微回調

流程圖: 只是這里的消息回調,僅作為提示,群內有消息了。不是具體的消息,而是類似這樣的結構,: 如果需要獲取消息,還需要拉取企微群內消息方法,這個后續再更新。 好了,我們開始吧。 開啟消息回調和接收消息回調,地址是一樣的,只是 開啟消息回調,get請求, 接受消…

人工智能在日常生活中的十大應用:從醫療到智能家居

人工智能已成為當今人類日常生活的重要組成部分&#xff0c;無論您是否意識到&#xff0c;它幾乎在所有場景中都能提供幫助。每次您進行網絡搜索、在線預訂旅行、接收來自京東等購物平臺的產品推薦又或是打開您的新浪、抖音時&#xff0c;都能看到影子&#xff0c;這些只是一些…

代碼隨想錄算法訓練營第51天 [115.不同的子序列 583. 兩個字符串的刪除操作 72. 編輯距離 ]

代碼隨想錄算法訓練營第51天 [115.不同的子序列 583. 兩個字符串的刪除操作 72. 編輯距離 ] 一、115.不同的子序列 鏈接: 代碼隨想錄. 思路&#xff1a;dp[i][j] 以t[j-1]為結尾的字符串在 以s[i-1]為結尾的字 符串出現個數 相等的時候 dp[i][j] dp[i - 1][j - 1] dp[i - 1][…

JAVA案例模擬電影信息系統

一案例要求&#xff1a; 二具體代碼(需要在同一個包下創建三個類) Ⅰ&#xff1a;實現類 package 重修;import java.util.Random; import java.util.Scanner;public class first {public static void main(String[] args) {javabean[]moviesnew javabean[4];movies[0] new ja…

加密與安全_ Jasypt (Java Simplified Encryption)不完全指北

文章目錄 官網功能概述Code附 官網 http://www.jasypt.org/ 功能概述 Jasypt 是一個 Java 庫&#xff0c;它允許開發人員以最小的努力添加基本的加密功能&#xff0c;并且不需要深入了解密碼學的工作原理。 高安全性、基于標準的加密技術&#xff0c;適用于單向和雙向加密。…

AIGC對設計師積極性的影響

隨著科技的迅猛發展&#xff0c;生成式人工智能&#xff08;AIGC&#xff09;工具正逐漸深入設計的每個角落&#xff0c;對設計師的工作方式和思維模式產生了深遠的影響。AIGC不僅極大提升了設計師的工作效率&#xff0c;更激發了他們的創新思維&#xff0c;為設計行業帶來了翻…

Spring Boot在java領域中有哪些優勢

哈嘍&#xff0c;大家好呀&#xff0c;淼淼又來和大家見面啦&#xff0c;隨著云計算、微服務架構的興起&#xff0c;Java開發領域迫切需要一套高效、靈活且易于上手的框架來應對日益復雜的業務需求。正是在這樣的背景下&#xff0c;Spring Boot應運而生&#xff0c;以其獨特的魅…

Dungeonborne聯機失敗、延遲高、卡頓的解決方法

Dungeonborne將第一人稱動作的即時性與經典的西幻RPG職業設計巧妙融合&#xff0c;為玩家帶來了一場前所未有的游戲體驗。在這款沉浸式第一人稱PvPvE地下城探險游戲中&#xff0c;我們可以獨自深入探索&#xff0c;也可以與值得信賴的伙伴并肩作戰&#xff0c;共同揭開地下城的…

移動端UI風格營造舒適氛圍

移動端UI風格營造舒適氛圍

中服云數字孿生平臺引領工業物聯仿真新紀元!

中服云數字孿生平臺3.0是基于中服云物聯網平臺和數據中臺打造的一款實時數據2D/3D集成展示監控平臺。 旨在解決工業物聯網數據的直觀展示、實虛互動、仿真模擬、故障診斷、告警、預警、預測、實時觀測、實時監控等問題。提供了數據采集、數據底座、監控邏輯、建模工具、展示互…

android 國內下載Gradle源

在中國使用 Gradle 時&#xff0c;可以配置使用一些國內的鏡像源&#xff0c;以提高下載速度和穩定性。以下是幾個常用的 Gradle 鏡像源地址&#xff1a; 配置 gradle-wrapper.properties 文件: 阿里云: distributionUrlhttps\://services.gradle.org/distributions/gradle-7.…

數據結構 —— 圖的遍歷

數據結構 —— 圖的遍歷 BFS&#xff08;廣度遍歷&#xff09;一道美團題DFS&#xff08;深度遍歷&#xff09; 我們今天來看圖的遍歷&#xff0c;其實都是之前在二叉樹中提過的方法&#xff0c;深度和廣度遍歷。 在這之前&#xff0c;我們先用一個鄰接矩陣來表示一個圖&#…

220千伏變電站輔助設備智能監控平臺 無人化與自動化升級改造工程

220千伏變電站特點 高電壓等級&#xff1a;220千伏變電站的最大特點是其高壓傳輸能力&#xff0c;能夠將發電廠產生的電能高效地傳輸到較遠的地區&#xff0c;滿足大型城市及工業區域的用電需求。 輸電能力大&#xff1a;220千伏變電站在輸電能力上遠大于普通的110千伏或更低…

Mybatis框架的集成使用

1_框架概述 框架是一個半成品&#xff0c;已經對基礎的代碼進行了封裝并提供相應的API&#xff0c;開發者在使用框架時直接調用封裝好的api可以省去很多代碼編寫&#xff0c;從而提高工作效率和開發速度,框架是一種經過校驗、具有一定功能的半成品軟件. 經過校驗&#xff1a;指…

【超萬卡GPU集群關鍵技術深度分析 2024】

文末有福利&#xff01; 1. 集群高能效計算技術 隨著大模型從千億參數的自然語言模型向萬億參數的多模態模型升級演進&#xff0c;超萬卡集群吸需全面提升底層計算能力。 具體而言&#xff0c;包括增強單芯片能力、提升超節點計算能力、基于 DPU (Data Processing Unit) 實現…