redis實現消息隊列redis發布訂閱redis監聽key

文章目錄

  • Redis消息隊列實現異步秒殺
    • 1. jvm阻塞隊列問題
    • 2. 什么是消息隊列
    • 3. Redis實現消息隊列
      • 1. 基于List結構模擬消息隊列
        • 操作
        • 優缺點
      • 2. 基于PubSub發布訂閱的消息隊列
        • 操作
        • 優缺點
        • spring 結合redis的pubsub使用示例
          • 1. 引入依賴
          • 2. 配置文件
          • 3. RedisConfig
          • 4. CustomizeMessageListener
          • 5. RedisMessageReceiver
          • 6. 監聽原理簡析
          • 7. 監聽redis的key
            • 修改redis.conf
            • KeyspaceEventMessageListener
            • KeyExpirationEventMessageListener
            • 修改RedisConfig
      • 3. 基于Stream的消息隊列
        • 1. 單消費者
          • xadd
          • xread
          • 操作示例
          • XREAD命令特點
        • 2. 消費者組
          • 特點
          • 要點
          • 創建消費者組
          • 從消費者組讀取消息
          • ==圖示操作過程==
          • 消費者監聽消息的基本思路
          • XREADGROUP命令特點

Redis消息隊列實現異步秒殺

1. jvm阻塞隊列問題

java使用阻塞隊列實現異步秒殺存在問題:

  1. jvm內存限制問題:jvm內存不是無限的,在高并發的情況下,當有大量的訂單需要創建時,就有可能超出jvm阻塞隊列的上限。
  2. 數據安全問題:jvm的內存沒有持久化機制,當服務重啟或宕機時,阻塞隊列中的訂單都會丟失。或者,當我們從阻塞隊列中拿到訂單任務,但是尚未處理時,如果此時發生了異常,這個訂單任務就沒有機會處理了,也就丟失了。

2. 什么是消息隊列

消息隊列Message Queue),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:

  • 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
  • 生產者:發送消息到消息隊列
  • 消費者:從消息隊列獲取消息并處理消息

在這里插入圖片描述

(正常下單,我們需要將訂單消息寫入數據庫。但由于秒殺并發訪問量大,數據庫本身并發處理能力不強,因此,在處理秒殺業務時,可以將部分業務在生產者這邊做校驗,然后將消息寫入消息隊列,而消費者處理該消息隊列中的消息,從而實現雙方解耦,更快的處理秒殺業務)

3. Redis實現消息隊列

我們可以使用一些現成的mq,比如kafka,rabbitmq等等,但是呢,如果沒有安裝mq,我們也可以直接使用redis提供的mq方案,降低我們的部署和學習成本。Redis提供了三種不同的方式來實現消息隊列:

  • list結構:基于List結構模擬消息隊列
  • PubSub:基本的點對點消息模型
  • Stream:比較完善的消息隊列模型

1. 基于List結構模擬消息隊列

消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數據結構是一個雙向鏈表,很容易模擬出隊列效果。

隊列是入口和出口不在一邊,因此我們可以利用:LPUSH 結合 RPOP、或者 RPUSH 結合 LPOP來實現

不過要注意的是,當隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應該使用BRPOP或者BLPOP來實現阻塞效果

在這里插入圖片描述

操作

命令介紹如下

在這里插入圖片描述

在這里插入圖片描述

優缺點

優點:

  • 利用Redis存儲,不受限于JVM內存上限
  • 基于Redis的持久化機制,數據安全性有保證
  • 可以滿足消息有序性

缺點:

  • 無法避免消息丟失(如果消費者獲取消息后,然后立馬就宕機了,這個消息就得不到處理,等同于丟失了)
  • 只支持單消費者(1個消息只能被1個消費者取走,其它消費者會收不到此消息)

2. 基于PubSub發布訂閱的消息隊列

PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。

  • SUBSCRIBE channel [channel] :訂閱一個或多個頻道
  • PUBLISH channel msg :向一個頻道發送消息
  • PSUBSCRIBE pattern [pattern] :訂閱與pattern格式匹配的所有頻道
    • ?匹配1個字符:h?llo subscribes to hello, hallo and hxllo
    • *匹配0個或多個字符:h*llo subscribes to hllo and heeeello
    • []指定字符:h[ae]llo subscribes to hello and hallo, but not hillo

在這里插入圖片描述

操作

在這里插入圖片描述

優缺點

優點:

  • 采用發布訂閱模型,支持多生產、多消費

缺點:

  • 不支持數據持久化(如果發送消息時,這個消息的頻道沒有被任何人訂閱,那這個消息就丟失了,也消息就是不會被保存)
  • 無法避免消息丟失(發完了,沒人收,直接就丟了)
  • 消息堆積有上限,超出時數據丟失(當我們發送消息時,如果有消費者在監聽,消費者會有1個緩存區去緩存這個消息數據,如果消費者處理的慢,那么客戶端的緩存區中的消息會不斷堆積,而這個緩存區是有大小限制的,如果超出了就會丟失)
spring 結合redis的pubsub使用示例
1. 引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zzhua</groupId><artifactId>demo-redis-pubsub</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 如果使用lettuce-core作為連接redis的實現, 不引入此依賴會報錯: Caused by: java.lang.ClassNotFoundException:org.apache.commons.pool2.impl.GenericObjectPoolConfig --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2. 配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0password:lettuce:pool:min-idle: 2max-active: 8max-idle: 8
3. RedisConfig

spring-data-redis提供了2種處理redis消息的方法:

  • 自己實現MessageListener接口

    public interface MessageListener {// 處理消息的方法// 第1個參數封裝了: 消息發布到哪1個具體頻道 和 消息的內容// 第2個參數封裝了: //     1. 如果當前是通過普通模式去訂閱的頻道, 那么收到消息時該pattern就是消息發送的具體頻道//     2. 如果當前是通過pattern通配符匹配去訂閱的頻道, 那么收到消息時, 該pattern就是訂閱的頻道void onMessage(Message message, @Nullable byte[] pattern);
    }
    
  • 指定MessageListenerAdapter適配器,該適配器指定特定對象的特定方法來處理消息(對特定的方法有參數方面的要求)

@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 監聽order.q通道(不帶通配符匹配channel)container.addMessageListener(customizeMessageListener, new ChannelTopic("order.q"));// 監聽order.*通道(帶通配符匹配channel)container.addMessageListener(listenerAdapter(), new PatternTopic("order.*"));return container;}@Beanpublic MessageListenerAdapter listenerAdapter() {// 交給receiver的receiveMessage方法, 對于這個方法的參數有如下要求:// (2個參數: 第一個參數是Object-即消息內容(默認由RedisSerializer#deserialize處理,見MessageListenerAdapter#onMessage), //           第二個參數是String-即訂閱的通道, 詳細看上面MessageListener接口中第二個參數的解釋)// (1個參數: 參數是Object-即消息內容)return new MessageListenerAdapter(redisMessageReceiver, "receiveMessage");}}
4. CustomizeMessageListener
@Slf4j
@Component
public class CustomizeMessageListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("order.q - 消息訂閱頻道: {}", new String(channelBytes));log.info("order.q - 消息內容: {}", new String(bodyBytes));log.info("order.q - 監聽頻道: {}", new String(channelBytes));}
}
5. RedisMessageReceiver
@Slf4j
@Component
public class RedisMessageReceiver {public void receiveMessage(String msg, String topic) {log.info("order.* - 消息的訂閱頻道: {}", topic);log.info("order.* - 消息的內容: {}", msg);}}
6. 監聽原理簡析

spring-data-redis的lettuce-core是基于netty的,消息監聽處理過程如下:
PubSubCommandHandler(netty中的ChannelHandler處理器)->PubSubEndpoint(根據消息類型調用LettuceMessageListener 的不同方法)->LettuceMessageListener -> RedisMessageListenerContainer$DispatchMessageListener(如果是pattern,則從patternMapping中獲取所有的listener;如果不是pattern,則從channelMapping中獲取所有的listener。至于怎么判斷是不是pattern?)->使用異步線程池對上一步獲取的所有listener執行onMessage方法

