kafka--基礎知識點--9.1--consumer 至多一次、至少一次、精確一次

1 自動提交

1.1 原理:

Kafka 消費者后臺線程每隔 auto.commit.interval.ms 自動提交最近一次 poll() 的 offset
無需開發者干預

1.2 示例:

enable.auto.commit=true
auto.commit.interval.ms=5000 # 每 5 秒自動提交一次

from confluent_kafka import Consumer, KafkaError
import sys# 配置消費者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': True,   # 自動提交'auto.commit.interval.ms': 5000  # 每個5s自動提交一次
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# consumer.commit(msg)  不需要這行,這是手動提交時需要用的# 業務處理邏輯print(f"處理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中斷信號已接收")finally:consumer.close()

2 手動提交

2.1 至多一次

2.1.1 原理

消息處理后立即提交偏移量;
即使處理失敗也不會重試;
適合對消息丟失容忍度高的場景(如日志采集)。

2.1.2 示例:

enable.auto.commit=False:禁用自動提交偏移量
手動調用consumer.commit(msg)提交偏移量
auto.offset.reset=‘earliest’:從最早消息開始消費

from confluent_kafka import Consumer, KafkaError
import sys# 配置消費者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': False
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# 手動提交偏移量(最多一次核心)consumer.commit(msg)print(f"已提交偏移量: {msg.offset()}")# 業務處理邏輯print(f"處理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中斷信號已接收")finally:consumer.close()

2.2 最少一次

2.2.1 原理

通過重試機制+手動提交偏移量實現:

  • 消息處理失敗時自動重試(最多3次)
  • 成功處理后批量提交偏移量
  • 延長輪詢間隔避免重平衡

2.2.2 示例

該示例是批量處理消息,批量提交;當然也可以一次處理一條消息,并一次提交一條消息偏移量

from confluent_kafka import Consumer, KafkaError, TopicPartition
import time
import sys# 配置消費者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'my_group','auto.offset.reset': 'earliest','enable.auto.commit': False,  # 手動提交控制'max.poll.interval.ms': 300000,  # 延長輪詢間隔'session.timeout.ms': 10000,'heartbeat.interval.ms': 3000
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])def process_with_retry(msg):"""帶重試的消息處理"""for attempt in range(3):try:# 替換為實際業務邏輯print(f"處理消息: {msg.value().decode('utf-8')}")return Trueexcept Exception:time.sleep(1)  # 指數退避可在此實現return Falsetry:while True:# 批量拉取10條消息msgs = consumer.consume(num_messages=10, timeout=1.0)processed = []for msg in msgs:if msg.error():continue# 處理消息(帶重試)if process_with_retry(msg):processed.append(TopicPartition(msg.topic(), msg.partition(), msg.offset()))# 批量提交偏移量if processed:consumer.commit(offsets=processed)print(f"已提交偏移量: {[p.offset for p in processed]}")except KeyboardInterrupt:pass
finally:consumer.close()

補充:延長輪詢間隔避免重平衡

核心概念解析:

  • 輪詢間隔:指Kafka消費者兩次調用poll()方法拉取消息的時間間隔,由max.poll.interval.ms參數控制(默認5分鐘)。
  • 重平衡(Rebalance):當消費者組成員變動、主題/分區變化或心跳超時時,Kafka會觸發分區重新分配,導致消費者暫停消費。

為什么延長輪詢間隔能避免重平衡?

  • 防止誤判消費者宕機
    • Kafka通過session.timeout.ms(默認10秒)和heartbeat.interval.ms(默認3秒)檢測消費者存活。
    • 若消息處理耗時超過max.poll.interval.ms(默認5分鐘),Kafka會認為消費者已宕機,觸發重平衡。
    • 延長輪詢間隔(如設為10分鐘)可允許更長的消息處理時間,避免因業務邏輯耗時過長導致的誤判重平衡。
  • 避免頻繁重平衡的連鎖反應
    • 重平衡期間消費者暫停消費,導致消息堆積和延遲。
    • 頻繁重平衡(如每5分鐘觸發一次)會顯著降低吞吐量,延長端到端延遲。

關鍵參數配置建議:

參數默認值推薦值作用
max.poll.interval.ms5分鐘根據業務處理時間調整(如10-30分鐘)控制兩次poll的最大間隔,防止處理超時觸發重平衡
session.timeout.ms10秒30秒-1分鐘心跳超時時間,需大于heartbeat.interval.ms
heartbeat.interval.ms3秒2-5秒心跳發送頻率,建議設為session.timeout.ms的1/3

2.3 精確一次

2.3.1 冪等性消費 (Idempotent Consumption) - 推薦且最常用

思路:承認消息可能會被重復傳遞,但從業務邏輯上保證重復執行不會產生負面效果。

做法:在消費者的處理邏輯中,設計冪等性。例如:

為每條消息生成一個唯一 ID(可以是消息key,或自定義UUID)。

