kafka的Java客戶端
生產者
1.引入依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.3</version></dependency>
2.生產者發送消息的基本實現
/*** 消息的發送?*/
public class MyProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094");// 把發送的key從字符串序列化為字節數組props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 把發送消息value從字符串序列化為字節數組props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());RecordMetadata metadata = null;try (Producer<String, String> producer = new KafkaProducer<>(props)) {Order order = new Order(1L, 99.9D);// 未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = newProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息發送成功的同步阻塞?法metadata = producer.send(producerRecord).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);} finally {if (metadata != null) {// =====阻塞=======System.out.println("同步?式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());}}}
}
3.發送消息到指定分區
4.發送消息未指定分區
發送消息未指定分區,會通過業務key的hash運算,算出消息往哪個分區上發
// 未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = newProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
5.同步發送消息
如果生產者發送消息沒有收到ack,生產者會阻塞,阻塞到3s的時間,如果還沒有收到消息,會進行重試。重試的次數3次。
RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步?式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
6.異步發送消息
異步發送,生產者發送完消息后就可以執行之后的業務,broker在收到消息后異步調用生產者提供的callback回調方法。
// 異步發送消息 Callback回調接口producer.send(producerRecord, new Callback() {// 異步回調方法@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {System.err.println("發送消息失敗:" +e.getMessage());}if (metadata != null) {System.out.println("異步?式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}}});System.out.println("處理之后的邏輯~");
輸出結果:
7.生產者中的ack的配置
在同步發消息的場景下:生產者發送消息到broker上后,ack會有3種不同的選擇:
ack = 0
:kafka-cluster不需要任何的broker收到消息,就立即返回ack給生產者就可以繼續發送下一條消息,效率是最高的但最容易丟消息ack=1(默認)
:多副本之間的leader已經收到消息,并把消息寫?到本地的log中,才會返回ack給生產者,性能和安全性是最均衡的(這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失)ack=-1/all
:需要等待 min.insync.replicas(默認為1,推薦配置大于等于2) 這個參數配置的副本個數都成功寫入日志才會返回ack給生產者,這種策略會保證只要有?個備份存活就不會丟失數據。這種方式最安全但性能最差。(?般除非是金融級別,或跟錢打交道的場景才會使用這種配置)
code:
props.put(ProducerConfig.ACKS_CONFIG, "1");
關于ack和重試(如果沒有收到ack,就開啟重試)的配置
- 發送會默認會重試3次,每次間隔100ms
props.put(ProducerConfig.ACKS_CONFIG, "1");/*發送失敗會重試,默認重試間隔100ms,【重試能保證消息發送的可靠性,但是也可能造成消息重復發送】,?如?絡抖動,所以【需要在接收者那邊做好消息接收的冪等性處理】*/props.put(ProducerConfig.RETRIES_CONFIG, 3);// 重試間隔設置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
8.關于消息發送的緩沖區
發送的消息會先進入到本地緩沖區(32mb),kakfa會跑?個線程,該線程去緩沖區中取16k的數據,發送到kafka,如果到10毫秒數據沒取滿16k,也會發送?次。
- kafka默認會創建一個消息緩沖區,用來存放要發送的消息,緩沖區是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- kafka本地線程會去緩沖區中?次拉16k的數據,發送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果線程拉不到16k的數據,間隔10ms也會將已拉到的數據發到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
消費者
1.消費者消費消息的基本實現
public class MyConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094");// 消費分組名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 1.創建?個消費者的客戶端try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 2.消費者訂閱主題列表consumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {/** 3.poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 4.操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}}} catch (Exception e) {throw new RuntimeException(e);}}
}
2.消費者自動提交和手動提交offset
1)提交的內容
消費者無論是自動提交還是手動提交,都需要把所屬的消費組+消費的某個主題+消費的某個分區及消費的偏移量,這樣的信息提交到集群的_consumer_offsets主題里面。
2)自動提交
消費者poll消息下來以后就會自動提交offset
// 是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注意
:自動提交會丟消息。因為消費者在消費前提交offset,有可能提交完后還沒消費時消費者掛了。于是下?個消費者會從已提交的offset的下一個位置開始消費消息。之前未被消費的消息就丟失掉了。
3)手動提交
需要把自動提交的配置改成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手動提交又分成了兩種
:
- 手動同步提交
在消費完消息后調用同步提交的方法,當集群返回ack前?直阻塞,返回ack后表示提交成功,執行之后的邏輯
while (true) {/** poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消費完if (records.count() > 0) {// 有消息// ?動同步提交offset,當前線程會阻塞直到offset提交成功// 【?般使?同步提交】,因為提交之后?般也沒有什么邏輯代碼了consumer.commitSync();// =======阻塞=== 提交成功}}
- 手動異步提交
在消息消費完后提交,不需要等到集群ack,直接執行之后的邏輯,可以設置?個回調方法,供集群調用
while (true) {/** poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消費完if (records.count() > 0) {// 有消息// ?動異步提交offset,當前線程提交offset不會阻塞,可以繼續處理后?的程序邏輯consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getMessage());}}});}}
3.長輪詢poll消息(消費者拉取消息)
-
消費者建立了與broker之間的長連接,開始poll消息
-
默認情況下,消費者一次會poll500條消息
// ?次poll最?拉取消息的條數,可以根據消費速度的快慢來設置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
- 代碼中設置了長輪詢的時間是1000毫秒
while (true) {/** poll()API 是拉取消息的?輪詢*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}}
- 意味著:
- 如果?次poll到500條,就直接執行for循環
- 如果這?次沒有poll到500條。且時間在1秒內,那么長輪詢繼續poll,要么到500條,要么到1s,執行后續for循環
- 如果多次poll都沒達到500條,且1秒時間到了,那么直接執行for循環
- 如果兩次poll的間隔超過30s(poll時間短但是消費時間長,消費者消費可能會達到30s左右),集群會認為該消費者的消費能力過 弱,該消費者被踢出消費組,觸發rebalance機制,rebalance機制會造成性能開銷。
可以通過設置參數, 讓?次poll的消息條數少?點,避免觸發rebalance損耗性能
// ?次poll最?拉取消息的條數,可以根據消費速度的快慢來設置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果兩次poll的時間如果超出了30s的時間間隔,kafka會認為其消費能?過弱,將其踢出消費組。將分區分配給其他消費者。-rebalanceprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
4.消費者的健康狀態檢查
消費者每隔1s向kafka集群發送心跳,集群發現如果有超過10s沒有續約的消費者,將被踢出消費組,觸發該消費組的rebalance機制,將該分區交給消費組里的其他消費者進行消費。
// consumer給broker發送心跳的間隔時間 1s一次
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果超過10秒沒有收到消費者的心跳,則會把消費者踢出消費組,進?rebalance,把分區分配給其他消費者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
5.指定分區和偏移量、時間消費
- 指定分區消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 從頭消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 指定offset消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
- 指定時間消費
根據時間,去所有的partition中確定該時間對應的offset,然后去所有的partition中找到該offset之后的消息開始消費。
// topic對應所有分區
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 從1小時前開始消費
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;long offset = value.offset();System.out.println("partition-" + key.partition() +"|offset-" + offset);System.out.println();//根據消費?的timestamp確定offsetconsumer.assign(Arrays.asList(key));consumer.seek(key, offset);
}
6.新消費組的消費offset規則
新消費組中的消費者在啟動以后,默認會從當前分區的最后?條消息的offset+1開始消費(消費新消息)。可以通過以下的設置,讓新的消費者第?次從頭開始消費。之后開始消費新消息(最后消費的位置的偏移量+1)
-
Latest:默認的,消費新消息
-
earliest:第?次從頭開始消費。之后開始消費新消息(最后消費的位置的偏移量+1),這個需要區別于consumer.seekToBeginning(每次都從頭開始消費)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
SpringBoot集成kafka
1.引入依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2.配置文件
server:port: 8080
spring:kafka:bootstrap-servers: 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094producer: # 生產者retries: 3 # 設置大于0的值,則客戶端會將發送失敗的記錄重新發送batch-size: 16384 # 每次拉取多少數據發送broker buffer-memory: 33554432 # 本地緩沖區大小acks: 1# 指定消息key和消息體的編解碼?式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 當每?條記錄被消費者監聽器(ListenerConsumer)處理之后提交# RECORD# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后提交# BATCH# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交# TIME# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交# COUNT# TIME | COUNT 有?個條件滿足時提交# COUNT_TIME# 當每?批poll()的數據被消費者監聽器(ListenerConsumer)處理之后, 手動調用Acknowledgment.acknowledge()后提交# MANUAL# 【手動調用Acknowledgment.acknowledge()后立即提交,?般使用這種】# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE
3.消息生產者
發送消息到指定topic
4.消息消費者
設置消費組,消費指定topic
@Component
public class MyConsumer {@KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")public void listenGroup(ConsumerRecord<String, String> record,Acknowledgment ack) {String value = record.value();System.out.println(record);System.out.println(value);//?動提交offsetack.acknowledge();}
}
5.消費者中配置消費主題、分區和偏移量
設置消費組、多topic、指定分區、指定偏移量消費及設置消費者個數
@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))}, concurrency = "3")// concurrency:同消費組中消費者個數,就是并發消費數,建議小于等于分區總數public void listenGroupPro(ConsumerRecord<String, String> record,Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);//?動提交offsetack.acknowledge();}