https://build-your-own.org/redis/
事件循環怎么實現的
首先我將連接包裝為一個 Connect 類,它包含了 socket fd,讀寫緩沖區,連接狀態(這個連接是發送數據還是接收數據)等成員屬性
我會在全局維護一個從 socket fd 到它對應的 Connect 對象指針的 map,叫做 fd2conn
在 main 函數中首先 socket、bind、listen 創建一個 listen fd,然后創建一個 vector<struct pollfd> poll_args
,接著進入事件循環(一個死循環)
在事件循環中首先將 poll_args 清空,然后壓入 listen fd 對應的 pollfd;然后遍歷 fd2conn,根據每個 Connect 對象的狀態(發送狀態還是接收狀態)決定 pollfd 中要監聽的事件(POLLIN | POLLOUT),把 fd2conn 中的每一項都壓入 poll_args 數組,然后調用 poll,傳入 poll_args 監聽這些文件描述符
等到 poll 返回后,再遍歷 poll_args,根據其 fd 在 fd2conn 中找到對應的 Connect 對象,執行 IO 操作。如果 IO 操作結束后 Connect 的狀態變為 END,則關閉這個連接并從 map 中刪除
最后再判斷一下 0 號 poll_args 是否有就緒事件(listen fd),如果有,則說明有新連接到來,再去調用 accept 函數創建新的連接。accept 的結果會在 fd2conn 這個 map 中放入對應的 socket fd 和 Conn 對象
連接定時器是如何實現的
連接定時器是通過雙向循環鏈表串起來的,我在 Connect 結構中還會維護一個 us 級的時間戳 idle_start
,它的意義是“上一次操作這個連接的時刻”,也就是開始空閑的時刻,在 Connect 的構造函數里初始化為當前時刻,構造完成后把它掛到鏈表末尾
idle_start
在每一次處理 IO 時都會被刷新為當前時刻(說明它當前活躍),然后從鏈表中取下來,再掛到末尾,這樣就保證了雙向鏈表自動是遞增有序的,這個序就是每個連接的空閑開始時刻,越位于前面的,空閑開始的越早,越可能超時
在每一次 poll 之前,我都去從鏈表首部獲取最小的那個時間戳,這意味著這個連接空閑開始的最早,擁有最小的超時時刻,把這個時間戳加上一個 timeout 值,例如 5ms,然后求這個值減去當前時刻,就是需要最小阻塞等待的時長,然后 poll 的第三個超時參數傳入這個值即可
在每一次事件循環的末尾,我都會去從前往后處理這個雙向鏈表管理的定時器:如果空閑開始時間戳 + timeout < 當前時刻,說明超時了,就把這個定時器取下,然后關閉這個連接。直到遇到一個不超時的連接,此時退出循環即可,因為鏈表是自增有序的
支持哪些命令
set k1 v1
get k1
del k1
keys# zadd zset_name score name
zadd zs1 1.0 abc
zadd zs1 1.5 def
zrem zs1 def
zscore zs1 def
zquery zs1 1.5 def $offset $limitpexpire k1 5000
pttl k1
解釋客戶端命令發送到服務端的過程
客戶端從命令行參數獲取到 “set k1 v1” 這個命令,然后調用 socket、connect 建立連接;服務端在本輪事件循環末尾 accept 這個連接,將它放入全局的 fd2conn 的 map 中,設置 idle_start 為當前時刻,把它插入 idle_list 鏈表的末尾
客戶端建立連接后,緊接著調用 write 發送數據,也就是 “set k1 v1” 這個字符串;服務端這時已經進入到下一輪事件循環,在這輪循環中,poll 監聽到 fd 可讀,返回,然后輪詢 poll_args,輪詢到該客戶端 fd 的可讀,進而處理該客戶端發來的數據
解釋 set 命令的執行過程
客戶端發來 “set k1 v1”
服務端首先從 fd 讀取數據到 read buffer 中,然后將這3個單詞解析到一個 vector<string>
,然后根據第一個單詞是 set,進入到 do_set 函數
在 do_set 中首先去全局的 hashmap 中查找對應的 key 是否存在,查找的過程是在2個 hashtable 都查詢(因為漸進式哈希用到 2 個哈希表),在 hashtable 中查詢的過程是先根據哈希值 % 哈希桶長度定位到哈希桶(實際上哈希桶長度永遠是 2 ^ n,故只需要 hashcode & (n - 1) 即可),然后遍歷哈希桶后面的鏈表,如果存在 hashcode 一樣且判等函數返回 true 的節點,則返回該節點
如果在全局 hashmap 中查到了 k1 已經存在,則將它對應的 value 改為 v1;否則新建一個 Entry,把它的掛到全局 hashmap
HashMap 怎么實現的
// hashtable node, should be embedded into the payload
struct HNode {HNode *next = NULL;uint64_t hcode = 0;
};// a simple fixed-sized hashtable
struct HTab {HNode **tab = NULL;size_t mask = 0;size_t size = 0;
};// the real hashtable interface.
// it uses 2 hashtables for progressive resizing.
struct HMap {HTab ht1; // newerHTab ht2; // oldersize_t resizing_pos = 0;
};
HashMap 也采用侵入式容器,我采用拉鏈法解決哈希沖突,每個鏈表節點(HNode)只存儲一個 hashcode 和指向下一個節點的指針,并不存儲真正的 key、value 數據
然后定義一個 HTable 結構,它有3個成員:HNode* 的數組 tab;數組長度 - 1 的 mask,因為限制了數組長度只能為 2 ^ n,所以定位哈希桶時就避免了 mod 運算;還有標志著當前哈希表總節點數量的 size
最后定義真正的 HMap 結構,它采用漸進式哈希的策略,擁有 2 個 HTable,還有一個標志著 rehash 進度位置的 pos 域
解釋 HashMap 的插入過程
insert() 的參數是 HNode*,一個 HashMap 會管理2個哈希桶,ht1 和 ht2,插入操作都是在 ht1 中進行
插入 ht1 的過程是先用 hnode 的 hashcode & mask 求得哈希桶的位置,然后執行單鏈表頭插,最后 size++
在 ht1 中插入之后判斷 ht2 是否為 null,若為 null,再判斷 ht1 的負載因子是否過高,如果過高,則將 ht1 轉移給 ht2,ht1.resize,然后設置 HashMap 的 resizing_pos = 0,這個過程可以看作是開啟 rehash 過程
解釋 rehash 的過程
漸進式 rehash 就是不一次性 rehash 全部節點,而是只在每一步操作 HashTable 的過程中 rehash 固定數目的節點
rehash 開始的標志是在向 ht1 中插入一個節點后,判斷 ht2 為空且負載因子 > 8,此時將 ht1 轉移給 ht2,ht1.resize,然后設置 HashMap 的 resizing_pos = 0,resize_pos 表示在 ht2 中 rehash 的進度
然后在每一次操作哈希表的函數中,包括查詢,新增,刪除,都會執行一次 rehash 函數,它會循環執行 128 次:從 ht2 的哈希桶的 resizing_pos 位置摘下頭節點,將其 detach 下來,再插入到 ht1 的 HashTable。直到循環次數滿或 ht2 被摘空
一旦 ht2 被摘空,就意味著 rehash 終止(因為他是作為 rehash 的循環條件之一)
rehash 的過程中,對哈希表的插入照常在 ht1 中執行,且只有在 ht2 被摘空后,才會再次計算負載因子,開始下一次 rehash
解釋 HashMap 的刪除過程
pop 傳入的是一個 HNode* key 和一個判等函數 eq。首先按需執行 rehash,結束后先在 ht1 中 lookup:先根據 key 中的 hcode & mask 定位到哈希桶,然后在它后面的單鏈表中遍歷查找,遇到 hcode 一致且 eq 函數返回 true 的節點,將其返回
如果在 ht1 中查找結果不為空,則將其 detach 返回(從鏈表中摘下);否則在 ht2 中執行同樣的步驟
如果 ht1 和 ht2 查找都為空,說明待刪除的節點不存在,返回 null
哈希沖突你怎么解決的
我會在每一個涉及到在 HashTab 中查找操作的函數都傳入一個判等函數 eq,它的參數是2個 HNode*,用于真正判斷這兩個節點是否相等,例如當使用 HashMap 存儲 KV 時,它會根據 HNode* 調用 container_of,找到對應的 Entry 結構(真正存儲 string KV)的,然后判斷 string key 是否相等
HashMap 如何是如何管理 KV 的
struct Entry {HNode node;std::string key;std::string val;uint32_t type = 0;ZSet* zset = NULL;size_t heap_idx = -1;Entry(const std::string& k) {key = k;node.hcode = str_hash(k);}
};
我定義了一個 Entry 結構,它包含 string 類型的 KV,同時包含了一個 HNode 類型(就是哈希表中單鏈表的節點類型)
HNode 中只有一個 hachode 和指向下一個 HNode 的指針,hashcode 在 Entry 構造時被初始化
這樣,查找某個 string key 時,只需要將 key 構造為 Entry(棧內存),然后把這個 Entry 的 hnode 交給 HMap 的 lookup 方法查找即可。lookup 返回的是一個 HNode*,就是真正存儲了這個 key 的 hashcode 的 HNode,然后使用 container_of 即可獲得包裹了這個 HNode 的 Entry 指針
set k v 時,也是先將 key 構造為 Entry,然后把這個 Entry 的 hnode 交給 HMap 的 lookup 方法查找。如果返回 NULL,就說明這對 KV 不存在,需要 new Entry,然后將它的 hnode 成員插入全局的 HashMap 中;否則就使用 container_of 定位到包含這個 hnode 的 Entry,然后修改它的 value
換句話說,KV 數據只是被簡單地存儲在堆內存中,將它的 HNode 域掛到 HashMap 上進行管理和查找
哈希函數如何實現的
哈希函數的作用是將 string 轉化為 int64,采用的策略是先初始化一個較大的素數h,然后不斷地將字符串中的字節數據累加到 h 上,并乘以一個魔法值,最終得到一個 int64 的魔法值
解釋 get 命令的執行過程
客戶端發來“get k”命令,服務器將其解析為 vector<string>
,判斷其第一個單詞是 get,調用 do_get 函數進行處理
在 do_get 中,首先用發來的 k 構造一個 Entry(初始化 Entry 中 HNode 的 hcode),然后將這個 Entry 的 HNode 丟給 HashMap 的 lookup 方法進行查找(和判等函數),如果未找到,序列化一個 nil 給客戶端
如果找到節點,調用 container_of,定位到包裹這個 HNode 的 Entry 對象指針,獲取其 string val 字段,序列化給客戶端
解釋 del 命令的執行過程(異步)
客戶端發來“del k”命令,服務器將其解析為 vector<string>
,判斷其第一個單詞是 del,調用 do_del 函數進行處理
在 do_del 中,首先用發來的 k 構造一個 Entry(初始化 Entry 中 HNode 的 hcode),然后將這個 Entry 的 HNode 丟給 HashMap 的 pop 方法,在 HashMap 中刪除這個 HNode
如果這個 HNode 在哈希表中存在,則使用 container_of 找到包裹該 HNode 的對象指針,執行刪除
在刪除函數中,判斷該對象是否為 ZSet,如果是,再判斷它是否 too_big,如果是,將其壓入線程池的任務隊列執行異步刪除;否則執行同步刪除即可(因為線程調度是有開銷的,所以只有對大塊數據才執行異步刪除)
解釋 keys 命令的執行過程
使用 h_scan 函數掃描哈希桶,然后在掃描每個哈希桶后面的單鏈表,在每個節點上調用 container_of 獲取包裹這個 HNode 的節點,然后獲取它的 key 值,序列化給客戶端
解釋一下侵入式數據結構(以雙向鏈表為例)
侵入式雙向鏈表節點只包含 prev 和 next 兩個指針,沒有數據域。然后基于這個節點定義實現鏈表的操作,比如 insert 和 detach
對比傳統容器,將被容器管理的對象“塞入”容器(也就是 STL 的風格);侵入式容器需要在被管理的對象內增加一個數據域,也就是鏈表節點,相當于被管理的對象包裹住了鏈表節點
我在項目中是用雙向鏈表管理連接對象 Connect 的,所以我的 Connect 類里面有一個 DList 類型的數據成員 idle_list(因為鏈表管理的是 Connect 的超時時長),向容器中插入對象就是 new Connect,然后將這個對象的鏈表節點數據域掛到鏈表上;從容器中刪除對象就是將這個對象的鏈表節點從鏈表上摘除,然后 delete 這個對象
侵入式容器的工作過程需要一個 container_of 指針計算函數,一般用宏函數實現(因為涉及到傳入類型、字段名作為參數,不符合 C++ 的函數語法)
它的參數有3個:鏈表節點指針,包裹鏈表節點的數據類型(也就是 Connect 類型),鏈表節點在被管理對象中的字段名字(也就是 idle_list)
它的功能是根據這三者定位到包裹鏈表節點的數據對象的指針
它的原理是用鏈表節點指針減去鏈表節點數據域在整個 Connect 對象中的 offset,就得到了 Connect 對象的地址
這個 container_of 函數在所有的侵入式容器中都是通用的
怎么用 HashMap + AVLTree 實現的 ZSet
struct AVLNode {uint32_t depth;uint32_t cnt;AVLNode *left;AVLNode *right;AVLNode *parent;AVLNode(): depth(1), cnt(1), left(nullptr), right(nullptr), parent(nullptr) {}void init() {depth = cnt = 1;left = right = parent = nullptr;}void update();AVLNode *rot_left();AVLNode *rot_right();AVLNode *fix_left();AVLNode *fix_right();
};AVLNode *avl_fix(AVLNode *node);
AVLNode *avl_del(AVLNode *node);
AVLNode *avl_offset(AVLNode *node, int64_t offset);struct ZNode {AVLNode tree;HNode hmap;double score = 0;std::string name;ZNode(std::string name, double score) {this->name = name;hmap.hcode = str_hash(name);this->score = score;}ZNode* offset(int64_t offset) {auto tnode = avl_offset(&tree, offset);return tnode ? container_of(tnode, ZNode, tree) : nullptr;}
};struct ZSet {AVLNode* tree = NULL;HMap hmap;public:bool add(const std::string& name, double score);ZNode* look_up(const std::string& name);ZNode* pop(const std::string& name);ZNode* query(double score, const std::string& name);~ZSet();private:void tree_add(ZNode* node);void update(ZNode* node, double score);
};
解釋 zadd 命令的執行過程
// the structure for the key
struct Entry {HNode node;std::string key;std::string val;uint32_t type = 0;ZSet* zset = NULL;size_t heap_idx = -1;Entry(const std::string& k) {key = k;node.hcode = str_hash(k);}
};
在 Entry 中存儲了一個 type 用于區分 key 代表的是 KV 鍵值對還是 ZSet
在 do_zadd 函數中,首先使用 zset 的名字構造一個 Entry(初始化它的 HNode 中的 hcode),然后在全局的 HashMap 中查找這個 Entry 中的 HNode。如果未找到,則 new 一個 Entry,設置它的 type 為 ZSET,并將它的 HNode 插入到 HashMap;否則如果能找到,則使用 container_of 找到包裹這個 HNode 的 Entry*,判定其為 ZSET
ZSet 有一個 AVL 樹結構和一個 HMap,前者用于存儲 score,后者用于存儲 name;還有增加、查詢、刪除、范圍查詢方法
然后執行 ZSet 的 add 方法,它的參數是 (name, score)
在 add 方法中,首先在 ZSet 的 HMap 中查詢 name 是否存在,若存在,則執行更新操作:將 ZNode 中的 AVLNode 節點從 AVL 樹上摘除,將 score 改為新值,再插入到 AVL 樹;否則說明 name 不存在,則 new ZNode,將它的 name 插入到當前 ZSet 持有的 HMap,再把當前節點的 AVLNode 成員掛到 AVL 樹上
解釋 zrem 的執行過程
zrem zs1 kk1
首先在全局的 HashMap 中根據 zset_name 查找對應的 Entry 是否存在,若存在,再判定其 type == ZSET
然后執行 ZSet 的 pop 方法:首先根據 name 構造一個 HKey(類似于 Entry),在當前 ZSet 管理的 HMap 中 pop
若 pop 返回非 null,說明該節點存在,則使用 container_of 找到包裹它的 ZNode,再將它的 AVLNode tree
從樹上刪除
解釋 zscore 的執行過程
zscore zs1 kk1
首先在全局的 HashMap 中根據 zset_name 查找對應的 Entry 是否存在,若存在,再判定其 type == ZSET
然后執行 ZSet 的 lookup 方法:根據 name 構造一個 HKey(類似于 Entry),在當前 ZSet 管理的 HMap 中 lookup,最后使用 container_of 返回包裹它的 ZNode
如果返回非 null,則解引用 ZNode 的 score 字段將其作為 double 序列化給客戶端;否則序列化給客戶端 nil
解釋 zquery 的執行過程(范圍查詢)
小頂堆是怎么實現的
// heap.h
struct HeapItem {uint64_t val = 0;size_t *ref = NULL;
};
// 節點上濾或下濾
void heap_update(HeapItem *a, size_t pos, size_t len);// 在 server.cpp 中定義一個全局的 heap
std::vector<HeapItem> heap;
由于堆在邏輯上是一顆完全二叉樹,所以可以借助一個數組來實現
解釋 pexpire 的執行過程(如何使用小頂堆實現 TTL)
pexpire kk1 5000
pexpire 命令的第二個參數為一個 int 類型的毫秒值,代表過期時長
在 Entry 中,存儲了一個 heap_idx,作為堆的數組下標
在 do_expire 中,首先根據傳入的 kk1 構造一個 Entry,將其 HNode 丟給全局的 HMap 的 lookup 查找,如果存在,再調用 container_of 找到包裹這個 HNode 的 Entry 對象指針,再將這個指針和 ttl_ms 傳入 entry_set_ttl 設置 TTL
在 entry_set_ttl 中,首先根據 ent->heap_idx 定位到堆中的 HeapItem,如果 pos = heap_idx == -1,則需要新建一個 HeapItem,讓其 ref 指向 ent->heap_idx,然后將其 pushback 進堆末尾,設置 pos = heap.size() - 1
然后 heap[pos].val = get_monotonic_usec() + (uint64_t)ttl_ms * 1000;
,再調整堆
解釋 pttl 的執行過程
pttl kk1
在 Entry 中,存儲了一個 heap_idx,作為堆的數組下標
在 do_ttl 中,首先根據傳入的 kk1 構造一個 Entry,將其 HNode 丟給全局的 HMap 的 lookup 查找,如果存在,再調用 container_of 找到包裹這個 HNode 的 Entry 對象指針 ent
然后根據 ent->heap_idx 在堆中定位到 HeapItem,讀出它的 val 字段,也就是過期時刻 expire_at
然后用這個時刻減去當前時刻,就是 ttl,將它序列化給客戶端即可
過期 key 是怎么實現自動刪除的
我會在每個事件循環的末尾處理定時器,包括連接的定時器還有堆定時器
在處理堆定時器時,因為它是小頂堆,所以堆頂的元素是過期時刻最小的,是最可能超時的
while 循環判斷堆頂定時器超時時刻是否小于當前時刻,如果小于,則在全局的 HMap 中將這個 key pop 掉,然后在 entry_set_ttl 中將其 ttl 設置為 -1,一旦設置為 -1,就會被從堆中刪除
數據序列化和反序列化如何實現的
tiny-redis 支持多種數據類型的序列化,比如 nil、str、int64、double、err 等
先定義一系列表示數據類型的枚舉
enum {SER_NIL = 0,SER_ERR = 1,SER_STR = 2,SER_INT = 3,SER_DBL = 4,SER_ARR = 5,
};
序列化時首先 push_back 數據類型枚舉值,然后 append 進真正需要序列化的值,對于字符串需要先 append 進字符串長度
static void out_nil(std::string &out) {out.push_back(SER_NIL);
}static void out_str(std::string &out, const std::string &val) {out.push_back(SER_STR);uint32_t len = (uint32_t)val.size();out.append((char *)&len, 4);out.append(val);
}static void out_int(std::string &out, int64_t val) {out.push_back(SER_INT);out.append((char *)&val, 8);
}static void out_dbl(std::string &out, double val) {out.push_back(SER_DBL);out.append((char *)&val, 8);
}static void out_err(std::string &out, int32_t code, const std::string &msg) {out.push_back(SER_ERR);out.append((char *)&code, 4);uint32_t len = (uint32_t)msg.size();out.append((char *)&len, 4);out.append(msg);
}
客戶端遵循同樣的協議進行反序列化:先讀取數據類型,然后讀取數據內容
線程池是怎么實現的
struct Work {void (*f)(void *) = NULL;void *arg = NULL;
};struct ThreadPool {std::vector<pthread_t> threads;std::deque<Work> queue;pthread_mutex_t mu;pthread_cond_t not_empty;ThreadPool(size num_threads);void enqueue(void (*f)(void *), void *arg);
}
線程池中包含如下的成員:
- 包含所有線程 id 的數組
- 任務隊列,其中每一項任務都是一個函數指針 + 泛型參數指針
- 互斥鎖
- 標志任務隊列不為空的條件變量
創建線程池時 pthread_create() 創建指定數目的線程,并初始化鎖和條件變量
向任務隊列添加任務時,先獲取鎖,然后 push_back,然后喚醒條件變量,最后解鎖
每個任務是一個死循環:先解鎖,判斷任務隊列是否為空,若為空則循環等待條件變量;否則從任務隊列中取出任務,然后解鎖,最后執行任務