在處理前,先檢查這個 ID 是否已經被處理過(比如在數據庫里查一下)。

如果已處理,就直接跳過并提交位移(視為成功);如果未處理,則執行業務邏輯。

這是最有效、最通用的方法,因為它不依賴于任何特定技術,而是從業務設計上根本性地解決問題。

例如:
a) 對于流程中的消息,每條消息中包含唯一id,比如業務id,在數據庫中將業務id作為Unique key,插入重復時會報duplicate key異常,不會導致數據庫中出現臟數據
b) Redis中使用set存儲業務id,天然冪等性
c) 如果不是上面兩個場景,需要讓生產者發送每條數據的時候,里面加一個全局唯一的 id,然后你這里消費到了之后,先根據這個 id 去比如 Redis 里查一下消費過嗎?如果沒有消費過,就執行相應業務進行處理,然后這個 id 寫 Redis,最后提交偏移。如果消費過了,那如果消費過了,那就別處理了,保證不重復處理相同的消息即可

2.3.2 事務性輸出 (Transactional Output) / 兩階段提交 (2PC) - 復雜且受限

思路:將消費者的“業務處理”和“位移提交”綁定為一個分布式事務。

做法:例如,使用 Kafka 的事務性生產者,將處理結果和位移提交到外部系統(如另一個Kafka主題)的操作放在一個事務里。但這通常需要外部系統(如數據庫)也支持參與 Kafka 事務(通過 Kafka Connect),實現復雜度非常高,性能和可用性也會受影響。不推薦普通應用使用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/98244.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/98244.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/98244.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python中的類:從入門到實戰,掌握面向對象編程的核心