至于怎么判斷是不是pattern?這個是根據訂閱關系來的,如果訂閱的是pattern,那么如果這個向這個pattern中發送了消息,那么就會收到1次消息,并且是pattern。如果訂閱的是普通channel,那么如果向這個普通channel發送了消息,那么又會收到1次消息不是pattern。如果向1個channel中發送消息,這個channel既符合訂閱的pattern,也符合訂閱的普通channel,那么會收到2次消息,并且這2次消息1次是pattern,1次不是pattern的

7. 監聽redis的key

既然已經說到了監聽redis發布消息了,那么也補充一下監聽redis的key過期。因為監聽redis的key過期也是通過redis的發布訂閱實現的。

修改redis.conf
############################# EVENT NOTIFICATION ############################### Redis能夠將在keyspace中發生的事件通知給 發布/訂閱 客戶端# Redis can notify Pub/Sub clients about events happening in the key space. 
# This feature is documented at http://redis.io/topics/notifications# 例如:如果開啟了keyspace事件通知(注意了,必須是開啟了keyspace事件通知才可以,開啟的方式就是添加參數K),
#      一個客戶端在數據庫0對一個叫'foo'的key執行了刪除操作,
#      那么redis將會通過 發布訂閱 機制發布2條消息 
#        PUBLISH __keyspace@0__:foo del 
#        PUBLISH __keyevent@0__:del foo# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo#  也可以指定一組 類名 來選擇 Redis 會通知的一類事件。
#  每類事件 都通過一個字符定義# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:#        keySpace事件   以 __keyspace@<數據庫序號>__ 為前綴 發布事件
#  K     Keyspace events, published with __keyspace@<db>__ prefix.  #        Keyevent事件   以 __keyevent@<數據庫序號>__ 為前綴 發布事件
#  E     Keyevent events, published with __keyevent@<db>__ prefix.        #        執行常規命令,比如del、expire、rename           
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...      #        執行 String 命令
#  $     String commands  #        執行 List   命令                                                          
#  l     List commands  #        執行 Set    命令                                                         
#  s     Set commands  #        執行 Hash   命令                                                         
#  h     Hash commands                                                          執行 Hash   命令#        執行 ZSet   命令
#  z     Sorted set commands #        key過期事件(每個key失效都會觸發這類事件)                                                   
#  x     Expired events (events generated every time a key expires) #        key驅逐事件(當key在內存滿了被清除時生成)
#  e     Evicted events (events generated when a key is evicted for maxmemory)  #        A是g$lshzxe的別名,因此AKE就意味著所有的事件
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.     
##  配置中的notify-keyspace-events這個參數由0個或多個字符組成,
#  如果配置為空字符串表示禁用通知
#  The "notify-keyspace-events" takes as argument a string that is composed     
#  of zero or multiple characters. The empty string means that notifications
#  are disabled.
##  比如,要開啟list命令和generic常規命令的事件通知,
#  應該配置成 notify-keyspace-events Elg
#  Example: to enable list and generic events, from the point of view of the    
#           event name, use:
#
#  notify-keyspace-events Elg
#
#  比如,訂閱了__keyevent@0__:expired頻道的客戶端要收到key失效的時間,
#  應該配置成 notify-keyspace-events Ex
#  Example 2: to get the stream of the expired keys subscribing to channel   name __keyevent@0__:expired use:
#
#  notify-keyspace-events Ex
##  默認情況下,所有的通知都被禁用了,并且這個特性有性能上的開銷。
#  注意,K和E必須至少指定其中一個,否則,將收不到任何事件。
#  By default all notifications are disabled because most users don't need      
#  this feature and the feature has some overhead. Note that if you don't       
#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
  1. 通過實現InitializingBean接口,在afterPropertiesSet方法中,調用初始化init方法,從redis中獲取notify-keyspace-events配置項對應的值,如果未設置任何值,則改為EA,結合上面的redis.conf節選可知,表示的是開啟所有的事件通知
  2. 使用redisMessageListenerContainer,通過pattern通配符匹配的方式訂閱__keyevent@*頻道
  3. 它是個抽象類,實現了MessageListener接口,處理消息的方法是個抽象方法
  4. 它有1個子類KeyExpirationEventMessageListener,訂閱的pattern的頻道是:__keyevent@*__:expired,通過重寫doRegister修改了訂閱的頻道。并且重寫了處理消息的方法,通過將消息內容包裝成RedisKeyExpiredEvent事件對象,然后通過事件發布器將事件發布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");private final RedisMessageListenerContainer listenerContainer;private String keyspaceNotificationsConfigParameter = "EA";/*** Creates new {@link KeyspaceEventMessageListener}.** @param listenerContainer must not be {@literal null}.*/public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");this.listenerContainer = listenerContainer;}/** (non-Javadoc)* @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])*/@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {if (message == null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message** @param message never {@literal null}.*/protected abstract void doHandleMessage(Message message);/*** Initialize the message listener by writing requried redis config for {@literal notify-keyspace-events} and* registering the listener within the container.*/public void init() {if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();try {Properties config = connection.getConfig("notify-keyspace-events");if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);}} finally {connection.close();}}doRegister(listenerContainer);}/*** Register instance within the container.** @param container never {@literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}/** (non-Javadoc)* @see org.springframework.beans.factory.DisposableBean#destroy()*/@Overridepublic void destroy() throws Exception {listenerContainer.removeMessageListener(this);}/*** Set the configuration string to use for {@literal notify-keyspace-events}.** @param keyspaceNotificationsConfigParameter can be {@literal null}.* @since 1.8*/public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;}/** (non-Javadoc)* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {init();}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implementsApplicationEventPublisherAware {private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");private @Nullable ApplicationEventPublisher publisher;/*** Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.** @param listenerContainer must not be {@literal null}.*/public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)*/@Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}/** (non-Javadoc)* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)*/@Overrideprotected void doHandleMessage(Message message) {publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {@link ApplicationEventPublisher} is set.** @param event can be {@literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher != null) {this.publisher.publishEvent(event);}}/** (non-Javadoc)* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)*/@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher = applicationEventPublisher;}
}
修改RedisConfig
@Slf4j
@Configuration
public class RedisConfig {@Autowiredprivate RedisMessageReceiver redisMessageReceiver;@Autowiredprivate CustomizeMessageListener customizeMessageListener;@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 注意以下測試在redis.confi配置文件中設置了: notify-keyspace-events 為 AKE, // 也可以參照KeyspaceEventMessageListener在代碼中設置這個配置項/*redis提供的事件通知發布消息示例如下:K =>  PUBLISH __keyspace@0__:foo delE =>  PUBLISH __keyevent@0__:del foo參照上述示例去寫這個topic即可*/// 監聽key刪除事件container.addMessageListener(new MessageListener() {/*執行命令: del order:1234輸出如下:監聽key刪除事件 - 消息的發布頻道: __keyevent@0__:del監聽key刪除事件 - 消息內容: order:1234監聽key刪除事件 - 消息的訂閱頻道: __keyevent@*__:del*/@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();log.info("監聽key刪除事件 - 消息的發布頻道: {}", new String(channelBytes));log.info("監聽key刪除事件 - 消息內容: {}", new String(bodyBytes));log.info("監聽key刪除事件 - 消息的訂閱頻道: {}", new String(pattern));}}, new PatternTopic("__keyevent@*__:del"));// 監聽指定前綴的keycontainer.addMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes = message.getBody();byte[] channelBytes = message.getChannel();/*執行命令: set order:1234 a輸出如下:監聽指定前綴的key - 消息的發布頻道: __keyspace@0__:order:1234監聽指定前綴的key - 消息內容: set監聽指定前綴的key - 消息的訂閱頻道: __keyspace@0__:order:**/log.info("監聽指定前綴的key - 消息的發布頻道: {}", new String(channelBytes));log.info("監聽指定前綴的key - 消息內容: {}", new String(bodyBytes));log.info("監聽指定前綴的key - 消息的訂閱頻道: {}", new String(pattern));}}, new PatternTopic("__keyspace@0__:order:*"));return container;}/* 借助了1. 這個KeyspaceEventMessageListener的bean中的對redis的配置修改2. 監聽patter的topic*/@Beanpublic KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {return new KeyspaceEventMessageListener(container){/* __keyevent@* */@Overrideprotected void doHandleMessage(Message message) {log.info("監聽所有key命令事件, 消息內容:{}, {}",// set name zzhua; expire name 5;// 消息內容就是key的名稱, 比如: namenew String(message.getBody()),// 消息所發布的頻道, 比如: __keyevent@0__:set, __keyevent@0__:expire等new String(message.getChannel()));}};}@Beanpublic KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {return new KeyExpirationEventMessageListener(container){/* __keyevent@*__:expired */@Overrideprotected void doHandleMessage(Message message) {log.info("監聽所有key失效, 消息內容:{}, {}",// 消息內容就是key的名稱, 比如: namenew String(message.getBody()),// 消息所發布的頻道, 比如: __keyevent@0__:expirednew String(message.getChannel()));}};}}

3. 基于Stream的消息隊列

Stream 是 Redis 5.0 引入的一種新的數據類型(因此支持持久化),可以實現一個功能非常完善的消息隊列(專門為消息隊列設計的),Redis streams官網介紹

1. 單消費者
xadd

發送消息的命令:

在這里插入圖片描述

