SpringBoot整合rabbitmq-重復消費問題

說明:重復消費的原因大致是生產者將信息A發送到隊列中,消費者監聽到消息A后開始處理業務,業務處理完成后,監聽在告知rabbitmq消息A已經被消費完成途中中斷,也就時說我已經處理完業務,而隊列中還存在當前消息A,導致消費者服務恢復后又消費到消息A,出現重復操作的業務。

解決思路:我只要有一個地方記錄了消息A已經被消費過了【這個消息必須得設置一個唯一標記】,即使消息A再次被消費時,比對一下,如果有記錄則說明消息A已經被消費,如果沒有說明沒有被消費。

我使用redis及設置redis過期時間來解決重復消費問題。

工程圖:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId>  <!-- 被繼承的父項目的構件標識符 --><groupId>org.springframework.boot</groupId>  <!-- 被繼承的父項目的全球唯一標識符 --><version>2.2.2.RELEASE</version>  <!-- 被繼承的父項目的版本 --></parent><groupId>RabbitmqDemoOne</groupId><artifactId>RabbitmqDemoOne</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>RabbitmqDemoOne Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 測試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--開發環境調試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><!-- commons-lang --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>RabbitmqDemoOne</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>

2.application.yml

server:port: 8080
spring:redis:host: 127.0.0.1port: 6379rabbitmq:port: 5672host: 192.168.199.139username: adminpassword: adminvirtual-host: /

3.RabbitMqConfig

package com.dev.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 李慶偉* @title: RabbitMqConfig* @date 2024/3/3 14:12*/
@Configuration
public class RabbitMqConfig {/*** 隊列* @return repeatQueue隊列名稱 true 持久化*/@Beanpublic Queue makeQueue(){return new Queue("repeatQueue",true);}}

4.RedisTemplateConfig

package com.dev.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author 李慶偉* @title: RedisTemplateConfig* @date 2024/3/3 14:24*/
@Configuration
public class RedisTemplateConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);// 設置鍵(key)的序列化采用StringRedisSerializer。redisTemplate.setKeySerializer(new StringRedisSerializer());//redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//設置值(value)的序列化采用jdk// 設置值(value)的序列化采用FastJsonRedisSerializer。redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}}

5.RabbitRepeatController

