簡識MQ之Kafka、ActiveMQ、RabbitMQ、RocketMQ傳遞機制

四種主流消息隊列(Kafka、ActiveMQ、RabbitMQ、RocketMQ)的生產者與消費者傳遞信息的機制說明,以及實際使用中的注意事項和示例:


1. Apache Kafka

傳遞機制

  • 模型:基于?發布-訂閱模型,生產者向?主題(Topic)?發送消息,消費者訂閱主題并消費消息。
  • 核心流程
    1. 生產者將消息發送到 Kafka 集群的 Broker,根據?分區策略(如輪詢、哈希)將消息寫入對應的分區(Partition)。
    2. 消費者通過消費者組(Consumer Group)訂閱主題,每個分區的數據會被分配給組內的消費者(通過?Rebalance?機制)。
    3. 消費者從分區中拉取消息(poll?方式)并處理。

示例代碼(Kafka 生產者)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "message"));
producer.close();

示例代碼(Kafka 消費者)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}

注意事項

  1. 分區與順序性

    • Kafka 不保證跨分區的消息順序,但單個分區內的消息按順序存儲。
    • 示例:發送訂單創建事件時,需將同一用戶的消息發送到同一分區(通過?key)。
  2. 消費者組與 Rebalance

    • 消費者組內成員變化時(如新增消費者),會觸發分區重新分配(Rebalance),可能導致短暫消息不可讀。
    • 建議:避免頻繁增減消費者實例。
  3. 消息持久化

    • 生產者可通過?acks=all?確保消息寫入所有副本后返回成功,但會增加延遲。
    • 適用場景:對消息可靠性要求極高的場景(如金融交易)。

2. Apache ActiveMQ

傳遞機制

  • 模型:支持?點對點(Queue)?和?發布-訂閱(Topic)?模型。
  • 核心流程
    1. 生產者發送消息到隊列或主題。
    2. 消息通過?異步/同步?方式傳遞給消費者(默認異步)。
    3. 可啟用?持久化,消息存儲到磁盤以防 Broker宕機。

示例代碼(ActiveMQ 生產者)

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);
connection.close();

示例代碼(ActiveMQ 消費者)

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive();
System.out.println("Received: " + message.getText());
consumer.acknowledge(); // 手動確認消息
connection.close();

注意事項

  1. 消息持久化

    • 需設置?DeliveryMode.PERSISTENT,否則消息可能丟失。
    • 示例:關鍵業務消息(如訂單支付通知)必須持久化。
  2. 事務支持

    • 生產者和消費者可通過事務確保消息的原子性(發送/接收一致性)。
    • 風險:長事務可能導致性能下降。
  3. 死信隊列(DLQ)

    • 配置?deadLetterExchange?和?deadLetterRoutingKey?處理無法消費的消息。
    • 示例:超過重試次數的消息自動進入 DLQ。

3. RabbitMQ

傳遞機制

  • 模型:靈活的消息路由模型,基于?交換器(Exchange)?和?綁定(Binding)
  • 核心流程
    1. 生產者將消息發送到交換器,并附帶路由鍵(Routing Key)。
    2. 交換器根據類型(如 Direct、Topic、Headers)將消息路由到綁定的隊列。
    3. 消費者從隊列中拉取消息。

示例代碼(RabbitMQ 生產者Producers)

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "direct-exchange";channel.exchangeDeclare(exchangeName, "direct");String routingKey = "user.login";AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2) // 持久化.build();channel.basicPublish(exchangeName, routingKey, props, "Login Event".getBytes());
}

示例代碼(RabbitMQ 消費者Consumers)

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {Channel channel = connection.createChannel();String queueName = "user_queue";channel.queueDeclare(queueName, true, false, false, null);String exchangeName = "direct-exchange";channel.queueBind(queueName, exchangeName, "user.login");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
}