  • 不指定消息隊列的的最大消息數量就是不限制消息數量

  • 消息唯一id建議使用*,讓redis自動生成消息唯一id

  • (上面命令介紹中的:大寫表示照著抄就行;小寫的是需要我們自己提供的參數;中括號表示可選參數)

示例

## 創建名為 users 的隊列,并向其中發送一個消息,內容是:{name=jack,age=21},并且使用Redis自動生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
xread

讀取消息的方式之一:

在這里插入圖片描述

  • 不指定阻塞時間,就是直接返回(不阻塞);設置為0表示阻塞到有值為止;
  • stream中消息讀取之后,不會被刪除;
  • $ 表示讀取最新的消息,但是如果之前消息都已經被讀過了,那么當前繼續去讀的話,是讀不到的(盡管當前stream中仍然有消息)

示例

## 從users的隊列中讀取1條消息, 從第1條開始讀
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"2) 1) 1) "1708522812423-0"2) 1) "name"2) "jack"3) "age"4) "21"
操作示例

查看當前redis版本是否支持stream數據結構

在這里插入圖片描述

xadd與xread使用示例

在這里插入圖片描述

在上面,還有1點沒有體現出來:在stream中的每1個消息,被當前客戶端讀了1遍,還可以被當前客戶端讀1遍,然后,這個消息還可以被其它客戶端讀1遍。

xread讀取最新數據要使用阻塞的方法才可以

在這里插入圖片描述

我們發現,只有在阻塞期間,使用$才能讀取到最新消息;如果不使用阻塞,想要讀取最新數據是不可能的。

在業務開發中,我們可以循環的調用XREAD阻塞方式來查詢最新消息,從而實現持續監聽隊列的效果,偽代碼如下:

在這里插入圖片描述

但是這會存在消息漏讀的問題,由于:只有在阻塞期間,使用$才能讀取到最新消息,假設在處理消息的時候,此時消息隊列中發來了消息,那么這些消息就會被錯過,只有當執行XREAD COUNT 1 BLOCK 2000 STREAMS users $開始時收到的第1個消息,才會被處理。

XREAD命令特點

STREAM類型消息隊列的XREAD命令特點

