kafka學習筆記(三、消費者Consumer使用教程——消費性能多線程提升思考)

在這里插入圖片描述


1.簡介

KafkaConsumer是非線程安全的,它定義了一個acquire()方法來檢測當前是否只有一個線程在操作,如不是則會拋出ConcurrentModifcationException異常。

acquire()可以看做是一個輕量級鎖,它僅通過線程操作計數標記的方式來檢測線程是否發生了并發操作,以此保證只有一個線程在操作。acquire()方法和release()方法成對出現,表示相應的加鎖操作和解鎖操作。

KafkaConsumer雖然是單線程的執行方式,但是在某些情況下如:生產者發送消息的速度遠大于消費者消費的速度,這樣長時間可能會造成消息的丟失,此時我們就需要消費者采用多線程消費的方式增加消費速度。

2.多線程實現的方式

2.1.線程封閉多線程

即為每個線程實例化一個KafkaConsumer,如圖所示,一個線程對應一個KafkaConsumer實例,所有的消費線程都屬于同一個消費者組。

這種方式的并發度受限分區的實際個數

在這里插入圖片描述
實現代碼示例:

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此處初始化消費者配置參數省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消費線程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 處理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}

2.1.消息處理模塊多線程

此方法是對上面方法的進一步優化,在消息處理模塊增加多線程來處理消息,進一步提升消息消費的速度。
在這里插入圖片描述

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此處初始化消費者配置參數省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 處理records  }}
}

此方法需要引入一個共享的offsets來參與提交。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/83429.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/83429.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/83429.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

百度蜘蛛池的作用是什么?技術@baidutopseo

百度蜘蛛池的作用 百度蜘蛛池是一種用于集中管理和調度百度搜索引擎蜘蛛&#xff08;爬蟲&#xff09;的技術工具。其主要目的是優化網站內容的收錄效率&#xff0c;幫助搜索引擎更高效地發現和抓取網頁內容。技術baidutopseo 提升收錄效率 通過集中大量百度蜘蛛的訪問請求&…

黑馬Java面試筆記之 消息中間件篇(Kafka)

一. Kafka保證消息不丟失 Kafka如何保證消息不丟失 使用Kafka在消息的收發過程中都會出現消息丟失&#xff0c;Kafka分別給出了解決方案 生產者發送消息到Brocker丟失消息在Brocker中存儲丟失消費者從Brocker接收消息丟失 1.1 生產者發送消息到Brocker丟失 設置異步發送 消息…

dis css port brief 命令詳細解釋

華為交換機命令 display css port brief 詳細解釋 display css port brief 是華為交換機中用于 快速查看堆疊&#xff08;CSS&#xff0c;Cluster Switch System&#xff09;端口狀態及關鍵參數 的命令&#xff0c;適用于日常運維、堆疊鏈路健康檢查及故障定位。以下是該命令的…

Redis 緩存問題及其解決方案

1. 緩存雪崩 概念&#xff1a;緩存雪崩是指在緩存層出現大范圍緩存失效或緩存服務器宕機的情況下&#xff0c;大量請求直接打到數據庫&#xff0c;導致數據庫壓力驟增&#xff0c;甚至可能引發數據庫宕機。 影響&#xff1a;緩存雪崩會導致系統性能急劇下降&#xff0c;甚至導…

使用Python進行函數作畫

前言 因為之前通過deepseek繪制一下卡通的人物根本就不像&#xff0c;又想起來之前又大佬通過函數繪制了一些圖像&#xff0c;想著能不能用Python來實現&#xff0c;結果發現可以&#xff0c;不過一些細節還是需要自己調整&#xff0c;deepseek整體的框架是沒有問題&#xff0…

關于list集合排序的常見方法

目錄 1、list.sort() 2、Collections.sort() 3、Stream.sorted() 4、進階排序技巧 4.1 空值安全處理 4.2 多字段組合排序 4.3. 逆序 5、性能優化建議 5.1 并行流加速 5.2 原地排序 6、最佳實踐 7、注意事項 前言 Java中對于集合的排序操作&#xff0c;分別為list.s…

Java高級 | (二十二)Java常用類庫

參考&#xff1a;Java 常用類庫 | 菜鳥教程 一、核心Java類庫 二、常用第三方庫 以下是 Java 生態系統中廣泛使用的第三方庫&#xff1a; 類別庫名稱主要功能官方網站JSON 處理JacksonJSON 序列化/反序列化https://github.com/FasterXML/jacksonGsonGoogle 的 JSON 庫https:…

幾種常用的Agent的Prompt格式

一、基礎框架范式&#xff08;Google推薦標準&#xff09; 1. 角色與職能定義 <Role_Definition> 你是“項目專家”&#xff08;Project Pro&#xff09;&#xff0c;作為家居園藝零售商的首席AI助手&#xff0c;專注于家裝改造領域。你的核心使命&#xff1a; 1. 協助…

蛋白質結構預測軟件openfold介紹

