參考
Redis隊列詳解(springboot實戰)_redis 隊列-CSDN博客
前言
MQ消息隊列有很多種,比如RabbitMQ,RocketMQ,Kafka等,但是也可以基于redis來實現,可以降低系統的維護成本和實現復雜度,本篇介紹redis中實現消息隊列的幾種方案,并通過springboot實戰使其更易懂。
1. 基于List的 LPUSH+BRPOP 的實現
2. PUB/SUB,訂閱/發布模式
3. 基于Stream類型的實現
1、基于List的的實現
原理
使用rpush和lpush操作入隊列,lpop和rpop操作出隊列。
List支持多個生產者和消費者并發進出消息,每個消費者拿到都是不同的列表元素。
優點
一旦數據到來則立刻醒過來,消息延遲幾乎為零。
缺點
-
不能重復消費,一旦消費就會被刪除
-
不能做廣播模式 , 不支持分組消費
-
lpop和rpop會一直空輪訓,消耗資源 ,但可以 引入阻塞讀blpop和brpop 同時也有新的問題 如果線程一直阻塞在那里,Redis客戶端的連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用,這個時候blpop和brpop或拋出異常
代碼
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId></dependency>
配置文件
server:port: ${SERVER_PORT:9210}# Spring
spring:application:# 應用名稱name: ruoyi-redis-messageredis:host: localhostport: 6379password: 123456
啟動類
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class RuoYiRedisMessageApplication
{public static void main(String[] args){SpringApplication.run(RuoYiRedisMessageApplication.class, args);System.out.println("(????)ノ゙ ruoyi-redis-message啟動成功");}
}
添加redis配置類
/*** redis配置*/
@Configuration
public class RedisConfig {private static final RedisSerializer<Object> SERIALIZER = createSerializer();@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 創建 RedisTemplate 對象RedisTemplate<String, Object> template = new RedisTemplate<>();// 設置 RedisConnection 工廠。😈 它就是實現多種 Java Redis 客戶端接入的秘密工廠。感興趣的胖友,可以自己去擼下。template.setConnectionFactory(factory);// 使用 String 序列化方式,序列化 KEY 。template.setKeySerializer(RedisSerializer.string());template.setHashKeySerializer(RedisSerializer.string());// 使用 JSON 序列化方式(庫是 Jackson ),序列化 VALUE 。template.setValueSerializer(SERIALIZER);template.setHashValueSerializer(SERIALIZER);return template;}private static RedisSerializer<Object> createSerializer() {ObjectMapper mapper = new ObjectMapper();mapper.registerModules(new JavaTimeModule());// 此項必須配置,否則會報java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXXmapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);return new GenericJackson2JsonRedisSerializer(mapper);}}
隊列方法
@Slf4j
@Service
public class ListRedisQueue {//隊列名public static final String KEY = "listQueue";@Resourceprivate RedisTemplate redisTemplate;public void produce(String message) {redisTemplate.opsForList().rightPush(KEY, message);}public void consume() {while (true) {String msg = (String) redisTemplate.opsForList().leftPop(KEY);log.info("瘋狂獲取消息:" + msg);}}public void blockingConsume() {while (true) {List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {//隊列沒有元素會阻塞操作,直到隊列獲取新的元素或超時,5表示如果沒元素就每五秒去拿一次消息return connection.bRPop(5, KEY.getBytes());}}, new StringRedisSerializer());for (Object str : obj) {log.info("blockingConsume獲取消息 : {}", str);}}}}
測試
lPop/rPop消費數據
@Slf4j
@SpringBootTest
public class ListRedisTest {@Autowiredprivate ListRedisQueue listRedisQueue;@Testpublic void produce() {for (int i = 0; i < 5; i++) {listRedisQueue.produce("第"+i + "個數據");}}@Testpublic void consume() {produce();log.info("生產消息完畢");listRedisQueue.consume();}}
blpop / brpop 消費數據
@Testpublic void blockingConsume() {produce();log.info("生產消息完畢");listRedisQueue.blockingConsume();}
2、PUB/SUB,訂閱/發布模式
原理
SUBSCRIBE,用于訂閱信道
PUBLISH,向信道發送消息
UNSUBSCRIBE,取消訂閱
此模式允許生產者只生產一次消息,由中間件負責將消息復制到多個消息隊列,每個消息隊列由對應的消費組消費。
優點
-
一個消息可以發布到多個消費者
-
消費者可以同時訂閱多個信道,因此可以接收多種消息(處理時先根據信道判斷)
-
消息即時發送,消費者會自動接收到信道發布的消息
缺點
-
消息發布時,如果客戶端不在線,則消息丟失
-
消費者處理消息時出現了大量消息積壓,則可能會斷開通道,導致消息丟失
-
消費者接收消息的時間不一定是一致的,可能會有差異(業務處理需要判重)
代碼
配置消息監聽器
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 消息處理** @param message* @param pattern*/@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(pattern);log.info("onMessage --> 消息通道是:{}", channel);RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();Object deserialize = valueSerializer.deserialize(message.getBody());log.info("反序列化的結果:{}", deserialize);if (deserialize == null) return;String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));log.info("計算得到的key: {}", md5DigestAsHex);Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS);if (Boolean.TRUE.equals(result)) {// redis消息進行處理log.info("接收的結果:{}", deserialize);} else {log.info("其他服務處理中");}}
}
實現MessageListener 接口,就可以通過onMessage()方法接收到消息了,該方法有兩個參數:
-
參數 message 的 getBody() 方法以二進制形式獲取消息體, getChannel() 以二進制形式獲取消息通道
-
參數 pattern 二進制形式的消息通道(實際和 message.getChannel() 返回值相同)
綁定監聽器
@Configuration
public class RedisMessageListenerConfig {@Beanpublic RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,RedisMessageListener redisMessageListener) {RedisMessageListenerContainer messageListenerContainer = new RedisMessageListenerContainer();messageListenerContainer.setConnectionFactory(redisConnectionFactory);messageListenerContainer.addMessageListener(redisMessageListener, new ChannelTopic(PubSubRedisQueue.KEY));messageListenerContainer.addMessageListener(redisMessageListener, new ChannelTopic(PubSubRedisQueue.KEY2));return messageListenerContainer;}
}
RedisMessageListenerContainer 是為Redis消息偵聽器 MessageListener 提供異步行為的容器。處理偵聽、轉換和消息分派的低級別詳細信息。
本文使用的是主題訂閱:ChannelTopic,你也可以使用模式匹配:PatternTopic,從而匹配多個信道。
這里我們同一個監聽器訂閱了兩個信道
生產者
@Service
public class PubSubRedisQueue {//隊列名public static final String KEY = "pub_sub_queue";public static final String KEY2 = "pub_sub_queue2";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void produce(String message) {redisTemplate.convertAndSend(KEY, message);}public void produce2(String message) {redisTemplate.convertAndSend(KEY2, message);}
}
測試
@Slf4j
@RestController
@RequestMapping(value = "/pubSubRedis")
@Api(tags = "pubSubRedis測試")
public class PubSubRedisController {@Autowiredprivate PubSubRedisQueue pubSubRedisQueue;@GetMapping(value = "/pubsub/produce")@ApiOperation(value = "測試")public void produce(@RequestParam(name = "msg") String msg) {pubSubRedisQueue.produce(msg);}@GetMapping(value = "/pubsub/produce2")@ApiOperation(value = "測試2")public void produce2(@RequestParam(name = "msg") String msg) {pubSubRedisQueue.produce2(msg);}
}
可以看到監聽器成功監聽了兩個信道的信息
3、基于Stream類型的實現(Redis Version5.0)
原理
Stream為redis 5.0后新增的數據結構。支持多播的可持久化消息隊列,實現借鑒了Kafka設計。
Redis Stream的結構如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用xadd指令追加消息時自動創建。
每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。
每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內唯一名稱。
消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量在Redis官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。
優點
- 高性能:可以在非常短的時間內處理大量的消息。
- 持久化:支持數據持久化,即使Redis服務器宕機,也可以恢復之前的消息。
- 順序性:保證消息的順序性,即使是并發的消息也會按照發送順序排列。
- 靈活性:可以方便地擴展和分布式部署,可以滿足不同場景下的需求。
缺點
- 功能相對簡單:Redis Stream相對于其他的消息隊列,功能相對簡單,無法滿足一些復雜的需求。
- 不支持消息回溯:即消費者無法獲取之前已經消費過的消息。
- 不支持多消費者分組:無法實現多個消費者并發消費消息的功能。
代碼
自動ack消費者
@Slf4j
@Component
public class AutoAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {//分組名public static final String GROUP = "auto_ack_stream";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {String stream = message.getStream();RecordId id = message.getId();Map<String, String> map = message.getValue();log.info("[自動ACK]接收到一個消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);redisTemplate.opsForStream().delete(GROUP, id.getValue());}
}
手動ack消費者
@Slf4j
@Component
public class BasicAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {//分組名public static final String GROUP = "basic_ack_stream";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {String stream = message.getStream();RecordId id = message.getId();Map<String, String> map = message.getValue();log.info("[手動ACK]接收到一個消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);redisTemplate.opsForStream().acknowledge(stream, GROUP, id.getValue());//消費完畢刪除該條消息redisTemplate.opsForStream().delete(GROUP, id.getValue());}
}
配置綁定關系
@Slf4j
@Configuration
public class RedisStreamConfiguration {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Autowiredprivate AutoAckStreamConsumeListener autoAckStreamConsumeListener;@Autowiredprivate BasicAckStreamConsumeListener basicAckStreamConsumeListener;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {AtomicInteger index = new AtomicInteger(1);int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("async-stream-consumer-" + index.getAndIncrement());thread.setDaemon(true);return thread;});StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多獲取多少條消息.batchSize(3)// 運行 Stream 的 poll task.executor(executor)// Stream 中沒有消息時,阻塞多長時間,需要比 `spring.redis.timeout` 的時間小.pollTimeout(Duration.ofSeconds(3))// 獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發生了異常的處理.errorHandler(throwable -> log.info("出現異常就來這里了" + throwable)).build();StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =StreamMessageListenerContainer.create(redisConnectionFactory, options);// 獨立消費// 消費組A,自動ack// 從消費組中沒有分配給消費者的消息開始消費if (!isStreamGroupExists(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP)){redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP);}streamMessageListenerContainer.receiveAutoAck(Consumer.from(AutoAckStreamConsumeListener.GROUP, "AutoAckConsumer"),StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), autoAckStreamConsumeListener);// 消費組B,不自動ackif (!isStreamGroupExists(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP)){redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP);}streamMessageListenerContainer.receive(Consumer.from(BasicAckStreamConsumeListener.GROUP, "BasicAckConsumer"),StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), basicAckStreamConsumeListener);return streamMessageListenerContainer;}/*** 判斷該消費組是否存在* @param streamKey* @param groupName* @return*/public boolean isStreamGroupExists(String streamKey, String groupName) {RedisStreamCommands commands = redisConnectionFactory.getConnection().streamCommands();//首先檢查Stream Key是否存在,否則下面代碼可能會因為嘗試檢查不存在的Stream Key而導致異常if (Boolean.FALSE.equals(redisTemplate.hasKey(streamKey))){return false;}//獲取streamKey下的所有groupsStreamInfo.XInfoGroups xInfoGroups = commands.xInfoGroups(streamKey.getBytes());AtomicBoolean exists= new AtomicBoolean(false);assert xInfoGroups != null;xInfoGroups.forEach(xInfoGroup -> {if (xInfoGroup.groupName().equals(groupName)){exists.set(true);}});return exists.get();}
}
生產者
@Slf4j
@Service
public class StreamRedisQueue {//隊列名public static final String KEY = "stream_queue";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public String produce(Map<String, String> value) {return Objects.requireNonNull(redisTemplate.opsForStream().add(KEY, value)).getValue();}public void createGroup(String key, String group){redisTemplate.opsForStream().createGroup(key, group);}}
測試
生產消息
@Slf4j
@RestController
@RequestMapping(value = "/streamRedis")
@Api(tags = "streamRedis測試")
public class StreamRedisController {@Autowiredprivate StreamRedisQueue streamRedisQueue;@GetMapping(value = "/stream/produce")@ApiOperation(value = "測試")public void streamProduce() {Map<String, String> map = new HashMap<>();map.put("劉德華", "大家好我是劉德華");map.put("周杰倫", "周杰倫");map.put("time", DateUtil.now());String result = streamRedisQueue.produce(map);log.info("返回結果:{}", result);}
}