阿里云和騰訊云RocketMQ 發消息和消費消息客戶端JAVA接口

一、RocketMQ 概述

RocketMQ 是阿里巴巴開源的一款分布式消息中間件,后捐贈給 Apache 基金會成為頂級項目。它具有低延遲、高并發、高可用、高可靠等特點,廣泛應用于訂單交易、消息推送、流計算、日志收集等場景。

核心特點

  1. 分布式架構:支持集群部署,可水平擴展

  2. 高吞吐量:單機可支持10萬級TPS

  3. 低延遲:毫秒級消息投遞

  4. 高可用性:支持主從復制,自動故障轉移

  5. 消息可靠性:支持消息持久化,確保不丟失

  6. 豐富的消息模式:支持普通消息、順序消息、事務消息、定時消息等

二、核心概念

1. 基本組件

組件說明
NameServer輕量級注冊中心,負責Broker的注冊與發現
Broker消息存儲與轉發服務器,負責消息存儲、投遞和查詢
Producer消息生產者,負責發送消息
Consumer消息消費者,負責消費消息
Topic消息主題,用于消息分類
Message Queue消息隊列,Topic的分區單位
Tag消息標簽,用于消息二級分類
Group生產者組/消費者組,用于集群管理

一、阿里云rocketMQ

使用阿里云 ONS SDK
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>2.0.5.Final</version> <!-- 推薦最新版本 -->
</dependency>

獲取阿里云 RocketMQ 配置

  • Endpointhttp://{YourInstanceId}.mq-internet.aliyuncs.com:80

  • AccessKey:阿里云賬號的?AccessKey ID?和?AccessKey Secret

  • Topic:消息主題(需在阿里云控制臺創建)

  • Group ID:消費者組(需在控制臺創建)