  • 消息可回溯(消息讀取完之后,不會消失,永久的保留在我們的隊列當中,隨時想看都可以回去讀)
  • 一個消息可以被多個消費者讀取(因為消息讀取之后,不會消失)
  • 可以阻塞讀取
  • 有消息漏讀的風險(在處理消息的過程中,如果來了多條消息,則只能看到最后一條消息,即最新的那1條)
2. 消費者組

上面,我們知道通過xread命令你阻塞讀取最新消息,有消息漏讀的風險,下面,我們看看消費者組是如何解決這個問題的。

特點

消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:

在這里插入圖片描述

消息分流

隊列中的消息會分流給組內不同消費者,而重復消費,從而加快消息處理的速度

消息標示

消費者組會維護一個標示記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標示之后讀取消息。確保每一個消息都會被消費

消息確認

消費者獲取消息后,消息處于pending狀態,并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。

要點

redis服務器維護了多個消費者組

可以給1個stream指定多個消費者組

  • 把這里的消費者組當成上節中的消費者即可
  • 1個stream綁定的的多個消費者組都會收到消息

消息發給消費者組

  • 即多個消費者共同加入到消費者組中,形成1個消費者,而消息就分給消費者中中的消息來消費

消費者加入消費者組

消費者從消費者組中拉取消息,拉取到的消息進入消費者組中的pending-list

消費者消費完消息后,向消費者組確認消息已處理,已確認處理的消息會從pending-list中刪除

消費者組總會用1個標識,來記錄最后1個被處理的消息

創建消費者組
XGROUP CREATE  key groupName ID [MKSTREAM]
  • key:隊列名稱
  • groupName:消費者組名稱
  • ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息
    • 建議:如果不想處理隊列中已存在的消息,就可以使用$;如果要處理已存在的消息,就是用0)
  • MKSTREAM:隊列不存在時自動創建隊列;不指定的話,當不存在時,不會創建

