文章目錄
- 概要
- 代碼示例
- 小結
概要
kafka生產者發送消息默認根據總分區數和設置的key計算哈希取余數,key不變就默認存放在一個分區,沒有key則隨機數分區,明顯默認的是最不好用的,那kafka也提供了一個輪詢分區策略,我自己使用的是一言難盡,具體我也沒有深究下去,那么針對業務硬性要求消息按照升序或降序輪詢分區,就需要我們自己定義分區策略了。
有多少小伙伴第一次配置自定義分區策略時,發現分區總是按照倍數分區,并沒有按照指定的規則去分區呢?嘿嘿,相信沒閱讀過源碼的都應該踩過這一個坑,原因在于生產者發送消息時,kafka會先去分區策略那里逛一圈,拿到本次分區值,再去執行下一步流程,而在真正執行發送消息之前,kafka會再次進入分區策略內拿取本次的分區值,那么輪詢策略一般按照依次遞增或遞減,致使發送消息時都會拿到自增兩次后的分區值。
好,知道了問題所在,那就簡單了,修改邏輯就行了唄,這一塊考慮到使用分區策略一般是應對多個消息的產生同時發送,所以就涉及到并發了,那么并發就要考慮線程安全,這里推薦使用原子自增類和原子Boolean(非必要),能不使用鎖就不使用鎖,具體根據各位的業務而定吧,那話不多說,上代碼。
代碼示例
package org.example.springkafkademo.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {//針對并發設計,使分區數量原子自增private static AtomicInteger nextPartition = new AtomicInteger(0);//二次進入判斷機制private static AtomicBoolean flag = new AtomicBoolean(false);@Overridepublic int partition(String topic, Object key, byte[] bytes, Object o1, byte[] keyBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//最大自增值int numPartitions = partitions.size();if (key == null) {//二次判斷機制為true則說明自增過一次,需要返回自增之前的值if (flag.get()){flag.set(false);return nextPartition.get()-1;}//原子類將舊值返回再自增int next = nextPartition.getAndIncrement();//如果自增后與大于最大值或相等則直接cas賦值0,使下一次的輪詢從0開始if (next >= numPartitions) {nextPartition.compareAndSet(numPartitions, 0);}//標記已經進入過一次flag.set(true);System.out.println("分區值:" + next);return next;} else {// 如果key不為null,則使用默認的分區策略return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
小結
本文分享kafka實現自定義輪詢策略,在應對需要將大量的消息輪詢發送給分區的場景時,可以采納本文的代碼邏輯,但是并不是適配所有分區輪詢,畢竟業務邏輯不是定死的,各位小伙伴一定要結合實際業務邏輯,針對性的對代碼進行修改擴展。
有哪里不懂得小伙伴可留言或私信,如與本文章有不同觀點歡迎討論留言,大家一起進步。