我們為了自己實現一個MQ功能,就要深入底層挖掘現有開源產品的實現過程。
Redis 發布訂閱底層結構解析
Redis 不存儲消息,僅作為“實時中轉”;只有訂閱者在線時才能收到消息;消息是廣播給所有訂閱此頻道的客戶端。
1. 核心數據結構:哈希表(dict)+ 鏈表(鏈式客戶端列表)
1.1 pubsub_channels
:頻道訂閱表(dict)
Redis 使用一個全局字典(哈希表)結構來維護頻道與訂閱者之間的映射關系:
dict *pubsub_channels; // key: channel name (sds),value: list of clients
- Key 是頻道名稱(
channel
),類型為 Redis 字符串(SDS) - Value 是一個客戶端鏈表(
list *
),保存所有訂閱該頻道的客戶端連接指針
1.2 客戶端鏈表(client list)
對于每個頻道,對應一個鏈表或列表結構(底層用的是 Redis 自身的 list
類型):
typedef struct client {...int fd; // 客戶端 socket fdlist *subscribed_channels; // 客戶端自身也記錄了訂閱了哪些頻道...
} client;
這個鏈表中每個節點是一個 client *
指針,表示一個訂閱該頻道的活躍客戶端連接。
2. 消息發布流程
當執行 PUBLISH channel message
時,Redis 會:
- 查找
pubsub_channels[channel]
,得到訂閱該頻道的客戶端鏈表 - 遍歷該鏈表,對每個客戶端執行
addReply(client, message)
- 消息會直接寫入客戶端輸出緩沖區,等待發送至客戶端 socket
如果客戶端網絡異常導致寫失敗,可能會斷開連接并清理其訂閱記錄。
3. 訂閱流程
訂閱(SUBSCRIBE):
- Redis 檢查頻道是否存在于
pubsub_channels
:- 如果存在,將當前客戶端加入該頻道對應的客戶端鏈表
- 如果不存在,新建一個鍵值對:
channel -> list
,然后加入客戶端
4、取消訂閱流程(UNSUBSCRIBE):
- Redis 從
pubsub_channels[channel]
的鏈表中移除該客戶端 - 如果鏈表為空,則從字典中移除該頻道
- 同時更新客戶端的
subscribed_channels
屬性
5、 客戶端斷開連接時的清理機制
當客戶端斷開連接,Redis 會調用清理邏輯:
pubsubUnsubscribeAllChannels(client *c)
該函數會遍歷客戶端 subscribed_channels
,從所有頻道對應的客戶端鏈表中移除該客戶端,并刪除空頻道。
SUBSCRIBE
:注冊訂閱關系。PUBLISH
:消息發布,立即廣播給在線訂閱者。- 掉線(連接斷開):Redis 會檢測到客戶端斷連,并自動移除其訂閱關系。
UNSUBSCRIBE
:客戶端主動取消訂閱,Redis 也會移除其訂閱記錄。
實踐過程: