Spring boot 使用Redis 消息發布訂閱

Spring boot 使用Redis 消息發布訂閱

文章目錄

  • Spring boot 使用Redis 消息發布訂閱
    • Redis 消息發布訂閱
        • Redis 發布訂閱 命令
    • Spring boot 實現消息發布訂閱
      • 發布消息
      • 消息監聽
      • 主題訂閱
    • Spring boot 監聽 Key 過期事件
      • 消息監聽
      • 主題訂閱

最近在做請求風控的時候,在網上搜集了大量的解決方案,最后使用Redis 消息發布訂閱 比較符合業務。做一下記錄

img

Redis 消息發布訂閱

img

Redis 發布訂閱 命令:redis命令手冊

1、Redis 中"pub/sub"的消息,為"即發即失",server 不會保存消息,如果 publish 的消息沒有任何 client 處于 “subscribe” 狀態,消息將會被丟棄;如果 client 在 subcribe 時,鏈接斷開后重連,那在么此期間的消息也將丟失。

2、Redis server 將會"盡力"將消息發送給處于 subscribe 狀態的 client,但是仍不會保證每條消息都能被正確接收。

**優點:**支持發布訂閱,支持多組生產者、消費者處理消息

缺點:

  1. 消費者下線數據會丟失

  2. 不支持數據持久化,Redis宕機則數據也會丟失

  3. 消息堆積,緩存區溢出,消費者會被強制踢下線,數據也會丟失

Redis 發布訂閱 命令
命令描述
Redis Unsubscribe 命令指退訂給定的頻道。
Redis Subscribe 命令訂閱給定的一個或多個頻道的信息。
Redis Pubsub 命令查看訂閱與發布系統狀態。
Redis Punsubscribe 命令退訂所有給定模式的頻道。
Redis Publish 命令將信息發送到指定的頻道。
Redis Psubscribe 命令訂閱一個或多個符合給定模式的頻道。

Spring boot 實現消息發布訂閱

1、引入 Redis 依賴

    <!--Spring Boot redis 啟動器--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2、Redis 數據庫配置

spring:data:redis:database: 0host: localhostport: 6379password:

發布消息

	/*** redis 將信息發送到指定的頻道* @param topic   :消息所屬的主題/頻道* @param context :消息內容* @return*/redisTemplate.convertAndSend(topic, context);