其它常見命令:

# 刪除指定的消費者組
XGROUP DESTORY key groupName# 給指定的消費者組添加消費者
#(一般情況下,我們并不需要自己添加消費者,因為當我們從這個消費者組當中指定1個消費者,
#                                   并且監聽消息的時候,如果這個消費者不存在,則會自動創建消費者)
XGROUP CREATECONSUMER key groupname consumername# 刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername
從消費者組讀取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消費組名稱
  • consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
  • count:本次查詢的最大數量
  • BLOCK milliseconds:當沒有消息時最長等待時間(若未指定,則不阻塞)
  • NOACK:無需手動ACK,即獲取到消息后自動確認(一般不建議使用)
  • STREAMS key:指定隊列名稱
  • ID:獲取消息的起始ID:
    • “>”:從消費者組的標記找到最后1個處理的消息(注意:不是已處理的消息,是處理的消息,也就是說它有可能被消費者獲取了,但還沒被消費者確認掉),的下一個未處理的消息開始
    • 其它(除了">"以外的所有):根據指定id從pending-list中獲取已消費但未確認的消息。例如0,是從pending-list中的第一個消息開始(一直拿0,就是一直從pending-list中拿第1個消息)
圖示操作過程

在這里插入圖片描述

在這里插入圖片描述

消費者監聽消息的基本思路

在這里插入圖片描述

XREADGROUP命令特點
  • 消息可回溯
  • 可以多消費者爭搶消息,加快消費速度
  • 可以阻塞讀取
  • 沒有消息漏讀的風險
  • 有消息確認機制,保證消息至少被消費一次
  • (內存不受jvm限制,消息可做持久化,消息確認機制)

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/697091.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/697091.shtml
英文地址,請注明出處:http://en.pswp.cn/news/697091.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

大語言模型的開山之作—探秘GPT系列:GPT-1-GPT2-GPT-3的進化之路

模型模型參數創新點評價GPT1預訓練微調&#xff0c; 創新點在于Task-specific input transformations。GPT215億參數預訓練PromptPredict&#xff0c; 創新點在于Zero-shotZero-shot新穎度拉滿&#xff0c;但模型性能拉胯GPT31750億參數預訓練PromptPredict&#xff0c; 創新點…

pclpy 可視化點云(多窗口可視化、單窗口多點云可視化)

