消息隊列如何保證消息可靠性(kafka以及RabbitMQ)

目錄

RabbitMQ保證消息可靠性

生產者丟失消息

MQ丟失消息

消費端丟失了數據

Kakfa的消息可靠性

生產者的消息可靠性

Kakfa的消息可靠性

消費者的消息可靠性


RabbitMQ保證消息可靠性

生產者丟失消息

1.事務消息保證

生產者在發送消息之前,開啟事務消息隨后生產者發送消息,消息發送之后,如果消息沒有被MQ接收到的話,生產者會收到異常報錯,生產者回滾事務,然后重試消息,如果收到了消息,就能提交事務了

@Autowired
private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();Channel channel = connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 開啟事務channel.basicPublish("exchange", "routing.key", null, "message".getBytes());channel.txCommit(); // 提交事務} catch (Exception e) {channel.txRollback(); // 出錯回滾}
}

2.使用confirm機制

  • 普通confirm機制,就是發送消息之后,等待服務器confirm之后再發送下一個消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功發送到Broker");} else {System.out.println("消息發送失敗,原因:" + cause);}
});rabbitTemplate.convertAndSend("exchange", "routing.key", "message");
  • 批量confirm機制,每發送一批消息之后,等待服務器confirm
Channel channel = connection.createChannel(false);
channel.confirmSelect();for (int i = 0; i < 100; i++) {channel.basicPublish("exchange", "routing.key", null, ("msg" + i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息確認
  • 異步confirm機制,服務器confirm一個或者多個消息之后,客戶端(生產者)能夠通過回調函數來確定消息是否被confirm(推薦)
SortedSet<Long> pendingSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println("未確認消息:" + tag);if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}
});while (true) {long seq = channel.getNextPublishSeqNo();channel.basicPublish("demo.exchange", "demo.key",MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());pendingSet.add(seq);
}

MQ丟失消息

防止MQ的丟失數據的話,方法就是開啟RabbitMQ的持久化,消息寫入之后(也就是到了MQ之后)就直接持久化到磁盤中,即使Rabbimq自己掛了之后,會恢復數據。

設置持久化步驟

  • 創建queue的時候直接設置持久化,此時就能持久化queue的元數據(不是消息)
@Bean
public Queue durableQueue() {return new Queue("myQueue", true); // true 表示持久化
}
  • 發送消息的時候指定消息為deliveryMode設置為2,也就是設置消息為持久化,此時消息可以持久化磁盤上
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("content".getBytes(), props);
rabbitTemplate.send("exchange", "routing.key", message);

極端情況:

消息寫到RabbitMQ之后,但是還沒有持久化到磁盤之后直接掛了,導致內存中消息丟失。

解決方法:持久化與生產者的confirm機制配合,當且僅當持久化了消息之后,再confirm,避免數據與消息丟失,此時生產者收不到ack,也是可以自己重發

消費端丟失了數據

意思就是消息已經拉取到了信息,還沒有處理(注意這是已經告訴MQ我拉取到數據了),結果進程掛了,重啟之后繼續消費下一條消息,導致中間的這一條沒有消費到,此時數據丟失了。

利用ack機制處理

取消RabbiMQ的自動ack,也就是一個api,可以在消費端消費完了消息之后再調用api告訴MQ我們收到并且處理了該消息。如果沒有返回ack,RabbitMQ會把該消息分配給其他的consumer處理,消息不會丟失。通過配置處理

spring:rabbitmq:listener:simple:acknowledge-mode: manual

Kakfa的消息可靠性

生產者的消息可靠性

在kafka中,可以在producer(生產段)設置一個參數,也就是ack=all,要求每個數據,必須寫入所有的replica(也就是所有該分區的副本),才認為是接收成功。該參數設置的是你的leader接收到消息后,所有的follower都同步到消息后才認為寫成功

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 等待所有副本確認
props.put("retries", Integer.MAX_VALUE); // 無限重試
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("發送成功:" + metadata.offset());} else {exception.printStackTrace();}
});
producer.close();

Kakfa的消息可靠性

kafka默認是會將消息持久化到磁盤上的,但是還是有情況會導致丟失數據

kafka某個broker宕機,隨后重新選舉partition的leader。倘若在該broker中的partition中的leader副本中的消息,還沒有被其他broker中的follower同步,此時同步缺失的數據就丟失了,也就是少了一些數據