@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {private final RedisTemplate<String, Object> redisTemplate;// Redis 中的 key 前綴private static final String REDIS_KEY_PREFIX = "select_rate_limit:";// Redis 中的通道名稱private static final String REDIS_CHANNEL = "select_rate_limit_channel";// 根據用戶名 請求風控public boolean allowRequest(String username) {// 每分鐘最大請求次數Long MAX_REQUESTS_PER_MINUTE = 60L;String key = REDIS_KEY_PREFIX + username;Long currentRequests = redisTemplate.opsForValue().increment(key);if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {redisTemplate.convertAndSend(REDIS_CHANNEL, username);return false; // 超過閾值,拒絕請求}if (currentRequests != null && currentRequests == 1) {redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 設置過期時間為1分鐘}return true; // 允許請求}}

消息監聽

1、 Redis 消息訂閱-消息監聽器,當收到閱訂的消息時,會將消息交給這個類處理。

/*** Redis 消息訂閱-消息監聽器,當收到閱訂的消息時,會將消息交給這個類處理* <p>* 1、可以直接實現 MessageListener 接口,也可以繼承它的實現類 MessageListenerAdapter.* 2、自動多線程處理,打印日志即可看出,即使手動延遲,也不會影響后面消息的接收。**/
@Component
public class RequestRateLimitSubscriber implements MessageListener {// 直接從容器中獲取@Resourceprivate RedisTemplate<String, Object> redisTemplate;/*** 監聽到的消息必須進行與發送時相同的方式進行反序列* 1、訂閱端與發布端 Redis 序列化的方式必須相同,否則會亂碼。** @param message :消息實體* @param pattern :匹配模式*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 消息訂閱的匹配規則,如 new PatternTopic("basic-*") 中的 basic-*String msgPattern = new String(pattern);// 消息所屬的通道,可以根據不同的通道做不同的業務邏輯String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());// 接收的消息內容,可以根據自己需要強轉為自己需要的對象,但最好先使用 instanceof 判斷一下Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());log.info("收到 Redis 訂閱消息: channel={} body={} pattern={} ", channel, body, msgPattern);// 模擬數據處理 ********// 發送警告通知,可以通過郵件、短信等方式進行通知log.info("------------數據處理完成.......");}
}

主題訂閱

1、自定義 RedisTemplate 序列化方式(發布者和訂閱者必須相同)。

2、配置主題訂閱 - Redis 消息監聽器綁定監聽指定通道。

/*** 自定義 RedisTemplate 序列化方式* 配置主題訂閱 - Redis 消息監聽器綁定監聽指定通道*/
@Configuration
public class RedisConfig {// 自定義的消息訂閱監聽器,當收到閱訂的消息時,會將消息交給這個類處理@Resourceprivate RequestRateLimitSubscriber requestRateLimitSubscriber;//  自定義 RedisTemplate 序列化方式   @Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化規則redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化規則redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化規則redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化規則redisTemplate.setConnectionFactory(factory); //綁定 RedisConnectionFactoryreturn redisTemplate; //返回設置好的 RedisTemplate}/*** 配置主題訂閱* RedisMessageListenerContainer - Redis 消息監聽器綁定監聽指定通道* 1、可以添加多個監聽器,監聽多個通道,只需要將消息監聽器與訂閱的通道/主題綁定即可。* 2、訂閱的通道可以配置在全局配置文件中,也可以配置在數據庫中,* <p>* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):將消息監聽器與多個訂閱的通道/主題綁定* addMessageListener(MessageListener listener, Topic topic):將消息監聽器與訂閱的通道/主題綁定** @param connectionFactory* @return*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 設置連接工廠,RedisConnectionFactory 可以直接從容器中取,也可以從 RedisTemplate 中取container.setConnectionFactory(factory);// 訂閱名稱叫 select_rate_limit_channel 的通道, 類似 Redis 中的 subscribe 命令container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));// 訂閱名稱以 'basic-' 開頭的全部通道, 類似 Redis 的 pSubscribe 命令container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));return container;}
}

Spring boot 監聽 Key 過期事件

1、Redis 數據庫可以通過命令設置 Key 的有效時間,當一個 Key 過期后會自動從數據庫中刪除,釋放空間。得益于于這個特性,可以很輕松地實現諸多類似于 “Session” 管理、數據緩存等功能。它們都有一個共同點就是,數據不會永久保存!

2、在有些場景中,可能希望在某些 Key 過期的時候獲取到通知,進行一些業務處理。或者是干脆用于 “定時通知/任務” 功能,例如:下單 30 分鐘后未支付,則取消訂單。那么可以在用戶下單的時候使用訂單號作為 key 設置到 Redis 數據庫中,并且設置過期時間為 30 分鐘。當超時后,可以在 “key 過期通知” 中獲取到 key 也就是訂單號,判斷用戶是否已經支付從而是否取消訂單。

3、Redis 的 Key 過期通知功能本質上是通過 發布/訂閱 功能實現的,所以它「不能保證通知消息的交付」,當 Key 過期時如果服務器停機、重啟后則該通知消息會永久丟失。

消息監聽

1、Spring Data Redis 專門提供了一個密鑰過期事件消息偵聽器:KeyExpirationEventMessageListener,自定義監聽器類繼承它,然后覆寫 doHandleMessage(Message message) 方法即可。

2、doHandleMessage 方法用于處理 Redis Key 過期通知事件,其中 Message 參數表示通知消息,只有 2 屬性,分別表示消息正文(在這里就是過期的 Key 名稱)以及來自于哪個 channel。

3、在 Redis Key 過期事件中,「只能獲取到已過期的 Key 的名稱,不能獲取到值。」

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/*** Redis 緩存 Key 過期監聽器* Spring Data Redis 專門提供了一個密鑰過期事件消息偵聽器:KeyExpirationEventMessageListener,* 自定義監聽器類繼承它,然后覆寫 doHandleMessage(Message message) 方法即可。*/
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);/*** 通過構造函數注入 RedisMessageListenerContainer 給 KeyExpirationEventMessageListener** @param listenerContainer : Redis消息偵聽器容器*/public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** doHandleMessage 方法用于處理 Redis Key 過期通知事件,* 在 Redis Key 過期事件中,「只能獲取到已過期的 Key 的名稱,不能獲取到值。」** @param message:通知消息,只有 2 屬性,分別表示消息正文(在這里就是過期的 Key 名稱)以及來自于哪個 channel。*/@Overridepublic void doHandleMessage(Message message) {// 過期的 keyString key = new String(message.getBody());// 消息通道String channel = new String(message.getChannel());logger.info("過期key={} 消息通道(channel)={}", key, channel);}
}

主題訂閱