pclpy 可視化點云&#xff08;多窗口可視化、單窗口多點云可視化&#xff09; 一、算法原理二、代碼三、結果1.多窗口可視化結果2.單窗口多點云可視化 四、相關數據五、問題與解決方案1.問題2.解決 一、算法原理 原理看一下代碼寫的很仔細的。。目前在同一個窗口最多建立2個窗…

ESP8266智能家居(3)——單片機數據發送到mqtt服務器

1.主要思想 前期已學習如何用ESP8266連接WIFI&#xff0c;并發送數據到服務器。現在只需要在單片機與nodeMCU之間建立起串口通信&#xff0c;這樣單片機就可以將傳感器測到的數據&#xff1a;光照&#xff0c;溫度&#xff0c;濕度等等傳遞給8266了&#xff0c;然后8266再對數據…

Java Web3J :使用web3j調用自己的智能合約,返回一個內部有數組的對象結構時出現NPE問題

之前有寫過一篇文章Java Web3J :使用web3j調用自己的智能合約的方法(教程),當時只是簡單的方法調用,也不涉及到什么復雜的數據類型,入參是long類型,出參是String類型。 目錄 問題描述報錯信息嘗試解決控制變量法查看源碼網上查閱解決最后問題描述 遇到這個問題是因為有…

【AI應用】SoraWebui——在線文生視頻工具

SoraWebui 是一個開源項目&#xff0c;允許用戶使用 OpenAI 的 Sora 模型使用文本在線生成視頻&#xff0c;從而簡化視頻創建&#xff0c;并具有輕松的一鍵網站部署功能 在 Vercel 上部署 1. 克隆項目 git clone gitgithub.com:SoraWebui/SoraWebui.git 2. 安裝依賴 cd Sor…

本科畢業設計(論文)開題報告:基于人工智能的短視頻獲客平臺的設計與實現

目錄 1.選題概述1.題目背景2.目的及意義3.技術現狀 2.題目內容1.任務概述2.系統設計1.數據采集模塊&#xff1a;2.數據處理與分析模塊&#xff1a;3.客戶識別模塊&#xff1a;4.推廣策略模塊&#xff1a; 3.功能模塊1.數據采集模塊&#xff1a;2.數據處理與分析模塊&#xff1a…

【Java EE初階二十】http的簡單理解(一)

1. 初識http HTTP 最新的版本應該是 HTTP/3.0&#xff0c;目前大規模使用的版本 HTTP/1.1&#xff1b; 下面來簡單說明一下使用 HTTP 協議的場景: 1、瀏覽器打開網站 (基本上) 2、手機 APP 訪問對應的服務器 (大概率) 前面的 TCP與UDP 和http不同&#xff0c;HTTP 的報文格式&a…

React基礎-webpack+creact-react-app創建項目

學習視頻&#xff1a;學習視頻 2節&#xff1a;webpack工程化創建項目 2.1.webpack工程化工具&#xff1a;vite/rollup/turbopak; 實現組件的合并、壓縮、打包等&#xff1b; 代碼編譯、兼容、校驗等&#xff1b; 2.2.React工程化/組件開發 我們可以基于webpack自己去搭建…

sql-labs25-28a

一、環境 網上都有不過多闡述 二、sql-labs第25關 它說你的OR和and屬于它,那就是過濾了OR和and 注入嘗試 不用or和and進行爆破注入,很明顯是有注入點的 ?id-1 union select 1,2,3-- 查看數據庫 ok&#xff0c;此道題算是解了但是如果我們用了and了呢 ?id-1 and updatex…

淺談集群的分類

本文主要介紹集群部署相關的知識&#xff0c;介紹集群部署的基礎&#xff0c;集群的分類、集群的負載均衡技術&#xff0c;集群的可用性以及集群的容錯機制。隨后介紹Redis-Cluster以及Mysql的架構以及主從復制原理。 集群介紹 單臺服務器本身會受到帶寬、內存、處理器等多方面…

STM32-串口通信(串口的接收和發送)

文章目錄 STM32的串口通信一、STM32里的串口通信二、串口的發送和接收串口發送串口接收 三、串口在STM32中的配置四、串口接收的兩種實現方式1. 需要更改的地方2. 查詢RXNE標志位3. 使用中斷 總結 STM32的串口通信 本文在于記錄自己的學習過程中遇到的問題和總結&#xff0c;各…