1、發消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;import java.util.Properties;public class AliyunMQProducer {public static void main(String[] args) {// 1. 配置 ProducerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Producer Group ID// 2. 創建 ProducerProducer producer = ONSFactory.createProducer(properties);producer.start();// 3. 創建消息Message msg = new Message("YourTopic",  // Topic"YourTag",    // Tag"Hello Aliyun RocketMQ!".getBytes()  // Body);// 4. 發送消息producer.send(msg);System.out.println("消息發送成功!");// 5. 關閉 Producerproducer.shutdown();}
}

2、消費MQ

import com.aliyun.openservices.ons.api.*;
import java.util.Properties;public class AliyunMQConsumer {public static void main(String[] args) {// 1. 配置 ConsumerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Consumer Group ID// 2. 創建 ConsumerConsumer consumer = ONSFactory.createConsumer(properties);// 3. 訂閱 Topic 和 Tag(* 表示所有 Tag)consumer.subscribe("YourTopic", "*", new MessageListener() {@Overridepublic Action consume(Message message, ConsumeContext context) {System.out.println("收到消息: " + new String(message.getBody()));return Action.CommitMessage; // 消費成功}});// 4. 啟動 Consumerconsumer.start();System.out.println("消費者已啟動,等待消息...");}
}

?

  1. 阿里云 ONS SDK 更穩定,推薦使用(比 Apache RocketMQ 客戶端更適配阿里云環境)。

  2. Topic 和 Group ID 需先在阿里云控制臺創建,否則會報錯。

  3. 生產環境建議配置重試機制和日志監控,避免消息丟失。

  4. 消費模式

    • 集群消費(CLUSTERING):同 Group ID 的多個 Consumer 分攤消息(默認)。

    • 廣播消費(BROADCASTING):同 Group ID 的每個 Consumer 都收到所有消息。

二、騰訊云RocketMQ

import java.io.UnsupportedEncodingException;
import java.util.List;import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import lombok.extern.slf4j.Slf4j;/*** 騰訊云rocketMQ服務類*/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class RocketTXMqService {@Value("${rocketmq.namespace:-1}")private String namespace;@Value("${rocketmq.producer.group:-1}")private String groupName;@Value("${rocketmq.producer.access-key:-1}")private String accessKey;@Value("${rocketmq.producer.secret-key:-1}")private String secretKey;@Value("${rocketmq.name-server:-1}")private String nameserver;// MQ生產者private DefaultMQProducer producer;// MQ實例化消費者pushprivate DefaultMQPushConsumer pushConsumer;// MQ實例化消費者pullprivate DefaultLitePullConsumer pullConsumer;/*** 創建生產者* * @return*/public DefaultMQProducer getProducer() {if (null == producer) {// 實例化消息生產者Producerproducer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL權限);// 設置NameServer的地址producer.setNamesrvAddr(nameserver);try {// 啟動Producer實例producer.start();} catch (MQClientException e) {e.printStackTrace();}}return producer;}/*** 同步發送 發送消息*/public void syncSend(String topic, String tag, String data) {producer = getProducer();// 發送消息SendResult sendResult = null;try {// 創建消息實例,設置topic和消息內容Message msg = new Message(topic, tag, data.getBytes(RemotingHelper.DEFAULT_CHARSET));sendResult = producer.send(msg);log.info("埋點信息發送騰訊云MQ:" + data);log.info("發送騰訊云MQ接口返回狀態sendResult:" + sendResult);} catch (UnsupportedEncodingException e) {log.error("UnsupportedEncodingException:" + e.getMessage());} catch (MQClientException e) {log.error("MQClientException:" + e.getMessage());} catch (RemotingException e) {log.error("RemotingException:" + e.getMessage());} catch (MQBrokerException e) {log.error("MQBrokerException:" + e.getMessage());} catch (InterruptedException e) {log.error("InterruptedException:" + e.getMessage());}}/*** 創建push消費者* * @return*/public DefaultMQPushConsumer getPushConsumer() {if (null == pushConsumer) {// 實例化消費者pushConsumer = new DefaultMQPushConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL權限// 設置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);}return pushConsumer;}/*** 創建pull 消費者* * @return*/public DefaultLitePullConsumer getPullConsumer() {if (null == pullConsumer) {// 實例化消費者// 實例化消費者pullConsumer = new DefaultLitePullConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// 設置NameServer的地址pullConsumer.setNamesrvAddr(nameserver);// 設置從第一個偏移量開始消費pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);}return pullConsumer;}/*** push方式訂閱消費* * @param topicName*/public void pushConsumer(String topicName) {pushConsumer = this.getPushConsumer();if (null != pushConsumer) {try {pushConsumer.subscribe(topicName, "*");// 注冊回調實現類來處理從broker拉取回來的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息處理邏輯log.info("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 標記該消息已經被成功消費, 根據消費情況,返回處理狀態return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 啟動消費者實例pushConsumer.start();} catch (MQClientException e) {log.error("push MQClientException:" + e.getMessage());}}}/*** pull方式訂閱消費* * @param topicName*/public void pullConsumer(String topicName) {pullConsumer = this.getPullConsumer();if (null != pullConsumer) {try {// 訂閱topicpullConsumer.subscribe(topicName, "*");// 啟動消費者實例pullConsumer.start();} catch (MQClientException e) {log.error(" pull MQClientException:" + e.getMessage());}try {log.info("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();log.info("%s%n", messageExts);}} finally {pullConsumer.shutdown();}}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/88357.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/88357.shtml
英文地址,請注明出處:http://en.pswp.cn/web/88357.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Vue響應式原理六:Vue3響應式原理

1. 多個對象響應式當前存在的問題&#xff1a;當前實現僅針對某個固定對象&#xff08;obj&#xff09;進行依賴收集&#xff0c;實際開發中需要處理多個不同對象將對象響應式處理邏輯抽取為通用函數&#xff0c;支持任意對象代碼如下&#xff1a; // 方案一&#xff1a;Obje…

【算法筆記 day three】滑動窗口(其他類型)

hello大家好&#xff01;這份筆記包含的題目類型主要包括求子數組已經一些比較‘小眾’的題目。和之前一樣&#xff0c;筆記中的代碼和思路要么是我手搓要么是我借鑒一些大佬的想法轉化成自己的話復現。所以方法不一定是最好的&#xff0c;但一定是經過我理解的產物&#xff0c…

docker-鏡像管理指南

在本節中&#xff0c;我們將詳細介紹 Docker 鏡像的常用命令&#xff0c;幫助您更好地管理和操作鏡像。以下是核心命令及其功能說明&#xff1a;1.使用"ls"查看鏡像列表#查看現有的鏡像列表[rootdocker01 ~]# docker images [rootdocker01 ~]# docker image ls#僅查看…

Mac 電腦無法讀取硬盤的解決方案

引言近年來&#xff0c;選擇使用 Mac 電腦的用戶越來越多&#xff0c;尤其是在設計、開發、剪輯、文檔處理等領域&#xff0c;macOS 憑借其優秀的系統生態與硬件體驗吸引了大量擁躉。與此同時&#xff0c;對于攝影師、剪輯師、程序員、學生等用戶來說&#xff0c;一塊移動硬盤往…

25春期末考

web 瘋狂星期四 先來看一下源碼 分析代碼的黑名單后得知 我們可以用的字符就只剩下 字母a-z(大小寫均可) 數字2 空格 這里的限制太多了 這里比較常用的getallheaders被ban掉了 這里就是用session來做 session_start()開啟session session_id()獲取session 這里我們要構造一…

時間顯示 藍橋云課Java

目錄 題目鏈接 題目 解題思路 代碼 題目鏈接 競賽中心 - 藍橋云課 題目 解題思路 通過%天數,得到一天內的時間,然后/小時單位(換算成毫秒的)得到小時,然后總數減去該小時,得到分鐘數,秒數同理 代碼 import java.util.Scanner; // 1:無需package // 2: 類名必須Main, 不…

STM32F1控制步進電機

一、基礎知識1. 步進電機控制方式脈沖方向控制&#xff08;最常見&#xff09;控制信號&#xff1a;DIR方向&#xff1a;高低電平決定正轉或反轉&#xff1b;STEP脈沖&#xff1a;每個脈沖電機前進一步&#xff08;可通過端口拉高拉低來模擬脈沖&#xff0c;或使用pwm來生成脈沖…

Docker 容器部署腳本

#!/bin/bash# # Author: ldj # Date: 2025-07-08 15:37:11 # Description: 首先刪除舊的容器和鏡像&#xff0c;然后登錄到 Harbor 并拉取最新的鏡像進行部署 # # 顯示每條命令執行情況&#xff0c;便于調試 set -x harbor_addr$1 harbor_repo$2 project_name$3 version$4 po…

OpenCV 4.10.0 移植 - Android

前文: Ubuntu 編譯 OpenCV SDK for Android Linux OpenCV 4.10.0 移植 概述 在移動應用開發領域&#xff0c;Android平臺與OpenCV庫的結合為開發者提供了強大的圖像處理和計算機視覺能力。OpenCV(Open Source Computer Vision Library)是一個開源的計算機視覺和機器學習軟件…

go go go 出發咯 - go web開發入門系列(二) Gin 框架實戰指南

go go go 出發咯 - go web開發入門系列&#xff08;二&#xff09; Gin 框架實戰指南 往期回顧 go go go 出發咯 - go web開發入門系列&#xff08;一&#xff09; helloworld 前言 前一節我們使用了go語言簡單的通過net/http搭建了go web服務&#xff0c;但是僅使用 Go 的標…

編譯OpenHarmony-4.0-Release RK3566 報錯

編譯OpenHarmony-4.0-Release RK3566 報錯1. 報錯問題2.問題解決3.解決方案4.?調試技巧?subsystem name config incorrect in ‘/home/openharmony/OpenHarmony/vendor/kaihong/khdvk_356b/bundle.json’, build file subsystem name is kaihong_products,configured subsy1.…

【PTA數據結構 | C語言版】線性表循環右移

本專欄持續輸出數據結構題目集&#xff0c;歡迎訂閱。 文章目錄題目代碼題目 給定順序表 A(a1?,a2?,?,an?)&#xff0c;請設計一個時間和空間上盡可能高效的算法將該線性表循環右移指定的 m 位。例如&#xff0c;(1,2,5,7,3,4,6,8) 循環右移 3 位&#xff08;m3) 后的結果…

c++-內部類

概念如果一個類定義在另一個類的內部&#xff0c;這個內部類就叫做內部類。內部類是一個獨立的類&#xff0c; 它不屬于外部類。特性1.不能通過外部類的對象去訪問內部類的成員。外部類對內部類沒有任何優越的訪問權限。 2.內部類就是外部類的友元類&#xff0c;參見友元類的定…

.golangci.yml文件配置

version: “2” run: timeout: 5m concurrency: 10 modules-download-mode: readonly linters: default: standard enable: - revive - cyclop settings: staticcheck: initialisms: [ “ACL”, “API”, “ASCII”, “CPU”, “CSS”, “DNS”, “EOF”, “GUID”, “HTML”, …

YOLO模型魔改指南:從原理到實戰,替換Backbone、Neck和Head(戰損版)

前言 Hello&#xff0c;大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名熱愛AI技術的GIS開發者。本系列是作者參加DataWhale 2025年6月份Yolo原理組隊學習的技術筆記文檔&#xff0c;這里整理為博客&#xff0c;希望能幫助Yolo的開發者少走彎路&#xff01; &am…

Swift 圖論實戰:DFS 算法解鎖 LeetCode 323 連通分量個數

文章目錄摘要描述示例題解答案DFS 遍歷每個連通區域Union-Find&#xff08;并查集&#xff09;題解代碼分析&#xff08;Swift 實現&#xff1a;DFS&#xff09;題解代碼詳解構建鄰接表DFS 深度優先搜索遍歷所有節點示例測試及結果示例 1示例 2示例 3時間復雜度分析空間復雜度分…

【劍指offer】棧 隊列

&#x1f4c1; JZ9 用兩個棧實現隊列一個棧in用作進元素&#xff0c;一個棧out用于出元素。當棧out沒有元素時&#xff0c;從in棧獲取數據&#xff0c;根據棧的特性&#xff0c;棧out的top元素一定是先進入的元素&#xff0c;因此當棧out使用pop操作時&#xff0c;一定時滿足隊…

GoView 低代碼數據可視化

純前端 分支&#xff1a; master &#x1f47b; 攜帶 后端 請求分支: master-fetch &#x1f4da; GoView 文檔 地址&#xff1a;https://www.mtruning.club/ 項目純前端-Demo 地址&#xff1a;https://vue.mtruning.club/ 項目帶后端-Demo 地址&#xff1a;https://demo.mtrun…

Spring Boot返回前端Long型丟失精度 后兩位 變成00

文章目錄一、前言二、問題描述2.1、問題背景2.2、問題示例三、解決方法3.1、將ID轉換為字符串3.2、使用JsonSerialize注解3.3、使用JsonFormat注解一、前言 在后端開發中&#xff0c;我們經常會遇到需要將ID作為標識符傳遞給前端的情況。當ID為long類型時&#xff0c;如果該ID…

計算機網絡實驗——無線局域網安全實驗

實驗1. WEP和WPA2-PSK實驗一、實驗目的驗證AP和終端與實現WEP安全機制相關的參數的配置過程。驗證AP和終端與實現WPA2-PSK安全機制相關的參數的配置過程。驗證終端與AP之間建立關聯的過程。驗證關閉端口的重新開啟過程。驗證屬于不同BSS的終端之間的數據傳輸過程。二、實驗任務…