字符串(String)
string 是 redis 最基本的類型,你可以理解成與 Memcached 一模一樣的類型,一個 key 對應一個 value。
string 類型是二進制安全的。意思是 redis 的 string 可以包含任何數據,比如jpg圖片或者序列化的對象。
string 類型是 Redis 最基本的數據類型,string 類型的值最大能存儲 512MB。
分布式ID生成器
相信大家經常會遇到需要生成唯一ID的場景,例如標識碼每次請求、生成一個訂單號、創建用戶。
分布式ID生成器需要滿足以下特性:
- 趨勢遞增,mysql是最常用的數據庫,如果ID不是趨勢遞增,那么B+樹為了維護ID有序性,會頻繁在索引的中間位置插入節點,從而影響后面節點的位置,甚至頻繁導致頁分裂,這對于性能的影響是極大的。
- 全局唯一性,ID不唯一就會出現主鍵沖突。
- 高性能,生成ID是高頻操作,如果性能緩慢,系統的整體性能就會受到限制。
- 高可用,也就是在給定的時間間隔內,一個系統總的可用時間占的比例。
- 存儲空間小,例如對于mysql的B+樹,普通索引會存儲主鍵值,主鍵越大,每個Page可以存儲的數據就越少,訪問磁盤I/O的次數就會增加。
Redis集群能保證高可用和高性能,為了節省內存,ID可以使用數字的形式,并且通過遞增的方式來創建新的ID。為了防止重啟數據丟失,還需要開啟redis的aof持久化。雖然,開啟aof持久化,為了性能配置everysec策略還是有可能丟失1s的數據,因此還可以使用一個異步機制(例如將id發送到mq消息隊列中)將生成的最大ID持久化到一個mysql。
設計思路
假設訂單ID生成器的鍵是“counter:order”,當應用服務啟動時先從mysql中查詢出最大值M,然后執行exists命令判斷是否存在鍵。
- 如果redis中不存在鍵,則執行set命令將值M寫入redis。
- 如果redis中存在鍵,值為N,則比較M和N的值,執行SET命令將M與N的最大值寫入redis,相等則不操作。
- 應用服務啟動完成后,在每次需要生成ID時,應用程序就向redis服務器發送incr命令。
- 應用程序將獲取到的ID值發送到mq對象,消費者監聽隊列把值持久化到mysql。
代碼實現
async generateDistributedId(key, value, callback: (id) => void) {try {let flag = await this.redis.exists(key)if (flag == 0) {//不存在該健await this.redis.set(key, value??0)//不存在該鍵,就初始化為0或value} else {let redis_value = await this.redis.get(key)await this.redis.set(key, Math.max(redis_value, value))}// 使用 INCR 命令對指定的鍵進行自增操作 const id = await this.redis.incr(key);callback(id)return id;} catch (error) {this.logger.error(`生成分布式 ID 時出錯: ${error}`);throw error;}}
當然,還需要其他的分布式ID生成發送,例如,雪花算法、UUID等
列表(List)
Redis 列表是簡單的字符串列表,按照插入順序排序。你可以添加一個元素到列表的頭部(左邊)或者尾部(右邊)。列表最多可以存儲 2^32 - 1 個元素。
分布式系統中必備的一個中間件就是消息隊列,它可以進行服務間異步解耦、流量消峰,實現最終一致性。目前市面上已有的消息隊列又kaqfka、pulsar等。
那么,redis適合做消息隊列嗎?想要回答這個問題之前,得先從本質思考
- 消息隊列提供了什么特性?
- Redis如何實現消息隊列?是否滿足存取需求?
什么是消息隊列
消息隊列是一種異步的服務間通信方式,適用于分布式和微服務架構。消息在被處理和刪除之前一直存儲在隊列上。
它基于先進先出的原則,允許生產者向隊列中發送消息,而消費者則可以從隊列中獲取消息并進行處理。
消息隊列通常被用于解耦應用程序的各個組件,實現異步通信、削峰填谷、解耦合、流量控制等功能。
- Producer:消息生產者,負責產生和發送消息到消息處理中心(Broker)。
- Broker:消息處理中心,負責消息存儲、確認、重試等,一般會包含多個queue。
- Consumer:消息消費者,負責從Broker中獲取消息,并進行相應處理。
消息隊列在實際應用中包括如下4個場景。
- 應用耦合:發送方、接收方的系統之間不需要互相了解,只需要認識消息。多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導致整個過程失敗。
- 異步處理:多應用對消息隊列中的同一消息進行處理,應用間并發處理消息,相比串行處理,減少處理時間。
- 限流削峰:廣泛應用于秒殺或搶購活動中,避免流量過大導致應用系統“掛掉”的情況。
- 消息驅動的系統:系統中有消息隊列、消息生產者和消息消費者,生產者負責產生消息,消費者負責處理消息。
消息隊列滿足哪些特性
- 消息有序性:消息是異步處理的,但是消費者需要按照生產者發送消息的順序來消費,避免出現后發送的消息被先處理的情況。
- 重復消息處理:當因為網絡問題出現消息重傳時,消費者可能收到多條重復消息。同樣的消息重復多次可能造成同一業務邏輯被多次執行,在這種情況下,應用系統需要確保冪等性。
- 可靠性:保證一次性傳遞消息。如果發送消息時消費者不可用,那么消息隊列會保留消息,直到成功傳遞它。當消費者重啟后,可以繼續讀取消息進行處理,防止消息遺漏。
- LPUSH:生產者使用LPUSH命令將消息插入隊列頭部,如果key不存在則會創建一個空的隊列再插入消息,LPUSH命令的返回值表示插入隊列的消息個數。
- 使用BLPOP、BRPOP阻塞讀取的命令,消費者在讀取隊列沒有數據時會自動阻塞,直到有新的消息寫入隊列,才繼續讀取消息執行業務邏輯。
注意:Lists方式實現的消息隊列并沒有提供類似kafka的消費者組的概念,由多個消費者組成一個消費者組來分擔處理隊列消息的任務。如果,生產者發送消息的速度過快,消費者處理不過來,則會導致消息積壓,占用過多的內存。從redis5.0版本開始,可以使用Stream來實現。
重復消費解決方案
- 消息隊列自動為每條消息生成一個全局ID。
- 生產者為每條消息創建一個全局ID,消費者把處理過的消息ID記錄下來判斷是否重復。
其實這就是冪等,對于同一條消息,消費者收到后處理一次的結果和處理多次的結果是一致的。
消息可靠性解決方案
消費者讀取消息,處理過程中宕機了就會導致消息沒有處理完成,可是數據已經不在隊列中了。這種現象的本質就是消費者在處理消息時崩潰了,無法再讀取消息,缺乏一個消息確認的可靠機制。可以使用BLMOVE命令,以阻塞的方式從原隊列中讀取消息,同時把這條消息復制到另一個隊列中(備份),并且是原子操作。
設計思路
- 生產者使用LPUSH命令將消息依次存入隊列隊頭。
- 消費者消費消息時在while循環使用BLMOVE,以阻塞的方式從隊列隊尾彈出消息,同時把該消息復制到備份隊列中,該操作是原子性的。
- 如果消費消息成功,就是用LREM把備份隊列中的對應的消息刪除,從而實現ACK確認機制。
- 如果消費異常,那么應用程序使用BRPOP命令從備份隊列再次讀取消息。
代碼實現
// 定義隊列名稱 private MAIN_QUEUE = 'main_queue';private BACKUP_QUEUE = 'backup_queue';private DEAD_LETTER_QUEUE = 'dead_letter_queue';// 定義最大重試次數 private MAX_RETRIES = 3;// 定義消息過期時間(單位:毫秒) private MESSAGE_EXPIRATION_TIME = 60000;// 生產者:向主隊列中添加消息 async produceMessage(message) {try {const msgId = Date.now() + '_' + Math.random().toString(36).slice(2);const fullMsg = {id: msgId,content: message,retries: 0,createdAt: Date.now()};await this.redis.lpush(this.MAIN_QUEUE, JSON.stringify(fullMsg));this.logger.info(`Produced message: ${JSON.stringify(fullMsg)}`);} catch (error) {this.logger.error(`Error producing message: ${error}`);}}// 消費者:從主隊列消費消息 async consumeMessages(processMessageCallback: (msg: any) => Promise<void>) {while (true) {try {// 原子性地將消息從主隊列移到備份隊列 const msg = await this.redis.blmove(this.MAIN_QUEUE,this.BACKUP_QUEUE,'RIGHT','LEFT',0);if (msg) {const parsedMsg = JSON.parse(msg);// 檢查消息是否過期 if (Date.now() - parsedMsg.createdAt > this.MESSAGE_EXPIRATION_TIME) {// 消息過期,將其移動到死信隊列 await this.redis.lrem(this.BACKUP_QUEUE, 1, msg);await this.redis.lpush(this.DEAD_LETTER_QUEUE, JSON.stringify(parsedMsg));this.logger.warn(`Message ${parsedMsg} has expired and moved to dead letter queue.`);continue;}this.logger.info(`Processing message: ${parsedMsg}`);//消息處理 await processMessageCallback(parsedMsg);// 確認消費成功,從備份隊列中移除消息 await this.redis.lrem(this.BACKUP_QUEUE, 1, msg);}} catch (error) {this.logger.error(`Error consuming message:${error}`);// 從備份隊列取出消息進行重試 const [, retryMsg] = await this.redis.brpop(this.BACKUP_QUEUE, 0);const parsedRetryMsg = JSON.parse(retryMsg);// 增加重試次數 parsedRetryMsg.retries++;if (parsedRetryMsg.retries >= this.MAX_RETRIES) {// 達到最大重試次數,將消息移到死信隊列 await this.redis.lpush(this.DEAD_LETTER_QUEUE, JSON.stringify(parsedRetryMsg));this.logger.warn(`Message ${parsedRetryMsg} moved to dead letter queue.`);} else {// 未達到最大重試次數,重新放回主隊列 await this.redis.lpush(this.MAIN_QUEUE, JSON.stringify(parsedRetryMsg));this.logger.info(`Message ${parsedRetryMsg} will be retried.`);}}}}
集合(Set)
Redis 的 Set 是 string 類型的無序集合。集合是通過哈希表實現的,所以添加,刪除,查找的復雜度都是 O(1)。
使用場景
需要存儲多個元素,并且要求不能出現重復數據,無須考慮元素的有序性時,可以使用Set。Set還支持在集合之間做交集、并集、差集操作。例如統計如下場景中多個集合元素的聚合結果。
- 統計多個元素的共有數據(交集)。
- 對于兩個集合,統計其中的一個獨有元素(差集)。
- 統計多個集合的所有元素(并集)。
常見的使用場景如下:
- 社交軟件中共同關注:通過交集實現。
- 每日新增關注數:對近兩天的總注冊用戶量集合取差集。
- 打標簽:為自己收藏的每一篇文章打標簽,例如微信收藏功能,這樣可以快速找到被添加了某個標簽的所有文章。
代碼實現
async getCommonElements(keys:RedisKey[], storeKey:RedisKey) {if (!keys || keys.length < 2) {throw new Error("至少需要2個集合鍵");}// 執行交集操作 if (storeKey) {// 存儲結果到新鍵并返回數量 const count = await this.redis.sinterstore(storeKey, ...keys);return count;} else {// 直接獲取交集數據 const elements = await this.redis.sinter(...keys);return elements;}}
哈希(Hash)
Redis hash 是一個鍵值(key=>value)對集合,類似于一個小型的 NoSQL 數據庫。
Redis hash 是一個 string 類型的 field 和 value 的映射表,hash 特別適合用于存儲對象。
每個哈希最多可以存儲 2^32 - 1 個鍵值對。
常用命令
- HSET key field value:設置哈希表中字段的值。
- HGET key field:獲取哈希表中字段的值。
- HGETALL key:獲取哈希表中所有字段和值。
- HDEL key field:刪除哈希表中的一個或多個字段。
- DEL key:刪除整個哈希表。
- HLEN key:查詢散列表中有多少個field-value pairs。
- HSETNX key value:當key不存在時配置value,否則什么也不干。
有序集合(Sorted Set)
Sorted Set和Set類似,是一種集合類型,這種集合中不會出現重復的member(數據)。它們之間的區別在于:Sorted Set中的元素由兩部分組成,分別是member和score。
member會關聯一個double類型的score,Sorted Set默認會根據這個score對member從小到大進行排序,如果member關聯的score相同,則按照字符串的字典順序排序。
應用場景
- 排行榜:維護大型在線游戲中根據分數排名的Top10有序列表。
- 速率限流器:根據排序集合構建滑動窗口速率限制器。
- 延遲隊列:使用score存儲過期時間,從小到大排序,最靠前的就是最先到期的數據。
例如,很多地方都會用到排行榜功能,如微博熱搜、游戲戰力排行榜等,我們可以使用sorted sets實現一個實時游戲高分排行榜。
玩家的得分越高,排名越靠前,如果分數相同則先達到該分數的玩家排在前面,游戲排行榜提供的功能如下:
- 按照分數從高到低排名,查詢前N位玩家的信息。
- 新注冊玩家,需要把新玩家信息添加到排行榜中。
- 能查看某個玩家的排名和分數。
sorted sets的每個元素都由member和score兩部分組成,可以利用score進行排序,正好滿足我們的需求。用score保存玩家的游戲得分,member保存玩家ID。那么如何實現最先達到該分數的玩家排在前面這個功能。我們可以指定一個非常大的時間作為基準時間,時間排序值 = (基準時間-玩家達到分數時間)/基準時間。
以上公式得到的結果一定小于1,正好可以作為score的小數部分。越早達到,這個值就越大,滿足排序要求。
Bitmap
在移動應用的業務場景中,我們需要保存這樣的信息:一個key關聯了一個數據集合。
常見的場景如下:
- 用戶在線狀態統計:可以使用Bitmap來記錄用戶的在線狀態,其中每位表示一個用戶的在線狀態(在線為1,離線為0).這樣可以高效地統計在線用戶數量和在線用戶的分布情況。
- 用戶簽到記錄:Bitmap可以用于記錄用戶的簽到情況,其中每位表示一個日期(已簽到為1,未簽到為0).這樣可以輕松統計用戶的連續簽到天數、活躍用戶數等信息。
- 頁面點擊量統計:Bitmap可以用于統計網站的頁面點擊量,其中每位表示一個頁面的點擊情況(點擊為1,未點擊為0).這樣可以快速獲取每個頁面的點擊量以及總點擊量。
面向bit的操作是在字符串類型上定義的,將bitmap存儲在字符串中,每個字符都是由8bit組成的數據,其中的每位只能是1或0.字符串類型的最大容量是512MB,所以一個Bitmap最多可配置2^32個不同位。
bitmap解決的是二值狀態統計場景問題。也就是集合中的元素的值只有0和1兩種,在簽到打卡和用戶是否登錄的場景中,只需記錄簽到或未簽到。
假如我們使用redis的字符串類型判斷用戶是否登錄,如果以字符串的形式存儲100萬個用戶的登錄狀態,那么需要存儲100萬個字符串,內存開銷太大。因為redis是用c語言編寫的redis提供的字符串類型本質上還是c語言中的字節數組,末尾還需要加上"\0"的那種,除此之外,redis還在此基礎上封裝了一層,記錄數組已使用的長度,數組分配的空間總長度等,組成一個結構體。這些都是需要占用額外的存儲空間。
bitmap的底層使用字符串類型的SDS數據結構來保存位數組,redis把每字節數組的8bit利用起來,每位表示一個元素的二值狀態(不是1就是0).可以將bitmap看作一個以bit為單位的數組,數組的每位只能存儲0或者1,數組的每位下標在bitmap中叫做offset偏移量。
使用場景
用戶登錄判斷,bitmap提供了GETBIT、SETBIT操作,通過一個偏移值offset對bit數組的偏移量offset的bit進行讀/寫操作,需要注意的是,offset從0開始。bitmap的大小為byte的整數倍,例如你使用了31個bit,但是實際上為占用32bit,沒用的會自動補0。
SETBIT:
例如:記錄用戶在2025年3月1日和2025年3月31日打卡情況。
SETBIT uid:sign:xxxx:202503 0 1
SETBIT uid:sign:xxx:202503 31 1
執行以上兩個命令后,bitmap的數據就是100000000000000000000000000010(一共32bit)。
相關命令
-
SETBIT:
- 設置或清除位的值。
- 語法:
SETBIT key offset value
- 例子:
SETBIT mykey 7 1
(將mykey
的第 8 位設為1
)
-
GETBIT:
- 獲取指定偏移量的位的值。
- 語法:
GETBIT key offset
- 例子:
GETBIT mykey 7
(獲取mykey
的第 8 位的值)
-
BITCOUNT:
- 計算字符串中被設置為
1
的位的數量。 - 語法:
BITCOUNT key [start end]
- 例子:
BITCOUNT mykey
(計算mykey
中所有位中1
的數量)
- 計算字符串中被設置為
-
BITOP:
- 對一個或多個字符串執行按位運算(AND、OR、XOR、NOT)。
- 語法:
BITOP operation destkey key [key ...]
- 例子:
BITOP AND resultkey key1 key2
(對key1
和key2
執行按位與運算,并將結果存儲在resultkey
中)
-
BITPOS:
- 找到字符串中第一個設置為
1
或0
的位的索引。 - 語法:
BITPOS key bit [start end]
- 例子:
BITPOS mykey 1
(查找mykey
中第一個1
的位置)
- 找到字符串中第一個設置為
使用場景
- 用戶行為記錄: 通過 Bitmap 可以高效地記錄某個用戶是否在某一天執行了某個操作,比如簽到。
- 大規模布爾值存儲: 可以在極小的空間內存儲大量布爾值。
- 快速統計: 使用
BITCOUNT
可以快速統計某個集合或用戶群體的某個行為發生次數。
Bitmap 的優點是空間效率高和操作速度快,適合用于需要處理大量布爾值或類似布爾值的數據場景。通過這些命令,你可以非常靈活地操作位數組,并根據具體需求進行優化和調整。
HyperLogLog
在移動互聯網的業務場景中,數據量很大,系統需要保存這樣的信息:一個key關聯了一個數據集合,同時將這個數據集合以統計報表的形式呈現給運營人員。例如:
- 統計一個App的日活、月活人數。
- 統計一個頁面每天被多少個不同賬戶訪問。
- 統計用戶每天搜索不同詞條的個數。
- 統計注冊IP地址數。
這些是典型的HyperLogLog(基數統計)應用場景。基數統計指統計一個集合中不重復元素的數量,這些不重復的元素被稱為基數。
基數統計
HyperLogLog是一種概率數據結構,用于估計集合的基數。每個HyperLogLog最多消耗12kb內存,在標準誤差0.81%的前提下,可以計算2^64個元素的基數。其主要特點如下。
- 高效存儲:HyperLogLog的內存消耗是固定的,與集合中的元素數量無關。這使得它特別適用于處理大規模數據集,因為它不需要存儲所有不同的元素,只需要存儲估計基數所需的信息。
- 概率估計:HyperLogLog提供的結果是概率性的,不是精確的基數計數。它通過哈希函數將輸入元素映射到Bitmap中的某些位置,并基于Bitmap的統計信息來估計基數的。由于這是一種概率方法,因此可能存在一定的誤差,但在實際應用中,這個誤差通常可以接受。
- 高速計算:HyperLogLog可以在常量時間內計算估計的基數,無論集合的大小如何。
應用場景
HyperLogLog的主要使用場景是基數統計。例如,海量網頁訪問量的統計,統計文章每天被多少用戶訪問過,一個用戶一天訪問多次只能記一次。
對于上面的場景,可以使用Sets、Bitmap和HyperLogLog來解決。
- Sets:統計精度高,對于少量的數據統計建議使用,大量的數據統計會占用很大的內存空間。
- Bitmap:位圖算法,統計精度高,內存占用比Sets少,但是在統計大量數據時還是會占用大量內存。
- HyperLogLog:存在一定的誤差,占用內存少(穩定占用12kb左右)。
Redis 中的 HyperLogLog 是一種概率性數據結構,用于估計集合中唯一元素的基數(即不重復元素的數量)。HyperLogLog 的優點是它占用極小的內存空間(大約 12 KB),即使是非常大的數據集也能保持這種低內存消耗。雖然它是一個估計器,但誤差率非常小(大約 0.81%)。
以下是 Redis 中與 HyperLogLog 相關的命令:
-
PFADD:
- 向 HyperLogLog 添加元素。
- 語法:
PFADD key element [element ...]
- 例子:
PFADD hllkey user1 user2 user3
(將user1
、user2
和user3
添加到hllkey
)
-
PFCOUNT:
- 返回存儲在 HyperLogLog 中的唯一元素的近似數量。
- 語法:
PFCOUNT key [key ...]
- 例子:
PFCOUNT hllkey
(返回hllkey
中估計的唯一元素數量)
-
PFMERGE:
- 合并多個 HyperLogLog 數據結構為一個。
- 語法:
PFMERGE destkey sourcekey [sourcekey ...]
- 例子:
PFMERGE mergedkey hllkey1 hllkey2
(將hllkey1
和hllkey2
合并到mergedkey
)
Geospatial
基于位置服務(LBS)
經緯度是由經度與維度組成的坐標系統,又稱為地理坐標系統。LBS是圍繞用戶當前地理位置的數據而展開的服務,為用戶提供精準的“邂逅”服務。LBS的特點如下:
- 以“我”為中心,搜索附件的Ta。
- 以“我”當前的地理位置為準,計算出別人和“我”之間的距離。
- 按“我”與別人距離的遠近排序,篩選出離“我”最近的用戶。
Redis 提供了 Geospatial 數據類型,用于存儲和查詢地理空間信息(如經緯度坐標)。這些命令非常適用于需要處理地理位置信息的應用,比如基于位置的服務、地圖應用等。
GeoHash編碼
GeoHash編碼是將二維經緯度編碼轉換為一維,為地址位置分區的一種算法。其核心思想是區間二分:將地球編碼看成一個二維平面,然后將這個平面遞歸均分為更小的子塊。這個過程可以分為三步。
- 將經、緯度分別變成一個N位二進制數。
- 將經、緯度的二進制數合并。
- 按照Base32進行編碼。
經緯度編碼
GeoHash編碼會把一個經度編碼成一個N位的二進制數,例如對經度范圍(-180,180)做√次二分區操作,其中N可以自定義。
在進行第一次二分區時,經度范圍[-180,180]會被分成(-180,0)和[0,180]兩個子區間(我稱之為左、右分區)。
此時,我們可以查看一下要編碼的經度值落在了左分區還是右分區。如果落在左分區,就用0表示;如果落在右分區,就用1表示。這樣一來,每做完一次二分區,我們就可以得到1立編碼值(不是0就是1)。
再對經度值所屬的分區做一次二分區,查看經度值落在了二分區后的左分區還是右分區,然后按照剛才的規則再做1位編碼。當做完N次二分區后,經度值就可以用一個N位的數來表示了。
所有的地圖元素坐標都被放置于唯一的方格中,分區次數越多,方格越小,坐標越精確。
然后對這些方格進行整數編碼,距離越近的方格編碼越接近。
編碼之后,每個地圖元素的坐標都將變成一個整數,通過這個整數可以還原出元素的坐標,整數越長,還原出來的坐標值的損失就越小。對于“附近的人”這個功能而言,損失的一點精度可以忽略不計。
例如,對經度值169.99進行4位編碼(N=4,做4次分區),把經度區間(-180,180)分成了左分區(-180,0)和右分區[0,180]。
- 169.99屬于右分區,使用1表示第一次分區編碼。
- 再將169.99經過第一次劃分所屬的[0,180]區間繼續分成(0,90)和[90,180],169.99依然在右區間,編碼為1。
- 將[90,180]分為(90,135)和[135,180],這次落在左分區,編碼為0。
緯度的編碼思路與經度一樣,不再贅述。
Geospatial 命令
-
GEOADD:
- 將地理空間位置添加到指定的鍵中。
- 語法:
GEOADD key longitude latitude member [longitude latitude member ...]
- 例子:
GEOADD locations 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"
-
GEOPOS:
- 返回一個或多個成員的地理空間位置(經緯度)。
- 語法:
GEOPOS key member [member ...]
- 例子:
GEOPOS locations "Palermo" "Catania"
-
GEODIST:
- 返回兩個給定成員之間的距離。
- 語法:
GEODIST key member1 member2 [unit]
- 可選單位:
m
(米),km
(公里),mi
(英里),ft
(英尺) - 例子:
GEODIST locations "Palermo" "Catania" km
-
GEORADIUS (已棄用,推薦使用
GEOSEARCH
):- 在給定的圓形范圍內查找成員,基于給定的經緯度。
- 語法:
GEORADIUS key longitude latitude radius unit [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
- 例子:
GEORADIUS locations 15 37 200 km WITHDIST
-
GEORADIUSBYMEMBER (已棄用,推薦使用
GEOSEARCH
):- 在給定的圓形范圍內查找成員,基于給定的已有成員的位置。
- 語法:
GEORADIUSBYMEMBER key member radius unit [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
- 例子:
GEORADIUSBYMEMBER locations "Palermo" 100 km WITHDIST
-
GEOSEARCH:
- 根據指定的半徑或邊界框條件搜索地理位置。
- 語法:
GEOSEARCH key FROMMEMBER member|FROMLOC long lat BYRADIUS radius unit|BYBOX width height unit [ASC|DESC] [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count]
- 例子:
GEOSEARCH locations FROMMEMBER "Palermo" BYRADIUS 100 km ASC WITHDIST
-
GEOSEARCHSTORE:
- 執行
GEOSEARCH
并將結果存儲在一個新的或現有的鍵中。 - 語法:
GEOSEARCHSTORE dest key FROMMEMBER member|FROMLOC long lat BYRADIUS radius unit|BYBOX width height unit [ASC|DESC] [COUNT count] [STOREDIST]
- 例子:
GEOSEARCHSTORE nearby_locations locations FROMMEMBER "Palermo" BYRADIUS 100 km
- 執行
Stream
stream是redis5.0專門為消息隊列設計的數據類型,借鑒kafka的消費組的設計思想,提供消費者組的概念,同時提供消息的持久化和主從復制機制。客戶端可以訪問任何時刻的數據,并且能記住每個客戶端的訪問位置,從而保證消息不丟失。
需要注意的是,stream是一種超輕量級的MQ,并沒有完全實現消息隊列的所有設計要點,所以它的使用場景需要考慮業務的數據量和對性能】可靠性的需求。對于系統消息量不大、可以容忍數據丟失的場景,使用stream作為消息隊列就能享受高性能快速讀/寫消息的優勢。
Redis Streams 是一種強大的數據結構,旨在支持高效的消息隊列和事件流處理。它允許在 Redis 中存儲和處理時間序列數據,提供了強大的功能用于生產者和消費者之間的消息傳遞。以下是 Redis Streams 相關的主要命令及其功能:
Stream 命令
-
XADD:
- 向流中添加新的條目。
- 語法:
XADD key ID field value [field value ...]
- ID 可以為
*
,讓 Redis 自動生成一個唯一 ID。 - 例子:
XADD mystream * sensor-id 1234 temperature 19.8
-
XRANGE:
- 讀取流中某個范圍的條目。
- 語法:
XRANGE key start end [COUNT count]
start
和end
可以為特殊值-
和+
,表示流的開始和結束。- 例子:
XRANGE mystream - + COUNT 10
-
XREVRANGE:
- 以相反的順序讀取流中某個范圍的條目。
- 語法:
XREVRANGE key end start [COUNT count]
- 例子:
XREVRANGE mystream + - COUNT 10
-
XLEN:
- 返回流中的條目數。
- 語法:
XLEN key
- 例子:
XLEN mystream
-
XREAD:
- 從一個或多個流中讀取數據。
- 語法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- 支持阻塞操作,等待新的條目。
- 例子:
XREAD STREAMS mystream 0
-
XGROUP CREATE:
- 創建消費者組。
- 語法:
XGROUP CREATE key groupname id|$ [MKSTREAM]
id
可以為$
,表示從最后一個條目開始消費。- 例子:
XGROUP CREATE mystream mygroup $
-
XREADGROUP:
- 讀取消息并標記為已被消費者組消費。
- 語法:
XREADGROUP GROUP groupname consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- 例子:
XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
-
XACK:
- 確認消息已被處理。
- 語法:
XACK key groupname id [id ...]
- 例子:
XACK mystream mygroup 1526569495631-0
-
XPENDING:
- 查看消費者組的待處理消息。
- 語法:
XPENDING key groupname [start end count] [consumer]
- 例子:
XPENDING mystream mygroup
-
XCLAIM:
- 將消息的所有權轉移到另一個消費者。
- 語法:
XCLAIM key groupname consumer min-idle-time id [id ...] [RETRYCOUNT count] [FORCE] [JUSTID]
- 例子:
XCLAIM mystream mygroup consumer2 0 1526569495631-0
-
XDEL:
- 刪除流中的條目。
- 語法:
XDEL key id [id ...]
- 例子:
XDEL mystream 1526569495631-0
-
XTRIM:
- 修剪流以減少其長度。
- 語法:
XTRIM key MAXLEN [~] count
- 例子:
XTRIM mystream MAXLEN 1000
使用場景
- 消息隊列: 用于實現高效的生產者-消費者模型。
- 事件溯源: 在事件驅動架構中記錄和處理事件。
- 日志處理: 實時收集和分析日志數據。
Redis Streams 提供了一種高效且靈活的方法來處理實時數據流,適合多種應用場景,如實時數據處理、日志分析、消息隊列等。通過消費者組和阻塞讀取等功能,Redis Streams 可以處理復雜的數據流管理需求。
const Redis = require('ioredis');
const redis = new Redis();const streamKey = 'mystream';
const groupName = 'mygroup';
const consumerName = 'consumer1';// 創建消費者組
async function createConsumerGroup() {try {await redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');console.log(`Consumer group ${groupName} created.`);} catch (err) {if (err.message.includes('BUSYGROUP Consumer Group name already exists')) {console.log(`Consumer group ${groupName} already exists.`);} else {throw err;}}
}// 生產者:向流中添加消息
async function produceMessages() {const messageId = await redis.xadd(streamKey, '*', 'field1', 'value1');console.log(`Produced message with ID: ${messageId}`);
}// 消費者:讀取并確認消息
async function consumeMessages() {while (true) {try {const messages = await redis.xreadgroup('GROUP', groupName, consumerName,'BLOCK', 0,'STREAMS', streamKey, '>');if (messages) {messages.forEach(([_stream, entries]) => {entries.forEach(([id, fields]) => {console.log(`Consumed message ID: ${id}, fields: ${fields}`);// 處理消息后,確認消息redis.xack(streamKey, groupName, id);console.log(`Acknowledged message ID: ${id}`);});});}} catch (err) {console.error('Error reading from stream:', err);break;}}
}(async () => {await createConsumerGroup();produceMessages(); // 生產消息// 啟動消費者,可以啟動多個消費者consumeMessages();})();
Radix Tree
前綴樹被用于高效存儲和查找字符串集合。它將字符串按照前綴拆分成一個個字符,并將每個字符作為一個節點存儲在樹中。
當插入一個field-value pairs時,redis會將field拆分成一個個字符,并根據字符在radix tree中的位置找到合適的節點,如果該節點不存在,則創建新節點并添加到radix tree中。
當所有字符添加完畢后,將值對象指針保存到最后一個節點中。當查詢一個field時,redis按照字符順序遍歷radix tree,如果發現某個字符不存在于樹中,則表示field不存在;如果最后一個節點表示一個完整的field,則返回對應的值對象。利用前綴樹就可以避免相同字符串被重復存儲。
Redis高性能的原因
-
基于內存實現
讀、寫操作都是在內存上完成的。內存直接由cpu控制,也就是由cpu內部集成內存控制器,所以說內存是直接與cpu對接的,享受與cpu通信的“最優帶寬”。redis將數據存儲在內存中,讀/寫操作不會被磁盤的I/O速度限制。 -
I/O多路復用模型
redis采用I/O多路復用技術并發處理連接。采用epoll+自己實現的簡單的事件框架。將epoll中的讀、寫、關閉、連接都轉化為事件,再利用epoll的多路復用特性實現一個ae高性能網絡事件處理框架,絕不在I/O上浪費時間。
在解釋I/O多路復用之前,我們先了解下基本I/O操作會經歷什么。
一個基本的網絡I/O模型處理get請求時,會經歷如下過程。- 服務端bind/listen綁定IP地址并監聽指定端口的請求,與客戶端建立accept。
- 從socket讀取請求recv。
- 解析客戶都安發送的請求parse。
- 指向get命令。
- send執行相應客戶端數據,也就是向socket寫回數據。
其中,bind、accept、recv、parse和send都屬于網絡I/O處理,而get命令屬于鍵-值數據操作。既然redis是單線程的,那么,最基本的實現就是在一個線程中依次執行上述操作。其中的關鍵是accept和recv會出現阻塞,當redis監聽到一個客戶端有連接請求,但一直未能成功建立連接時,會阻塞在accept函數,導致其他客戶端無法和redis建立連接。類似地,當redis通過recv函數從一個客戶端讀取數據時,如果數據一直沒有到達,那么redis就會一直阻塞在recv函數。
“多路”指多個socket連接,“復用”指共同使用一個線程。多路復用主要有select、poll和epoll三種技術。epoll的基本原理是,內核不監視應用程序本身的連接,而是監視應用程序的文件描述符。客戶端在運行時會生成具有不同事件類型的套接字。在服務器端,I/O多路復用程序會將消息放入隊列,然后通過文件事件分派器將其轉發到不同的事件處理器。
簡單來說,在單線程條件下,內核會一直監聽socket上的連接請求或者數據請求,一旦有請求到達就交給redis線程處理,這就實現了一個redis線程處理多個I/O流的效果。
select/epoll提供了基于事件的回調機制,即針對不同事件調用不同的事件處理器。所以,redis一直在處理事件,響應性能得到了提升。redis線程不會阻塞在某一個特定的監聽或套接字上,也就是說,不會阻塞在某一個特定的客戶端請求處理上。因此,redis可以同時和多個客戶端連接并處理請求,以提升并發能力。 -
單線程模型
單線程指redis的網絡I/O以及field-value pairs命令讀/寫是由一個線程來執行的。redis的持久化、集群數據同步,異步刪除等操作都是其他線程執行的。
單線程高性能的原因
redis選擇使用單線程處理命令以及高性能的主要原因如下:- 不會因為創建線程消耗性能。
- 避免上下文切換引起的cpu消耗,沒有多線程切換的開銷。
- 避免了線程之間的競爭問題,例如添加鎖、釋放鎖、死鎖等。
- 代碼更清晰。
-
高效的數據結構
redis通過一個散列表來保存所有的key-value,散列表的本質就是數組+鏈表,數組的槽位被叫做哈希桶。每個桶的entry保存指向具體key和value的指針。key是string類型,value的數據類型可以是5種中的任意一種。
我們可以把redis看作一個全局散列表,而全局散列表的時間復雜度是O(1)。通過計算每個鍵的哈希值,可以知道對應的哈希桶位置,再通過哈希桶的entry找到對應的數據,這也是redis快的原因之一。