Springboot整合kafka簡單使用

kafka

一,介紹

Kafka 是一個開源的分布式流處理平臺,最初由 LinkedIn 開發并貢獻給 Apache 軟件基金會。它設計用于構建高性能、持久性、可伸縮和容錯的實時數據管道和流處理應用程序。

以下是 Kafka 的一些關鍵特點和概念:

  1. 發布-訂閱模型:Kafka 使用發布-訂閱模型,其中消息生產者將消息發布到主題(topic),而消息消費者從主題訂閱消息。這種模型可以支持多個消費者同時消費消息,并且具有良好的擴展性。
  2. 主題(Topic):主題是消息的邏輯分類,相當于一個消息隊列。消息被發布到主題,并且消費者從主題訂閱消息。
  3. 分區(Partition):每個主題可以劃分為一個或多個分區,每個分區是一個有序的日志。分區可以分布在集群的不同節點上,以提供水平擴展和負載均衡。
  4. 副本(Replication):Kafka 支持將每個分區的數據復制到多個副本,以提供容錯性和數據可靠性。副本位于集群的不同節點上,確保即使某個節點故障,數據仍然可用。
  5. 生產者(Producer):生產者負責將消息發布到主題。它們可以選擇將消息發送到特定的分區,也可以根據負載均衡策略將消息均勻分布到不同的分區中。
  6. 消費者(Consumer):消費者從主題訂閱消息,并按照其偏移量(offset)順序消費消息。消費者組(Consumer Group)是一組共享相同主題的消費者,它們協調以確保每個分區的消息只有一個消費者消費。
  7. ZooKeeper:Kafka 使用 ZooKeeper 來進行分布式協調和管理,如協調生產者和消費者、維護集群元數據等。

Kafka 提供了高性能、可靠、持久的消息傳遞系統,適用于大規模的實時數據處理和流式處理應用程序。它已經成為許多企業構建實時數據管道和流處理應用程序的首選工具之一。

二,運行原理

Kafka 的運行原理涉及多個組件和過程,主要包括生產者發送消息、消息存儲在代理 (Broker) 中的分區中、消費者從分區中讀取消息等。以下是 Kafka 的基本運行原理:

  1. 生產者發送消息:
    • 生產者將消息發送到 Kafka 的特定主題 (Topic) 中。
    • 生產者可以選擇性地將消息發送到特定的分區 (Partition),或者使用 Kafka 的默認分區分配策略,由 Kafka 在發送時決定將消息發送到哪個分區。
  2. 消息存儲在分區中:
    • 主題可以被分成多個分區,每個分區是一個有序的消息序列。
    • 每條消息在分區內有一個唯一的偏移量 (Offset),用于標識消息在分區中的位置。
    • 消息被持久化在 Kafka 的分區中,直到滿足一定的保留策略(如時間或者大小限制)。
  3. 消費者從分區中讀取消息:
    • 消費者從 Kafka 的特定主題中讀取消息。
    • 消費者可以以不同的方式訂閱主題,例如:
      • 指定訂閱的主題和分區。
      • 加入一個消費者組 (Consumer Group),使得消費者可以以并行的方式消費主題中的消息。
    • 每個消費者在消費主題時,會維護自己的消費偏移量 (Offset),用于記錄已經消費的消息位置。
  4. 分區和副本管理:
    • Kafka 使用分區來實現并行處理和水平擴展。
    • 每個分區可以有多個副本,其中一個是領導者副本 (Leader),其余的是追隨者副本 (Follower)。
    • 領導者副本負責處理讀寫請求,追隨者副本用于備份數據。
    • 如果領導者副本失效,Kafka 會從追隨者副本中選舉新的領導者。
  5. ZooKeeper 協調:
    • Kafka 集群依賴 ZooKeeper 來進行集群管理、元數據存儲和領導者選舉等任務。
    • ZooKeeper 存儲了 Kafka 集群的元數據,包括主題、分區、消費者組等信息,同時也用于監控和管理集群的健康狀態。

通過這些組件和過程,Kafka 實現了高吞吐量、持久性、分布式和水平擴展等特性,使得它成為處理大規模實時數據流的理想選擇。

三,Spring Boot 項目中整合 Kafka (簡單使用)

1.添加 Maven 依賴