1、與上面稍微有點不同,因為 key 過期事件屬于 Redis 內部消息,內部頻道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手動設置監聽器 監聽指定的通道/頻道(topic 表達式)。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);//  container.setTaskExecutor(null);            // 設置用于執行監聽器方法的 Executor//  container.setErrorHandler(null);            // 設置監聽器方法執行過程中出現異常的處理器//  container.addMessageListener(null, null);   // 手動設置監聽器 & 監聽的 topic 表達式return container;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/213661.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/213661.shtml
英文地址,請注明出處:http://en.pswp.cn/news/213661.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

postgreSql邏輯復制常用語句匯總和說明

簡單說明 postgreSql邏輯復制的原理這里不再贅述&#xff0c;度娘一下即可。這里只是對常用的語句做一些匯總和說明&#xff0c;以便日后查找時方便。 邏輯復制的概念 邏輯復制整體上采用的是一個發布訂閱的模型&#xff0c;訂閱者可以訂閱一個或者多個發布者&#xff0c; 發…

全套的外貿出口業務流程,趕緊收藏起來吧

很多做外貿的小伙伴入行遇到的第一個問題就是對外貿業務流程的不熟悉&#xff0c;今天小易給大家整理了一份外貿業務全流程&#xff0c;從開發客戶到售后服務一整套流程&#xff0c;一起來看看吧&#xff01; 目前做外貿開發客戶的渠道一般有以下幾種&#xff1a; 1、自建站、外…

如何在 Windows 中恢復已刪除的 Excel 文件?– 8 個有效方法!

如何恢復已刪除的Excel文件&#xff1f;如果您不小心刪除了 Excel 文件或該文件已損壞&#xff0c;您無需擔心會丟失寶貴的數據。MiniTool 分區向導的這篇文章提供了 8 種有效的方法來幫助您恢復它們。 Microsoft Excel 是 Microsoft 為 Windows、macOS、Android、iOS 和 iPad…

【lesson4】數據類型之數值類型

文章目錄 數據分類數值類型tinyint類型有符號類型測試無符號類型測試 bit類型測試 float類型有符號測試無符號測試 decimal類型測試 數據分類 數值類型 tinyint類型 說明&#xff1a;tinyint 有符號能存儲的范圍是-128-127&#xff0c;無符號能存儲的范圍是0~255 有符號類型…

藍橋杯-動態規劃專題-子數組系列,雙指針

目錄 一、單詞拆分 二、環繞字符串中唯一的子字符串 雙指針-三數之和 ArrayList(Arrays.asList(array)) 四、四數之和&#xff08;思路和三數之和一樣&#xff0c;只是多了一層循環&#xff09; 一、單詞拆分 1.狀態表示 dp[i]:到達i位置結尾&#xff0c;能否被dict拆分 …

Terraform實戰(二)-terraform創建阿里云資源

1 初始化環境 1.1 創建初始文件夾 $ cd /data $ mkdir terraform $ mkdir aliyun terraform作為terraform的配置文件夾&#xff0c;內部的每一個.tf&#xff0c;.tfvars文件都會被加載。 1.2 配置provider 創建providers.tf文件&#xff0c;配置provider依賴。 provider…

想學編程,但不知道從哪里學起,應該怎么辦?

怎樣學習任何一種編程語言 我將教你怎樣學習任何一種你將來可能要學習的編程語言。本書的章節是基于我和很多程序員學習編程的經歷組織的&#xff0c;下面是我通常遵循的流程。 1&#xff0e;找到關于這種編程語言的書或介紹性讀物。 2&#xff0e;通讀這本書&#xff0c;把…

MYSQL數據類型詳解

MySQL支持多種數據類型&#xff0c;這些數據類型可以分為三大類&#xff1a;數值、日期和時間以及字符串&#xff08;字符&#xff09;類型。這些數據類型可以幫助我們根據需要選擇合適的類型來存儲數據。選擇合適的數據類型對于確保數據的完整性和性能至關重要。 以下…

RHEL8_Linux用rpm管理軟件

本章主要介紹使用rpm對軟件包進行管理 使用rpm查詢軟件的信息使用rpm安裝及卸載軟件使用rpm對軟件進行更新使用rpm對軟件進行驗證 rpm 全稱是redhat package manager&#xff0c;后來改成rpm package manager&#xff0c;這是根據源碼包編譯出來的包。先從光盤中拷貝一個包&am…

基于Java Swing泡泡龍游戲(Java畢業設計)