golang,gin腳手架,完美集成與結構化,gin-restful-api模板gin-layout,開箱即用

關于gtools golang非常奈斯&#xff0c;gin作為web框架也非常奈斯&#xff0c;但我們在開發過程中&#xff0c;前期搭建會花費大量的時間&#xff0c;且還不盡人意。 為此我集成了gin-restful-api的模板gin-layout&#xff0c;還有腳手架一鍵生成項目。 集成相關 ginviperz…

大型語言模型的語義搜索(一):關鍵詞搜索

關鍵詞搜索(Keyword Search)是文本搜索種一種常用的技術&#xff0c;很多知名的應用app比如Spotify、YouTube 或 Google map等都會使用關鍵詞搜索的算法來實現用戶的搜索任務&#xff0c;關鍵詞搜索是構建搜索系統最常用的方法&#xff0c;最常用的搜索算法是Okapi BM25&#x…

Liunx使用nginx和http搭建yum-server倉庫

文章目錄 1. yum-server的搭建方式2. nginx搭建yum-server倉庫2.1. 安裝配置nginx2.2 配置yum-server的rpm2.3. 同步yum源相關包2.3.1 rsync同步源3.3.1 reposync同步源 2.4. 配置客戶端訪問yum配置2.5. 驗證測試 3. http服務搭建yum-server倉庫3.1. 安裝配置http3.2 配置yum-s…

基于微信小程序校園洗衣系統設計與實現(PHP后臺)可行性分析

博主介紹&#xff1a;黃菊華老師《Vue.js入門與商城開發實戰》《微信小程序商城開發》圖書作者&#xff0c;CSDN博客專家&#xff0c;在線教育專家&#xff0c;CSDN鉆石講師&#xff1b;專注大學生畢業設計教育和輔導。 所有項目都配有從入門到精通的基礎知識視頻課程&#xff…

Firewalld防火墻

Firewalld概述 Firewalld firewalld防火墻是centos7系統默認防火墻的防火墻管理工具&#xff0c;取代了之前的iptables防火墻&#xff0c;也是工作在網絡層&#xff0c;屬于包過濾防火墻。 支持網絡區域所定義的網絡鏈接以及接口安全等級的動態防火墻管理工具至此IPv4、IPv6…

ECMAScript modules規范示例詳解

ECMAScript modules&#xff08;簡稱 ES modules&#xff09;是JavaScript的標準模塊系統。每個模塊都是一個獨立的JavaScript文件&#xff0c;可以在其中定義導出的變量、函數或類&#xff0c;并從其他模塊中導入這些變量、函數或類。以下是ES modules規范的一些示例和詳解&am…

Go 線程池實現案例

Go 語言并不像其他一些語言&#xff08;例如 Java 或 C#&#xff09;那樣直接提供一個線程池的概念。相反&#xff0c;Go 使用 goroutines 來實現并發&#xff0c;它是一種比線程更輕量級的并發執行單元。不過&#xff0c;仍然可以實現一個類似線程池的結構&#xff0c;來管理和…

studio one 6正版多少錢?怎么購買studio one 更便宜,有優惠券哦

Presonus Studio One Studio One是由美國PreSonus公司開發的數字音頻工作站&#xff0c;作為DAW屆的新人&#xff0c;功能強大且全面&#xff0c;雖然它不像其他DAW那樣擁有歷史和聲譽&#xff0c;但它是一個可愛的軟件&#xff0c;包含許多其它DAW所不具備的實用功能&#xff…

web基礎及http協議 (二)----------Apache相關配置與優化

一、httpd 安裝組成 http 服務基于 C/S 結構 1 .常見http 服務器程序 httpd apache&#xff0c;存在C10K&#xff08;10K connections&#xff09;問題 nginx 解決C10K問題lighttpd IIS .asp 應用程序服務器 tomcat .jsp 應用程序服務器 jetty 開源的servlet容器&#xf…