首先,在你的 Spring Boot 項目的 pom.xml 文件中添加 Spring Kafka 的依賴:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.配置 Kafka 連接信息

application.propertiesapplication.yml 中添加 Kafka 服務器的連接信息:

  kafka:bootstrap-servers: 192.168.193.131:9092producer:  #生產者序列化器retries: 10 #如果發生故障,生產者將嘗試重新發送消息的次數。key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生產者消息鍵的類。value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生產者消息值的類。ack-mode: manualconsumer: #消費者序列化器group-id: ${spring.application.name}-test # 消費者組的唯一標識符。在消費者組中的所有消費者將共享消費者組的工作負載。key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消費者消息鍵的類。value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消費者消息值的類。listener: #配置了監聽器相關的設置。ack-mode: manual #開啟手動確認 設置為手動,表示消費者將等待手動確認來確定是否已成功處理消息。
3.創建kafka配置類:

配置 Kafka 主題(topic)的創建

@Configuration
public class KafkaConfig {/*** @return org.apache.kafka.clients.admin.NewTopic* @date 2024/5/31 14:42* @Description: TODO @Bean 注解* 作用:將方法返回的對象注冊為 Spring 容器中的一個 Bean。* 返回值類型:NewTopic,表示 Kafka 主題的配置信息。*/@Beanpublic NewTopic viewUserTopic(){/*第一個參數:主題名稱,這里是 "viewUserTopic"。第二個參數:分區數量,這里設置為 1。第三個參數:副本數量,這里設置為 1。*/return new NewTopic("viewUserTopic",1,(short) 1);}}
4.注入kafka模板類
@Autowired
private KafkaTemplate kafkaTemplate;
5.發送消息
//定義消息的唯一ID 防止消息重復消費
String msgId = "msg-" + UUID.randomUUID().toString();
//定義消息內容
String msgBody = JSON.toJSONString(tbUser);//將消息唯一表示存入redis緩存  防止消息重復消費
stringRedisTemplate.opsForValue().set(msgId, msgBody);/*組裝消息體 發送消息隊列*/
MessageVO messageVO = new MessageVO();
messageVO.setMsgID(msgId);
messageVO.setMsgBody(msgBody);//向名為 "viewUserTopic" 的 Kafka 主題發送消息。//參數一: 表示目標 Kafka 主題的名稱。   參數二:消息內容kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo))//通過該方法設置回調函數,用于處理消息發送的成功和失敗情況。.addCallback(//成功回調函數,處理消息發送成功的情況。new SuccessCallback() {@Overridepublic void onSuccess(Object o) {// 消息發送成功System.out.println("kafka 消息發送成功了~~~~~~~~~~~~");}},//失敗回調函數,處理消息發送失敗的情況。new FailureCallback() {@Overridepublic void onFailure(Throwable throwable) {// 消息發送失敗了,再次發送System.out.println("kafka 消息發送失敗了,再次發送");kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo));}});
6.消息的接收(監聽)
/*** @param message 表示接收到的消息內容,這里是 JSON 格式的字符串。* @param acknowledgment 用于手動提交消費者偏移量的對象。* @date 2024/5/31 15:13* @Description: TODO* @KafkaListener 通過該注解指定了監聽的 Kafka 主題為 "viewUserTopic"。*/@KafkaListener(topics = "viewUserTopic")public void recvViewUserMessage(String message, Acknowledgment acknowledgment) {//--1 接收消息MessageVO messageVo = JSON.parseObject(message, MessageVO.class);//--2 根據消息的唯一ID,判斷消息是否重復String msgId = messageVo.getMsgID();if (!stringRedisTemplate.hasKey(msgId)) {// 消息重復了System.out.println("kafka 消息重復了");// 使用 acknowledgment.acknowledge() 方法手動確認消費完成,通知 Kafka 服務器該消息已經被處理。acknowledgment.acknowledge();return;}//--3 消費消息(處理消息)String msgBody = messageVo.getMsgBody();TbLog tbLog = JSON.parseObject(msgBody, TbLog.class);tbLog.setCreateTime(new Date());tbLogMapper.insert(tbLog);//--4 手動確認消息//手動確認消費完成,通知 Kafka 服務器該消息已經被處理。acknowledgment.acknowledge();//--5 刪除消息的唯一ID,防止消息重復消費stringRedisTemplate.delete(msgId);}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/21157.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/21157.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/21157.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SPWM載波調制方式-三電平雜記1

方法一&#xff1a; P2 O1 N0 方法二&#xff1a;雙載波直接發波 方法三&#xff1a;負軸載波和調制波往上抬升1&#xff0c;得到使用同一個載波 在正半周在P和O切換&#xff0c;在下半軸式O和N切換

自動評論自動私信引流系統,自動化時代的挑戰與機遇

隨著科技的飛速發展&#xff0c;自動化技術已經滲透到我們生活的方方面面。從工業生產線上的機械臂到家庭中的智能助手&#xff0c;自動化不僅改變了我們的工作方式&#xff0c;也在重塑著社會的面貌。然而&#xff0c;在享受自動化帶來的便利和效率的同時&#xff0c;我們也必…

961題庫 北航計算機 MIPS基礎選擇題 附答案 選擇題形式

有題目和答案&#xff0c;沒有解析&#xff0c;不懂的題問大模型即可&#xff0c;無償分享。 第1組 習題 MIPS處理器五級流水線中&#xff0c;涉及DRAM的是 A. 取指階段 B. 譯碼階段 C. 執行階段 D. 訪存階段 MIPS處理器五級流水線中&#xff0c;R型指令保存結果的階段是 A.…

關于高版本 Plant Simulation 每次保存是 提示提交comm對話框的處理方法

關于高版本 Plant Simulation 每次保存是 提示提交comm對話框的處理方法 如下圖 將model saving history 修改為None即可 關于AutoCAD 2022 丟失模板庫的問題 從新從以下地址打開即可&#xff1a; D:\Program Files\Autodesk\AutoCAD 2022\UserDataCache\zh-cn\Template

Visual Studio Installer 點擊閃退

Visual Studio Installer 點擊閃退問題 1. 問題描述2. 錯誤類型3. 解決方法4. 結果5. 說明6. 參考 1. 問題描述 重裝了系統后&#xff08;系統版本&#xff1a;如下圖所示&#xff09;&#xff0c;我從官方網站&#xff08;https://visualstudio.microsoft.com/ ) 下載了安裝程…

Leetcode:正則表達式匹配

目錄 普通版本&#xff08;動態規劃&#xff09; 狀態表示 狀態轉移方程 優化③①情況 數學化簡分析 結合實際情況畫圖化簡分析 總結 最終代碼 題目鏈接&#xff1a;10. 正則表達式匹配 - 力扣&#xff08;LeetCode&#xff09; 好像是leetcode前100道里面最難的一道&a…

方法引用與構造方法引用

目錄 方法引用 什么是方法引用 構造方法引用 構造方法引用&#xff08;也可以稱作構造器引用&#xff09; 數組構造方法引用 方法引用 什么是方法引用 當要傳遞給 Lambda 體的操作&#xff0c;已經有實現的方法了&#xff0c;可以使用方法引用。 方法引用可以看做是 La…

PHAR反序列化

PHAR PHAR&#xff08;PHP Archive&#xff09;文件是一種歸檔文件格式&#xff0c;phar文件本質上是一種壓縮文件&#xff0c;會以序列化的形式存儲用戶自定義的meta-data。當受影響的文件操作函數調用phar文件時&#xff0c;會自動反序列化meta-data內的內容,這里就是我們反序…

頭歌頁面置換算法第3關:計算LRU算法缺頁率

2 任務:LRU算法 2.1 任務描述 設計LRU頁面置換算法模擬程序:從鍵盤輸入訪問串。計算LRU算法在不同內存頁框數時的缺頁數和缺頁率。要求程序模擬駐留集變化過程,即能模擬頁框裝入與釋放過程。 2.2任務要求 輸入串長度作為總頁框數目,補充程序完成LRU算法。 2.3算法思路 LRU算…

jmeter常用的斷言

包括&#xff08;Contains&#xff09;&#xff1a;響應內容包括需要匹配的內容即代表響應成功&#xff0c;支持正則表達式 匹配&#xff08;Matches&#xff09;&#xff1a;響應內容要完全匹配需要匹配的內容即代表響應成功&#xff0c;大小寫不敏感&#xff0c;支持正則表達…

vue html2canvas生成base64圖片和圖片高度

vue html2canvas生成圖片 exportAsBase64() {const ele document.getElementById(content);html2canvas(ele, {dpi: 96, // 分辨率 scale: 2, // 設置縮放 useCORS: true, // 允許canvas畫布內跨域請求外部鏈接圖片 bgcolor: #ffffff, // 背景顏色 logging: false, // 不…

rust之cargo install cargo-binstall 是什么

cargo-binstall 是什么 官方&#xff1a;https://lib.rs/crates/cargo-binstall Binstall 提供了一種低復雜性的機制來安裝 Rust 二進制文件&#xff0c;作為從源代碼&#xff08;通過 cargo install &#xff09;構建或手動下載軟件包的替代方案。這旨在與現有的 CI 工件和基…

Windows安裝ElasticSearch版本7.17.0

在Windows系統上本地安裝Elasticsearch的詳細步驟如下&#xff1a; 1. 下載Elasticsearch 訪問 Elasticsearch下載頁面。選擇適用于Windows的版本7.17.0&#xff0c;并下載ZIP文件。 2. 解壓文件 下載完成后&#xff0c;找到ZIP文件&#xff08;例如 elasticsearch-7.17.0.…

【算法篇】冒泡排序算法JavaScript版

冒泡排序算法&#xff1a;原理與實現 冒泡排序&#xff08;Bubble Sort&#xff09;是一種簡單的排序算法&#xff0c;它重復地遍歷要排序的數列&#xff0c;一次比較兩個元素&#xff0c;如果它們的順序錯誤就把它們交換過來。遍歷數列的工作是重復地進行直到沒有再需要交換&…

spoon基礎使用-第一個轉換文件

新建一個轉換&#xff0c;文件->新建->轉換&#xff0c;也可以直接ctralN新建。 從右邊主對象樹拖拽一個輸入->表輸入&#xff1b;輸出->文本文檔輸出&#xff1b;也可以直接在搜索框搜素表輸入、文本文檔輸出。 雙擊表輸入新建一個數據庫連接 確定后就可以在S…

【人工智能】第二部分:ChatGPT的架構設計和訓練過程

人不走空 &#x1f308;個人主頁&#xff1a;人不走空 &#x1f496;系列專欄&#xff1a;算法專題 ?詩詞歌賦&#xff1a;斯是陋室&#xff0c;惟吾德馨 目錄 &#x1f308;個人主頁&#xff1a;人不走空 &#x1f496;系列專欄&#xff1a;算法專題 ?詩詞歌…

Java | Leetcode Java題解之第126題單詞接龍II

題目&#xff1a; 題解&#xff1a; class Solution {public List<List<String>> findLadders(String beginWord, String endWord, List<String> wordList) {List<List<String>> res new ArrayList<>();// 因為需要快速判斷擴展出的單詞…

傳輸中的串擾(八)

串擾指的是有害信號從一個線網傳遞到相鄰線網上。通常把噪聲源所在的線網稱為動態線或攻擊線網&#xff0c;而把有噪聲形成的線網稱為靜態線或受害線網。 靜態線上的噪聲電壓的表現與信號電壓完全一樣。一旦在靜態線上產生噪聲電壓&#xff0c;它們就會傳播并在阻抗突變處出現反…

html解決瀏覽器記住密碼輸入框的問題

當瀏覽器記住密碼并自動填充到表單的密碼輸入框時&#xff0c;這通常是瀏覽器為了提供便利而采取的功能。然而&#xff0c;有時這可能不是用戶所期望的&#xff0c;或者你可能希望在某些情況下禁用此功能。 雖然HTML本身并沒有直接提供禁用瀏覽器自動填充密碼輸入框的標準方法…

常見算法(基本查找、二分查找、分塊查找冒泡、選擇、插入、快速排序和遞歸算法)

一、常見算法-01-基本、二分、插值和斐波那契查找 1、基本查找/順序查找 需求1&#xff1a;定義一個方法利用基本查找&#xff0c;查詢某個元素是否存在 數據如下&#xff1a;{131&#xff0c;127&#xff0c;147&#xff0c;81&#xff0c;103&#xff0c;23&#xff0c;7&am…