Spring boot 使用Redis 消息發布訂閱
文章目錄
- Spring boot 使用Redis 消息發布訂閱
- Redis 消息發布訂閱
- Redis 發布訂閱 命令
- Spring boot 實現消息發布訂閱
- 發布消息
- 消息監聽
- 主題訂閱
- Spring boot 監聽 Key 過期事件
- 消息監聽
- 主題訂閱
最近在做請求風控的時候,在網上搜集了大量的解決方案,最后使用Redis 消息發布訂閱 比較符合業務。做一下記錄
Redis 消息發布訂閱
Redis 發布訂閱 命令:redis命令手冊
1、Redis 中"pub/sub"的消息,為"即發即失",server 不會保存消息,如果 publish 的消息沒有任何 client 處于 “subscribe” 狀態,消息將會被丟棄;如果 client 在 subcribe 時,鏈接斷開后重連,那在么此期間的消息也將丟失。
2、Redis server 將會"盡力"將消息發送給處于 subscribe 狀態的 client,但是仍不會保證每條消息都能被正確接收。
**優點:**支持發布訂閱,支持多組生產者、消費者處理消息
缺點:
-
消費者下線數據會丟失
-
不支持數據持久化,Redis宕機則數據也會丟失
-
消息堆積,緩存區溢出,消費者會被強制踢下線,數據也會丟失
Redis 發布訂閱 命令
命令 | 描述 |
---|---|
Redis Unsubscribe 命令 | 指退訂給定的頻道。 |
Redis Subscribe 命令 | 訂閱給定的一個或多個頻道的信息。 |
Redis Pubsub 命令 | 查看訂閱與發布系統狀態。 |
Redis Punsubscribe 命令 | 退訂所有給定模式的頻道。 |
Redis Publish 命令 | 將信息發送到指定的頻道。 |
Redis Psubscribe 命令 | 訂閱一個或多個符合給定模式的頻道。 |
Spring boot 實現消息發布訂閱
1、引入 Redis 依賴
<!--Spring Boot redis 啟動器--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2、Redis 數據庫配置
spring:data:redis:database: 0host: localhostport: 6379password:
發布消息
/*** redis 將信息發送到指定的頻道* @param topic :消息所屬的主題/頻道* @param context :消息內容* @return*/redisTemplate.convertAndSend(topic, context);
@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {private final RedisTemplate<String, Object> redisTemplate;// Redis 中的 key 前綴private static final String REDIS_KEY_PREFIX = "select_rate_limit:";// Redis 中的通道名稱private static final String REDIS_CHANNEL = "select_rate_limit_channel";// 根據用戶名 請求風控public boolean allowRequest(String username) {// 每分鐘最大請求次數Long MAX_REQUESTS_PER_MINUTE = 60L;String key = REDIS_KEY_PREFIX + username;Long currentRequests = redisTemplate.opsForValue().increment(key);if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {redisTemplate.convertAndSend(REDIS_CHANNEL, username);return false; // 超過閾值,拒絕請求}if (currentRequests != null && currentRequests == 1) {redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 設置過期時間為1分鐘}return true; // 允許請求}}
消息監聽
1、 Redis 消息訂閱-消息監聽器,當收到閱訂的消息時,會將消息交給這個類處理。
/*** Redis 消息訂閱-消息監聽器,當收到閱訂的消息時,會將消息交給這個類處理* <p>* 1、可以直接實現 MessageListener 接口,也可以繼承它的實現類 MessageListenerAdapter.* 2、自動多線程處理,打印日志即可看出,即使手動延遲,也不會影響后面消息的接收。**/
@Component
public class RequestRateLimitSubscriber implements MessageListener {// 直接從容器中獲取@Resourceprivate RedisTemplate<String, Object> redisTemplate;/*** 監聽到的消息必須進行與發送時相同的方式進行反序列* 1、訂閱端與發布端 Redis 序列化的方式必須相同,否則會亂碼。** @param message :消息實體* @param pattern :匹配模式*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 消息訂閱的匹配規則,如 new PatternTopic("basic-*") 中的 basic-*String msgPattern = new String(pattern);// 消息所屬的通道,可以根據不同的通道做不同的業務邏輯String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());// 接收的消息內容,可以根據自己需要強轉為自己需要的對象,但最好先使用 instanceof 判斷一下Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());log.info("收到 Redis 訂閱消息: channel={} body={} pattern={} ", channel, body, msgPattern);// 模擬數據處理 ********// 發送警告通知,可以通過郵件、短信等方式進行通知log.info("------------數據處理完成.......");}
}
主題訂閱
1、自定義 RedisTemplate 序列化方式(發布者和訂閱者必須相同)。
2、配置主題訂閱 - Redis 消息監聽器綁定監聽指定通道。
/*** 自定義 RedisTemplate 序列化方式* 配置主題訂閱 - Redis 消息監聽器綁定監聽指定通道*/
@Configuration
public class RedisConfig {// 自定義的消息訂閱監聽器,當收到閱訂的消息時,會將消息交給這個類處理@Resourceprivate RequestRateLimitSubscriber requestRateLimitSubscriber;// 自定義 RedisTemplate 序列化方式 @Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化規則redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化規則redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化規則redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化規則redisTemplate.setConnectionFactory(factory); //綁定 RedisConnectionFactoryreturn redisTemplate; //返回設置好的 RedisTemplate}/*** 配置主題訂閱* RedisMessageListenerContainer - Redis 消息監聽器綁定監聽指定通道* 1、可以添加多個監聽器,監聽多個通道,只需要將消息監聽器與訂閱的通道/主題綁定即可。* 2、訂閱的通道可以配置在全局配置文件中,也可以配置在數據庫中,* <p>* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):將消息監聽器與多個訂閱的通道/主題綁定* addMessageListener(MessageListener listener, Topic topic):將消息監聽器與訂閱的通道/主題綁定** @param connectionFactory* @return*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 設置連接工廠,RedisConnectionFactory 可以直接從容器中取,也可以從 RedisTemplate 中取container.setConnectionFactory(factory);// 訂閱名稱叫 select_rate_limit_channel 的通道, 類似 Redis 中的 subscribe 命令container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));// 訂閱名稱以 'basic-' 開頭的全部通道, 類似 Redis 的 pSubscribe 命令container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));return container;}
}
Spring boot 監聽 Key 過期事件
1、Redis 數據庫可以通過命令設置 Key 的有效時間,當一個 Key 過期后會自動從數據庫中刪除,釋放空間。得益于于這個特性,可以很輕松地實現諸多類似于 “Session” 管理、數據緩存等功能。它們都有一個共同點就是,數據不會永久保存!
2、在有些場景中,可能希望在某些 Key 過期的時候獲取到通知,進行一些業務處理。或者是干脆用于 “定時通知/任務” 功能,例如:下單 30 分鐘后未支付,則取消訂單。那么可以在用戶下單的時候使用訂單號作為 key 設置到 Redis 數據庫中,并且設置過期時間為 30 分鐘。當超時后,可以在 “key 過期通知” 中獲取到 key 也就是訂單號,判斷用戶是否已經支付從而是否取消訂單。
3、Redis 的 Key 過期通知功能本質上是通過 發布/訂閱 功能實現的,所以它「不能保證通知消息的交付」,當 Key 過期時如果服務器停機、重啟后則該通知消息會永久丟失。
消息監聽
1、Spring Data Redis 專門提供了一個密鑰過期事件消息偵聽器:KeyExpirationEventMessageListener,自定義監聽器類繼承它,然后覆寫 doHandleMessage(Message message) 方法即可。
2、doHandleMessage 方法用于處理 Redis Key 過期通知事件,其中 Message 參數表示通知消息,只有 2 屬性,分別表示消息正文(在這里就是過期的 Key 名稱)以及來自于哪個 channel。
3、在 Redis Key 過期事件中,「只能獲取到已過期的 Key 的名稱,不能獲取到值。」
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/*** Redis 緩存 Key 過期監聽器* Spring Data Redis 專門提供了一個密鑰過期事件消息偵聽器:KeyExpirationEventMessageListener,* 自定義監聽器類繼承它,然后覆寫 doHandleMessage(Message message) 方法即可。*/
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);/*** 通過構造函數注入 RedisMessageListenerContainer 給 KeyExpirationEventMessageListener** @param listenerContainer : Redis消息偵聽器容器*/public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** doHandleMessage 方法用于處理 Redis Key 過期通知事件,* 在 Redis Key 過期事件中,「只能獲取到已過期的 Key 的名稱,不能獲取到值。」** @param message:通知消息,只有 2 屬性,分別表示消息正文(在這里就是過期的 Key 名稱)以及來自于哪個 channel。*/@Overridepublic void doHandleMessage(Message message) {// 過期的 keyString key = new String(message.getBody());// 消息通道String channel = new String(message.getChannel());logger.info("過期key={} 消息通道(channel)={}", key, channel);}
}
主題訂閱
1、與上面稍微有點不同,因為 key 過期事件屬于 Redis 內部消息,內部頻道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手動設置監聽器 監聽指定的通道/頻道(topic 表達式)。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);// container.setTaskExecutor(null); // 設置用于執行監聽器方法的 Executor// container.setErrorHandler(null); // 設置監聽器方法執行過程中出現異常的處理器// container.addMessageListener(null, null); // 手動設置監聽器 & 監聽的 topic 表達式return container;}
}