引言
Kafka 的分區策略決定了生產者發送的消息會被分配到哪個分區中,合理的分區策略有助于實現負載均衡、提高消息處理效率以及滿足特定的業務需求。
輪詢策略(默認)
- 輪詢策略是 Kafka 默認的分區策略(當消息沒有指定鍵時)。生產者會按照順序依次將消息發送到各個分區中,確保每個分區都能均勻地接收到消息,從而實現負載均衡。簡單高效,能使各個分區的消息量相對均衡,充分利用每個分區的存儲和處理能力。
-
import org.apache.kafka.clients.producer.*; import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();} }
隨機策略
- 隨機策略會隨機地將消息分配到一個分區中。這種策略在某些情況下可以實現一定程度的負載均衡,但由于是隨機分配,可能會導致分區之間的消息分布不夠均勻。可以通過自定義分區器來實現隨機策略。
-
import org.apache.kafka.clients.producer.*; import java.util.List; import java.util.Map; import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }// 使用隨機分區器的生產者示例 public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();} }
按鍵哈希策略
- 當消息指定了鍵時,Kafka 會根據鍵的哈希值將消息分配到特定的分區中。相同鍵的消息會被分配到同一個分區,這有助于保證具有相同業務邏輯的消息順序性。可以保證消息的局部有序性,例如在處理用戶相關的消息時,將同一個用戶的消息發送到同一個分區,方便后續的處理和分析。
-
import org.apache.kafka.clients.producer.*; import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();} }
自定義分區策略(實現接口)
-
當上述默認策略無法滿足業務需求時,可以自定義分區策略。通過實現
org.apache.kafka.clients.producer.Partitioner
接口,重寫partition
方法來實現自定義的分區邏輯。例如,根據消息的某些特定字段(如時間、地理位置等)來進行分區,以滿足特定的業務需求。 -
import org.apache.kafka.clients.producer.*; import java.util.List; import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定義分區邏輯,這里簡單示例根據消息值的長度分區String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }// 使用自定義分區器的生產者示例 public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();} }