package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author 李慶偉* @title: RabbitRepeatContoller* @date 2024/3/3 14:05*/
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法/*** 測試* @return*/@GetMapping("/sendMessage")public String sendMessage() {for (int i = 0; i < 1000; i++) {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","張龍");map.put("phone","123..11");map.put("num",i);String str = JSONObject.toJSONString(map);Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend("", "repeatQueue", msg);}return "ok";}}

6.RabbitMqListener

package com.dev.listener;import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Map;/*** @author 李慶偉* @title: RabbitMqListener* @date 2024/3/3 14:13*/
@Component
public class RabbitMqListener {@Autowiredprivate RedisUtil redisUtil;@RabbitListener(queues = "repeatQueue")@RabbitHandlerpublic void process(Message msg) throws UnsupportedEncodingException {//獲取在發送消息時設置的唯一idString id = msg.getMessageProperties().getMessageId();//去redis中查看是否有記錄,如果有證明已經消費過了String val = redisUtil.get(id);if(StringUtils.isNotEmpty(val)){return;}String str = new String(msg.getBody(),"utf-8");if(StringUtils.isNotEmpty(str)){Map<String,Object> map = JSON.parseObject(str,Map.class);System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));//將消費過的消息記錄到redis中,失效時間為1個小時redisUtil.set(id,id,3600L);System.out.println("----------");}}}

7.RedisUtil

package com.dev.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @author 李慶偉* @title: RedisUtil* @date 2024/3/3 14:27*/@Component
public class RedisUtil {@Autowiredprivate RedisTemplate redisTemplate;/*** 批量刪除對應的value** @param keys*/public void remove(final String... keys) {for (String key : keys) {remove(key);}}/*** 批量刪除key** @param pattern*/public void removePattern(final String pattern) {Set<Serializable> keys = redisTemplate.keys(pattern);if (keys.size() > 0)redisTemplate.delete(keys);}/*** 刪除對應的value** @param key*/public void remove(final String key) {if (exists(key)) {redisTemplate.delete(key);}}/*** 判斷緩存中是否有對應的value** @param key* @return*/public boolean exists(final String key) {return redisTemplate.hasKey(key);}/*** 讀取緩存** @param key* @return*/public String get(final String key) {Object result = null;redisTemplate.setValueSerializer(new StringRedisSerializer());ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();result = operations.get(key);if(result==null){return null;}return result.toString();}/*** 寫入緩存** @param key* @param value* @return*/public boolean set(final String key, Object value) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 寫入緩存** @param key* @param value* @return*/public boolean set(final String key, Object value, Long expireTime) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public  boolean hmset(String key, Map<String, String> value) {boolean result = false;try {redisTemplate.opsForHash().putAll(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public  Map<String,String> hmget(String key) {Map<String,String> result =null;try {result=  redisTemplate.opsForHash().entries(key);} catch (Exception e) {e.printStackTrace();}return result;}/*** 遞增** @param key 鍵* @paramby 要增加幾(大于0)* @return*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("遞增因子必須大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 遞減** @param key 鍵* @paramby 要減少幾(小于0)* @return*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("遞減因子必須大于0");}return redisTemplate.opsForValue().increment(key, -delta);}/*** redis zset可已設置排序(案例,熱搜)** @param key 鍵* @paramby* @return*/public void zadd(String key ,String name) {BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);//自增長后的數據boundZSetOperations.incrementScore(name,1);}}

8.App

package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 李慶偉* @title: App* @date 2024/3/3 14:01*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/716730.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/716730.shtml
英文地址,請注明出處:http://en.pswp.cn/news/716730.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Qt|QTreewidget類下函數qt助手詳解說明示例(上)

該系列持續更新&#xff0c;喜歡請一鍵三連&#xff0c;感謝各位大佬。 QT5.14.2 參考官方QT助手 文章目錄 QTreeWidget ClasspropertiesPublic Functions默認構造函數默認析構函數添加根節點void addTopLevelItem(QTreeWidgetItem *item)添加多個根節點void addTopLevelItems…

LeetCode---【和的操作】

目錄 兩數之和我的答案在b站up那里學到的【然后自己復寫】 和為 K 的子數組在b站up那里學到的【然后自己復寫】 三數之和在b站up那里學到的【然后自己復寫】 兩數相加【鏈表】我的半路答案&#xff1a;沒有看到是鏈表在b站up那里學到的【復寫失敗后整理】 兩數之和 我的答案 …

Linux下的權限

1. 操作系統的外殼 在理解Linux權限之前&#xff0c;我們先來吃點小菜。 1.大部分指令都是文件&#xff0c;如果把指令對應的文件刪除了&#xff0c;那么這條指令就使用不了了。 2.用戶執行某種功能的時候&#xff0c;不是直接讓操作系統執行對應的指令的&#xff0c;而是先交…

IIC協議總結

1.基本理解 iic通信協議:雙線制串行通信協議,由時鐘線SCL和數據線SDA構成. 通信方式:主從模式,主設備發起通信,從設備響應通信 2.通信的基本步驟 a.主設備發送一個開始信號&#xff0c;表示開始通信&#xff0c;即啟動I2C 條件&#xff1a;SCL1&#xff0c;SDA出現下降沿 …

Python開源項目月排行 2024年2月

Python 趨勢月報&#xff0c;按月瀏覽往期 GitHub,Gitee 等最熱門的Python開源項目&#xff0c;入選的項目主要參考GitHub Trending,部分參考了Gitee和其他。排名不分先后&#xff0c;都是當前月份內相對熱門的項目。 入選公式&#xff1d;70%GitHub Trending20%Gitee10%其他 …

jvm面試題-背誦版

按照思維導圖抽查和記憶&#xff0c;答案見&#xff1a;四、面試-多線程/并發_scheduledfuture釋放-CSDN博客

Jmeter系列(4) 線程屬性詳解

線程屬性 線程組是配置壓測策略的一個重要環節線程組決定了測試執行的請求數量 線程數 在這里線程數相當于一個虛擬用戶每個線程數大約占內存1M特別注意?? 單臺機器最大線程數不要超過1000&#xff0c;不然可能會造成內存溢出 Ramp-Up時間 所有線程在多長時間內全部啟動…

【網絡工程設計】用GNS3和VMware搭建網絡環境

&#x1f4dd;本文介紹 本文主要是使用GNS3和VMware來搭建網絡環境 &#x1f44b;作者簡介&#xff1a;一個正在積極探索的本科生 &#x1f4f1;聯系方式&#xff1a;943641266(QQ) &#x1f6aa;Github地址&#xff1a;https://github.com/sankexilianhua &#x1f511;Gitee地…

計算機網絡-第2章 物理層

本章內容&#xff1a;物理層和數據通信的概念、傳輸媒體特點&#xff08;不屬于物理層&#xff09;、信道復用、數字傳輸系統、寬帶接入 2.1-2.2 物理層和數據通信的概念 物理層解決的問題&#xff1a;如何在傳輸媒體上傳輸數據比特流&#xff0c;屏蔽掉傳輸媒體和通信手段的差…

文獻閱讀筆記《Spatial-temporal Forecasting for Regions without Observations》13頁

目錄 目錄 目錄 發行刊物 ABSTRACT 1 INTRODUCTION 2 RELATED WORK&#xff08;相關工作 2.1 Spatial-temporal Forecasting&#xff08;時空預測 2.2 Spatial-temporal Forecasting withIncomplete Data&#xff08;不完全數據的時空預測 2.3 Graph Contrastive Lear…

藍橋杯集訓·每日一題2024 (前綴和)

筆記&#xff1a; 例題&#xff1a; #include<bits/stdc.h> using namespace std; const int N 5000010; char str[N]; int s[N]; int main(){int t;cin>>t;for(int a1;a<t;a){int n;cin>>n;scanf("%s",str1);for(int i1;i<n;i){s[i]s[i-1]…

【MySQL】:約束全解析

&#x1f3a5; 嶼小夏 &#xff1a; 個人主頁 &#x1f525;個人專欄 &#xff1a; MySQL從入門到進階 &#x1f304; 莫道桑榆晚&#xff0c;為霞尚滿天&#xff01; 文章目錄 &#x1f4d1;前言一. 約束概述二. 約束演示三. 外鍵約束3.1 介紹3.2 語法3.3 刪除/更新行為 &…

Mybatis - generator(自動生成)

1、生成數據庫數據 2、配置pom文件 這個plugin文件里有配置項和依賴以及版本號 修改configurationFile路徑為項目里存在的generatorConfig.xml文件&#xff0c;因為后續的配置都在這個文件中進行。 <plugin><groupId>org.mybatis.generator</groupId><…

Netty的InboundHandler 和OutboundHandler

一、InboundHandler 和OutboundHandler的區別 在Netty中&#xff0c;"inbound"表示來自外部來源&#xff08;如網絡連接&#xff09;的數據&#xff0c;而"outbound"則表示從應用程序發送到外部目標&#xff08;如網絡連接或其他服務&#xff09;的數據。…

Git——Upload your open store

0.default config ssh-keygen -t rsa #之后一路回車,當前目錄.ssh/下產生公私鑰 cat ~/.ssh/id_rsa.pub #復制公鑰到賬號 git config --global user.email account_email git config --global user.name account_name1. 上傳一個公開倉庫 查看當前分支&#xff1a; git branc…

MATLAB基于隱馬爾可夫模型-高斯混合模型-期望最大化的MR圖像分割

隱馬爾可夫模型是一種統計模型&#xff0c;它描述了馬爾可夫過程&#xff0c;隱馬爾可夫過程中包含隱變量&#xff0c;語音識別和詞性自動標注等一些領域常常使用隱馬爾可夫模型方法來處理。馬爾可夫過程是一類隨機過程&#xff0c;馬爾可夫鏈是它的原始模型&#xff0c;馬爾可…

GPT對話知識庫——FreeRTOS中寄存器BASEPRI的作用

提問模型&#xff1a;GPT-4-TURBO-PREVIEW 提問時間&#xff1a;2024.03.02 1&#xff0c;問&#xff1a; 舉例詳細說明寄存器BASEPRI在freertos中作用 1&#xff0c;答&#xff1a; 在使用FreeRTOS這樣的實時操作系統時&#xff0c;確保系統的實時性和響應能力至關重要。其中…

【C++那些事兒】深入理解C++類與對象:從概念到實踐(中)| 默認構造函數 | 拷貝構造函數 | 析構函數 | 運算符重載 | const成員函數

&#x1f4f7; 江池俊&#xff1a; 個人主頁 &#x1f525;個人專欄&#xff1a; ?數據結構冒險記 ?C那些事兒 &#x1f305; 有航道的人&#xff0c;再渺小也不會迷途。 文章目錄 1. 類的6個默認成員函數2. 構造函數2.1 概念2.2 特性 3. 析構函數3.1 概念3.2 特性 4. 拷貝…

國際視頻編解碼標準提案下載地址

H.266 相關提案下載地址&#xff1a;http://phenix.it-sudparis.eu/jvet/ 更新的地址&#xff1a;https://jvet-experts.org/ H.265 提案下載地址&#xff1a;http://phenix.int-evry.fr/jct/ 標準文檔下載地址&#xff1a;http://www.itu.int/rec/T-REC-H.265 H.264 提案下載…

QT多語言切換功能

一.目的 在做項目時&#xff0c;有時希望我們的程序可以在不同的國家使用&#xff0c;這樣最好的方式是一套程序能適應于多國語言。 Qt提供了這樣的功能&#xff0c;使得一套程序可以呈現出不同的語言界面。本文將介紹QT如何實現多語言&#xff0c;以中文和英文為例。 QT開發…