深入Kafka client

分區分配策略

客戶端可以自定義分區分配策略, 當然也需要考慮分區消費之后的offset提交, 是否有沖突。

消費者協調器和組協調器

a. 消費者的不同分區策略, 消費者之間的負載均衡(新消費者加入或者存量消費者退出), 需要broker做必要的協調。
b. Kafka按照消費組管理消費者, 鑒于offset提交最終都是在某個broker節點上完成。該broker扮演GroupCoordinator角色, 具體的選擇則是通過hash快速定位。
c. client端存在一個ClientCoordinator與目標的GroupCoordinator進行通信實現最終協調;
d. 具體過程如下

ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer) 1. Find_Coordinator request Find_Coordinator response 2. Join_Group request 3.1 calculate brokerId 3.2 Elect leader consumer 3.3 Elect partition strategy . Join_Group response, isLeader 4. Sync_Group Request Sync_Group Response 5. Poll offset/message, HeartBeat response offset/heartbeat/message ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer)

關于__consumer_offset

__consumer_offset是一個特殊的topic, 用于存儲每個topic中partition中client提交的offset。其中的數據保留時間通過offset.retention.minutes配置。如果consumer消費消息的間隔超過了配置時間, 則offset會丟失, consumer再次獲取offset時會因為沒有存量的offset而自動重置(auto.offset.reset)。該topic下的消息清理采用壓縮策略(僅保留最新消息)。Kafka中會有定時清理任務清理過期的消費位移。

消息發送QoS

  1. at-least-once, 至少一次, 消息不會丟失, 但消息會重復;
  2. at-most-once, 至多一次, 消息不會重復, 但可能會丟失;
  3. exact-once, 恰好一次, 消息肯定被傳輸且只傳輸一次;(如果開發即時消息系統, 那么這個語義就是我們的目標)
    默認情況下, Kafka producer在發送時, 如果消息發送失敗會自動進行重試, 重試過程可能會導致消息重復。而一旦發送成功, Kafka通過多副本機制保證消息一定會被保存。因此從consumer角度觀察, producer發送的結果, 其QoS是at-least-once。如果需要exact-once, 則需要啟用Kafka的冪等特性。

冪等

  1. 配置參數
    enable.idompotence=true
    retrics > 0
    max.in.flight.requests.per.connection <=5
    ack=-1

  2. 實現細節
    首先冪等是partition級別, broker端自動為producer分配一個PID, 并維護PID->分區(序列號 lastSeq) 的狀態。當producer發送消息時, 必須攜帶該序列號newSeq。broker端收到消息時做校驗:
    a. newSeq = lastSeq+1, broker接收;
    b. newSeq > lastSeq+1, 中間存在消息丟失, 拋出OutOfOrderException;
    c. newSeq < lastSeq+1, 消息存在重復, 直接丟棄即可.

事務消息

如果要實現跨parition的exact-once語義, 則需要基于事務消息。一般來說事務有ACID的特性, 但這個是數據庫事務的通用場景。Kafka下消息需要考慮生產和消費, 這里的事務消息更多是生產端的事務消息。消費端可能會因為某些原因無法以事務的形式消費。比如:

  1. 對于采用日志壓縮策略的主題而言, 事務中的消息被清理(對相同key的消息后寫入的消息會覆蓋之前寫入的消息);
  2. 事務涉及的分區多個日志段, 如果老的日志分段被刪除, 對應的消息也會消失;
  3. 消費者通過seek消費消息, 造成消息遺漏;
  4. 消費者在消費時沒有消費到事務涉及的所有分區, 因此不能讀取事務中的所有消息;
    總的來說, 事務保證了生產者可以以事務的方式實現消息發送的exact
    -once語義, 但消息清理和消費并未引入事務約束。

實現原理

  1. 開啟冪等;
  2. 設置事務ID, transactional.id;
  3. 生產者通過事務ID得到PID和producer epoch, 進而實現跨生產者會話的消息發送和事務恢復。前者保證相同transactionId的生產者僅有1個可以有效發送消息, 后者保證如果事務消息發送后宕機新恢復出來的生產者可以繼續提交或者終止事務。其中包含2個方面, 生產者的唯一性, 其關聯的在途事務的可見性和可操作性。
  4. broker端為支持事務消息引入了事務協調器, 與組協調器類似, 用于處理事務的提交和終止。
  5. 具體交互流程如下
    發送事務消息交互細節

