目錄
- 1. 消息重復消費怎么解決
- 1.1. 確保相同的消息不會被重復發送(消費冪等性)
- 1.2. 消息去重
- 1.3. 消息重試機制
- 1.4. kafka怎么保證消息的順序性
- 1.4.1. 利用分區的特征:
- 1.4.2. 解決辦法:
- 1.4.3. 分區分配策略
- 1.4.3.1. RangeAssignor (每組(Topic)里的分區(partition)都依次消費,可能導致第一個消費者負重,最后的消費者無消息可消費)
- 1.4.3.2. RoundRobinAssignor (所有分區(partition)依次消費,不在區分有沒有組(Topic)。所有資源都盡量就盡量均衡。)
- 1.4.3.3. StickyAssignor (不改變原來的分配上,把關聯消費者宕機的分區,重新分配給其他消費者,盡量達到均衡)
- 1.5. kafka 怎么知道消費成功
- 1.6. 死信(消息處理失敗后怎么處理)
- 1.6.1. 死信配置(這里存放到特定的主題)
- 1.7. 事務
- 1.7.1. 配置參數開啟事務:
- 1.7.2. Producer 事務
- 1.7.2.1. javaSDK例子
- 1.7.2.2. springboot例子
- 1.7.3. Consumer 事務
- 1.7.3.1. 隔離級別:
- 1.7.3.2. 提交事務和回滾事務
- 1.7.3.3. 總結:
- 2. kafka文檔
- 2.1. Topic、Group、Partition、消費者的關系
- 2.2. 管理topic
- 2.3. 發送producer消息
- 2.4. consumer - 消費producer消息
- 2.5. 部署kafka
- 2.6. 部署zookeeper
- 2.7. 學習用的demo
1. 消息重復消費怎么解決
消息重復消費的問題可以通過多種方法解決,主要包括消費冪等性、消息去重、消息確認機制、消息重試機制、保證消息的順序性以及將消息進行持久化存儲。
-
消費冪等性
:確保在同一條消息被重復消費時,系統不會產生副作用或影響系統的正確性。這可以通過在消費端使用唯一標識來判斷消息是否已經被消費過,例如使用數據庫的唯一索引、使用分布式鎖等方式來保證冪等性。 -
消息去重
:針對同一條消息的多次消費,只保留其中一次消費結果。可以在消費者端進行去重,使用緩存或數據庫來記錄已經處理過的消息,避免重復消費。 -
消息確認機制
:MQ一般提供消息確認機制,如ACK機制。消費者在成功處理一條消息后,發送ACK給MQ,表示該消息已經被成功消費。如果消費者在處理消息時發生異常或失敗,可以不發送ACK,MQ會將該消息重新發送給其他消費者進行處理。 -
消息重試機制
:當消息處理失敗時,可以將消息重新發送給MQ,由MQ重新投遞給消費者進行處理。可以在消息的header中添加重試次數的標記,當達到最大重試次數后,可以將消息發送到死信隊列進行處理,以避免消息的無限重試。 -
保證消息的順序性
:如果消息的順序性很重要,可以將相關消息發送到同一個分區或同一個隊列中,以保證消息的順序性。(kafka分區) -
持久化機制
:為了避免消息丟失,可以將消息進行持久化存儲,例如將消息存儲到數據庫或文件系統中。即使MQ發生故障或重啟,也可以通過持久化的消息進行恢復。
這些方法可以單獨或結合使用,以有效解決消息重復消費的問題,確保系統的穩定性和數據的準確性。
1.1. 確保相同的消息不會被重復發送(消費冪等性)
Kafka事務是怎么實現的?Kafka事務消息原理詳解
Kafka生產者可以配置為冪等,確保相同的消息不會被重復發送。
保證在消息重發的時候,消費者不會重復處理。即使在消費者收到重復消息的時候,重復處理,也要保證最終結果的一致性。(可以理解為在應用端做了冪等處理。即使重復消息發送過來了,也會判斷是否已在處理,從而達到一條消息只會被一個處理)
消息在 MQ 中的傳遞,大致可以歸類為下面三種:
-
At most once: 至多一次。消息在傳遞時,最多會被送達一次。是不安全的,可能會丟數據。
-
At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現。
-
Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。
大部分消息隊列滿足的都是At least once,也就是可以允許重復的消息出現。
1.2. 消息去重
針對同一條消息的多次消費,只保留其中一次消費結果。可以在消費者端進行去重,使用緩存或數據庫來記錄已經處理過的消息,避免重復消費。
例如,使用redis緩存去重:
// 去重public void removeRepeatProcessMessage(FsConsumer fsConsumer){RSet<String> set = redisson.getSet("local:fs:downloadFileMsg");if (set.add(fsConsumer.getMsgId())) {// 成功則處理,失敗則拋出異常,記錄到死信隊列里// 只有當消息ID被成功添加到集合時才處理消息processMessage(fsConsumer);} else {// 去重處理// 如果消息ID已存在于集合中,則表示該消息已處理,跳過System.out.println("Message with ID " + fsConsumer.getMsgId() + " has already been processed.");}}
1.3. 消息重試機制
當消息處理失敗時,可以將消息重新發送給MQ,由MQ重新投遞給消費者進行處理。可以在消息的header中添加重試次數的標記,當達到最大重試次數后,可以將消息發送到死信隊列進行處理,以避免消息的無限重試。
@Configuration
public class KafkaConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);//最大重試三次ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// 設置重試間隔 10秒, 重試次數為 1 次BackOff backOff = new FixedBackOff(10 * 1000L, 3L);// 失敗進入死信,topic和group,都變成${topic}.DLT和${group}.DLT ,如下是進入死信的例子:/*# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroupGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmyGroup.DLT myTopic.DLT 0 4 4 4 test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3 test-0GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmyGroup myTopic 0 800020 800020 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0myGroup myTopic 1 400012 400012 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0myGroup myTopic 2 300000 300000 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0*/factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer,backOff));return factory;}
}
1.4. kafka怎么保證消息的順序性
1.4.1. 利用分區的特征:
- 分區是最小的并列單位;
- 一個消費者可以消費多個分區;
- 一個分區可以被多個消費者組里的消費者消費;
- 但是,一個分區不能同時被同一個消費者組里的多個消費者消費;(意思是:在組里,一個分區同時消費的只能有一個消費者)
1.4.2. 解決辦法:
- 把全部有序消息放到同一個分區里
- 把有需要有序的消息,放到同一個分區里。
1.4.3. 分區分配策略
Kafka的消費者分區策略
一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定哪個partition由哪個consumer來消費。Kafka提供了3種消費者分區分配策略:RangeAssigor
、RoundRobinAssignor
、StickyAssignor
。
術語:
partition - 分區
1.4.3.1. RangeAssignor (每組(Topic)里的分區(partition)都依次消費,可能導致第一個消費者負重,最后的消費者無消息可消費)
RangeAssignor對每個Topic進行獨立的分區分配。對于每一個Topic,首先對分區按照分區ID進行排序,然后訂閱這個Topic的消費組的消費者再進行排序,之后盡量均衡的將分區分配給消費者。這里只能是盡量均衡,因為分區數可能無法被消費者數量整除,那么有一些消費者就會多分配到一些分區。分配示意圖如下:
T0 是Topic-0
T1 是Topic-1
1.4.3.2. RoundRobinAssignor (所有分區(partition)依次消費,不在區分有沒有組(Topic)。所有資源都盡量就盡量均衡。)
RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分區及所有消費者進行排序后盡量均衡的分配(RangeAssignor是針對單個Topic的分區進行排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那么分配結果是盡量均衡的(消費者之間分配到的分區數的差值不會超過1)。如果訂閱的Topic列表是不同的,那么分配結果是不保證“盡量均衡”的,因為某些消費者不參與一些Topic的分配。
1.4.3.3. StickyAssignor (不改變原來的分配上,把關聯消費者宕機的分區,重新分配給其他消費者,盡量達到均衡)
StickyAssignor分區分配算法,目的是在執行一次新的分配時,能在上一次分配的結果的基礎上,盡量少的調整分區分配的變動,節省因分區分配變化帶來的開銷。Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動。其目標有兩點:
分區的分配盡量的均衡。
每一次重分配的結果盡量與上一次分配結果保持一致。
當這兩個目標發生沖突時,優先保證第一個目標。第一個目標是每個分配算法都盡量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的。
StickyAssignor算法比較復雜,下面舉例來說明分配的效果(對比RoundRobinAssignor),前提條件:
有4個Topic:T0、T1、T2、T3,每個Topic有2個分區。
有3個Consumer:C0、C1、C2,所有Consumer都訂閱了這4個分區。
1.5. kafka 怎么知道消費成功
Kafka 不直接提供一種機制來確認消費者是否成功消費了消息。但是,它提供了幾種策略來保證消息的成功處理:
-
使用 Kafka 的自動提交功能:可以配置消費者自動定期提交消息的偏移量。
-
手動提交偏移量:消費者可以在處理完消息后手動提交偏移量,表明該消息已被成功處理。
-
使用 Kafka 事務:在支持事務的 Kafka 集群上,可以開啟事務來保證消費者處理消息的全部成功或失敗。
以下是一個簡單的示例,展示如何在消費者中手動提交偏移量:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Arrays;
import java.util.Properties;public class ManualOffsetCommit {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 關閉自動提交props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 處理消息System.out.println(record.value());// 處理完后提交當前偏移量consumer.commitSync();}}}
}
1.6. 死信(消息處理失敗后怎么處理)
死信,即Dead Letter,是指在Kafka中無法消費的消息。當消息因為以下原因而無法被消費時,可能會變成死信:
- 消費者故障,無法處理消息。
- 消息處理時發生異常。
- 消息達到設定的消息Level再次嘗試消費的次數限制。
為了處理死信,Kafka提供了幾種策略:
- 將死信發送到一個特定的主題(Topic)。
- 將死信保存到一個文件中。
以下是一個示例,演示如何設置Kafka消費者,以便將死信消息發送到一個特定的主題:
1.6.1. 死信配置(這里存放到特定的主題)
@BeanDefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {// 失敗進入死信,topic和group,都變成${topic}.DLT和${group}.DLT ,如下是進入死信的例子:/*# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroupGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmyGroup.DLT myTopic.DLT 0 4 4 0 test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3 test-0GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmyGroup myTopic 0 800020 800020 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0myGroup myTopic 1 400012 400012 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0myGroup myTopic 2 300000 300000 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0*/
// DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
// (r, e) -> {
// //死信的結果發送到特定的主題
// return new TopicPartition(r.topic()+".DLT", r.partition());
//
// });DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// ErrorHandler errorHandler = new FallbackBatchErrorHandler(recoverer, new FixedBackOff(0L, 2L));return new DefaultErrorHandler(recoverer,new FixedBackOff(10*1000L, 2L));}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory(DefaultErrorHandler defaultErrorHandler) {ConcurrentKafkaListenerContainerFactory<String, String>factory = new ConcurrentKafkaListenerContainerFactory<>();// 設置消費者工廠factory.setConsumerFactory(consumerFactory());// 消費者組中線程數量factory.setConcurrency(3);// 拉取超時時間factory.getContainerProperties().setPollTimeout(3000);// 當使用批量監聽器時需要設置為truefactory.setBatchListener(true);factory.setCommonErrorHandler(defaultErrorHandler);return factory;}
觸發死信的業務代碼
@Service
public class UserConsumerService {@KafkaListener(topics = {"myUser"},groupId = "myUserGroup", containerFactory="kafkaListenerContainerFactory")
// public void kafkaListener(String message){public void kafkaListener(List<ConsumerRecord<String, String>> recordList){System.out.println("消費列表:"+recordList.size());for (ConsumerRecord<String, String> consumerRecord : recordList) {kafkaListener(consumerRecord);}}public void kafkaListener(ConsumerRecord<String, String> record){String key = record.key().toString();String value = record.value().toString();System.out.println(record.offset()+" \t "+key+" \t "+ value);if(value.contains("abc")){System.out.println("存在消費abc");throw new RuntimeException("存在消費abc");}}}
1.7. 事務
1.7.1. 配置參數開啟事務:
// transactional.id = transactionId
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionId");
1.7.2. Producer 事務
保證事務原子性操作
事務期間,會發送produce消息到kafka服務器上的Topic,這是未提交狀態的消息,你可以看到Topic上多了幾條消息。
事務回滾后,消息還是會存在的。所以comsumer需要使用讀已提交的方式獲取。
1.7.2.1. javaSDK例子
以下是 Producer 事務使用示例:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");KafkaProducer producer = new KafkaProducer(props);producer.initTransactions();try {String msg = "matt test";producer.beginTransaction();producer.send(new ProducerRecord(topic, "0", msg.toString()));producer.send(new ProducerRecord(topic, "1", msg.toString()));producer.send(new ProducerRecord(topic, "2", msg.toString()));producer.commitTransaction();} catch (ProducerFencedException e1) {e1.printStackTrace();producer.close();
} catch (KafkaException e2) {e2.printStackTrace();producer.abortTransaction();
}
producer.close();
1.7.2.2. springboot例子
@Transactional
public String sendForTransaction(String userNo, String jsonString) {System.out.println("sendForTransaction方法中,是否開啟事務中:"+kafkaTemplate.inTransaction());kafkaTemplate.send("myUser","key",jsonString);return "success";
}
1.7.3. Consumer 事務
通過使用隔離級別能查看到未提交的消息。
1.7.3.1. 隔離級別:
隔離級別:
- isolation.level=read_uncommitted 讀未提交
- isolation.level=read_committed 讀取已提交
配置方式:
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));//# 讀取已提交的消息
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));//# 讀取已提交的消息
Springboot中kafka默認隔離級別是 讀未提交
public class ConsumerConfig{....public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);....
}
1.7.3.2. 提交事務和回滾事務
使用 Kafka 事務:在支持事務的 Kafka 集群上,可以開啟事務來保證消費者處理消息的全部成功或失敗。
// 自動提交事務@Transactionalpublic String sendForTransaction1(String userNo, String jsonString) {System.out.println("sendForTransaction方法中,是否開啟事務中:"+kafkaTemplate.inTransaction());kafkaTemplate.send("myUser","key",jsonString);return "success";}
// 回滾事務@Transactionalpublic String sendForTransaction(String userNo, String jsonString) {System.out.println("sendForTransaction方法中,是否開啟事務中:"+kafkaTemplate.inTransaction());kafkaTemplate.send("myUser","key",jsonString);throw new RuntimeException();}
1.7.3.3. 總結:
初步:因為分區只會被一個消費者消費,沒消費完就不會發送下個消息,所以發送到消費者端,實際上就同時進行事務的就只有一個,相當于同步消費。
進階:但這樣性能會很差,所以后面可以把業務操作區間的,進行隔離歸類,就像id余數為1的放1分區,id余數為2的放2分區,這樣大大提高了消費速度,可自定義程度高,受制于本人設計,它的瓶頸就是設計人員,如果歸類好,可以不存在資源搶占問題。
瓶頸:但消息有個問題,就是一個分區每次只能消費一條記錄,所以它并行處理嚴重依賴于分區數量,但分區數量也不是亂加的,設計不合理,會存在資源操作沖突的問題。
對比其他事務的瓶頸:
- 消息事務:瓶頸在于設計人員設計,合理分配分區數量,資源搶占問題在設計層面就已經解決了。
- seata:瓶頸在于是否存在資源搶占。
- seata的XA、AT、TCC等 的瓶頸在于
是否存在資源搶占
,如果在修改倉庫那功能的話,因為存在資源搶占的話,相當于同步處理,其他線程被鎖,只有一個線程在處理,這么多服務都只會有一個業務在處理,直接導致seta性能拖慢。
- seata的XA、AT、TCC等 的瓶頸在于
個人建議:
- 如果業務存在資源搶占的話,使用消息事務會更優。
- 如果業務不存在資源搶占的話,使用seta會更優。
消息事務就像是基金一樣,穩定處理,
seata就像是股票一樣,上下波動,一時快,一時慢。(若使用在合適場景會很快。)
應用場景
:如果不停的區分seata和消息事務會很累的,而且因為費用問題,還有業務一定會有資源搶占問題,所以一般都使用消息事務的方式處理。這是我個人分析。
2. kafka文檔
https://kafka.apache.org/documentation/#producerconfigs
2.1. Topic、Group、Partition、消費者的關系
大體上分為Topic和Group,代表的是發布者與訂閱者。
Topic 就像是一個數據庫表,Partition就是數據庫表中的分表存儲的記錄。
Group指的是消費者組,消費者組里有很多個消費者,消費Topic的Partition表里的消息
例如:
TopicsmyOrdermyOrder.DLTmyTopicmyTopic.DLTmyUsermyUser.DLT
Consumer GroupsmyGroupmyGroup.DLTmyUserGroupmyUserGroup.DLT
Topic的消息只記錄數據,不記錄消費記錄
Group只記錄消息消費情況。
Topic 與 消費組 的關系,就像是聊天室,topic是發布消息A,其他消費組都能收到消息A,消息是一對多的關系,每次增加一組消費者,都會從0offset開始讀取Topic
2.2. 管理topic
https://blog.51cto.com/u_16213637/9850264
# 查看所有topicbin/kafka-topics.sh --bootstrap-server localhost:9092 --list
--------------------
__consumer_offsets
myTopic
test-javasdk
--------------------
# 查看topic詳情
> bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
--------------------
Topic: myTopic TopicId: Y_t8v2snRgmirrGykcLYmg PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824Topic: myTopic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001Topic: myTopic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001Topic: myTopic Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001--------------------
# 創建topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \--partitions 20 --replication-factor 1 --config x=y
簡寫:> bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 # 修改topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic
//> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name myTopic --partitions 20 --alter # 修改topic 的分區
bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic# 刪除topic> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name# 添加topic配置,例如修改 partitions 40
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \--partitions 40# 刪除topic配置
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y# 消費者列表
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning
--------------------
test-99988
test-99989
test-99990
test-99991
test-99992
test-99993
test-99994
test-99995
--------------------
# 查看消費組里的成員
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup --members
--------------------
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
myGroup ConsumerTest-1a87e781-b3ce-43c8-ac4a-76c8ccdec5aa /192.168.3.3 ConsumerTest 2
myGroup ConsumerTest2-ed1ddf24-7a68-4136-ad39-d774f531d765 /192.168.3.3 ConsumerTest2 1
--------------------# 查看消息堆積情況(還可以看出是哪臺機器有)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
--------------------GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup myTopic 0 664075 700001 35926 ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb /192.168.3.3 ConsumerTest
myGroup myTopic 1 284334 300000 15666 ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb /192.168.3.3 ConsumerTest
myGroup myTopic 2 200000 200000 0 ConsumerTest2-4ae32ff0-e67f-4d09-8185-005bf61f3b1f /192.168.3.3 ConsumerTest2其中:LAG 是待消費記錄
--------------------
- replication-factor
復制因素控制有多少服務器將復制寫入的每條消息。如果您的復制因子為3,那么在您失去對數據的訪問權限之前,最多可以有2臺服務器出現故障。我們建議您使用2或3的復制因子,這樣您就可以在不中斷數據消耗的情況下透明地跳轉機器。
- partitions
分區計數控制主題將被切分成多少個日志。分區計數有幾個影響。首先,每個分區必須完全適合單個服務器。因此,如果您有20個分區,那么整個數據集(以及讀寫負載)將由不超過20臺服務器處理(不包括副本)。最后,分區計數會影響消費者的最大并行性。這將在概念部分進行更詳細的討論。
每個分片的分區日志都放在Kafka日志目錄下自己的文件夾中。這類文件夾的名稱由主題名稱、加上破折號(-)和分區id組成。由于典型的文件夾名稱長度不能超過255個字符,因此主題名稱的長度將受到限制。我們假設分區的數量永遠不會超過100,000。因此,主題名稱不能超過249個字符。這在文件夾名稱中留下了足夠的空間來放置破折號和可能的5位數長的分區id。
- config
與主題相關的配置既有服務器默認值,也有可選的每個主題覆蓋。如果沒有給出每個主題的配置,則使用服務器默認值。可以在創建主題時通過提供一個或多個——config選項來設置覆蓋。下面的例子創建了一個名為my-topic的主題,并自定義了最大消息大小和刷新速率:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
以后還可以使用alter configs命令更改或設置覆蓋。下面的例子更新了my-topic的最大消息大小:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --add-config max.message.bytes=128000
要檢查主題上設置的覆蓋,您可以執行以下操作
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
要移除覆蓋,您可以這樣做
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --delete-config max.message.bytes
以下是主題級配置。服務器對此屬性的默認配置在服務器默認屬性標題下給出。給定的服務器默認配置值僅適用于沒有顯式主題配置覆蓋的主題。
2.3. 發送producer消息
# 發送消息
> bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second eventCtrl-C 終止
2.4. consumer - 消費producer消息
# 消費消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
數量統計:普通消息和死信消息
死信是應用端生成的,非kafka本身自帶。
myGroup.DLT 是死信
myGroup 是普通消息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
---------------------------------
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup.DLT myTopic.DLT 0 4 4 0 test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3 test-0GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup myTopic 0 800020 800020 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0
myGroup myTopic 1 400012 400012 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0
myGroup myTopic 2 300000 300000 0 test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3 test-0---------------------------------
2.5. 部署kafka
創建docker-compose.yml
version: '2'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sock
# 證實可以
docker-compose up -d
2.6. 部署zookeeper
后面在找了一個
創建docker-compose.yml
version: '2'
services:zookeeper: # 注意,在我的archlinux系統的機子會有內存泄露問題image: wurstmeister/zookeeperports:- "2181:2181"2.6. kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sock
# 啟動
docker-compose up -d
2.7. 學習用的demo
https://gitee.com/alvis128/springboot-kafka-demo
官網:https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#using-kafkatransactionmanager