生產者學習
1.1 生產者消息發送流程
在消息發送的過程中,涉及到了兩個線程——main線程和Sender線程。在main線程中創建了一個雙端隊列RecordAccumulator。main線程將消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka Broker。
生產者如何發送的?
現在Main線程中將數據進行處理,處理成IO型數據,然后調用sender進行發送
Main:
1.讀取生產者配置
2.產生數據
3.過濾數據(校驗什么的)
4.序列化
5.放入緩沖區 RecordAccumulator
6.發送Sender
細節: 考慮的問題 1.生產者配置的讀取和修改 2.數據的過濾與分區, 3.緩沖區是如何設置的,大小
4.發送(發送失敗怎么樣,請求區的大小)
這里注意一下,可以在緩沖區對數據進行壓縮,這樣就提高緩沖區的容量和發送的數據量,提高吞吐量
1.2 同步發送與異步發送
1.什么是同步和異步
同步就是,串行,一條龍 異步 一起運行
舉例: 餐館點餐
同步: 需要等服務員過來,讓服務員記錄,
異步: 點餐APP直接點餐,交給隊列,讓他自己運行
2.發送的同步異步
同步:需要得到返回值
異步:發送過去不管了
3. 分區好處
啥是分區?
將一個數據塊分成多個數據塊
將數據分布式處理了
存儲: 可以分在多個機器上, 也可以整多個副本。便于存儲,同時提高健壯性
IO:多個數據塊可以同時進行發送接收消費。生產者可以以分區為單位發送數據,消費者可以以分區為單位進行消費
4. 默認分區器
前提條件: 1.分區 2.key值
規則:
- 1存在,按1分區
- 1不存在,按2.key值對分區數取余得到的值分區
- 1.2都不存在 隨機選個分區,等這個批次發送完了,再換
3 就是粘性分區
那么粘性分區的缺點是什么?
因為緩沖區溢出的條件是,大小和時間雙重判斷,如果大小不夠,但是時間夠了,還是會發走,這樣,最后導致,分區上產生數據傾斜
如何解決的?
3.3.1 Kafka去掉粘性分區的時間控制,批次只由大小判斷
1.3.自定義分區器
1.思路
- 1.實現接口Parititoner,重寫相關方法
- 2.修改配置 將partitioner設置為默認配置
2.1 自定義分區器代碼
public class MyPartitioner implements Partitioner {// 自定義分區器 實現partitioner接口// 1.分區方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 獲取消息String data = value.toString();// 創建partition 作為最后的分區標識int partitions;// 分區邏輯// 根據含有的字符串進行判斷 判斷進入哪個分區if (data.contains("atguigu")){partitions = 0;} else if (data.contains("shangguigu")){partitions = 1;} else {partitions = 2;}return partitions;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
2.2 主類
package com.atguigu.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class ProducerClientAsync {public static void main(String[] args) {// 0 配置對象Properties properties = new Properties();// --指定kafka的Broker地址properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");// -- 1.指定序列化器 序列化器的全限定類名properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");// -- 2.設置分區器properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());// -- 3.獲取客戶端連接對象KafkaProducer<String,String> kafkaProducer= new KafkaProducer<String,String>(properties);// key是主題 v是發送內容 這里注意一下// -- 4.發送數據String[] str= {"atguigu","111","atguigu","shangguigu","222"};for (int i =0; i < str.length; i++) {System.out.println(str[i]);try {kafkaProducer.send(new ProducerRecord<>("first", str[i]), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主題:" + metadata.topic() + "->" + "分區:" + metadata.partition());}else {// 出現異常打印exception.printStackTrace();}}}).get();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}kafkaProducer.close();}
}
3.面試細節
1.如何提高生產者的吞吐量
- 批次大小調到16
- 將等待時間改成50-100ms 默認是0
- 壓縮數據量,這樣每次發送的數據就多了
- 加大緩沖區大小,進來的數據變多,發送也能提上去
2.生產者如何保證數據可靠性的
主要通過ack機制
1.什么是ACK機制?
根據ack值來決定Kafka集群服務端的存儲應答
- ack=0 最低 生產者只管發送,不用接收
- ack=1 中等 生產者發送完需要等待Leader保存后回應,
- ack=-1 最高 生產者發送完需要等待所有副本保存后回應
2.分析ACK機制
性能與安全是成反比的
所以,-1雖然最安全,但是效率最低
3.如果將ACK調到-1會出現什么問題?
有可能出現數據重復發送與接收
比如,在同步的瞬間,Leader死掉,但是其他副本已經落盤,這時候,就是問題了。
因為Leader死掉了,所以會直接更換Leader,選出一個副本作為Leader,注意,這時顯示沒有收到內容,所以,send重新發送,這時候,每個副本上,收到的就是2份該數據了。
4.應用場景
acks=0 幾乎不用
acks=1 傳輸普通日志,允許丟失
acks=-1 傳輸高可靠性數據,一般與錢有關
5.ACK=-1一定可靠么?
不一定
如果分區副本數設置為1 ,或者ISR里應答的最小副本數設置為1(默認也是1),這時候,ack=1效果相同了。
也就是說,應答一個,就能走,就沒意義了
所以需要完全可靠就需要配置一下
ACK=-1 & 分區副本大于等于2 & ISR應答最小副本數量大于等于2
3. 數據去重
1.概念
至少一次:一次或者多次 完全可靠
最多一次:直接不管回復只管發送 ack=0
至少:保證數據不丟失,但是無法保證數據不重復
最多: 無法保證數據不丟失
1.如何解決數據的重復發送與接收的問題,同時保證數據的不丟失
注意,這里解決的是sender和服務端的重復發送與接收,而不是生產者本身發送多個重復消息的問題,這個要搞清楚。
一般重復問題,都是通過標識來判別,從而去重的
Kafka 0.11 引入 冪等性和事務
精確一次: 冪等性 +至少一次(ack=-1 & 分區副本>=2 & ISR最小副本>=2)
4.冪等性
1.概念
啥是冪等性,標識一個消息的唯一標識
<pid,partition,Seqnumber>
Pid 是會話ID,每次重新生成會話,就會重新生成PID
partition是分區 標識 消息是哪個分區的
Seqnumber是單調遞增的標識,注意,這是每個分區獨享的
這三個在一起,才是唯一標識。
2.如何使用冪等性
開啟參數enable.idempotence 默認為true,false關閉。
開啟開關就行