解決方法:

  • 給 topic 設置 replication.factor 參數:這個值必須大于 1,要求每個 partition 必須有至少 2 個副本。
  • 在 Kafka 服務端設置 min.insync.replicas 參數:這個值必須大于 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯系。
  • 在 producer 端設置 acks=all :這個是要求每條數據,必須是寫入所有 replica 之后,才能認為是寫成功了。
  • 在 producer 端設置 retries=MAX (很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。

按照上面的配置之后,leader的切換就不會導致數據缺失了。

消費者的消息可靠性

唯一可能也是類似于RabbitMQ中的,也就是說你消費到該消息的時候,消費者自動提交offset,讓kafka以為你消費好了該消息,但是自己還沒處理就宕機后,會導致重啟后沒有消費該消息。

解決方法:

關閉kafka默認的自動提交offset,通過消費端業務邏輯處理完消息后,再手動提交offset,當然這里就是會導致重復消費了,這里就是冪等性的問題了。比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重復消費一次

手動提交api:consumer.commitSync();

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理消息System.out.println("處理消息:" + record.value());}// 手動提交 offsetconsumer.commitSync();
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/905235.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/905235.shtml
英文地址,請注明出處:http://en.pswp.cn/news/905235.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何查看項目是否支持最新 Android 16K Page Size 一文匯總

前幾天剛聊過 《Google 開始正式強制 Android 適配 16 K Page Size》 之后&#xff0c;被問到最多的問題是「怎么查看項目是否支持 16K Page Size」 &#xff1f;其實有很多直接的方式&#xff0c;但是最難的是當你的項目有很多依賴時&#xff0c;怎么知道這個「不支持的動態庫…

HttpServletResponse的理解

HttpServletResponse 是 Java Servlet API 提供的一個接口 常用方法 方法用途setContentType(String type)設置響應內容類型&#xff08;如 "application/json"、"text/html"&#xff09;setStatus(int sc)設置響應狀態碼&#xff08;如 200、404&#x…

可靈 AI:開啟 AI 視頻創作新時代

在當今數字化浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;技術正以前所未有的速度滲透到各個領域&#xff0c;尤其是在內容創作領域&#xff0c;AI 的應用正引發一場革命性的變革。可靈 AI 作為快手團隊精心打造的一款前沿 AI 視頻生成工具&#xff0c;宛如一顆璀璨的…

用 AltSnap 解鎖 Windows 窗口管理的“魔法”

你有沒有遇到過這樣的場景&#xff1a;電腦屏幕上堆滿了窗口&#xff0c;想快速調整它們的大小和位置&#xff0c;卻只能拖來拖去&#xff0c;費時又費力&#xff1f;或者你是個多任務狂魔&#xff0c;喜歡一邊寫代碼、一邊看文檔、一邊刷視頻&#xff0c;卻發現 Windows 自帶的…

深度策略梯度算法PPO

一、策略梯度核心思想和原理 從時序差分算法Q學習到深度Q網絡&#xff0c;這些算法都側重于學習和優化價值函數&#xff0c;屬于基于價值的強化學習算法&#xff08;Value-based&#xff09;。 1. 基于策略方法的主要思想&#xff08;Policy-based&#xff09; 基于價值類方…

【LaTeX】Word插入LaTeX行間公式如何編號和對齊

在 Word 文檔中插入公式&#xff0c;需要用到 LaTeX \LaTeX LATE?X 。但遺憾的是&#xff0c;Word 只支持部分 LaTeX \LaTeX LATE?X 語法&#xff0c;這就導致很多在 Markdown 能正常渲染的公式在 Word 中無法正常顯示。 “內嵌”和“顯示” 首先介紹一下 Word 的“內嵌”…

互聯網大廠Java面試實戰:Spring Boot到微服務的技術問答解析

&#x1f4aa;&#x1f3fb; 1. Python基礎專欄&#xff0c;基礎知識一網打盡&#xff0c;9.9元買不了吃虧&#xff0c;買不了上當。 Python從入門到精通 &#x1f601; 2. 畢業設計專欄&#xff0c;畢業季咱們不慌忙&#xff0c;幾百款畢業設計等你選。 ?? 3. Python爬蟲專欄…

spring boot3.0自定義校驗注解:文章狀態校驗示例

文章目錄 Spring Boot 自定義校驗注解&#xff1a;狀態校驗示例一、創建 State 注解步驟&#xff1a;1. 創建自定義注解&#xff1a;2. 實現校驗邏輯&#xff1a; 二、 實現自定義校驗步驟:1. 在實體類中使用自定義校驗注解 State&#xff1a;2. 添加 State 注解&#xff1a; 總…

無侵入式彈窗體驗_探索 Chrome 的 Close Watcher API

1. 引言 在網頁開發中,彈窗(Popup)是一種常見的交互方式,用于提示用戶進行操作、確認信息或展示關鍵內容。然而,傳統的 JavaScript 彈窗方法如 alert()、confirm() 和 prompt() 存在諸多問題,包括阻塞主線程、樣式不可定制等。 為了解決這些問題,Chrome 瀏覽器引入了 …

