Kafka角色介紹
Topic
? ? ? ? Topic主題的意思,消費者必須指定主題用于的消息發送,生產者也必須指定主題用于消息的接收。topic只是邏輯上的劃分。
partition
? ? ? ? partition是分區的意思,他的主要作用是將發送到一個topic的數據做一個劃分。如果有4個partition那么消費者就可以去這四個分區中獲取消息,理想情況下提高了4倍效率。(降低Topic處理消息的壓力)其中的offset是用來記錄消息在分區當中的物理位置,可以用來保證在同一分區下消息的順序性。
? ? ? ? partition是將消息以物理的形式進行隔離,就是在一個目錄下由不同文件存儲。
broker
? ? ? ? broker即kafka服務器中的一個節點。用于接收生產者發來的消息、將消息寫入磁盤(分區對應的日志文件)、向消費者提供消息、參與分區副本的同步與 Leader 選舉
consumer-group
? ? ? ? 消費者組是讓一組消費者消費一個或者多個主題的分區,一個消費者組中一個分區只會被其中一個消費者消費。
? ? ? ? 分組的好處:組相當于調度中心,如果組內有人丟失消息了,組內維護有offset可以幫忙你送。誰沒活都去配置中心領。
? ? ? ? 為什么不用一個分組消費一個分區。每個組offset不共享,當組內無法處理時外部就會從最開始的消息開始消費出現重復消費。擴展也麻煩想要擴展只能加消費者組。
熟悉Kafka配置
kafka的配置可以通過配置類的形式進行設置,但是我們使用SpringBoot可以通過properties/yaml文件的形式加載配置(值得注意的是properties文件都是以扁平鍵值對,用 . 分割;yaml文件是通過樹形結構)然后就可以通過注解的形式使用Kafka
#表示配置Kafaka服務地址,通過 ,可以配置多臺服務
spring.kafka.bootstrap-servers = 127.0.01
#表示Kafka消息失敗重試次數
spring.kafka.producer.retries=3
#設置批量發送消息大小的閥值,達到16kb就會批量發送。
#批次發送的意義是為了減少網絡開支成本,多條消息建立一次網絡通道
#但是這里沒有設置消息等待發送時間,也就是每一條消息都會立即發送,這條消息更像是一個保險策略
spring.kafka.producer.batch-size=16384
#設置緩沖區大小,消息都會放到緩沖區里面等待
spring.kafka.producer.buffer-memory=33554432
#消息確認機制 0表示無需ack機制 1表示需要leader節點確認(ACK機制) -1 表示需要所有節點都確認
spring.kafka.producer.acks=1
#設置鍵值對的序列化方式。kafka對于生產者和消費者都必須設置序列化類型,
#因為Kafka將生產者消息將對象轉為字符數組,消費者需要將字符數組轉為需要的類型
#所以為了讓Kafka能夠接收消費消息都需要設置序列化類型
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
----------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------
#消費者組的意義是為了記錄一個組中消息消費到的位置也就是offset
#這樣新加入的消費者就知道從哪里開始執行任務
spring.kafka.consumer.group-id=default-group-td-geek
#兜底策略,避免因消息正在消費時,偏移量提交時宕機導致該條消息不消費。
#手動提交,只有在消息被消費完畢之后才會去提交偏移量
spring.kafka.consumer.enable-auto-commit=false
#兜底策略,當消費者啟動時,判斷偏移量是否可靠,如果不可靠 配置lateset讓消費者從最新消息開始消費 配置earliest讓消費者從最早消息開始消費
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消息生產者
只需要注入Kafka客戶端,調用客服端對象的send方法就可以發送消息,send方法需要指定消息發送到的topic,還有具體的數據。同時我們可以設置key值用于,分區運算,保證消息順序(在同一個分區下消息可以保證順序性)
消息消費者
通過注解的方式綁定監聽器,監聽器可以接收指定的topic用來消費消息。
@KafkaListener(topics = {"alphaess_"})
//ConsumerRecord<String, String> 是Kafka中消息記錄對象,第一個String指的是Key 第二個String指的是Value
public void onMessage1(ConsumerRecord<String, String> record){// 消費的哪個topic、partition的消息,打印出消息內容System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
}