注意事項

  1. 消息確認機制

    • 消費者需發送?ACK?確認消息處理,避免重復消費。
    • 示例:使用?channel.basicAck()?或?channel.basicNack()
  2. 死信隊列配置

    • 在隊列聲明時配置?x-dead-letter-exchange?和?x-dead-letter-routing-key
    • 示例:處理失敗的消息進入專用隊列。
  3. 內存限制

    • RabbitMQ 默認限制隊列大小為內存中的一定比例,需根據業務調整?vm_memory_high_watermark

4. RocketMQ

傳遞機制

  • 模型:基于?主題(Topic)?和?隊列(Queue)?的分布式模型。
  • 核心流程
    1. 生產者Producers發送消息到主題,主題將消息路由到多個隊列(負載均衡)。
    2. 消費者Consumers通過消費者組(Consumer Group)訂閱主題,從隊列中拉取消息。
    • 順序消息:同一隊列內的消息按順序消費。
    • 廣播消息:消費者組內每個消費者都收到同一條消息(僅限 Topic 模型)。

示例代碼(RocketMQ 生產者Producers)

DefaultMQProducer producer = new DefaultMQProducer("my-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();Message msg = new Message("my-topic", "Order-123".getBytes(), "JSON".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("Send Result: " + sendResult);producer.shutdown();

示例代碼(RocketMQ 消費者Consumers)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my-topic", "*"); // 訂閱所有隊列consumer.registerMessageListener(new MessageListener() {@Overridepublic void consume(Message msg, ConsumeContext context) throws Exception {System.out.println("Received: " + new String(msg.getBody()));context.commitMessage(msg); // 提交消費位移}
});
consumer.start();

注意事項

  1. 事務消息

    • 生產者和消費者可通過事務確保消息的最終一致性。
    • 示例:訂單創建成功后,發送支付通知(若失敗則回滾)。
  2. 消息順序性

    • 嚴格順序場景需指定?MessageQueueSelector,確保同一訂單的所有消息進入同一隊列。
  3. 消息堆積

    • 消費者處理能力不足時,消息會堆積在隊列中,需監控并擴容消費者實例。

總結對比

特性KafkaActiveMQRabbitMQRocketMQ
模型發布-訂閱(僅 Topic)支持點對點和發布-訂閱靈活路由(多種交換器)主題+隊列(順序/廣播)
持久化支持分區副本支持消息持久化和事務支持隊列和消息持久化支持消息持久化和事務
順序性單分區有序不保證(除非事務)可通過隊列保證單隊列嚴格有序
適用場景高吞吐、日志/事件流通用、企業級消息系統復雜路由、多協議支持高可靠、順序消息、分布式事務

通用注意事項

  1. 消息冪等性:防止重復消費(如訂單支付場景)。
  2. 監控與告警:關注隊列長度、消息堆積、消費者延遲。
  3. 序列化與壓縮:選擇高效的序列化方式(如 Protobuf)和壓縮算法(如 GZIP)。
  4. 連接池管理:避免頻繁創建/關閉連接,影響性能。

5、注意MQ的Kafka、ActiveMQ、RabbitMQ、RocketMQ區別;

? ? ? ? URL: 淺識MQ的 Kafka、ActiveMQ、RabbitMQ、RocketMQ區別-CSDN博客

6、注意:持久化策略

? ? ? ? URL:淺聊MQ之Kafka、RabbitMQ、ActiveMQ、RocketMQ持久化策略-CSDN博客

? ?

(望各位潘安、各位子健/各位彥祖、于晏不吝賜教!多多指正!🙏)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/71736.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/71736.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/71736.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Websocket——心跳檢測

1. 前言&#xff1a;為什么需要心跳機制&#xff1f; 在現代的實時網絡應用中&#xff0c;保持客戶端和服務端的連接穩定性是非常重要的。尤其是在長時間的網絡連接中&#xff0c;存在一些異常情況&#xff0c;導致服務端無法及時感知到客戶端的斷開&#xff0c;可能造成不必要…

tailwindcss 前端 css 框架 無需寫css 快速構建頁面

版本&#xff1a;VUE3 TS 框架 vite 文章中使用tailwindcss 版本&#xff1a; ^3.4.17 簡介&#xff1a; Tailwind CSS 一個CSS 框架&#xff0c;提供組件化的樣式&#xff0c;直接在HTML 中編寫樣式&#xff0c;無需額外自定義CSS &#xff0c;快速&#xff01; 簡潔&#…

