文章目錄
- Redis消息隊列實現異步秒殺
- 1. jvm阻塞隊列問題
- 2. 什么是消息隊列
- 3. Redis實現消息隊列
- 1. 基于List結構模擬消息隊列
- 操作
- 優缺點
- 2. 基于PubSub發布訂閱的消息隊列
- 操作
- 優缺點
- spring 結合redis的pubsub使用示例
- 1. 引入依賴
- 2. 配置文件
- 3. RedisConfig
- 4. CustomizeMessageListener
- 5. RedisMessageReceiver
- 6. 監聽原理簡析
- 7. 監聽redis的key
- 修改redis.conf
- KeyspaceEventMessageListener
- KeyExpirationEventMessageListener
- 修改RedisConfig
- 3. 基于Stream的消息隊列
- 1. 單消費者
- xadd
- xread
- 操作示例
- XREAD命令特點
- 2. 消費者組
- 特點
- 要點
- 創建消費者組
- 從消費者組讀取消息
- ==圖示操作過程==
- 消費者監聽消息的基本思路
- XREADGROUP命令特點
Redis消息隊列實現異步秒殺
1. jvm阻塞隊列問題
java使用阻塞隊列實現異步秒殺存在問題:
- jvm內存限制問題:jvm內存不是無限的,在高并發的情況下,當有大量的訂單需要創建時,就有可能超出jvm阻塞隊列的上限。
- 數據安全問題:jvm的內存沒有持久化機制,當服務重啟或宕機時,阻塞隊列中的訂單都會丟失。或者,當我們從阻塞隊列中拿到訂單任務,但是尚未處理時,如果此時發生了異常,這個訂單任務就沒有機會處理了,也就丟失了。
2. 什么是消息隊列
消息隊列(Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:
- 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
- 生產者:發送消息到消息隊列
- 消費者:從消息隊列獲取消息并處理消息
(正常下單,我們需要將訂單消息寫入數據庫。但由于秒殺并發訪問量大,數據庫本身并發處理能力不強,因此,在處理秒殺業務時,可以將部分業務在生產者這邊做校驗,然后將消息寫入消息隊列,而消費者處理該消息隊列中的消息,從而實現雙方解耦,更快的處理秒殺業務)
3. Redis實現消息隊列
我們可以使用一些現成的mq,比如kafka,rabbitmq等等,但是呢,如果沒有安裝mq,我們也可以直接使用redis提供的mq方案,降低我們的部署和學習成本。Redis提供了三種不同的方式來實現消息隊列:
- list結構:基于List結構模擬消息隊列
- PubSub:基本的點對點消息模型
- Stream:比較完善的消息隊列模型
1. 基于List結構模擬消息隊列
消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數據結構是一個雙向鏈表,很容易模擬出隊列效果。
隊列是入口和出口不在一邊,因此我們可以利用:LPUSH 結合 RPOP、或者 RPUSH 結合 LPOP來實現。
不過要注意的是,當隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應該使用BRPOP或者BLPOP來實現阻塞效果。
操作
命令介紹如下
優缺點
優點:
- 利用Redis存儲,不受限于JVM內存上限
- 基于Redis的持久化機制,數據安全性有保證
- 可以滿足消息有序性
缺點:
- 無法避免消息丟失(如果消費者獲取消息后,然后立馬就宕機了,這個消息就得不到處理,等同于丟失了)
- 只支持單消費者(1個消息只能被1個消費者取走,其它消費者會收不到此消息)
2. 基于PubSub發布訂閱的消息隊列
PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。
- SUBSCRIBE channel [channel] :訂閱一個或多個頻道
- PUBLISH channel msg :向一個頻道發送消息
- PSUBSCRIBE pattern [pattern] :訂閱與pattern格式匹配的所有頻道
- ?匹配1個字符:
h?llo
subscribes tohello
,hallo
andhxllo
- *匹配0個或多個字符:
h*llo
subscribes tohllo
andheeeello
- []指定字符:
h[ae]llo
subscribes tohello
andhallo,
but nothillo
- ?匹配1個字符:
操作
優缺點
優點:
- 采用發布訂閱模型,支持多生產、多消費
缺點:
- 不支持數據持久化(如果發送消息時,這個消息的頻道沒有被任何人訂閱,那這個消息就丟失了,也消息就是不會被保存)
- 無法避免消息丟失(發完了,沒人收,直接就丟了)
- 消息堆積有上限,超出時數據丟失(當我們發送消息時,如果有消費者在監聽,消費者會有1個緩存區去緩存這個消息數據,如果消費者處理的慢,那么客戶端的緩存區中的消息會不斷堆積,而這個緩存區是有大小限制的,如果超出了就會丟失)
spring 結合redis的pubsub使用示例
1. 引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zzhua</groupId><artifactId>demo-redis-pubsub</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 如果使用lettuce-core作為連接redis的實現, 不引入此依賴會報錯: Caused by: java.lang.ClassNotFoundException:org.apache.commons.pool2.impl.GenericObjectPoolConfig --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2. 配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0password:lettuce:pool:min-idle: 2max-active: 8max-idle: 8
3. RedisConfig
spring-data-redis提供了2種處理redis消息的方法:
-
自己實現MessageListener接口
public interface MessageListener {// 處理消息的方法// 第1個參數封裝了: 消息發布到哪1個具體頻道 和 消息的內容// 第2個參數封裝了: // 1. 如果當前是通過普通模式去訂閱的頻道, 那么收到消息時該pattern就是消息發送的具體頻道// 2. 如果當前是通過pattern通配符匹配去訂閱的頻道, 那么收到消息時, 該pattern就是訂閱的頻道void onMessage(Message message, @Nullable byte[] pattern); }
-
指定MessageListenerAdapter適配器,該適配器指定特定對象的特定方法來處理消息(對特定的方法有參數方面的要求)
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 監聽order.q通道(不帶通配符匹配channel)container.addMessageListener(customizeMessageListener, new ChannelTopic("order.q"));// 監聽order.*通道(帶通配符匹配channel)container.addMessageListener(listenerAdapter(), new PatternTopic("order.*"));return container;}@Beanpublic MessageListenerAdapter listenerAdapter() {// 交給receiver的receiveMessage方法, 對于這個方法的參數有如下要求:// (2個參數: 第一個參數是Object-即消息內容(默認由RedisSerializer#deserialize處理,見MessageListenerAdapter#onMessage), // 第二個參數是String-即訂閱的通道, 詳細看上面MessageListener接口中第二個參數的解釋)// (1個參數: 參數是Object-即消息內容)return new MessageListenerAdapter(redisMessageReceiver, "receiveMessage");}}
4. CustomizeMessageListener
@Slf4j
@Component
public class CustomizeMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("order.q - 消息訂閱頻道: {}", new String(channelBytes));log.info("order.q - 消息內容: {}", new String(bodyBytes));log.info("order.q - 監聽頻道: {}", new String(channelBytes));}
}
5. RedisMessageReceiver
@Slf4j
@Component
public class RedisMessageReceiver {public void receiveMessage(String msg, String topic) {log.info("order.* - 消息的訂閱頻道: {}", topic);log.info("order.* - 消息的內容: {}", msg);}}
6. 監聽原理簡析
spring-data-redis的lettuce-core是基于netty的,消息監聽處理過程如下:
PubSubCommandHandler(netty中的ChannelHandler處理器)->PubSubEndpoint(根據消息類型調用LettuceMessageListener 的不同方法)->LettuceMessageListener -> RedisMessageListenerContainer$DispatchMessageListener(如果是pattern,則從patternMapping中獲取所有的listener;如果不是pattern,則從channelMapping中獲取所有的listener。至于怎么判斷是不是pattern?)->使用異步線程池對上一步獲取的所有listener執行onMessage方法
至于怎么判斷是不是pattern?這個是根據訂閱關系來的,如果訂閱的是pattern,那么如果這個向這個pattern中發送了消息,那么就會收到1次消息,并且是pattern。如果訂閱的是普通channel,那么如果向這個普通channel發送了消息,那么又會收到1次消息不是pattern。如果向1個channel中發送消息,這個channel既符合訂閱的pattern,也符合訂閱的普通channel,那么會收到2次消息,并且這2次消息1次是pattern,1次不是pattern的
7. 監聽redis的key
既然已經說到了監聽redis發布消息了,那么也補充一下監聽redis的key過期。因為監聽redis的key過期也是通過redis的發布訂閱實現的。
修改redis.conf
############################# EVENT NOTIFICATION ############################### Redis能夠將在keyspace中發生的事件通知給 發布/訂閱 客戶端# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications# 例如:如果開啟了keyspace事件通知(注意了,必須是開啟了keyspace事件通知才可以,開啟的方式就是添加參數K),
# 一個客戶端在數據庫0對一個叫'foo'的key執行了刪除操作,
# 那么redis將會通過 發布訂閱 機制發布2條消息
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo# 也可以指定一組 類名 來選擇 Redis 會通知的一類事件。
# 每類事件 都通過一個字符定義# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:# keySpace事件 以 __keyspace@<數據庫序號>__ 為前綴 發布事件
# K Keyspace events, published with __keyspace@<db>__ prefix. # Keyevent事件 以 __keyevent@<數據庫序號>__ 為前綴 發布事件
# E Keyevent events, published with __keyevent@<db>__ prefix. # 執行常規命令,比如del、expire、rename
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... # 執行 String 命令
# $ String commands # 執行 List 命令
# l List commands # 執行 Set 命令
# s Set commands # 執行 Hash 命令
# h Hash commands 執行 Hash 命令# 執行 ZSet 命令
# z Sorted set commands # key過期事件(每個key失效都會觸發這類事件)
# x Expired events (events generated every time a key expires) # key驅逐事件(當key在內存滿了被清除時生成)
# e Evicted events (events generated when a key is evicted for maxmemory) # A是g$lshzxe的別名,因此AKE就意味著所有的事件
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
## 配置中的notify-keyspace-events這個參數由0個或多個字符組成,
# 如果配置為空字符串表示禁用通知
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
## 比如,要開啟list命令和generic常規命令的事件通知,
# 應該配置成 notify-keyspace-events Elg
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# 比如,訂閱了__keyevent@0__:expired頻道的客戶端要收到key失效的時間,
# 應該配置成 notify-keyspace-events Ex
# Example 2: to get the stream of the expired keys subscribing to channel name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
## 默認情況下,所有的通知都被禁用了,并且這個特性有性能上的開銷。
# 注意,K和E必須至少指定其中一個,否則,將收不到任何事件。
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
- 通過實現InitializingBean接口,在afterPropertiesSet方法中,調用初始化init方法,從redis中獲取
notify-keyspace-events
配置項對應的值,如果未設置任何值,則改為EA
,結合上面的redis.conf節選可知,表示的是開啟所有的事件通知 - 使用redisMessageListenerContainer,通過pattern通配符匹配的方式訂閱
__keyevent@*
頻道 - 它是個抽象類,實現了MessageListener接口,處理消息的方法是個抽象方法
- 它有1個子類KeyExpirationEventMessageListener,訂閱的pattern的頻道是:
__keyevent@*__:expired
,通過重寫doRegister修改了訂閱的頻道。并且重寫了處理消息的方法,通過將消息內容包裝成RedisKeyExpiredEvent事件對象,然后通過事件發布器將事件發布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");private final RedisMessageListenerContainer listenerContainer;private String keyspaceNotificationsConfigParameter = "EA";/*** Creates new {@link KeyspaceEventMessageListener}.** @param listenerContainer must not be {@literal null}.*/public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");this.listenerContainer = listenerContainer;}/** (non-Javadoc)* @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])*/@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {if (message == null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message** @param message never {@literal null}.*/protected abstract void doHandleMessage(Message message);/*** Initialize the message listener by writing requried redis config for {@literal notify-keyspace-events} and* registering the listener within the container.*/public void init() {if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();try {Properties config = connection.getConfig("notify-keyspace-events");if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);}} finally {connection.close();}}doRegister(listenerContainer);}/*** Register instance within the container.** @param container never {@literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}/** (non-Javadoc)* @see org.springframework.beans.factory.DisposableBean#destroy()*/@Overridepublic void destroy() throws Exception {listenerContainer.removeMessageListener(this);}/*** Set the configuration string to use for {@literal notify-keyspace-events}.** @param keyspaceNotificationsConfigParameter can be {@literal null}.* @since 1.8*/public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;}/** (non-Javadoc)* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {init();}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implementsApplicationEventPublisherAware {private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");private @Nullable ApplicationEventPublisher publisher;/*** Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.** @param listenerContainer must not be {@literal null}.*/public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)*/@Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)*/@Overrideprotected void doHandleMessage(Message message) {publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {@link ApplicationEventPublisher} is set.** @param event can be {@literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher != null) {this.publisher.publishEvent(event);}}/** (non-Javadoc)* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)*/@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher = applicationEventPublisher;}
}
修改RedisConfig
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 注意以下測試在redis.confi配置文件中設置了: notify-keyspace-events 為 AKE, // 也可以參照KeyspaceEventMessageListener在代碼中設置這個配置項/*redis提供的事件通知發布消息示例如下:K => PUBLISH __keyspace@0__:foo delE => PUBLISH __keyevent@0__:del foo參照上述示例去寫這個topic即可*/// 監聽key刪除事件container.addMessageListener(new MessageListener() {/*執行命令: del order:1234輸出如下:監聽key刪除事件 - 消息的發布頻道: __keyevent@0__:del監聽key刪除事件 - 消息內容: order:1234監聽key刪除事件 - 消息的訂閱頻道: __keyevent@*__:del*/@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("監聽key刪除事件 - 消息的發布頻道: {}", new String(channelBytes));log.info("監聽key刪除事件 - 消息內容: {}", new String(bodyBytes));log.info("監聽key刪除事件 - 消息的訂閱頻道: {}", new String(pattern));}}, new PatternTopic("__keyevent@*__:del"));// 監聽指定前綴的keycontainer.addMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();/*執行命令: set order:1234 a輸出如下:監聽指定前綴的key - 消息的發布頻道: __keyspace@0__:order:1234監聽指定前綴的key - 消息內容: set監聽指定前綴的key - 消息的訂閱頻道: __keyspace@0__:order:**/log.info("監聽指定前綴的key - 消息的發布頻道: {}", new String(channelBytes));log.info("監聽指定前綴的key - 消息內容: {}", new String(bodyBytes));log.info("監聽指定前綴的key - 消息的訂閱頻道: {}", new String(pattern));}}, new PatternTopic("__keyspace@0__:order:*"));return container;}/* 借助了1. 這個KeyspaceEventMessageListener的bean中的對redis的配置修改2. 監聽patter的topic*/@Beanpublic KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {return new KeyspaceEventMessageListener(container){/* __keyevent@* */@Overrideprotected void doHandleMessage(Message message) {log.info("監聽所有key命令事件, 消息內容:{}, {}",// set name zzhua; expire name 5;// 消息內容就是key的名稱, 比如: namenew String(message.getBody()),// 消息所發布的頻道, 比如: __keyevent@0__:set, __keyevent@0__:expire等new String(message.getChannel()));}};}@Beanpublic KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {return new KeyExpirationEventMessageListener(container){/* __keyevent@*__:expired */@Overrideprotected void doHandleMessage(Message message) {log.info("監聽所有key失效, 消息內容:{}, {}",// 消息內容就是key的名稱, 比如: namenew String(message.getBody()),// 消息所發布的頻道, 比如: __keyevent@0__:expirednew String(message.getChannel()));}};}}
3. 基于Stream的消息隊列
Stream 是 Redis 5.0 引入的一種新的數據類型(因此支持持久化),可以實現一個功能非常完善的消息隊列(專門為消息隊列設計的),Redis streams官網介紹
1. 單消費者
xadd
發送消息的命令:
-
不指定消息隊列的的最大消息數量就是不限制消息數量
-
消息唯一id建議使用
*
,讓redis自動生成消息唯一id -
(上面命令介紹中的:大寫表示照著抄就行;小寫的是需要我們自己提供的參數;中括號表示可選參數)
示例
## 創建名為 users 的隊列,并向其中發送一個消息,內容是:{name=jack,age=21},并且使用Redis自動生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
xread
讀取消息的方式之一:
- 不指定阻塞時間,就是直接返回(不阻塞);設置為0表示阻塞到有值為止;
- stream中消息讀取之后,不會被刪除;
- $ 表示讀取最新的消息,但是如果之前消息都已經被讀過了,那么當前繼續去讀的話,是讀不到的(盡管當前stream中仍然有消息)
示例
## 從users的隊列中讀取1條消息, 從第1條開始讀
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"2) 1) 1) "1708522812423-0"2) 1) "name"2) "jack"3) "age"4) "21"
操作示例
查看當前redis版本是否支持stream數據結構
xadd與xread使用示例
在上面,還有1點沒有體現出來:在stream中的每1個消息,被當前客戶端讀了1遍,還可以被當前客戶端讀1遍,然后,這個消息還可以被其它客戶端讀1遍。
xread讀取最新數據要使用阻塞的方法才可以
我們發現,只有在阻塞期間,使用$才能讀取到最新消息;如果不使用阻塞,想要讀取最新數據是不可能的。
在業務開發中,我們可以循環的調用XREAD阻塞方式來查詢最新消息,從而實現持續監聽隊列的效果,偽代碼如下:
但是這會存在消息漏讀的問題,由于:只有在阻塞期間,使用$才能讀取到最新消息,假設在處理消息的時候,此時消息隊列中發來了消息,那么這些消息就會被錯過,只有當執行XREAD COUNT 1 BLOCK 2000 STREAMS users $開始時收到的第1個消息,才會被處理。
XREAD命令特點
STREAM類型消息隊列的XREAD命令特點
- 消息可回溯(消息讀取完之后,不會消失,永久的保留在我們的隊列當中,隨時想看都可以回去讀)
- 一個消息可以被多個消費者讀取(因為消息讀取之后,不會消失)
- 可以阻塞讀取
- 有消息漏讀的風險(在處理消息的過程中,如果來了多條消息,則只能看到最后一條消息,即最新的那1條)
2. 消費者組
上面,我們知道通過xread命令你阻塞讀取最新消息,有消息漏讀的風險,下面,我們看看消費者組是如何解決這個問題的。
特點
消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:
消息分流
隊列中的消息會分流給組內的不同消費者,而不是重復消費,從而加快消息處理的速度
消息標示
消費者組會維護一個標示,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標示之后讀取消息。確保每一個消息都會被消費
消息確認
消費者獲取消息后,消息處于pending狀態,并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。
要點
redis服務器維護了多個消費者組
可以給1個stream指定多個消費者組
- 把這里的消費者組當成上節中的消費者即可
- 1個stream綁定的的多個消費者組都會收到消息
消息發給消費者組
- 即多個消費者共同加入到消費者組中,形成1個消費者,而消息就分給消費者中中的消息來消費
消費者加入消費者組
消費者從消費者組中拉取消息,拉取到的消息進入消費者組中的pending-list
消費者消費完消息后,向消費者組確認消息已處理,已確認處理的消息會從pending-list中刪除
消費者組總會用1個標識,來記錄最后1個被處理的消息
創建消費者組
XGROUP CREATE key groupName ID [MKSTREAM]
- key:隊列名稱
- groupName:消費者組名稱
- ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息
- 建議:如果不想處理隊列中已存在的消息,就可以使用$;如果要處理已存在的消息,就是用0)
- MKSTREAM:隊列不存在時自動創建隊列;不指定的話,當不存在時,不會創建
其它常見命令:
# 刪除指定的消費者組
XGROUP DESTORY key groupName# 給指定的消費者組添加消費者
#(一般情況下,我們并不需要自己添加消費者,因為當我們從這個消費者組當中指定1個消費者,
# 并且監聽消息的時候,如果這個消費者不存在,則會自動創建消費者)
XGROUP CREATECONSUMER key groupname consumername# 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername
從消費者組讀取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消費組名稱
- consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
- count:本次查詢的最大數量
- BLOCK milliseconds:當沒有消息時最長等待時間(若未指定,則不阻塞)
- NOACK:無需手動ACK,即獲取到消息后自動確認(一般不建議使用)
- STREAMS key:指定隊列名稱
- ID:獲取消息的起始ID:
- “>”:從消費者組的標記找到最后1個處理的消息(注意:不是已處理的消息,是處理的消息,也就是說它有可能被消費者獲取了,但還沒被消費者確認掉),的下一個未處理的消息開始
- 其它(除了">"以外的所有):根據指定id從pending-list中獲取已消費但未確認的消息。例如0,是從pending-list中的第一個消息開始(一直拿0,就是一直從pending-list中拿第1個消息)
圖示操作過程
消費者監聽消息的基本思路
XREADGROUP命令特點
- 消息可回溯
- 可以多消費者爭搶消息,加快消費速度
- 可以阻塞讀取
- 沒有消息漏讀的風險
- 有消息確認機制,保證消息至少被消費一次
- (內存不受jvm限制,消息可做持久化,消息確認機制)