目錄 一、類的概念:從“模板”到“個體” 1.1 什么是類? 1.2 類與對象的關系:模板與實例 1.3 類的核心價值:封裝與抽象 二、類的形式:Python中的類定義語法 2.1 類的基本定義 2.2 關鍵組成解析 (1&a…

用戶爭奪與智能管理:定制開發開源AI智能名片S2B2C商城小程序的戰略價值與實踐路徑

摘要 在零售行業數字化轉型的浪潮中,用戶爭奪已從傳統流量競爭轉向對用戶24小時時間分配權的深度滲透。本文以定制開發開源AI智能名片S2B2C商城小程序為核心研究對象,系統探討其通過技術賦能重構用戶接觸場景、提升轉化效率、增強會員黏性的作用機制。結…

數學_向量投影相關

Part 1 你的問題是:設相機光心的朝向 w (0, 0, 1)(即朝向正前方,Z 軸正方向), 在 相機坐標系下有一個平面,其法向量為 n_cam, 問:w 在該平面上的投影的單位向量 w_p,是不…

從RTSP到HLS:構建一個簡單的流媒體轉換服務(java spring)

從RTSP到HLS:構建一個簡單的流媒體轉換服務(java spring) 在當今的網絡環境中,實時視頻流媒體應用越來越廣泛,從在線直播到安防監控,都離不開流媒體技術的支持。然而,不同的流媒體協議有著各自的特點和適用場景。本文…

【代碼隨想錄算法訓練營——Day15】二叉樹——110.平衡二叉樹、257.二叉樹的所有路徑、404.左葉子之和、222.完全二叉樹的節點個數

LeetCode題目鏈接 https://leetcode.cn/problems/balanced-binary-tree/ https://leetcode.cn/problems/binary-tree-paths/ https://leetcode.cn/problems/sum-of-left-leaves/ https://leetcode.cn/problems/count-complete-tree-nodes/ 題解 110.平衡二叉樹想到用左子樹的高…

JVM新生代/老年代垃圾回收器、內存分配與回收策略

新生代垃圾收集器 1. Serial收集器 serial收集器即串行收集器,是一個單線程收集器。 串行收集器在進行垃圾回收時只使用一個CPU或一條收集線程去完成垃圾回收工作,并且會暫停其他的工作線程(stop the world),直至回收完…

Unity Mirror 多人同步 基礎教程

Unity Mirror 多人同步 基礎教程MirrorNetworkManager(網絡管理器)Configuration:配置Auto-Start Options:自動啟動Scene Management:場景管理Network Info:網絡信息Authentication:身份驗證Pla…

基于紅尾鷹優化的LSTM深度學習網絡模型(RTH-LSTM)的一維時間序列預測算法matlab仿真

目錄 1.程序功能描述 2.測試軟件版本以及運行結果展示 3.部分程序 4.算法理論概述 5.完整程序 1.程序功能描述 紅尾鷹優化的LSTM(RTH-LSTM)算法,是將紅尾鷹優化算法(Red-Tailed Hawk Optimization, RTHO)與長短期…

深度學習“調參”黑話手冊:學習率、Batch Size、Epoch都是啥?

點擊 “AladdinEdu,同學們用得起的【H卡】算力平臺”,注冊即送-H卡級別算力,80G大顯存,按量計費,靈活彈性,頂級配置,學生更享專屬優惠。 引言:從"煉丹"到科學,…

【網絡實驗】-MUX-VLAN

實驗拓撲實驗要求: 在企業網絡中,企業員工和企業客戶可以訪問企業的服務器,對于企業來說,希望員工之間可以互相交流,但是企業用戶之間相互隔離,不能夠訪問。為了實現所有用戶都可以訪問企業服務器&#xff…

Java泛型:類型安全的藝術與實踐指南

Java泛型&#xff1a;類型安全的藝術與實踐指南 前言&#xff1a;一個常見的編譯錯誤 最近在開發中遇到了這樣一個編譯錯誤&#xff1a; Required type: Callable<Object> Provided: SalesPitchTask這個看似簡單的錯誤背后&#xff0c;隱藏著Java泛型設計的深層哲學。今天…

UMI企業智腦 2.1.0:智能營銷新引擎,圖文矩陣引領內容創作新潮流

在數字營銷日益激烈的今天&#xff0c;企業如何在信息洪流中脫穎而出&#xff1f;UMI企業智腦 2.1.0 的發布為企業提供了全新的解決方案。這款智能營銷工具結合了先進的AI技術與數據驅動策略&#xff0c;幫助企業優化營銷流程、提升效率&#xff0c;并通過圖文矩陣實現內容創作…

Lustre Ceph GlusterFS NAS 需要掛載在k8s容器上,數據量少,選擇哪一個存儲較好

在 K8s 容器環境中&#xff0c;數據量 不大的 規模下&#xff0c;Lustre、Ceph、GlusterFS 和 NAS 的選擇需結合性能需求、運維成本、擴展性和K8s 適配性綜合判斷。以下是針對性分析及推薦&#xff1a;一、核心對比與適用場景二、關鍵決策因素1. 性能需求高并發 / 高吞吐&#…

深入解析 Apache Doris 寫入原理:一條數據的“落地之旅”

在日常的數據分析場景中&#xff0c;我們經常會向 Apache Doris 寫入大量數據&#xff0c;無論是實時導入、批量導入&#xff0c;還是通過流式寫入。但你是否想過&#xff1a;一條數據從客戶端發出&#xff0c;到最終穩定落盤&#xff0c;中間到底經歷了哪些步驟&#xff1f; …

基于MATLAB的視頻動態目標跟蹤檢測實現方案

一、系統架構設計 視頻動態目標跟蹤系統包含以下核心模塊&#xff1a; 視頻輸入模塊&#xff1a;支持攝像頭實時采集或視頻文件讀取預處理模塊&#xff1a;灰度轉換、降噪、光照補償目標檢測模塊&#xff1a;背景建模、運動區域提取跟蹤算法模塊&#xff1a;卡爾曼濾波、粒子濾…

【Python】Python文件操作

Python文件操作 文章目錄Python文件操作[toc]1.文件的編碼2.文件打開、讀取&#xff08;r模式&#xff09;、關閉3.文件的寫入&#xff08;w模式&#xff09;4.文件的追加寫入&#xff08;a模式&#xff09;5.綜合案例1.文件的編碼 意義&#xff1a;計算機只能識別0和1&#x…

CES Asia的“五年計劃”:打造與北美展比肩的科技影響力

在全球科技產業版圖中&#xff0c;展會一直是前沿技術展示、行業趨勢探討以及商業合作達成的關鍵平臺。CES Asia&#xff08;亞洲消費電子技術展&#xff09;作為亞洲科技領域的重要展會&#xff0c;近日明確提出其“五年計劃”&#xff0c;目標是打造與北美展會比肩的科技影響…

【計算機網絡 | 第16篇】DNS域名工作原理

文章目錄3.5 域名系統工作原理主機的標識方式&#xff1a;域名 vs IP 地址標識轉換機制&#xff1a;DNS系統因特網的域名系統&#xff1a;層次域名空間&#x1f426;?&#x1f525;頂級域名分類低級域名與管理域名與IP的區別因特網的域名系統&#xff1a;域名服務器&#x1f9…

YASKAWA安川機器人鋁材焊接節氣之道

在鋁材焊接領域&#xff0c;保護氣體的合理使用對焊接質量與成本控制至關重要。安川焊接機器人憑借高精度與穩定性成為行業常用設備&#xff0c;而WGFACS節氣裝置的應用&#xff0c;則為其在鋁材焊接過程中實現高效節氣提供了創新路徑。掌握二者結合的節氣之道&#xff0c;對提…

GooseDB,一款實現服務器客戶端模式的DuckDB

在網上看到韓國公司開發的一款GooseDB&#xff0c; 官方網站對它的介紹是DuckDB? 的功能擴展分支&#xff0c;具有服務器/客戶端、多會話和并發寫入支持&#xff0c;使用 PostgreSQL 有線協議&#xff08;DuckDB?是 DuckDB 基金會的商標&#xff09; 使用也很簡單&#xff…