MFC開發:如何創建第一個MFC應用程序

文章目錄 一、概述二、MFC 的主要組件三、創建一個MFC窗口四、控件綁定消息函數 一、概述 MFC 是微軟提供的一個 C 類庫&#xff0c;用于簡化 Windows 應用程序的開發。它封裝了 Windows API&#xff0c;提供面向對象的接口&#xff0c;幫助開發者更高效地創建圖形用戶界面&am…

【Git版本控制器】第四彈——分支管理,合并沖突,--no-ff,git stash

&#x1f381;個人主頁&#xff1a;我們的五年 &#x1f50d;系列專欄&#xff1a;Linux網絡編程 &#x1f337;追光的人&#xff0c;終會萬丈光芒 &#x1f389;歡迎大家點贊&#x1f44d;評論&#x1f4dd;收藏?文章 ? 相關筆記&#xff1a; https://blog.csdn.net/djd…

AI助力小微企業技術開發規范化管理 | 雜談

AI助力小微企業技術開發規范化管理 在小型技術研發企業中&#xff0c;人員配置緊張&#xff0c;往往一名員工需要承擔多項職務和任務。例如&#xff0c;后端程序開發人員可能同時要負責需求調研、數據庫設計、后端設計及開發&#xff0c;甚至在某些情況下還需兼任架構師的角色。…

SpringBoot+Vue+微信小程序的貓咖小程序平臺(程序+論文+講解+安裝+調試+售后)

感興趣的可以先收藏起來&#xff0c;還有大家在畢設選題&#xff0c;項目以及論文編寫等相關問題都可以給我留言咨詢&#xff0c;我會一一回復&#xff0c;希望幫助更多的人。 系統介紹 在當下這個高速發展的時代&#xff0c;網絡科技正以令人驚嘆的速度不斷迭代更新。從 5G …

DeepSeek提效實操革命,全場景應用指南 AI提示詞萬能公式四步法以及對話技巧

歡迎來到濤濤聊AI DeepSeek系列文章 三塊顯示器如何擺放效率最高&#xff0c;讓deepseek給深度思考下 阿里云免費試用 DeepSeek大模型。 限時送 100 萬 tokens&#xff0c;快來搶先免費體驗&#xff01;AI 助手不再出現系統繁忙阿里云免費試用 DeepSeek大模型。 限時送 100 萬 …

智慧教室與無紙化同屏技術方案探討與實現探究

引言 隨著教育信息化的不斷發展&#xff0c;智慧教室和無紙化同屏技術逐漸成為提升教學效率和質量的重要手段。大牛直播SDK憑借其強大的音視頻處理能力和豐富的功能特性&#xff0c;在智慧教室和無紙化同屏領域積累了眾多成功案例。本文將深入探討基于大牛直播SDK的智慧教室、…

Linux MySQL 8.0.29 忽略表名大小寫配置

Linux MySQL 8.0.29 忽略表名大小寫配置 問題背景解決方案遇到的問題&#xff1a; 問題背景 突然發現有個大寫的表報不存在。 在Windows上&#xff0c;MySQL是默認支持忽略大小寫的。 這個時候你要查詢一下是不是沒有配置&#xff1a; SHOW VARIABLES LIKE lower_case_table…

【藍橋杯單片機】第十三屆省賽第二場

一、真題 二、模塊構建 1.編寫初始化函數(init.c) void Cls_Peripheral(void); 關閉led led對應的鎖存器由Y4C控制關閉蜂鳴器和繼電器 2.編寫LED函數&#xff08;led.c&#xff09; void Led_Disp(unsigned char ucLed); 將ucLed取反的值賦給P0 開啟鎖存器 關閉鎖存…

【CMake 教程】常用函數與構建案例解析(三)

一、CMake 常用函數簡析 1. 條件判斷 if() / elseif() / else() 在 CMake 腳本中&#xff0c;條件判斷是控制邏輯的重要工具。if() 支持多種比較語句&#xff0c;包括數值、字符串、布爾值和變量存在性等。在條件滿足時執行特定邏輯代碼&#xff0c;下面是典型語法&#xff1…