大家好&#xff0c;我是DeBug&#xff0c;很高興你能來閱讀&#xff01;作為一名熱愛編程的程序員&#xff0c;我希望通過這些教學筆記與大家分享我的編程經驗和知識。在這里&#xff0c;我將會結合實際項目經驗&#xff0c;分享編程技巧、最佳實踐以及解決問題的方法。無論你是…

AP9111手電筒專用集成電路芯片 單節干電池 LED手電筒IC

概述 AP9111 是 LED 手電筒專用集成電路芯片 &#xff0c;是一款采用大規模集成電路技術&#xff0c;專門針對單節干電池的 LED 手電筒設計的一款專用集成電路。外加 1 個電感元件&#xff0c;即可構成 LED 手電筒驅動電路板。AP 9111 性能優越、可靠性高、使用簡單、生產一致…

六級高頻詞匯3

目錄 單詞 參考鏈接 單詞 400. nonsense n. 胡說&#xff0c;冒失的行動 401. nuclear a. 核子的&#xff0c;核能的 402. nucleus n. 核 403. retail n. /v. /ad. 零售 404. retain vt. 保留&#xff0c;保持 405. restrict vt. 限制&#xff0c;約束 406. sponsor n. …

聊個開心的敏捷話題——40小時工作制

近年來&#xff0c;加班現象在很多行業已經普遍制度化&#xff0c;甚至“996”已成為一些行業標簽。企業高強度的壓榨讓員工不堪重負&#xff0c;且時常由此引發的各種悲劇也并不鮮見。 所以&#xff0c;今天我們一起來聊一個開心輕松的話題——極限編程的40h工作制原則。 40…

Leetcode(一)兩數之和

兩數之和 暴力 雙層循環 兩兩相加 等于目標值 返回 即可 class Solution {public int[] twoSum(int[] nums, int target) {for(int i0;i<nums.length;i){for(int j0;j<nums.length;j){if(nums[i]nums[j]target && i!j){int[] a{i,j};return a;}}}return null;…

kafka主題分區副本集群的概念

Kafka是一個高性能、分布式的消息系統&#xff0c;用于處理大規模的實時數據流。為了更好地理解Kafka的原理和使用&#xff0c;以下是Kafka中幾個重要概念的解釋&#xff1a; 主題&#xff08;Topic&#xff09;: Kafka中的最基本概念&#xff0c;相當于一個數據流或者消息流的…

【環境搭建】ubuntu22安裝ros2

基于某種特殊需求&#xff0c;從Ubuntu16到22目前都嘗試過安裝ros、ros2 參考1&#xff1a;http://t.csdnimg.cn/DzvSe 參考2&#xff1a;http://t.csdnimg.cn/sOzr1 1.設置locale sudo apt update && sudo apt install locales sudo locale-gen en_US en_US.UTF-8 s…

SQL注入漏洞檢測

預計更新SQL注入概述 1.1 SQL注入攻擊概述 1.2 SQL注入漏洞分類 1.3 SQL注入攻擊的危害 SQLMap介紹 2.1 SQLMap簡介 2.2 SQLMap安裝與配置 2.3 SQLMap基本用法 SQLMap進階使用 3.1 SQLMap高級用法 3.2 SQLMap配置文件詳解 3.3 SQLMap插件的使用 SQL注入漏洞檢測 4.1 SQL注入…

Spring的IOC容器初始化流程

Spring的IOC容器初始化流程 IOC容器初始化在SpringApplication對象創建完畢執行run方法時執行refreshContext()時開始。 準備BeanFactory&#xff0c;設置其類加載器和environment等 執行BeanFactory后置處理器&#xff0c;掃描要放入容器的Bean信息&#xff0c;得到對應的Bea…

計算機網絡常見的縮寫

計算機網絡常見縮寫 通訊控制處理機&#xff08;Communication Control Processor&#xff09;CCP 前端處理機&#xff08;Front End Processor&#xff09;FEP 開放系統互連參考模型 OSI/RM 開放數據庫連接&#xff08;Open Database Connectivity&#xff09;ODBC 網絡操作系…

阿里云服務器租用價格分享,阿里云服務器熱門配置最新活動價格匯總

在我們購買阿里云服務器的時候&#xff0c;1核2G、2核2G、2核4G、2核8G、4核8G、8核16G、8核32G等配置屬于用戶購買最多的熱門配置&#xff0c;1核2G、2核2G、2核4G這些配置低一點的云服務器基本上能夠滿足絕大部分個人建站和普通企業用戶建站需求&#xff0c;而4核8G、8核16G、…