生產者
生產者發送流程:
延遲時間為0ms時,也就意味著每當有數據就會直接發送
異步發送API
異步發送和同步發送的不同在于:異步發送不需要等待結果,同步發送必須等待結果才能進行下一步發送。
普通異步發送
首先導入所需的kafka依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>
public class CustomProducer {public static void main(String[] args) {//配置Properties properties = new Properties();//連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");//指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//創建Kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//異步發送數據kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i));}//關閉資源kafkaProducer.close();}
}
帶回調函數的異步發送
回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數:元數據信息和異常信息,如果異常信息為null,說明消息發送成功,如果異常現象不為null,說明消息發送失敗。
修改發送方法,采用回調
//異步發送數據,并有回調函數
kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主題: "+ recordMetadata.topic() +" 分區:"+recordMetadata.partition());}}});}
運行方法就能看到返回的主題、分區
主題: first 分區:2
同步發送
同步發送只需更改發送方式
//同步發送數據kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i)).get();}
為什么要分區
- 便于合理使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數據按照分區切割成一塊一塊數據存儲在多臺Broker上,合理控制分區的任務,可以實現負載均衡的效果。
- 提高并行度,生產者可以以分區為單位發送數據,而消費者則可以以分區為單位進行消費
分區策略:
-
默認分區策略:
-
如果在記錄中指定了分區,那么直接使用指定的分區
例如在send方法指定分區2,key為""
kafkaProducer.send(new ProducerRecord<>("first",2,"", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主題: "+ recordMetadata.topic() +" 分區:"+recordMetadata.partition());}}});
-
如果未指定分區但存在鍵key,則根據key的哈希值與topic的partition數目進行取余選擇分區
例如在send方法中不指定分區,設置key
kafkaProducer.send(new ProducerRecord<>("first","haha", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主題: "+ recordMetadata.topic() +" 分區:"+recordMetadata.partition());}}});
-
如果不存在分區也沒有鍵key,那么使用黏性分區,會隨機選擇一個分區并且盡可能一直使用該分區,如果該分區batch已滿或者已完成,kafka會再隨機一個分區進行使用(和上一個分區不同)。
-
自定義分區器
首先自定義一個分區器
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//獲取數據String msgValues = value.toString();int partition;//如果發送的數據包含aha字段則發送到0號分區,不包含則發往1號分區if(msgValues.contains("aha")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
在創建Kafka對象之前設置配置,選擇自定義的分區器
//關聯自定義分區
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hzp.kafka.producer.MyPartitioner");
注意:如果使用自定義分區的同時,還在send方法內指定分區,那么以指定分區為準。
生產者提高吞吐量
生產者發消息就相當于用貨車從本地倉庫(緩沖區)送貨到kafka,相關的參數有兩個,一個是batch size批次大小,一個是linger.ms等待時間。batch size默認為16k,相當于貨車的容量大小,如果貨車裝滿了就發往kafka。但是通常情況下等待時間為0ms,也就是每當倉庫來了一箱貨就直接送到kafka,不管貨車是否裝滿。
因此提高吞吐量主要有以下方法:
- 修改linger.ms,增長等待時間或者增加批次大小,讓貨車盡量裝多一點貨甚至裝滿再發送。(等待時間會造成一定的延遲,通常控制在5-100ms)
- 發送數據時,采取壓縮的方式
- 增大緩沖區大小,緩沖區大小通常為32m。相當于增加倉庫大小,讓倉庫能夠存儲更多的貨物。
//緩沖區大小(單位為kb,默認32M)1024*1024*32properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//批次大小(單位為kb,默認16kb)1024*16properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//linger.ms (單位為ms)properties.put(ProducerConfig.LINGER_MS_CONFIG,10);//壓縮 設置壓縮類型為snappy,可配置的值有gzip、snappy、lz4、zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
數據可靠性
數據可靠性與ACK應答級別有關
acks:
-
0:生產者發送過來的數據,不需要等數據落盤就應答。
如果不等數據落盤就應答,容易造成數據丟失,生產者發送數據就不管了,可靠性差,效率高。
-
1:生產者發送過來的數據,Leader收到數據后應答
如果Leader接收到數據,并且應答之后,突然掛掉了,但是此時Leader還沒有同步數據給其他節點,此時就造成數據丟失。生產者發送數據Leader應答,可靠性中等,效率中等。
-
-1:生產者發送的數據,Leader和ISR隊列中的所有節點收齊數據后應答
生產者發送數據需要Leader和ISR隊列里面所有的Follower應答,可靠性高,效率低。
如果Leader收到數據并且和Follower同步數據時,有一個Follower因為故障,長時間不能與Leader同步,這應該如何解決?
解決方案:Leader維護了一個動態的in-sync replica set(ISR)也就是與Leader保持同步的Follower+Leader的集合(Leader:0,ISR:0,1,2)。如果Follower長時間未向Leader發送通信請求或者同步數據,則該Follower將被踢出ISR。該時間閾值由replica.lag.time.max.ms參數設定,默認30s。這樣就不用長時間等待以故障的節點。
如果分區副本為1,那么ACK應答-1和1沒有區別,掛了數據就直接丟失,如果ISR里面也只有一個(Leader:0,ISR:0),那么說明沒有Follower跟Leader同步,那么仍然會數據丟失。因此可以得到:數據完全可靠的條件:ACK級別設置為-1、分區副本大于等于2、ISR應答里的最小副本數量大于等于2。
通常情況下,acks=0很少使用,acks=1主要用于傳輸普通日志(大量但并不重要的數據),允許個別數據丟失,acks=-1一般用于傳輸重要的數據比如金錢這類對可靠性要求比較高的場景。
acks=-1仍然存在問題,比如現在Leader:0,ISR:0,1,2。生產者發送數據data,Leader:0接收到data后與1、2同步數據。同步數據完成之后,即將應答之前,Leader突然掛掉了,那么此時就會從1,2中選擇一個成為新的Leader。假設1成為新的Leader,此時生產者沒有收到應答,再次發送數據data,那么此時Leader:1就接收到了兩份data數據,造成數據重復。
java設置acks,以及重試次數
//acks 設置為1
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重試次數 默認為int的最大值
properties.put(ProducerConfig.RETRIES_CONFIG,3);
數據去重
在剛剛的數據可靠性中,我們知道怎么讓數據能夠完全可靠,就是讓ACK級別設置為-1、分區副本大于等于2、ISR應答里的最小副本數量大于等于2。從數據傳遞來看,這種設置就是數據傳遞至少一次(At Least One);而當ACK級別設置為0,那么數據傳遞最多一次(At Most One)。
At Least One可以保證數據不丟失,但是不能保證數據不重復,At Most One可以保證數據不重復,但是不能保證數據不丟失。那么如果既想數據不丟失,又想數據不重復,此時就要依靠冪等性和事務。
冪等性
冪等性就是指Producer不論向Broker發送多少次重復數據,Broker端都只會持久化一條數據,保證了不重復。
重復數據的判斷標準就是<PID、Partition、SqlNumber>相同的消息,Broker只會持久化一條數據。Pid標識指的是ProducerId,生產者編號,Kafka每重啟一次就分配一個新的;Partiton標識分區號;SqlNumber是單調自增的,因此冪等性能夠保證在單分區、單會話內不重復。
冪等性的使用只需設置enable.idempotence即可,默認為true,關閉只需設置為false。
事務
事務開啟之前,必須先開啟冪等性。事務底層依賴冪等性。
數據有序
Kafka單分區內有序,但是多分區時,分區與分區之間無序。
數據亂序
kafka保證數據單分區有序的條件是:
- 如果沒有開啟冪等性,那么需要設置max.in.flight.request.per.connection的值為1
- 如果開啟冪等性,那么需要設置max.in.flight.request.per.connection的值小于等于5.
在kafka1.x版本之后當kafka啟用冪等,那么kafka服務端會緩存producer發來的最近5個request的元數據,而冪等性的實現依賴單調遞增的序號SqlNumber。如果發送時出現亂序,那么會根據單調遞增的序號進行重排序。也就是說當開啟了冪等性并且緩存的請求個數小于5,那么會在服務端進行一次重新排序,讓數據有序。