事務存儲

  1. 日志存儲按Topic, Partition和LogSegment層級存儲, 事務消息也不例外;
  2. 與普通消息的區別是, 事務消息更多適用于發送一組消息的場景, 具體到LogSegment就是有一組連續的消息, 因此Kafka引入了ControlBatch消息來標志消息結束。
  3. 事務消息的開始在哪里呢? 嚴格來說, producer跨分區發送成功后, consumer是無法恢復出原有的順序, 在分區級別僅可以做到與某個事務關聯的一組消息(通過消息的屬性標志是否為事務消息), 結束通過ControlBatch標志一組消息結束。

小結

本文討論了Kafka發送消息的三種語義at-least-once, at-most-once, exact-once,并針對exact-once的單分區實現(冪等控制)和跨分區實現(事務消息)做簡要介紹, 希望能幫助你梳理出Kafka broker端對消息發送QoS實現的基本脈絡, 為進一步學習打基礎。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/716963.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/716963.shtml
英文地址,請注明出處:http://en.pswp.cn/news/716963.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

VUE3:省市區聯級選擇器

一、實現效果 二、代碼展示 <template><div class"page"><select v-model"property.province"><option v-for"item in provinces" :key"item">{{ item }}</option></select><select v-model&…

今日學習總結2024.3.2

最近的學習狀態比較好&#xff0c;感覺非常享受知識進入腦子的過程&#xff0c;有點上頭。 實驗室一個星期唯一一天的假期周六&#xff0c;也就是今天&#xff0c;也完全不想放假出去玩啊&#xff0c;在實驗室泡了一天。 很后悔之前膽小&#xff0c;沒有提前投簡歷找實習&…

YOLOv9有效提點|加入MobileViT 、SK 、Double Attention Networks、CoTAttention等幾十種注意力機制(五)

專欄介紹&#xff1a;YOLOv9改進系列 | 包含深度學習最新創新&#xff0c;主力高效漲點&#xff01;&#xff01;&#xff01; 一、本文介紹 本文只有代碼及注意力模塊簡介&#xff0c;YOLOv9中的添加教程&#xff1a;可以看這篇文章。 YOLOv9有效提點|加入SE、CBAM、ECA、SimA…

ETH網絡中的區塊鏈

回顧BTC網絡的區塊鏈系統 什么是區塊鏈&#xff1f;BTC網絡是如何運行的&#xff1f;BTC交易模式 - UXTO ETH網絡中的區塊鏈 ETH網絡的基石依舊是 區塊鏈。上面 什么是區塊鏈&#xff1f; 的文章依舊適用。 相比BTC網絡&#xff0c;ETH網絡的賬戶系統就相對復雜&#xff0c;所…

ZJGSU 1199 表達式計算

題目描述 在數據結構課上&#xff0c;老師給大家布置了一個表達式計算的問題 3*21*5. Its so easy!!! csw同學做了很不過癮&#xff0c;他想求解更復雜的表達式: 比如(123456)/789. 但一時之間他想不出好的辦法&#xff0c;諸位就幫幫他吧. 輸入 輸入包括多組數據, 每組測試…

實用工具:實時監控服務器CPU負載狀態并郵件通知并啟用開機自啟

作用&#xff1a;在服務器CPU高負載時發送郵件通知 目錄 一、功能代碼 二、配置開機自啟動該監控腳本 1&#xff0c;配置自啟腳本 2&#xff0c;啟動 三、功能測試 一、功能代碼 功能&#xff1a;在CPU負載超過預設置的90%閾值時就發送郵件通知&#xff01;郵件內容顯示…

【Spring連載】使用Spring Data訪問 MongoDB----對象映射之屬性轉換器

【Spring連載】使用Spring Data訪問 MongoDB----對象映射之屬性轉換器 一、聲明式值轉換器二、編程式值轉換器注冊三、MongoCustomConversions配置 雖然基于類型的轉換已經提供了影響目標存儲中某些類型的轉換和表示的方法&#xff0c;但當僅考慮特定類型的某些值或屬性進行轉換…

js中Generator函數詳解

定義&#xff1a; promise是為了解決回調地獄的難題出現的&#xff0c;那么 Generator 就是為了解決異步問題而出現的。 普通函數&#xff0c;如果調用它會立即執行完畢&#xff1b;Generator 函數&#xff0c;它可以暫停&#xff0c;不一定馬上把函數體中的所有代碼執行完畢…

Linux基本指令(下)

目錄 1. less指令 2. head與tail指令 3. find指令 示例 4. grep指令 示例 ?編輯 5. zip/unzip 打包與壓縮 示例 ?編輯 6. tar指令 7. find指令&#xff1a; -name 8. echo指令 9. 時間相關的指令 1.在顯示方面&#xff0c;使用者可以設定欲顯示的格式&#xff…

分布式ID(6):Redis實現分布式ID生成