調出事件查看器界面的4種方法

方法1. 方法2. 方法3. 方法4.

Ubuntu 安裝遠程桌面連接RDP方式

1. 安裝 XFCE4 桌面環境 如果你的 Ubuntu 系統默認使用 GNOME 或其它桌面環境&#xff0c;可以安裝輕量級的 XFCE4&#xff1a; sudo apt update sudo apt install xfce4 xfce4-goodies 說明&#xff1a;xfce4-goodies 包含額外的插件和工具&#xff08;如面板插件、終端等&a…

LWIP傳輸層協議筆記

傳輸協議簡介 文件/圖片/視頻 都是一堆二進制數據 經過傳輸層來傳輸 這兩種協議有什么區別呢&#xff1f; 傳輸層的TCP/UDP三個步驟 TCP使用傳輸流程 1、三次握手 作用&#xff1a;三次握手就是建立連接的過程 2、傳輸數據 作用&#xff1a;建立連接完成之后&#xff…

數據分析與邏輯思維:六步解決業務難題;參考書籍《數據分析原理:6步解決業務分析難題 (周文全, 黃怡媛, 馬炯雄)》

文章目錄 一、懂業務&#xff1a;業務背景與邏輯前提1.1 明確業務目標與問題定義1.2 培養批判性思維與高于業務視角 二、定指標&#xff1a;構建科學的指標體系2.1 指標拆解與維度分析2.2 典型指標體系案例&#xff1a;用戶與業務視角 三、選方法&#xff1a;匹配業務需求的分析…

開啟WSL的鏡像網絡模式

開啟WSL的鏡像網絡模式 前提 Windows主機系統版本高于Windows 11 22H2。WLS版本>2.0。 可輸入wsl --version查看當前系統wsl版本。 修改設置 圖形界面修改 在開始菜單中搜索&#xff1a;wsl settings&#xff0c;結果如下圖所示&#xff1a; 點擊“打開”&#xff0…

Python爬蟲第20節-使用 Selenium 爬取小米商城空調商品

目錄 前言 一、 本文目標 二、環境準備 2.1 安裝依賴 2.2 配置 ChromeDriver 三、小米商城頁面結構分析 3.1 商品列表結構 3.2 分頁結構 四、Selenium 自動化爬蟲實現 4.1 腳本整體結構 4.2 代碼實現 五、關鍵技術詳解 5.1 Selenium 啟動與配置 5.2 頁面等待與異…

聚類分析的原理、常用算法及其應用

聚類分析的原理、常用算法及其應用 一、聚類分析的基本原理 &#xff08;一&#xff09;什么是聚類分析 聚類分析是一種無監督學習方法&#xff0c;其目標是將數據集中的樣本劃分為若干個簇&#xff0c;每個簇包含相似的樣本。聚類分析的核心思想是通過某種相似性度量&#…

Aware和InitializingBean接口以及@Autowired注解失效分析

Aware 接口用于注入一些與容器相關信息&#xff0c;例如&#xff1a; ? a. BeanNameAware 注入 Bean 的名字 ? b. BeanFactoryAware 注入 BeanFactory 容器 ? c. ApplicationContextAware 注入 ApplicationContext 容器 ? d. EmbeddedValueResolverAware 注入 解析器&a…

JDK 安裝與配置

JDK 全稱是 Java SE Development Kit&#xff0c;翻譯成中文就是&#xff1a;Java 標準版開發包&#xff0c;是 Sun 公司&#xff08;后被 Oracle 公司收購&#xff09;專門外 Java 開發人員提供的一套用于開發 Java 應用程序的工具包。 JDK 提供了用于編譯和運行 Java 應用程序…

防火墻來回路徑不一致導致的業務異常

案例拓撲&#xff1a; 拓撲描述&#xff1a; 服務器有2塊網卡&#xff0c;內網網卡2.2.2.1/24 網關2.2.254 提供內網用戶訪問&#xff1b; 外網網卡1.1.1.1/24&#xff0c;外網網關1.1.1.254 80端口映射到公網 這個時候服務器有2條默認路由&#xff0c;分布是0.0.0.0 0.0.0.0 1…

Java面試高頻問題(36-37)

三十六、服務網格核心能力與設計模式 服務網格架構分層模型 mermaid graph TB subgraph 數據平面 ASidecar代理 -->攔截流量 BEnvoy B -->協議轉換 CHTTP/gRPC B -->策略執行 D熔斷/限流 end subgraph 控制平面 E配置中心 -->下發策略 Fistiod F -->證書管理 …