文章目錄
- 前言
- Topic 管理
- 配置管理
- 消費者群組管理
- 查看消費者群組
- 修改消費者群組
- 為主題添加分區
- 從主題中刪除消息
- 首領選舉
前言
除了通過命令行和可視化界面對 kafka 進行管理,也可以通過 AdminClient
的 API 對 kafka 進行管理。
本文將介紹如何通過 AdminClient
進行 kafka 管理:主題管理、消費者群組管理和配置管理。
創建 AdminClient 對象
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// TODO: 用AdminClient做一些有用的事情
admin.close(Duration.ofSeconds(30));
Topic 管理
列出集群所有 Topic
ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);
驗證主題是否存在,如果不存在就創建新的主題
DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST); ?try {topicDescription = demoTopic.values().get(TOPIC_NAME).get(); ?System.out.println("Description of demo topic:" + topicDescription);if (topicDescription.partitions().size() != NUM_PARTITIONS) { ?System.out.println("Topic has wrong number of partitions. Exiting.");System.exit(-1);}
} catch (ExecutionException e) { ?// 對于大部分異常,提前退出if (! (e.getCause() instanceof UnknownTopicOrPartitionException)) {e.printStackTrace();throw e;}// 如果執行到這里,則說明主題不存在System.out.println("Topic " + TOPIC_NAME +" does not exist. Going to create it now");// 需要注意的是,分區和副本數是可選的// 如果沒有指定,那么將使用broker的默認配置CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR))); ?// 檢查主題是否已創建成功:if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) { ?System.out.println("Topic has wrong number of partitions.");System.exit(-1);}
}
? 為了檢查主題是否配置正確,可以調用describeTopics()方法,并傳入一組想要驗證的主題名字作為參數。它會返回DescribeTopicResult對象,這個對象對map(主題名字到Future的映射)進行了包裝。
? 如果一直等待Future完成,那么可以調用get()得到想要的結果——在這里是一個TopicDescription對象。但服務器也可能無法正確處理請求——如果主題不存在,那么服務器就不會返回我們想要的結果。在這種情況下,服務器將返回一個錯誤,Future將拋出ExecutionException,這個異常是服務器返回的錯誤導致的。因為我們想要處理主題不存在的情況,所以需要處理這些異常。
? 如果主題存在,那么Future將返回一個TopicDescription對象,其中包含了主題的所有分區、分區首領所在的broker、副本清單和同步副本清單。需要注意的是,這個對象并不包含主題的配置信息。本章后面會討論配置管理。
? 如果Kafka返回錯誤,那么所有的AdminClient結果對象都會拋出ExecutionException異常,因為AdminClient結果對Future對象進行了包裝,而Future又對異常進行了包裝,所以需要檢查ExecutionException的嵌套異常才能獲取到Kafka返回的錯誤信息。
? 如果主題不存在,就創建一個新主題。在創建主題時,可以只指定主題名字,其他參數使用默認值。當然,也可以指定分區數量、副本數量和其他配置參數。
? 最后,等待主題創建完成并驗證結果。這里只檢查分區數量。因為在創建主題時指定了分區數量,所以要確保它是對的。如果我們在創建主題時使用了broker默認配置,則更加需要驗證結果。再次調用get()獲取CreateTopic的結果,這個方法也有可能拋出異常,最常見的是TopicExistsException,我們需要處理這個異常。
刪除主題
admin.deleteTopics(TOPIC_LIST).all().get();// 檢查主題是否已刪除
// 需要注意的是,由于刪除是異步操作,這個時候主題可能還存在
try {topicDescription = demoTopic.values().get(TOPIC_NAME).get();System.out.println("Topic " + TOPIC_NAME + " is still around");
} catch (ExecutionException e) {System.out.println("Topic " + TOPIC_NAME + " is gone");
}
異步回調獲取執行結果
前邊的操作都調用了 get()
獲取執行結果,但是這個是阻塞等待的,有時候我們希望異步回調的方式獲取結果:
vertx.createHttpServer().requestHandler(request -> { ?String topic = request.getParam("topic"); ?String timeout = request.getParam("timeout");int timeoutMs = NumberUtils.toInt(timeout, 1000);DescribeTopicsResult demoTopic = admin.describeTopics( ?Collections.singletonList(topic),new DescribeTopicsOptions().timeoutMs(timeoutMs));demoTopic.values().get(topic).whenComplete( ?new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {@Overridepublic void accept(final TopicDescription topicDescription,final Throwable throwable) {if (throwable != null) {request.response().end("Error trying to describe topic "+ topic + " due to " + throwable.getMessage()); ?} else {request.response().end(topicDescription.toString()); ?}}});
}).listen(8080);
? 用Vert.x創建一個簡單的HTTP服務器。服務器在收到請求時會調用我們定義的requestHandler。
? 請求當中包含了一個主題名字,我們將用這個主題的描述信息作為響應。
? 像往常一樣調用AdminClient.describeTopics,并得到一個包裝好的Future對象。
? 這里沒有調用get()方法,而是構造了一個函數,Future在完成時會調用這個函數。
? 如果Future拋出異常,就將錯誤返回給HTTP客戶端。
? 如果Future順利完成,就將主題描述信息返回給客戶端。
配置管理
配置管理是通過描述和更新一系列配置資源(ConfigResource)來實現的。配置資源可以是broker、broker日志記錄器和主題。我們通常會用kafka-config.sh或其他Kafka管理工具來檢查和修改broker及broker日志配置,但主題配置管理是在應用程序中完成的。
例如,很多應用程序使用了壓實的主題,它們會定期(為安全起見,要比默認的保留期限更加頻繁一些)檢查主題是否被壓實,如果沒有,就采取相應的行動來糾正主題配置:
ConfigResource configResource =new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); ?
DescribeConfigsResult configsResult =admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
// 打印非默認配置
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println); ?// 檢查主題是否被壓實
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {// 如果主題沒有被壓實,就將其壓實Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET)); ?Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();alterConf.put(configResource, configOp);admin.incrementalAlterConfigs(alterConf).all().get();
} else {System.out.println("Topic " + TOPIC_NAME + " is compacted topic");
}
? 如上所述,ConfigResource有幾種類型,這里檢查的是主題配置。也可以在同一個請求中指定多個不同類型的資源。
? describeConfigs的結果是一個map(從ConfigResource到配置的映射)。每個配置項都有一個isDefault()方法,可以讓我們知道哪些配置被修改了。如果用戶為主題配置了非默認值,或者修改了broker級別的配置,而創建的主題繼承了broker的非默認配置,那么我們便能知道這個配置不是默認的。
? 為了修改配置,這里指定了需要修改的ConfigResource和一組操作。每個修改操作都由一個配置條目(配置的名字和值,此處名字是cleanup.policy,值是compacted)和操作類型組成。Kafka的4種操作類型分別是:SET(用于設置值)、DELETE(用于刪除值并重置為默認值)、APPEND和SUBSTRACT。后兩種只適用于List類型的配置,用于向列表中添加值或從列表中移除值,這樣就不用每次都把整個列表發送給Kafka了。
消費者群組管理
查看消費者群組
列出消費者群組
admin.listConsumerGroups().valid().get().forEach(System.out::println);
- 這里調用valid()方法,可以讓get()返回的消費者群組只包含由集群正常返回的消費者群組。錯誤都將被忽略,不作為異常拋出。
查看更多描述信息
ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST).describedGroups().get(CONSUMER_GROUP).get();System.out.println("Description of group " + CONSUMER_GROUP+ ":" + groupDescription);
- 描述信息包括群組成員、它們的標識符和主機地址、分配給它們的分區、分配分區的算法以及群組協調器的主機地址。
獲取分區最近偏移量和最近提交偏移量信息
Map<TopicPartition, OffsetAndMetadata> offsets =admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get(); ?Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();for(TopicPartition tp: offsets.keySet()) {requestLatestOffsets.put(tp, OffsetSpec.latest()); ?
}Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =admin.listOffsets(requestLatestOffsets).all().get();for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) { ?String topic = e.getKey().topic();int partition = e.getKey().partition();long committedOffset = e.getValue().offset();long latestOffset = latestOffsets.get(e.getKey()).offset();System.out.println("Consumer group " + CONSUMER_GROUP+ " has committed offset " + committedOffset+ " to topic " + topic + " partition " + partition+ ". The latest offset in the partition is "+ latestOffset + " so consumer group is "+ (latestOffset - committedOffset) + " records behind");
}
- ? 獲取消費者群組讀取的所有主題和分區,以及每個分區最新的提交偏移量。與
describeConsumerGroups
不同,listConsumerGroupOffsets
只接受一個消費者群組而不是一個集合作為參數。 - ? 我們希望獲取到結果集中每一個分區最后一條消息的偏移量。
OffsetSpec
提供了3個非常方便的實現:earliest()
、latest()
和forTimestamp()
,分別用于獲取分區中最早和最近的偏移量,以及在指定時間或緊接在指定時間之后寫入的消息的偏移量。 - ? 最后,遍歷所有分區,將最近提交的偏移量、分區中最近的偏移量以及它們之間的差值打印出來。
修改消費者群組
AdminClient也提供了修改消費者群組的方法:刪除群組、移除成員、刪除提交的偏移量和修改偏移量。
顯式地將提交的偏移量修改為最早的偏移量,可以強制消費者從主題開頭位置開始讀取,實際上就是“重置”消費者。
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =admin.listOffsets(requestEarliestOffsets).all().get(); ?Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:earliestOffsets.entrySet()) {resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset())); ?
}try {admin.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get(); ?
} catch (ExecutionException e) {System.out.println("Failed to update the offsets committed by group "+ CONSUMER_GROUP + " with error " + e.getMessage());if (e.getCause() instanceof UnknownMemberIdException)System.out.println("Check if consumer group is still active."); ?
}
- ? 要重置消費者群組,并讓它從最早的偏移量位置開始消費,需要先獲取最早的偏移量。
- ? 在這個循環中,將
listOffsets
返回的ListOffsetsResultInfo
轉成alterConsumerGroupOffsets
需要的OffsetAndMetadata
。 - ? 調用
alterConsumerGroupOffsets
之后等待Future
完成,這樣便可知道是否執行成功。 - ? 導致
alterConsumerGroupOffsets
執行失敗最常見的一個原因是沒有停止消費者群組(只能直接關閉消費者應用程序,因為沒有可用于關閉消費者群組的命令)。如果消費者群組仍然處于活躍狀態,那么一旦修改了偏移量,群組協調器就會認為有非群組成員正在提交偏移量,并拋出UnknownMemberIdException
異常。
為主題添加分區
可以用createPartitions方法為主題添加分區。需要注意的是,如果一次性為多個主題添加分區,則可能會出現一些主題添加成功一些主題添加失敗的情況。
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2)); ?
admin.createPartitions(newPartitions).all().get();
- ? 在添加分區時,需要指定添加分區后主題將擁有的分區總數,而不是要添加的新分區的數量。
從主題中刪除消息
deleteRecords方法會將所有偏移量早于指定偏移量的消息標記為已刪除,使消費者無法讀取這些數據。這個方法將返回被刪除的消息的最大偏移量,這樣我們就可以檢查刪除操作是否按預期執行了。從磁盤上徹底刪除數據是異步進行的。需要注意的是,可以用listOffsets方法獲取在特定時間點或之后寫入的消息的偏移量。也可以組合使用這些方法來刪除早于任意特定時間點的消息。
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:olderOffsets.entrySet())recordsToDelete.put(e.getKey(),RecordsToDelete.beforeOffset(e.getValue().offset()));
admin.deleteRecords(recordsToDelete).all().get();
首領選舉
首選首領選舉
- 每一個分區都有一個可以被指定為首選首領的副本。如果所有分區的首領都是它們的首選首領副本,那么每個broker上的首領數量應該是均衡的。
- 在默認情況下,Kafka每5分鐘會檢查一次首領是否就是首選首領副本,如果不是,但它有資格成為首領,就會選擇首選首領副本作為首領。
- 如果auto.leader.rebalance.enable被設置為false,或者你想快一點兒執行選舉,則可以調用electLeader()方法。
不徹底的首領選舉
如果一個分區的首領副本變得不可用,而其他副本沒有資格成為首領(通常是因為缺少數據),那么這個分區將沒有首領,也就不可用了。解決這個問題的一種方法是觸發不徹底的首領選舉,也就是選舉一個本來沒有資格成為首領的副本作為首領。這可能導致數據丟失——所有寫入舊首領但未被復制到新首領的消息都將丟失。electLeader()方法也可以用來觸發不徹底的首領選舉。
Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get(); ?
} catch (ExecutionException e) {if (e.getCause() instanceof ElectionNotNeededException) {System.out.println("All leaders are preferred already"); ?}
}
- ? 這里選舉的是某個特定主題的某個分區的首選首領。我們可以指定任意數量的分區和主題。如果在調用這個方法時傳入null而不是分區列表,則它將觸發所有分區的首領選舉。
- ? 如果集群的狀態是健康的,那么這個方法將不執行任何操作。首選首領選舉和不徹底的首領選舉只在當前首領不是首選首領副本時才有效。