Redis的發布訂閱(Pub/Sub)是一種基于消息多播的通信機制,它允許消息的**發布者(Publisher)向特定頻道發送消息,而訂閱者(Subscriber)**通過訂閱頻道或模式來接收消息。
其核心特點如下:
-
輕量級:無需額外組件,直接通過Redis服務實現
-
實時性:消息即時推送,無輪詢延遲
-
廣播模式:一個消息可被多個訂閱者同時接收
-
無狀態性:不存儲歷史消息,訂閱者只能接收訂閱后的消息
發布訂閱命令的使用
有關發布訂閱的命令可以通過help @pubsub
命令來查看。有關命令的使用可以通過help 命令
來查看,例如help publish
。
基礎命令速查表
命令 | 作用 | 示例 |
---|---|---|
SUBSCRIBE | 訂閱一個或多個頻道 | SUBSCRIBE news sports |
PSUBSCRIBE | 使用模式匹配訂閱頻道 | PSUBSCRIBE sensor.* |
PUBLISH | 向指定頻道發送消息 | PUBLISH news "Hello" |
UNSUBSCRIBE | 退訂指定頻道 | UNSUBSCRIBE news |
PUNSUBSCRIBE | 退訂模式訂閱 | PUNSUBSCRIBE sensor.* |
PUBSUB CHANNELS | 查看活躍頻道列表 | PUBSUB CHANNELS "sensor.*" |
操作示例
# 訂閱者A(終端1)
127.0.0.1:6379> subscribe notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notifications"
3) (integer) 1# 訂閱者B(終端2)
127.0.0.1:6379> psubscribe system.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "system.*"
3) (integer) 1# 發布消息(終端3)
127.0.0.1:6379> publish notifications "Service will be upgraded soon"
(integer) 1127.0.0.1:6379> publish system.alert "CPU usage exceeds 90%"
(integer) 1# 訂閱者A收到:
1) "message"
2) "notifications"
3) "Service will be upgraded soon"# 訂閱者B收到:
1) "pmessage"
2) "system.*"
3) "system.alert"
4) "CPU usage exceeds 90%"
發布訂閱的使用場景與優缺點
適用場景
-
實時通知系統:用戶在線狀態更新,即時聊天消息推送
-
事件驅動架構:緩存失效廣播,分布式配置更新
-
輕量級監控:服務器狀態報警,業務指標異常通知
優點
-
極低延遲(平均<1ms)
-
支持百萬級TPS消息吞吐
-
模式匹配訂閱實現靈活路由
-
零外部依賴(僅需Redis服務)
缺點
消息不可靠性:不保證送達,離線訂閱者會丟失消息
無持久化機制:重啟后所有訂閱關系丟失
客戶端阻塞:訂閱操作會占用連接線程(需異步處理)
替代方案建議:需要可靠消息時,使用Redis Streams(支持消息持久化、消費者組)或RabbitMQ/Kafka
在Java中使用RedisTemplate實現
配置RedisTemplate
package com.morris.redis.demo.pubsub;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** 對redis的鍵值進行序列化*/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用 String 序列化 keytemplate.setKeySerializer(new StringRedisSerializer());// 使用 JSON 序列化 value(需要額外依賴 jackson)template.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 對于 Hash 結構同理template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}
}
實現消息發布者
package com.morris.redis.demo.pubsub;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** 消息發布者*/
@Service
public class MessagePublisher {@Resourceprivate RedisTemplate<String, Object> redisTemplate;public void sendNotification(String channel, String message) {redisTemplate.convertAndSend(channel, message);}
}
實現消息訂閱者
package com.morris.redis.demo.pubsub;import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 消息訂閱者*/
@Component
public class MessageSubscriber implements MessageListener {@Resourceprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String body = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());System.out.printf("收到頻道[%s]的消息: %s\n", channel, body);}
}
配置訂閱監聽
package com.morris.redis.demo.pubsub;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** 配置redis消息訂閱監聽器*/
@Configuration
@Slf4j
public class RedisPubSubConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageSubscriber messageSubscriber) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);// 訂閱具體頻道container.addMessageListener(messageSubscriber, new ChannelTopic("notifications"));// 訂閱模式匹配container.addMessageListener(messageSubscriber, new PatternTopic("system.*"));// 異常處理container.setErrorHandler((e) -> {log.error("[listen message] error ", e);});return container;}
}
使用示例
package com.morris.redis.demo.pubsub;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** 使用接口發布消息*/
@RestController
@RequestMapping("/pubsub")
public class PubSubDemoController {@Resourceprivate MessagePublisher publisher;// 發布告警@GetMapping("/alert")public String sendAlert(@RequestParam String message) {publisher.sendNotification("system.alert", message);return "警報已發送";}// 發布通知@GetMapping("/notify")public String sendNotify(@RequestParam String message) {publisher.sendNotification("notifications", message);return "通知已發送";}
}