ASP.NET Core 8.0學習筆記(二十七)——數據遷移:Migrations深入與其他遷移命令

一、數據庫架構的管理 1.EF Core提供兩種方式來保持EF Core的模型與數據庫保持同步。 (1)以數據庫為準&#xff1a;反向工程&#xff08;Db First&#xff09;&#xff0c;適用于中大型工程 (2)以代碼為準&#xff1a;數據遷移&#xff08;Code First&#xff09;&#xff0c;…

Python 基本語法的詳細解釋

目錄 &#xff08;1&#xff09;注釋 &#xff08;2&#xff09;縮進 &#xff08;3&#xff09;變量和數據類型 變量定義 數據類型 &#xff08;4&#xff09;輸入和輸出 輸出&#xff1a;print() 函數 輸入&#xff1a;input() 函數 &#xff08;1&#xff09;注釋 注…

20-R 繪圖 - 餅圖

R 繪圖 - 餅圖 R 語言提供來大量的庫來實現繪圖功能。 餅圖&#xff0c;或稱餅狀圖&#xff0c;是一個劃分為幾個扇形的圓形統計圖表&#xff0c;用于描述量、頻率或百分比之間的相對關系。 R 語言使用 pie() 函數來實現餅圖&#xff0c;語法格式如下&#xff1a; pie(x, l…

Ubuntu 22.04 一鍵部署MinerU1.1.0

MinerU MinerU是一款將PDF轉化為機器可讀格式的工具&#xff08;如markdown、json&#xff09;&#xff0c;可以很方便地抽取為任意格式。 MinerU誕生于書生-浦語的預訓練過程中&#xff0c;我們將會集中精力解決科技文獻中的符號轉化問題&#xff0c;希望在大模型時代為科技發…

紫光同創開發板使用教程(二):sbit文件下載

sbit文件相當于zynq里面的bit文件&#xff0c;紫光的fpga工程編譯完成后會自動生成sbit文件&#xff0c;因工程編譯比較簡單&#xff0c;這里不在講解工程編譯&#xff0c;所以我這里直接下載sbit文件。 1.工程編譯完成后&#xff0c;可以看到Flow列表里面沒有報錯&#xff0c…

DeepSeek 部署全指南:常見問題解析與最新技術實踐

引言 隨著開源大模型DeepSeek的爆火&#xff0c;其部署需求激增&#xff0c;但用戶在實際操作中常面臨服務器壓力、本地部署性能瓶頸、API配置復雜等問題。本文結合2025年最新技術動態&#xff0c;系統梳理DeepSeek部署的核心問題與解決方案&#xff0c;并分享行業實踐案例&am…

Vue02

Vue02 綁定class樣式 字符串寫法&#xff0c;適用于&#xff1a;樣式的類名不確定&#xff0c;需要動態指定 數組寫法&#xff0c;適用于&#xff1a;要綁定的樣式個數不確定&#xff0c;名字也不確定 對象寫法&#xff0c;適用于&#xff1a;要綁定的樣式個數缺點&#xff…

超導量子計算機的最新進展:走向實用化的量子革命

超導量子計算機的最新進展:走向實用化的量子革命 大家好,我是 Echo_Wish,今天我們來聊聊科技圈最炙手可熱的話題之一——超導量子計算機。近年來,量子計算領域可謂是風起云涌,而超導量子計算機作為主流路線之一,已經在學術界和工業界取得了不少突破性進展。 那么,超導…

LangChain構建行業知識庫實踐:從架構設計到生產部署全指南

文章目錄 引言:行業知識庫的進化挑戰一、系統架構設計1.1 核心組件拓撲1.2 模塊化設計原則二、關鍵技術實現2.1 文檔預處理流水線2.2 混合檢索增強三、領域適配優化3.1 醫學知識圖譜融合3.2 檢索結果重排序算法四、生產環境部署4.1 性能優化方案4.2 安全防護體系五、評估與調優…