openfold 是一個用 Python 和 PyTorch 實現的 AlphaFold2 的開源復現版&#xff0c;旨在提升蛋白質結構預測的可復現性、可擴展性以及研究友好性。它允許研究者在不開源 DeepMind 原始代碼的情況下&#xff0c;自由地進行蛋白結構預測的訓練和推理&#xff0c;并支持自定義模型…

AD轉嘉立創EDA

可以通過嘉立創文件遷移助手進行格式的轉換 按照它的提示我們整理好文件 導出后是這樣的&#xff0c;第一個文件夾中有原理圖和PCB&#xff0c;可以把它們壓縮成一個壓縮包 這個時候我們打開立創EDA&#xff0c;選擇導入AD 這樣就完成了

MySQL(50)如何使用UNSIGNED屬性?

在 MySQL 中&#xff0c;UNSIGNED 屬性用于數值數據類型&#xff08;如 TINYINT、SMALLINT、MEDIUMINT、INT 和 BIGINT&#xff09;&#xff0c;表示該列只能存儲非負整數。使用 UNSIGNED 屬性可以有效地擴展列的正整數范圍&#xff0c;因為它不需要為負數保留空間。 1. 定義與…

什么是鏈游,鏈游系統開發價格以及方案

2025 Web3錢包開發指南&#xff1a;從多版本源碼到安全架構實戰 在數字資產爆發式增長的今天&#xff0c;Web3錢包已成為用戶進入鏈上世界的核心入口。作為開發者&#xff0c;如何高效構建安全、跨鏈、可擴展的錢包系統&#xff1f;本文結合前沿技術方案與開源實踐&#xff0c…

文件IO流

IO使用函數 標準IO文件IO(低級IO)打開fopen, freopen, fdopenopen關閉fcloseclose讀getc, fgetc, getchar, fgets, gets, fread printf fprintfread寫putc, fputc, putchar, fputs, puts, fwrite scanf fscanfwrite操作文件指針fseeklseek其它fflush rewind ftell 文件描述符 …

云原生DMZ架構實戰:基于AWS CloudFormation的安全隔離區設計

在云時代,傳統的DMZ(隔離區)概念已經演變為更加靈活和動態的架構。本文通過解析一個實際的AWS CloudFormation模板,展示如何在云原生環境中構建現代化的DMZ安全架構。 1. 云原生DMZ的核心理念 傳統DMZ是網絡中的"緩沖區",位于企業內網和外部網絡之間。而在云環境…

一、虛擬貨幣概述

1. 定義 - 虛擬貨幣是一種基于網絡技術、加密技術和共識機制的數字貨幣&#xff0c;它不依賴傳統金融機構發行&#xff0c;而是通過計算機算法生成&#xff0c;例如比特幣、以太坊等。 2. 特點 - 去中心化&#xff1a;沒有一個單一的機構或個人控制整個虛擬貨幣系統&#xff0c…

Make All Equal

給定一個循環數組 a1,a2,…,ana1?,a2?,…,an?。 你可以對 aa 至多執行 n?1n?1 次以下操作&#xff1a; 設 mm 為 aa 的當前大小&#xff0c;你可以選擇任何兩個相鄰的元素&#xff0c;其中前一個不大于后一個&#xff08;特別地&#xff0c;amam? 和 a1a1? 是相鄰的&a…

任務中心示例及瀏覽器強制高效下載實踐

1. 效果展示 這里的進度展示&#xff0c;可以通過我們之前講到的Vue3實現類ChatGPT聊天式流式輸出(vue-sse實現) SSE技術實現&#xff0c;比如用戶點擊全量下載時&#xff0c;后臺需要將PDF文件打包為ZIP文件&#xff0c;由于量較大&#xff0c;需要展示進度&#xff0c;用戶點…

SpringBoot整合Flowable【08】- 前后端如何交互

引子 在第02篇中&#xff0c;我通過 Flowable-UI 繪制了一個簡單的績效流程&#xff0c;并在后續章節中基于這個流程演示了 Flowable 的各種API調用。然而&#xff0c;在實際業務場景中&#xff0c;如果要求前端將用戶繪制的流程文件發送給后端再進行解析處理&#xff0c;這種…

2025 Java面試大全技術文章大綱

2025 Java面試大全技術文章大綱 基礎篇 Java核心語法 數據類型與包裝類自動裝箱與拆箱原理String、StringBuffer、StringBuilder區別final關鍵字作用場景 面向對象特性 多態的實現機制抽象類與接口的異同設計模式&#xff1a;單例的七種寫法泛型擦除與橋接方法 進階篇 J…

Python aiohttp 全面指南:異步HTTP客戶端/服務器框架

邊寫代碼零食不停口 盼盼麥香雞味塊 、卡樂比&#xff08;Calbee&#xff09;薯條三兄弟 獨立小包、好時kisses多口味巧克力糖、老金磨方【黑金系列】黑芝麻丸 邊寫代碼邊貼面膜 事業美麗兩不誤 DR. YS 野森博士【AOUFSE/澳芙雪特證】377專研美白淡斑面膜組合 優惠劵 別光顧寫…