Redis是一個高性能的鍵值數據庫,它可以用于生成分布式唯一標識符。需要注意的是Redis實現ID可以用,這也是很多公司的選擇。但是在redis服務器宕機的情況下,他也可能會出現重復生成ID的情況。 1 實現原理 利用Redis的原子操作:Redis提供了原子性的INCR和INCRBY命令,可用于…

使用python或AI自動分析數據關聯(簡介)

有一些Python庫可以幫助用戶自動發現數據集中的關聯關系。通常這類方法被稱為關聯分析或關聯規則挖掘&#xff0c;其中最著名的算法是Apriori和FP-Growth。 兩個算法 Apriori算法&#xff1a; 這是一個用于頻繁項集挖掘和關聯規則學習的經典算法。Python中的mlxtend庫提供了一…

【機器學習】有監督學習算法之:K最近鄰

K最近鄰 1、引言2、決策樹2.1 定義2.2 原理2.3 實現方式2.3.1 距離度量2.3.2 K值的選擇 2.4 算法公式2.5 代碼示例 3、總結 1、引言 小屌絲&#xff1a;魚哥&#xff0c; 這么長時間沒更新了&#xff0c;是不是得抓緊時間了。 小魚&#xff1a;最近可都是在忙的呢&#xff0c;…

已解決ResponseEntityException的Spring MVC異常響應實體異常的正確解決方法,親測有效!!!

由于ResponseEntityException并非Spring框架中明確定義的異常類&#xff0c;我推斷這里可能指的是在使用ResponseEntity時遇到的常見異常或錯誤。因此&#xff0c;我將根據這個假設&#xff0c;提供一個解決Spring MVC中與ResponseEntity相關異常的通用方法指南。 目錄 問題分…

線上歷史館藏系統 Java+SpringBoot+Vue+MySQL

??計算機編程指導師 ??個人介紹&#xff1a;自己非常喜歡研究技術問題&#xff01;專業做Java、Python、微信小程序、安卓、大數據、爬蟲、Golang、大屏等實戰項目。 ??實戰項目&#xff1a;有源碼或者技術上的問題歡迎在評論區一起討論交流&#xff01; ?? Java實戰 |…

day09_商品管理訂單管理SpringTaskEcharts

文章目錄 1 商品管理1.1 添加功能1.1.1 需求說明1.1.2 核心概念SPUSKU 1.1.3 加載品牌數據CategoryBrandControllerCategoryBrandServiceCategoryBrandMapperCategoryBrandMapper.xml 1.1.4 加載商品單元數據ProductUnitProductUnitControllerProductUnitServiceProductUnitMap…

詳解java中的Lambda表達式

Lambda表達式的前世今生&#xff08;來歷與概述&#xff09; Lambda表達式的前世------匿名類 以往&#xff0c;使用單一抽象方法的接口被用作函數類型。 它們的實例表示函數&#xff08;functions&#xff09;或行動&#xff08;actions&#xff09;。 自從 JDK 1.1 于 1997…

【MySQL】超詳細-基礎操作

數據庫定義 數據庫是一類軟件&#xff0c;用來管理數據&#xff0c;組織數據&#xff1b; 關系型數據庫MySQL&#xff08;Oracle,SQL Server,SQLite&#xff09;以表格形式組織數據&#xff0c;數據格式要求嚴格&#xff1b;非關系型數據庫Redis&#xff08;MongoDB,HBase&…

數據結構與算法-冒泡排序

引言 在數據結構與算法的世界里&#xff0c;冒泡排序作為基礎排序算法之一&#xff0c;以其直觀易懂的原理和實現方式&#xff0c;為理解更復雜的數據處理邏輯提供了堅實的入門階梯。盡管在實際應用中由于其效率問題不常被用于大規模數據的排序任務&#xff0c;但它對于每一位初…

【C++】set、multiset與map、multimap的使用

目錄 一、關聯式容器二、鍵值對三、樹形結構的關聯式容器3.1 set3.1.1 模板參數列表3.1.2 構造3.1.3 迭代器3.1.4 容量3.1.5 修改操作 3.2 multiset3.3 map3.3.1 模板參數列表3.3.2 構造3.3.3 迭代器3.3.4 容量3.3.5 修改操作3.3.6 operator[] 3.4 multimap 一、關聯式容器 談…

Hololens 2應用開發系列(1)——使用MRTK在Unity中設置混合現實場景并進行程序模擬

Hololens 2應用開發系列&#xff08;1&#xff09;——使用MRTK在Unity中進行程序模擬 一、前言二、創建和設置MR場景三、MRTK輸入模擬的開啟 一、前言 在前面的文章中&#xff0c;我介紹了Hololens 2開發環境搭建和項目生成部署等相關內容&#xff0c;使我們能生成一個簡單Ho…