文章目錄
- 1. 通知類型
- 2. 實現原理
- 2.1 Pub/Sub
- 2.1.1 基礎知識點
- 2.1.2 頻道和訂閱者的存儲通知原理
- 2.1.3 鍵空間通知
- 2.1.4 客戶端消費
- 2.1.5 缺陷
- 2.2 Redis Stream
- 2.2.1 基礎知識點
- 2.2.2 基礎數據結構
- 2.2.3 消費者組管理
- 2.2.4 消息和消費者持久化
- 2.2.5 消息生產和消費
- 2.2.6 消費者拉取消息
- 2.2.7 消息分配
- 2.2.8 底層結構體
- 3. 使用示例
- 3.1 maven
- 3.2 yml文件
- 3.3 Configuration配置類
- 3.4 消費者
- 3.5 生產者
1. 通知類型
Redis內置了兩種不同的訂閱發布機制:
- 基于Pub/sub發布訂閱的鍵通知機制:將訂閱者以鏈表形式緩存在內存中,當key發生改動后將消息放入內存緩沖區,并由服務端逐個主動通知推送給訂閱者
- 基于Redis Stream生產消費者通知機制:分為stream、消費者組和消費者,往stream中投遞消息,將會分發給消費者組,組中消費者定期拉取組中消息消費
先說總結:相比使用Pub/Sub機制,更推薦使用redis stream代替,但還是分別介紹下Pub/Sub和stream機制。
2. 實現原理
2.1 Pub/Sub
2.1.1 基礎知識點
Redis自帶的發布訂閱模式,分為頻道和訂閱鏈表。
頻道可以精確訂閱,也可以模糊訂閱;訂閱鏈表則記錄了該頻道的所有訂閱客戶端。
2.1.2 頻道和訂閱者的存儲通知原理
頻道和訂閱鏈表是以字典的方式存儲在內存中的,不會持久化。其中頻道分為普通頻道和鍵空間頻道,其中普通頻道通過往頻道發送消息,從而分發給各個訂閱者;而鍵空間頻道是監聽具體的key或事件,比如某個key值發生了修改,redis自動使用Pub/Sub機制發送給該鍵空間頻道的訂閱者。
當Redis往頻道發送了消息,Redis會根據頻道字典找出對應的訂閱者,并主動推送給對應的客戶端。該操作是以各機器節點為單元的,各個機器節點的頻道訂閱關系是互相隔離的,不會互相同步,因此如果頻道A是在主節點上,頻道B是在從節點上,想要監聽到兩個不同頻道,就需要客戶端和兩個節點都建立通信關系。
Redis給不同客戶端訂閱者發送消息時,會先將消息寫入緩沖區,再由Reactor的事件循環機制異步推送給客戶端的socket。
2.1.3 鍵空間通知
鍵空間頻道通知分為兩種:監聽key值的改動,如某個key被重新賦值,則會觸發頻道含有key,value值是set的通知;監聽事件的觸發,如某個key被重新賦值,則會觸發頻道含有set,value值是key的通知。
Redis集群的key會根據slot分布在不同的節點上,而鍵空間通知又是基于Pub/Sub機制的,因此如果想要監聽到所有的鍵改動,就需要監聽所有的節點。
2.1.4 客戶端消費
客戶端接收到服務端的通知后是阻塞處理的,因此最好使用異步線程處理業務邏輯,以提高通信效率。
2.1.5 缺陷
Pub/Sub的頻道訂閱者關系字段都是存儲在內存中的,不會持久化,因此消息和訂閱關系容易丟失。
Pub/Sub核心是廣播模式,發送給訂閱者后,無需訂閱者ack。
如果頻道的數據量過大,訂閱者對應的緩沖區空間不夠時,會導致客戶端被動斷開,且該模式無法重新消費歷史消息,因此會丟失消息。
2.2 Redis Stream
2.2.1 基礎知識點
Redis實現的持久化消息隊列(MQ),分為生產者和消費者角色。
支持單消費者和消費者組模式,單消費者模式一般不推薦使用,不便于擴展,推薦使用消費者組模式。
生產者發消息前需要創建stream key和消費者組,消費者組內一個消息只會分發給一個消費者。
如果stream key對應了多個消費者組,則消息會發送給每個消費者組。
每條消息都有消息ID,消息支持持久化,通過RDB/AOF持久化到磁盤,消息ID為時間戳+遞增值。
2.2.2 基礎數據結構
消息ID使用Radix Tree實現,本質是前綴樹,會進行前綴壓縮以降低空間占用率,支持范圍查詢。
樹節點使用Listpack存儲消息鍵值對,分為總字節數、元素數量、節點內容和結束符,其中總字節數4字節24位,元素數量2字節16位,每個Listpack存儲默認不超過4K,entry數默認不超過100條。
消息ID由時間戳-自增序號
組成,同一時間戳有多條則按接收順序依次自增序號。
處理流程:
- 消息寫入:生成消息ID,消息內容鍵值對存入對應節點的Listpack;
- 消息讀取:根據消息ID前綴定位Listpack,遍歷Listpack獲取具體消息內容
同一時刻生產者發送了多條消息內容的存儲格式:
- tree node:100000001- total- size- entry:100000001-0- entry:json1- entry:100000001-1- entry:value1- ...
- tree node:100000002- total- size- entry:100000002-0- entry:json2- entry:100000002-1- entry:value2- ...
其設計思想核心在于減少空間占用率。
2.2.3 消費者組管理
同一個stream可以關聯多個消費者組,以消費者組名稱為key,消費者組信息體為value,存儲在Radix Tree中。
消費者組信息體中包含了最后一次消費點位,該組中待確認消息字典以及消費者字典信息。
待確認消息里面包含了消息分配時間、消息投遞次數及對應的消費者信息,當消費者ack成功后將會刪除。
待確認消息在消費者組內保存了一份,消費該消息的消費者也保存了一份。消費前消息只存儲在基礎的Radix樹中,消費后將會在消費者組內和消費者中添加一條待確認消息,兩者消息的內容是同步的,消費者的未確認消息是消費者組的子集。
消費者字典的key是消費者名稱,value是消費者的信息體,其包含了最后活躍時間和該消費者私有的待確認消息集合。
消費者在消費者組內第一次拉取消息時就會被記錄保存,消費者創建后即使長時間不再消費也不會被自動刪除,只能通過顯式刪除消費者組對應的消費者或整個消費者組被刪除才會同步刪除。
2.2.4 消息和消費者持久化
和key的持久化一樣,都使用了RDB/AOF機制來持久化消息和消費者。
- RDB周期性的掃描消息樹和消費者組、消費者字典快照并序列化為二進制數據寫入RDB文件中;
- AOF則會根據消息新增、創建消費者組和消費者這些命令來追加到AOF文件中。
一般使用RDB定期備份+AOF每秒保存的組合來確保恢復速度和數據安全。
2.2.5 消息生產和消費
在生產者發送消息時,需要確保redis中包含了該stream,而消費者在訂閱消費者組中消息時,則需要確保stream下已創建了消費者組。
當生產者往stream發送消息時,會生成消息ID,消息會被直接保存在Radix Tree中作為原始數據。當消費者組要消費時再由Redis根據最后消費點位和讀取方式分配消息給消費者。
創建消費者組時可指定該消費者組的起始點:
- 從頭消費(0):創建消費者組時獲取stream中消息ID最小的消息作為最后消費點位,后續消費者將會從該點位依次往后消費;
- 指定時間戳:可指定從哪個時間戳開始消費后續消息,若該時間戳沒有消費,則從最接近的第一條開始消費;
- 僅消費最新的($和>):創建消費者組時獲取stream中消息ID最大的消息作為最后消費點,后續消費者將只消費最新消息。
創建消費者組時$和>指令等價,$代表最后消費點位設置為stream的最大消費ID;而>代表從下一條未消費的消息開始讀取。
消費者組創建后,$和>不等價,消費者組創建后會因為消費者斷連導致很多消息未消費,>指令將會把未消費點位到最新消息點位都消費一遍,而$則會消費最大消息ID之后的消息,消費后更新最后消費點位,僅消費最新的。
組內的最后消費id僅在分配給消費者組消息時才更新。
2.2.6 消費者拉取消息
在Redis stream中,消費者消費消息是通過消費者主動拉取的。
共有兩種拉取方式:阻塞式和非阻塞式拉取
- 阻塞式:指定阻塞參數,當阻塞時間為0,則無限期阻塞,保持連接活躍,直到有新消息或連接中斷;當阻塞時間>0,則阻塞指定時間,阻塞期間有新消息則返回,超時后返回空值,消費者后續重新拉取;
- 非阻塞式:消費者從消費者組拉取時會立即返,如果有新消息則返回批量消息;如果沒有新消息返回空值。
推薦使用的模式:阻塞式拉取,阻塞時間設置為3-5s。
其余模式的缺點:
- 無限期阻塞:會一直保持socket連接,占用Redis和消費者資源,且超過了服務器的連接超時時間也會斷開連接;
- 非阻塞式:若客戶端控制不好,很容易造成CPU空轉消耗資源,也可能導致新消息無法及時處理。
2.2.7 消息分配
消息被保存在stream的Radix樹中后,且消費者組里面有最后消費點位,此時多個消費者來拉取消息,消費者組會根據最后消費點位去Radix樹中批量獲取消息,按輪詢分發的方式分配給消費者。
消費者在拉取消息時可配置count參數來指定該批次的大小,在阻塞式/非阻塞式拉取時,只要有消息就會直接返回,不會強制等到滿足count時才會返回。只有當消息產生速度遠大于消費速度時,消費者才能穩定一次性拉取count數量的消息。
輪詢分發:根據消息ID順序,依次分配給組內的消費者,與消費者的消費批次無關。
消費者指定的批次大小僅影響從服務端拉取的批次,和消息分配時的權重或速度無關。
若某消費者被分配了N條消息,但因不可抗拒因素該消費者一直未消費這些消息,過了idle空閑時間后,這些消息將會被空閑消息,等待轉移,若一直未被轉移,則一直保存在對應的pel集合中。
2.2.8 底層結構體
Stream在Redis中的基礎結構體,rax代表Radix Tree樹。
typedef struct stream {rax *rax; // 指向 Radix Tree,存儲消息uint64_t length; // 當前消息總數streamID last_id; // 最新消息IDstreamID first_id; // 最早消息IDstreamID max_deleted_entry_id; // 已刪除的最大消息IDuint64_t entries_added; // 歷史累計消息數(含已刪除)rax *cgroups; // 消費組列表(Radix Tree)
} stream;
消費者組,對應stream里面的cgroups指針,cgroups的key是消費者組名稱,value是streamCG結構體。其中pel的key是消息ID,value是streamNACK結構體。
typedef struct streamCG {streamID last_id; // 組內最后分發的消息IDlong long entries_read; // 組已讀消息數(含已確認)rax *pel; // 組內所有未確認消息(PEL)rax *consumers; // 組內消費者列表(Radix Tree)
} streamCG;
未確認消息結構體,在streamCG和streamConsumer結構體中使用。
typedef struct streamNACK {mstime_t delivery_time; // 最后一次投遞時間戳uint64_t delivery_count; // 投遞次數(重試計數)streamConsumer *consumer; // 當前持有該消息的消費者
} streamNACK;
消費者實例,分別對應streamCG的consumers樹節點和streamNACK的consumer。pel為消費者私有的待確認消息。
typedef struct streamConsumer {mstime_t seen_time; // 最后活躍時間(判斷消費者存活)sds name; // 消費者名稱(客戶端標識)rax *pel; // 消費者私有未確認消息列表
} streamConsumer;
3. 使用示例
鑒于Pub/Sub的諸多缺陷,生產只會考慮最新的Redis stream,如果Redis的版本低于5.0,無法使用MQ實現。
注:例子使用springboot實現,版本要求大于2.2.x
3.1 maven
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.6.RELEASE</version>
</dependency>
3.2 yml文件
spring:redis:host: xxx.xxxport: 6379password: xxxxtimeout: 2000jedis:pool:max-active: 50max-idle: 10min-idle: 3max-wait: 2000
3.3 Configuration配置類
配置RedisTemplate
和StreamListener
的容器StreamMessageListenerContainer
,StreamMessageListenerContainer
主要配置的是阻塞超時時間和count批次大小。
@Configuration
public class RedisConfiguration {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(RedisConnectionFactory factory) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(3)).batchSize(10).targetType(String.class).build();System.out.println("create StreamMessageListenerContainer");return StreamMessageListenerContainer.create(factory, options);}
}
3.4 消費者
使用springboot-redis
提供的訂閱消費機制實現,缺點在于批量返回后沒辦法直接處理該批量,只能一個個處,若需要在StreamListener攢一批處理,需自己實現,或重寫StreamMessageListenerContainer
后再重寫StreamPollTask
。
@Component
public class StreamConsumer {private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> container;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public static final String STREAM_KEY = "test:dev:stream";public static final String STREAM_GROUP = "test:dev:group";public static final String STREAM_GROUP2 = "test:dev:group2";public StreamConsumer(StreamMessageListenerContainer<String, ObjectRecord<String, String>> container) {this.container = container;}@PostConstructpublic void start() {container.start();System.out.println("start receive message");subscribe(STREAM_GROUP, ReadOffset.lastConsumed());subscribe(STREAM_GROUP2, ReadOffset.lastConsumed());}private void subscribe(String groupName, ReadOffset readOffset) {try {createStream();container.register(StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(STREAM_KEY, readOffset)).consumer(Consumer.from(groupName, "streamConsumer")).autoAcknowledge(false).cancelOnError(t -> false).errorHandler(ex -> {System.out.println(groupName + " poll fail." + ex.getMessage());}).build(),message -> {try {System.out.println(groupName + " get message stream is " + message.getStream() +",value is " + message.getValue());long value = Long.parseLong(message.getValue());if ((value % 100) == 0) {System.out.println(groupName + " batch size is 100,execute business.");}Long result = redisTemplate.opsForStream().acknowledge(STREAM_KEY, STREAM_GROUP,message.getId());System.out.println(groupName + " ack result is " + result);} catch (Exception e) {System.out.println(groupName + " consume message fail");}});} catch (Exception e) {System.out.println(groupName + " subscribe fail");}}public void createStream() {if (Boolean.TRUE.equals(redisTemplate.hasKey(STREAM_KEY))) {return ;}redisTemplate.opsForStream().add(StreamRecords.newRecord().in(STREAM_KEY).ofObject("init"));String result = redisTemplate.opsForStream().createGroup(STREAM_KEY, STREAM_GROUP);System.out.println("create stream result is " + result);result = redisTemplate.opsForStream().createGroup(STREAM_KEY, STREAM_GROUP2);System.out.println("create stream2 result is " + result);}}
3.5 生產者
簡單的一個demo,持續向stream中發送消息給消費者消費。
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = {Application.class})
public class RedisServiceTest {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Testpublic void sendMessage() {for (int i = 0; i < 300; i++) {if (i % 50 == 0) {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}sendMessage(String.valueOf(i + 1000));}// deleteStream();try {Thread.sleep(40000);} catch (InterruptedException e) {e.printStackTrace();}}public void sendMessage(String message) {redisTemplate.opsForStream().add(StreamRecords.newRecord().in(StreamConsumer.STREAM_KEY).ofObject(message));}public void deleteStream() {Boolean result = redisTemplate.opsForStream().destroyGroup(StreamConsumer.STREAM_KEY, StreamConsumer.STREAM_GROUP);System.out.println("delete stream group result is " + result);result = redisTemplate.opsForStream().destroyGroup(StreamConsumer.STREAM_KEY, StreamConsumer.STREAM_GROUP2);System.out.println("delete stream group2 result is " + result);redisTemplate.delete(StreamConsumer.STREAM_KEY);}
}