基于C語言實現內存型數據庫(kv存儲)
文章目錄
- 基于C語言實現內存型數據庫(kv存儲)
- 1. 項目背景
- 1.1 Redis介紹
- 1.2 項目預期及基本架構
- 2. 服務端原理及代碼框架
- 2.1 網絡數據回環的實現
- 2.2 array的實現
- 2.3 rbtree的實現
- 2.4 btree的實現
- 2.5 hash的實現
- 2.6 dhash的實現
- 2.7 skiplist的實現
- 2.8 kv存儲協議的實現
- 3. 性能測試
- 4. 項目總結及改進思路
- 源代碼倉庫見Github:kv-store倉庫
- 參考視頻:“零聲教育”的“linux基礎架構-Kv存儲”。
- 其他源碼:協程。
1. 項目背景
1.1 Redis介紹
??本項目主要想仿照Redis的交互方式,實現一個基本的“內存型數據庫”,所以首先來介紹一下Redis。隨著互聯網的普及,只要是上網的APP基本上都需要和相應的服務器請求數據,通常來說,這些數據被服務器保存在“磁盤”上的文件中,稱之為“磁盤型數據庫”。但是面對海量用戶時(比如秒殺活動),磁盤IO的讀寫速率不夠快從而導致用戶體驗下降,并且服務器數據庫的壓力也非常大。鑒于很多請求只是讀取數據,這就啟發我們將一些熱點數據存放在內存中,以便快速響應請求、并且減輕磁盤的讀寫壓力。
當然,上述只是一個初步的想法,后續如何清理內存數據、分布式存儲等可以參考B站的科普視頻,講的非常簡潔易懂:
- 【趣話Redis第一彈】我是Redis,MySQL大哥被我坑慘了!—“緩存穿透、緩存擊穿、緩存雪崩”、“定時刪除、惰性刪除、內存淘汰”
- 【趣話Redis第二彈】Redis數據持久化AOF和RDB原理一次搞懂!—“RDB+AOF”
- 【趣話Redis第三彈】Redis的高可用是怎么實現的?哨兵是什么原理?—“主觀下線、客觀下線”、“哨兵選舉”、“故障轉移”
- 趣話Redis:Redis集群是如何工作的?—“哈希桶”、“集群工作+主從復制”
下面是一些典型的面試題:
- 為什么要使用Redis?
- 從高并發上來說:直接操作緩存能夠承受的請求是遠遠大于直接訪問數據庫的,所以我們可以考慮把數據庫中的部分數據轉移到緩存中去。這樣用戶的一部分請求會直接到緩存,而不用經過數據庫。
- 從高性能上來說:用戶第一次訪問數據庫中的某些數據,因為是從硬盤上讀取的,所以這個過程會比較慢。將該用戶訪問的數據存在緩存中,下一次再訪問這些數據的時候就可以直接從緩存中獲取了。操作緩存就是直接操作內存,所以速度相當快。
- 為什么要使用Redis而不是其他的,例如Java自帶的map或者guava?
??緩存分為本地緩存和分布式緩存,像map或者guava就是本地緩存。本地緩存最主要的特點是輕量以及快速,生命周期隨著jvm的銷毀而結束。在多實例的情況下,每個實例都需要各自保存一份緩存,緩存不具有一致性。redis或memcached之類的稱為分布式緩存,在多實例的情況下,各實例共用一份緩存數據,緩存具有一致性。
- Redis應用場景有哪些?
- 緩存熱點數據,緩解數據庫的壓力。
- 利用Redis原子性的自增操作,可以實現計數器的功能。比如統計用戶點贊數、用戶訪問數等。
- 分布式鎖。在分布式場景下,無法使用單機環境下的鎖來對多個節點上的進程進行同步。可以使用Redis自帶的SETNX命令實現分布式鎖,除此之外,還可以使用官方提供的RedLock分布式鎖實現。
- 簡單的消息隊列。可以使用Redis自身的發布/訂閱模式或者List來實現簡單的消息隊列,實現異步操作。
- 限速器。可用于限制某個用戶訪問某個接口的頻率,比如秒殺場景用于防止用戶快速點擊帶來不必要的壓力。
- 好友關系。利用集合的一些命令,比如交集、并集、差集等,實現共同好友、共同愛好之類的功能。
- 為什么Redis這么快?
- Redis是基于內存進行數據操作的Redis使用內存存儲,沒有磁盤IO上的開銷,數據存在內存中,讀寫速度快。
- 采用IO多路復用技術。Redis使用單線程來輪詢描述符,將數據庫的操作都轉換成了事件,不在網絡I/O上浪費過多的時間。
- 高效的數據結構。Redis每種數據類型底層都做了優化,目的就是為了追求更快的速度。
- 參考視頻:為什么要使用Redis?、Redis的應用場景有哪些?、Redis,好快!
1.2 項目預期及基本架構
??于是我們現在就來實現這個“內存型數據庫”,本項目使用C語言,默認鍵值對key-value都是char*
類型。如上圖所示,我們希望“客戶端”可以和“服務端”通訊,發送相應的指令并得到相應的信息。比如“客戶端”插入一個新的鍵值對“(name: humu)”,那么就發送“SET name humu”;“服務端”接收到這個數據包后,執行相應的操作,再返回“OK”給“客戶端”。鑒于kv存儲需要強查找的數據結構,我們可以使用rbtree、btree、b+tree、hash、dhash、array(數據量不多,比如http頭)、skiplist、list(性能低不考慮)。最終,下表列出了我們要實現的所有數據結構及其對應的指令:
操作/ 數據結構 | 插入 | 查找 | 刪除 | 計數 | 存在 |
---|---|---|---|---|---|
array | SET key value | GET key | DELETE key | COUNT | EXIST key |
rbtree | RSET key value | RGET key | RDELETE key | RCOUNT | REXIST key |
btree | BSET key value | BGET key | BDELETE key | BCOUNT | BEXIST key |
hash | HSET key value | HGET key | HDELETE key | HCOUNT | HEXIST key |
dhash | DHSET key value | DHGET key | DHDELETE key | DHCOUNT | DHEXIST key |
skiplist | ZSET key value | ZGET key | ZDELETE key | ZCOUNT | ZEXIST key |
備注 | 返回OK/Fail, 表示插入鍵值對是否成功 | 返回對應的value | 返回OK/Fail, 刪除對應的鍵值對 | 返回當前數據結構中 存儲的鍵值對數量 | 返回True/False, 判斷是否存在對應的鍵值對 |
??進一步,由于網絡編程中的“Hello,World!程序”就是實現一個echo,收到什么數據就原封不動的發送回去。所以我們希望在此基礎上,將kv存儲寫成一個獨立的進程,和網絡收發相關代碼隔離開,進而提升代碼的可維護性。另外在網絡協議的選擇中,由于我們的鍵值對設置通常較短只有十幾個字符(比如set key value
),而http協議的協議頭就有幾十個字符,有效數據占比太低;udp協議只能在底層網卡確認對方收到,但沒法在應用層確認,所以不可控;于是我們網絡通信協議選擇tcp。于是對于“服務端”,我們就可以有如下的架構設計:

- 網絡層:負責收發數據。本項目中都是“字符串”。
- 協議層:將“網絡層”傳輸過來的字符串進行拆解,若為無效指令直接返回相應的提示信息;若為有效指令則傳遞給“引擎層”進行進一步的處理,根據“引擎層”的處理結果給出相應的返回信息。
- 引擎層:分為6種存儲引擎,每種存儲引擎都可以進行具體的增、刪、查等操作,也就是實現上表給出的5種命令。
- 存儲層:注意“內存型數據庫”的數據在內存中,但若后續需要“持久化”也會將數據備份到磁盤中。
2. 服務端原理及代碼框架
2.1 網絡數據回環的實現
??在使用原生的socket庫函數進行網絡通信時,會一直阻塞等待客戶端的連接/通信請求,這個線程就做不了其他的事情,非常浪費資源。于是“reactor模式”應運而生,也被稱為“基于事件驅動”,核心點在于:注冊事件、監聽事件、處理事件。也就是說,線程找了一個“秘書”專門負責去監聽網絡端口是否有“網絡通信”的發生,線程就可以去做其他的事情;等到線程想處理“網絡通信”的時候一起全部通知給該線程,然后這個“秘書”繼續監聽。顯然,有這樣一個“秘書”存在,可以將“網絡通信”、“業務處理”分隔開,一個線程可以同時處理多個客戶端的請求/通信,也就實現了“IO多路復用一個線程”。下面是常見的三種reactor模式:
- reactor單線程模型:只分配一個線程。顯然若線程的“業務處理”時間過長,會導致“秘書”積壓的事件過多,甚至可能會丟棄一些事件。本模型不適合計算密集型場景,只適合業務處理非常快的場景(本項目就是業務處理非常快)。
- reactor多線程模型:分配一個主線程和若干子線程。主線程只負責處理“網絡通信”,“業務處理”則交給子線程處理。本模式的好處是可以充分利用多核CPU性能,但是帶來了線程安全的問題。并且只有一個線程響應“網絡通信”,在瞬時高并發的場景下容易成為性能瓶頸。
- 主從reactor多線程模型:在上述多線程模型的基礎上,再額外開辟出新的子線程專門負責“與客戶端通信”,主線程則只負責“連接請求”。
參考B站視頻:【Java面試】介紹一下Reactor模式
下面使用epoll作為“秘書”,采用“reactor單線程模型”,完成網路數據回環(echo),也就是“服務端”程序框架的“網絡層”:
main.c–共356行
/** zv開頭的變量是zvnet異步網絡庫(epoll)。* kv開頭的變量是kv存儲協議解析。
*/
#include<stdio.h>
#include<stdlib.h>
#include<string.h>#include<errno.h>
#include<unistd.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<fcntl.h>
#include<sys/epoll.h>#include"kvstore.h"/*-------------------------------------------------------*/
/*-----------------------異步網路庫-----------------------*/
/*-------------------------------------------------------*/
/*-----------------------函數聲明-------------------------*/
#define max_buffer_len 1024 // 讀寫buffer長度
#define epoll_events_size 1024 // epoll就緒集合大小
#define connblock_size 1024 // 單個連接塊存儲的連接數量
#define listen_port_count 1 // 監聽端口總數// 有如下參數列表和返回之類型的函數都稱為 CALLBACK
// 回調函數,方便在特定epoll事件發生時執行相應的操作
typedef int (*ZV_CALLBACK)(int fd, int events, void *arg);
// 回調函數:建立連接
int accept_cb(int fd, int event, void* arg);
// 回調函數:接收數據
int recv_cb(int clientfd, int event, void* arg);
// 回調函數:發送數據
int send_cb(int clientfd, int event, void* arg);// 單個連接
typedef struct zv_connect_s{// 本連接的客戶端fdint fd;// 本連接的讀寫bufferchar rbuffer[max_buffer_len];size_t rcount; // 讀buffer的實際大小char wbuffer[max_buffer_len];size_t wcount; // 寫buffer的實際大小size_t next_len; // 下一次讀數據長度(讀取多個包會用到)// 本連接的回調函數--accept()/recv()/send()ZV_CALLBACK cb;
}zv_connect;// 連接塊的頭
typedef struct zv_connblock_s{struct zv_connect_s *block; // 指向的當前塊,注意大小為 connblock_sizestruct zv_connblock_s *next; // 指向的下一個連接塊的頭
}zv_connblock;// 反應堆結構體
typedef struct zv_reactor_s{int epfd; // epoll文件描述符// struct epoll_event events[epoll_events_size]; // 就緒事件集合struct zv_connblock_s *blockheader; // 連接塊的第一個頭int blkcnt; // 現有的連接塊的總數
}zv_reactor;// reactor初始化
int init_reactor(zv_reactor *reactor);
// reator銷毀
void destory_reactor(zv_reactor* reactor);
// 服務端初始化:將端口設置為listen狀態
int init_sever(int port);
// 將本地的listenfd添加進epoll
int set_listener(zv_reactor *reactor, int listenfd, ZV_CALLBACK cb);
// 創建一個新的連接塊(尾插法)
int zv_create_connblock(zv_reactor* reactor);
// 根據fd從連接塊中找到連接所在的位置
// 邏輯:整除找到所在的連接塊、取余找到在連接塊的位置
zv_connect* zv_connect_idx(zv_reactor* reactor, int fd);
// 運行kv存儲協議
int kv_run_while(int argc, char *argv[]);
/*-------------------------------------------------------*//*-----------------------函數定義-------------------------*/
// reactor初始化
int init_reactor(zv_reactor *reactor){if(reactor == NULL) return -1;// 初始化參數memset(reactor, 0, sizeof(zv_reactor));reactor->epfd = epoll_create(1);if(reactor->epfd <= 0){printf("init reactor->epfd error: %s\n", strerror(errno));return -1;}// 為鏈表集合分配內存reactor->blockheader = (zv_connblock*)calloc(1, sizeof(zv_connblock));if(reactor->blockheader == NULL) return -1;reactor->blockheader->next = NULL;// 為鏈表集合中的第一個塊分配內存reactor->blockheader->block = (zv_connect*)calloc(connblock_size, sizeof(zv_connect));if(reactor->blockheader->block == NULL) return -1;reactor->blkcnt = 1;return 0;
}// reator銷毀
void destory_reactor(zv_reactor* reactor){if(reactor){close(reactor->epfd); // 關閉epollzv_connblock* curblk = reactor->blockheader;zv_connblock* nextblk = reactor->blockheader;do{curblk = nextblk;nextblk = curblk->next;if(curblk->block) free(curblk->block);if(curblk) free(curblk);}while(nextblk != NULL);}
}// 服務端初始化:將端口設置為listen狀態
int init_sever(int port){// 創建服務端int sockfd = socket(AF_INET, SOCK_STREAM, 0); // io// fcntl(sockfd, F_SETFL, O_NONBLOCK); // 非阻塞// 設置網絡地址和端口struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr_in));servaddr.sin_family = AF_INET; // IPv4servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0,任何地址都可以連接本服務器servaddr.sin_port = htons(port); // 端口// 將套接字綁定到一個具體的本地地址和端口if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){printf("bind failed: %s", strerror(errno));return -1;}// 將端口設置為listen(并不會阻塞程序執行)listen(sockfd, 10); // 等待連接隊列的最大長度為10printf("listen port: %d, sockfd: %d\n", port, sockfd);return sockfd;
}// 將本地的listenfd添加進epoll
int set_listener(zv_reactor *reactor, int listenfd, ZV_CALLBACK cb){if(!reactor || !reactor->blockheader) return -1;// 將服務端放進連接塊reactor->blockheader->block[listenfd].fd = listenfd;reactor->blockheader->block[listenfd].cb = cb; // listenfd的回調函數應該是accept()// 將服務端添加進epoll事件struct epoll_event ev;ev.data.fd = listenfd;ev.events = EPOLLIN;epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, listenfd, &ev);return 0;
}// 創建一個新的連接塊(尾插法)
int zv_create_connblock(zv_reactor* reactor){if(!reactor) return -1;// 初始化新的連接塊zv_connblock* newblk = (zv_connblock*)calloc(1, sizeof(zv_connblock));if(newblk == NULL) return -1;newblk->block = (zv_connect*)calloc(connblock_size, sizeof(zv_connect));if(newblk->block == NULL) return -1;newblk->next = NULL;// 找到最后一個連接塊zv_connblock* endblk = reactor->blockheader;while(endblk->next != NULL){endblk = endblk->next;}// 添加上新的連接塊endblk->next = newblk;reactor->blkcnt++;return 0;
}// 根據fd從連接塊中找到連接所在的位置
// 邏輯:整除找到所在的連接塊、取余找到在連接塊的位置
zv_connect* zv_connect_idx(zv_reactor* reactor, int fd){if(!reactor) return NULL;// 計算fd應該在的連接塊int blkidx = fd / connblock_size;while(blkidx >= reactor->blkcnt){zv_create_connblock(reactor);// printf("create a new connblk!\n");}// 找到這個連接塊zv_connblock* blk = reactor->blockheader;int i = 0;while(++i < blkidx){blk = blk->next;}return &blk->block[fd % connblock_size];
}// 回調函數:建立連接
// fd:服務端監聽端口listenfd
// event:沒用到,但是回調函數的常用格式
// arg:應該是reactor*
int accept_cb(int fd, int event, void* arg){// 與客戶端建立連接struct sockaddr_in clientaddr; // 連接到本服務器的客戶端信息socklen_t len_sockaddr = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len_sockaddr);if(clientfd < 0){printf("accept() error: %s\n", strerror(errno));return -1;}// 將連接添加進連接塊zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);conn->fd = clientfd;conn->cb = recv_cb;conn->next_len = max_buffer_len;conn->rcount = 0;conn->wcount = 0;// 將客戶端添加進epoll事件struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLIN; // 默認水平觸發(有數據就觸發)epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);printf("connect success! sockfd:%d, clientfd:%d\n", fd, clientfd);
}// 回調函數:接收數據
int recv_cb(int clientfd, int event, void* arg){zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);int recv_len = recv(clientfd, conn->rbuffer+conn->rcount, conn->next_len, 0); // 由于當前fd可讀所以沒有阻塞if(recv_len < 0){printf("recv() error: %s\n", strerror(errno));close(clientfd);// return -1;exit(0);}else if(recv_len == 0){// 重置對應的連接塊conn->fd = -1;conn->rcount = 0;conn->wcount = 0;// 從epoll監聽事件中移除epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, clientfd, NULL);// 關閉連接close(clientfd);printf("close clientfd:%d\n", clientfd);}else if(recv_len > 0){conn->rcount += recv_len;// conn->next_len = *(short*)conn->rbuffer; // 從tcp協議頭中獲取數據長度,假設前兩位是長度// 處理接收到的字符串,并將需要發回的信息存儲在緩沖區中printf("recv clientfd:%d, len:%d, mess: %s\n", clientfd, recv_len, conn->rbuffer);// conn->rcount = kv_protocal(conn->rbuffer, max_buffer_len);// 將kv存儲的回復消息(rbuffer)拷貝給wbuffer// printf("msg:%s len:%ld\n", msg, strlen(msg));memset(conn->wbuffer, '\0', max_buffer_len);memcpy(conn->wbuffer, conn->rbuffer, conn->rcount);conn->wcount = conn->rcount;memset(conn->rbuffer, 0, max_buffer_len);conn->rcount = 0;// 將本連接更改為epoll寫事件conn->cb = send_cb;struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLOUT;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, clientfd, &ev);}return 0;
}// 回調函數:發送數據
int send_cb(int clientfd, int event, void* arg){zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);int send_len = send(clientfd, conn->wbuffer, conn->wcount, 0);if(send_len < 0){printf("send() error: %s\n", strerror(errno));close(clientfd);return -1;}memset(conn->wbuffer, 0, conn->next_len);conn->wcount -= send_len;// 發送完成后將本連接再次更改為讀事件conn->cb = recv_cb;struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLIN;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, clientfd, &ev);return 0;
}// 運行kv存儲協議
int kv_run_while(int argc, char *argv[]){// 創建管理連接的反應堆// zv_reactor reactor;zv_reactor *reactor = (zv_reactor*)malloc(sizeof(zv_reactor));init_reactor(reactor);// 服務端初始化int start_port = atoi(argv[1]);for(int i=0; i<listen_port_count; i++){int sockfd = init_sever(start_port+i);set_listener(reactor, sockfd, accept_cb); // 將sockfd添加進epoll}printf("init finish, listening connet...\n");// 開始監聽事件struct epoll_event events[epoll_events_size] = {0}; // 就緒事件集合while(1){// 等待事件發生int nready = epoll_wait(reactor->epfd, events, epoll_events_size, -1); // -1等待/0不等待/正整數則為等待時長if(nready == -1){printf("epoll_wait error: %s\n", strerror(errno));break;}else if(nready == 0){continue;}else if(nready > 0){// printf("process %d epoll events...\n", nready);// 處理所有的就緒事件int i = 0;for(i=0; i<nready; i++){int connfd = events[i].data.fd;zv_connect* conn = zv_connect_idx(reactor, connfd);// 回調函數和下面的的邏輯實現了數據回環if(EPOLLIN & events[i].events){conn->cb(connfd, events[i].events, reactor);}if(EPOLLOUT & events[i].events){conn->cb(connfd, events[i].events, reactor);} }}}destory_reactor(reactor);return 0;
}int main(int argc, char *argv[]){if(argc != 2){printf("please enter port! e.x. 9999.\n");return -1;}// 初始化存儲引擎// kv_engine_init();// 運行kv存儲kv_run_while(argc, argv);// 銷毀存儲引擎// kv_engine_desy();return 0;
}
/*-------------------------------------------------------*/
注:只有251行、346行、352行是后續和kv存儲有關的接口函數。可以發現“網絡層”和“協議層”被很好的隔離開來。

可以看到上述實現了網絡數據回環的功能。并且進一步來說,假如客戶端使用瀏覽器(http協議)對該端口進行訪問,那么對接收到的數據包進行http協議拆包,根據其請求的內容返回相應的信息(如html文件),那么就是我們所熟知的“web服務器”了。為什么是“爛大街”啊,一代比一代卷是吧😭。
2.2 array的實現
??現在使用“數組”來存儲“鍵值對節點”。首先我們可以想到的是,直接定義一個長度足夠大的數組(如1024),然后每次“插入”就直接找第一個空節點,“查找”和“刪除”都是遍歷所有節點。但是數組長度固定是一件很危險的事情,所以我們可以借鑒“內存池”的思想,來自動進行擴容和縮容。

如上圖所示,首先申請一個固定大小的“數組”存儲元素,當在某次“插入”元素發現沒有空節點時,可以直接再申請一塊“數組”,并將當前數組指向這個新數組;同理,當我們“刪除”一個元素時,若發現刪除后當前數組塊為空,可以直接free
掉這塊內存,然后將其前后的內存塊連起來。注意到,為了能幫助我們快速判斷某個數組塊是否為空,還需要在其結構體中定義當前數組塊的有效元素數量count
。
array結構的增/刪/查操作還是比較簡單的,可以直接參考項目源碼中的“array.h”、“array.c”。
2.3 rbtree的實現

??紅黑樹是一種“自平衡的二叉搜索樹”,可謂是耳熟能詳,其增刪查改的操作都已經非常成熟。通俗來說,紅黑樹本質上是一個二叉樹,而一個滿二叉樹的查找性能近似于 O ( log ? N ) O(\log N) O(logN),于是我們便希望每次插入/刪除元素時,紅黑樹都能把自己調整成近乎于滿二叉樹的狀態。具體來說,就是在普通二叉樹的基礎上,只需要為每個節點額外添加一個“顏色信息”,再額外規定一些必須滿足的性質,就可以保證紅黑樹始終平衡。下面是紅黑樹的性質–來源中文維基百科“紅黑樹”:
- 每個節點是紅的或者黑的。
- 根結點是黑的。
- 所有葉子節點都是黑的(葉子節點是nil節點)。
- 紅色節點的子節點必須是黑色。(或者說紅色節點不相鄰)
- 從任一節點到其每個葉子節點的所有簡單路徑,都包含相同數目的黑色節點。
“nil節點”:是一個黑色的空節點,作為紅黑樹的葉子節點。
性質口訣:左根右,根葉黑,不紅紅,黑路同
注:零聲教育提供了能顯示紅黑樹生成步驟的本地HTML文件–“紅黑樹生成步驟.html(39KB)”。
??紅黑樹的查找操作只需要從根節點不斷比較即可,而紅黑樹的插入/刪除操作則有點說法。下面是我按照編程邏輯進行簡化的插入/刪除原理。具體來說,就是每次插入/刪除后都需要進行調整,調整的邏輯如下:
紅黑樹插入:剛插入的位置一定是葉子節點,顏色為紅,然后按照如下方式調整。
// 父節點是黑色:無需操作。 while(父節點是紅色){if(叔節點為紅色){變色:祖父變紅/父變黑/叔變黑、祖父節點成新的當前節點。進入下一次循環。}else(叔節點是黑色,注意葉子節點也是黑色){判斷“祖父節點->父節點->當前節點”的左右走向:LL:祖父變紅/父變黑、祖父右旋。最后的當前節點應該是原來的當前節點。LR:祖父變紅/父變紅/當前變黑、父左旋/祖父右旋。最后的當前節點應該是原來的祖父節點/父節點。RL:祖父變紅/父變紅/當前變黑、父右旋/祖父左旋。最后的當前節點應該是原來的祖父節點/父節點。RR:祖父變紅/父變黑、祖父左旋。最后的當前節點應該是原來的當前節點。然后進入下一次循環。} }
紅黑樹刪除:要刪除的位置一定是 沒有/只有一個 孩子。也就是說,如果要刪除的元素有兩個孩子,那就和其后繼節點交換鍵值對,然后實際刪除這個后繼節點。實際刪除的節點
del_r
為黑色,則將“孩子節點”(沒有孩子就是左側的葉子節點)作為“當前節點”按照如下方式調整。while(當前節點是黑色 && 不是根節點){if(兄弟節點為紅色){if(當前節點是左孩子)父變紅/兄弟變黑、父左旋,當前節點下一循環。else(當前節點是右孩子)父變紅/兄弟變黑、父右旋,當前節點下一循環。}else(兄弟節點為黑色){if(兄弟節點沒有紅色子節點){if(父為黑)父變黑/兄弟變紅,父節點成新的當前節點進入下一循環。else(父為紅)父變黑/兄弟變紅,結束循環(當前節點指向根節點)。}else(兄弟節點有紅色子節點){判斷“父節點->兄弟節點->兄弟的紅色子節點(兩個子節點都是紅色則默認是左)”的走向:LL:紅子變黑/兄弟變父色、父右旋、父變黑,結束循環。LR:紅子變父色、兄弟左旋/父右旋、父變黑,結束循環。RR:紅子變黑/兄弟變父色、父左旋、父變黑,結束循環。RL:紅子變父色、兄弟右旋/父左旋、父變黑,結束循環。“結束循環”等價于當前節點指向根節點。}} } 注意出了循環別忘了將根節點調整成黑色。
根據上述原理,我使用C語言實現了紅黑樹完整的增刪查操作,并增加了檢驗有效性、打印紅黑樹圖的代碼,以及測試代碼。值得注意的是,為了加快開發速度,下面的代碼預設“鍵值對”的類型為int key
、void* value
,也就是只關心int
型的key
、不關心value
,后續將紅黑樹添加進“kv存儲協議”中時會進一步修改:
rbtree_int.c-共905行
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>// 編譯指令:gcc -o main rbtree_int.c
// 本代碼實現紅黑樹,存儲int型key,未指定value。#define RBTREE_DEBUG 1 // 是否運行測試代碼typedef int KEY_TYPE; // 節點的key類型
#define RED 1
#define BLACK 0// 定義紅黑樹單獨節點
typedef struct _rbtree_node {KEY_TYPE key; // 鍵void *value; // 值,可以指向任何類型struct _rbtree_node *left;struct _rbtree_node *right;struct _rbtree_node *parent;unsigned char color; // 不同編譯器的無符號性質符號不同,這里加上unsigned減少意外。/* 對于32位系統,上述只有color是1個字節,其余都是4個字節,所以color放在最后可以節省內存。 */
} rbtree_node;// 定義整個紅黑樹
typedef struct _rbtree{struct _rbtree_node *root_node; // 根節點struct _rbtree_node *nil_node; // 空節點,也就是葉子節點、根節點的父節點
} rbtree;// 存儲打印紅黑樹所需的參數
typedef struct _disp_parameters{// 打印緩沖區char **disp_buffer;// 打印緩沖區的深度,寬度,當前打印的列數int disp_depth;int disp_width;int disp_column;// 樹的深度int max_depth;// 最大的數字位寬int max_num_width;// 單個節點的顯示寬度int node_width;
}disp_parameters;/*----初始化及釋放內存----*/
// 紅黑樹初始化,注意調用完后釋放內存rbtree_free
rbtree *rbtree_malloc(void);
// 紅黑樹釋放內存
void rbtree_free(rbtree *T);/*----插入操作----*/
// 紅黑樹插入
void rbtree_insert(rbtree *T, KEY_TYPE key, void *value);
// 調整插入新節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_insert_fixup(rbtree *T, rbtree_node *cur);/*----刪除操作----*/
// 紅黑樹刪除
void rbtree_delete(rbtree *T, rbtree_node *del);
// 調整刪除某節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_delete_fixup(rbtree *T, rbtree_node *cur);/*----查找操作----*/
// 紅黑樹查找
rbtree_node* rbtree_search(rbtree *T, KEY_TYPE key);/*----打印信息----*/
// 中序遍歷整個紅黑樹,依次打印節點信息
void rbtree_traversal(rbtree *T);
// 以圖的形式展示紅黑樹
void rbtree_display(rbtree *T);
// 先序遍歷,打印紅黑樹信息到字符數組指針
void set_display_buffer(rbtree *T, rbtree_node *cur, disp_parameters *p);/*----檢查有效性----*/
// 檢查當前紅黑樹的有效性:根節點黑色、紅色不相鄰、所有路徑黑高相同
bool rbtree_check_effective(rbtree *T);/*----其他函數----*/
// 在給定節點作為根節點的子樹中,找出key最小的節點
rbtree_node* rbtree_min(rbtree *T, rbtree_node *cur);
// 在給定節點作為根節點的子樹中,找出key最大的節點
rbtree_node* rbtree_max(rbtree *T, rbtree_node *cur);
// 找出當前節點的前驅節點
rbtree_node* rbtree_precursor_node(rbtree *T, rbtree_node *cur);
// 找出當前節點的后繼節點
rbtree_node* rbtree_successor_node(rbtree *T, rbtree_node *cur);
// 紅黑樹節點左旋,無需修改顏色
void rbtree_left_rotate(rbtree *T, rbtree_node *x);
// 紅黑樹節點右旋,無需修改顏色
void rbtree_right_rotate(rbtree *T, rbtree_node *y);
// 計算紅黑樹的深度
int rbtree_depth(rbtree *T);
// 遞歸計算紅黑樹的深度(不包括葉子節點)
int rbtree_depth_recursion(rbtree *T, rbtree_node *cur);/*-----------------------------下面為函數定義-------------------------------*/
// 紅黑樹初始化,注意調用完后釋放內存rbtree_free()
rbtree *rbtree_malloc(void){rbtree *T = (rbtree*)malloc(sizeof(rbtree));if(T == NULL){printf("rbtree malloc failed!");}else{T->nil_node = (rbtree_node*)malloc(sizeof(rbtree_node));T->nil_node->color = BLACK;T->nil_node->left = T->nil_node;T->nil_node->right = T->nil_node;T->nil_node->parent = T->nil_node;T->root_node = T->nil_node;}return T;
}// 紅黑樹釋放內存
void rbtree_free(rbtree *T){free(T->nil_node);free(T);
}// 在給定節點作為根節點的子樹中,找出key最小的節點
rbtree_node* rbtree_min(rbtree *T, rbtree_node *cur){ while(cur->left != T->nil_node){cur = cur->left;}return cur;
}// 在給定節點作為根節點的子樹中,找出key最大的節點
rbtree_node* rbtree_max(rbtree *T, rbtree_node *cur){ while(cur->right != T->nil_node){cur = cur->right;}return cur;
}// 找出當前節點的前驅節點
rbtree_node* rbtree_precursor_node(rbtree *T, rbtree_node *cur){// 若當前節點有左孩子,那就直接向下找if(cur->left != T->nil_node){return rbtree_max(T, cur->left);}// 若當前節點沒有左孩子,那就向上找rbtree_node *parent = cur->parent;while((parent != T->nil_node) && (cur == parent->left)){cur = parent;parent = cur->parent;}return parent;// 若返回值為空節點,則說明當前節點就是第一個節點
}// 找出當前節點的后繼節點
rbtree_node* rbtree_successor_node(rbtree *T, rbtree_node *cur){// 若當前節點有右孩子,那就直接向下找if(cur->right != T->nil_node){return rbtree_min(T, cur->right);}// 若當前節點沒有右孩子,那就向上找rbtree_node *parent = cur->parent;while((parent != T->nil_node) && (cur == parent->right)){cur = parent;parent = cur->parent;}return parent;// 若返回值為空節點,則說明當前節點就是最后一個節點
}// 紅黑樹節點左旋,無需修改顏色
void rbtree_left_rotate(rbtree *T, rbtree_node *x){// 傳入rbtree*是為了判斷節點node的左右子樹是否為葉子節點、父節點是否為根節點。rbtree_node *y = x->right;// 注意紅黑樹中所有路徑都是雙向的,兩邊的指針都要改!// 另外,按照如下的修改順序,無需存儲額外的節點。x->right = y->left;if(y->left != T->nil_node){y->left->parent = x;}y->parent = x->parent;if(x->parent == T->nil_node){ // x為根節點T->root_node = y;}else if(x->parent->left == x){x->parent->left = y;}else{x->parent->right = y;}y->left = x;x->parent = y;
}// 紅黑樹節點右旋,無需修改顏色
void rbtree_right_rotate(rbtree *T, rbtree_node *y){rbtree_node *x = y->left;y->left = x->right;if(x->right != T->nil_node){x->right->parent = y;}x->parent = y->parent;if(y->parent == T->nil_node){T->root_node = x;}else if(y->parent->left == y){y->parent->left = x;}else{y->parent->right = x;}x->right = y;y->parent = x;
}// 調整插入新節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_insert_fixup(rbtree *T, rbtree_node *cur){// 父節點是黑色,無需調整。// 父節點是紅色,則有如下八種情況。while(cur->parent->color == RED){// 獲取叔節點rbtree_node *uncle;if(cur->parent->parent->left == cur->parent){uncle = cur->parent->parent->right;}else{uncle = cur->parent->parent->left;}// 若叔節點為紅,只需更新顏色(隱含了四種情況)// 循環主要在這里起作用if(uncle->color == RED){// 叔節點為紅色:祖父變紅/父變黑/叔變黑、祖父節點成新的當前節點。if(uncle->color == RED){cur->parent->parent->color = RED;cur->parent->color = BLACK;uncle->color = BLACK;cur = cur->parent->parent;}}// 若叔節點為黑,需要變色+旋轉(當前節點相當于祖父節點位置包括四種情況:LL/RR/LR/RL)// 下面對四種情況進行判斷:都是只執行一次else{if(cur->parent->parent->left == cur->parent){// LL:祖父變紅/父變黑、祖父右旋。最后的當前節點應該是原來的當前節點。if(cur->parent->left == cur){cur->parent->parent->color = RED;cur->parent->color = BLACK;rbtree_right_rotate(T, cur->parent->parent);}// LR:祖父變紅/父變紅/當前變黑、父左旋、祖父右旋。最后的當前節點應該是原來的祖父節點/父節點。else{cur->parent->parent->color = RED;cur->parent->color = RED;cur->color = BLACK;cur = cur->parent;rbtree_left_rotate(T, cur);rbtree_right_rotate(T, cur->parent->parent);}}else{// RL:祖父變紅/父變紅/當前變黑、父右旋、祖父左旋。最后的當前節點應該是原來的祖父節點/父節點。if(cur->parent->left == cur){cur->parent->parent->color = RED;cur->parent->color = RED;cur->color = BLACK;cur = cur->parent;rbtree_right_rotate(T, cur);rbtree_left_rotate(T, cur->parent->parent);}// RR:祖父變紅/父變黑、祖父左旋。最后的當前節點應該是原來的當前節點。else{cur->parent->parent->color = RED;cur->parent->color = BLACK;rbtree_left_rotate(T, cur->parent->parent);}}}}// 將根節點變為黑色T->root_node->color = BLACK;
}// 插入
// void rbtree_insert(rbtree *T, rbtree_node *new){
void rbtree_insert(rbtree *T, KEY_TYPE key, void *value){// 創建新節點rbtree_node *new = (rbtree_node*)malloc(sizeof(rbtree_node));new->key = key;new->value = value;// 尋找插入位置(紅黑樹中序遍歷升序)rbtree_node *cur = T->root_node;rbtree_node *next = T->root_node;// 剛插入的位置一定是葉子節點while(next != T->nil_node){cur = next;if(new->key > cur->key){next = cur->right;}else if(new->key < cur->key){next = cur->left;}else if(new->key == cur->key){// 紅黑樹本身沒有明確如何處理key相同節點,所以取決于業務。// 場景1:統計不同課程的人數,相同就+1。// 場景2:時間戳,若相同則稍微加一點// 其他場景:覆蓋、丟棄...printf("Already have the same key=%d!\n", new->key);free(new);return;}}if(cur == T->nil_node){// 若紅黑樹本身沒有節點T->root_node = new;}else if(new->key > cur->key){cur->right = new;}else{cur->left = new;}new->parent = cur;new->left = T->nil_node;new->right = T->nil_node;new->color = RED;// 調整紅黑樹,使得紅色節點不相鄰rbtree_insert_fixup(T, new);
}// 調整刪除某節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_delete_fixup(rbtree *T, rbtree_node *cur){// child是黑色、child不是根節點才會進入循環while((cur->color == BLACK) && (cur != T->root_node)){// 獲取兄弟節點rbtree_node *brother = T->nil_node;if(cur->parent->left == cur){brother = cur->parent->right;}else{brother = cur->parent->left;}// 兄弟節點為紅色:父變紅/兄弟變黑、父單旋、當前節點下一循環if(brother->color == RED){cur->parent->color = RED;brother->color = BLACK;if(cur->parent->left == cur){rbtree_left_rotate(T, cur->parent);}else{rbtree_right_rotate(T, cur->parent);}}// 兄弟節點為黑色else{ // 兄弟節點沒有紅色子節點:父變黑/兄弟變紅、看情況是否結束循環if((brother->left->color == BLACK) && (brother->right->color == BLACK)){// 若父原先為黑,父節點成新的當前節點進入下一循環;否則結束循環。if(brother->parent->color == BLACK){cur = cur->parent;}else{cur = T->root_node;}brother->parent->color = BLACK;brother->color = RED;}// 兄弟節點有紅色子節點:LL/LR/RR/RLelse if(brother->parent->left == brother){// LL:紅子變黑/兄弟變父色/父變黑、父右旋,結束循環if(brother->left->color == RED){brother->left->color = BLACK;brother->color = brother->parent->color;brother->parent->color = BLACK;rbtree_right_rotate(T, brother->parent);cur = T->root_node;}// LR:紅子變父色/父變黑、兄弟左旋/父右旋,結束循環else{brother->right->color = brother->parent->color;cur->parent->color = BLACK;rbtree_left_rotate(T, brother);rbtree_right_rotate(T, cur->parent);cur = T->root_node;}}else{// RR:紅子變黑/兄弟變父色/父變黑、父左旋,結束循環if(brother->right->color == RED){brother->right->color = BLACK;brother->color = brother->parent->color;brother->parent->color = BLACK;rbtree_left_rotate(T, brother->parent);cur = T->root_node;}// RL:紅子變父色/父變黑、兄弟右旋/父左旋,結束循環else{brother->left->color = brother->parent->color;brother->parent->color = BLACK;rbtree_right_rotate(T, brother);rbtree_left_rotate(T, cur->parent);cur = T->root_node;}}}}// 下面這行處理情況2/3cur->color = BLACK;
}// 紅黑樹刪除
void rbtree_delete(rbtree *T, rbtree_node *del){if(del != T->nil_node){/* 紅黑樹刪除邏輯:1. 標準的BST刪除操作(本函數):最紅都會轉換成刪除只有一個子節點或者沒有子節點的節點。2. 若刪除節點為黑色,則進行調整(rebtre_delete_fixup)。*/rbtree_node *del_r = T->nil_node; // 實際刪除的節點rbtree_node *del_r_child = T->nil_node; // 實際刪除節點的子節點// 找出實際刪除的節點// 注:實際刪除的節點最多只有一個子節點,或者沒有子節點(必然在最后兩層中,不包括葉子節點那一層)if((del->left == T->nil_node) || (del->right == T->nil_node)){// 如果要刪除的節點本身就只有一個孩子或者沒有孩子,那實際刪除的節點就是該節點del_r = del;}else{// 如果要刪除的節點有兩個孩子,那就使用其后繼節點(必然最多只有一個孩子)del_r = rbtree_successor_node(T, del);}// 看看刪除節點的孩子是誰,沒有孩子就是空節點if(del_r->left != T->nil_node){del_r_child = del_r->left;}else{del_r_child = del_r->right;}// 將實際要刪除的節點刪除del_r_child->parent = del_r->parent; // 若child為空節點,最后再把父節點指向空節點if(del_r->parent == T->nil_node){T->root_node = del_r_child;}else if(del_r->parent->left == del_r){del_r->parent->left = del_r_child;}else{del_r->parent->right = del_r_child;}// 替換替換鍵值對if(del != del_r){del->key = del_r->key;del->value = del_r->value;}// 最后看是否需要調整if(del_r->color == BLACK){rbtree_delete_fixup(T, del_r_child);}// 調整空節點的父節點if(del_r_child == T->nil_node){del_r_child->parent = T->nil_node;}free(del_r);}
}// 查找
rbtree_node* rbtree_search(rbtree *T, KEY_TYPE key){rbtree_node *cur = T->root_node;while(cur != T->nil_node){if(cur->key > key){cur = cur->left;}else if(cur->key < key){cur = cur->right;}else{return cur;}}printf("There is NO key=%d in rbtree!\n", key);return T->nil_node;
}// 中序遍歷給定結點為根節點的子樹(遞歸)
void rbtree_traversal_node(rbtree *T, rbtree_node *cur){if(cur != T->nil_node){rbtree_traversal_node(T, cur->left);if(cur->color == RED){printf("Key:%d\tColor:Red\n", cur->key);}else{printf("Key:%d\tColor:Black\n", cur->key);}rbtree_traversal_node(T, cur->right);}
}// 中序遍歷整個紅黑樹
void rbtree_traversal(rbtree *T){rbtree_traversal_node(T, T->root_node);
}// 遞歸計算紅黑樹的深度(不包括葉子節點)
int rbtree_depth_recursion(rbtree *T, rbtree_node *cur){if(cur == T->nil_node){return 0;}else{int left = rbtree_depth_recursion(T, cur->left);int right = rbtree_depth_recursion(T, cur->right);return ((left > right) ? left : right) + 1;}
}// 計算紅黑樹的深度
int rbtree_depth(rbtree *T){return rbtree_depth_recursion(T, T->root_node);
}// 獲取輸入數字的十進制顯示寬度
int decimal_width(int num_in){int width = 0;while (num_in != 0){num_in = num_in / 10;width++;}return width;
}// 先序遍歷,打印紅黑樹信息到字符數組指針
void set_display_buffer(rbtree *T, rbtree_node *cur, disp_parameters *p){if(cur != T->nil_node){// 輸出當前節點p->disp_depth++;// 輸出數字到緩沖區char num_char[20];char formatString[20];int cur_num_width = decimal_width(cur->key);int num_space = (p->node_width - 2 - cur_num_width) >> 1; // 數字后面需要補充的空格數量strncpy(formatString, "|%*d", sizeof(formatString));int i = 0;for(i=0; i<num_space; i++){strncat(formatString, " ", 2);}strncat(formatString, "|", 2);snprintf(num_char, sizeof(num_char), formatString, (p->node_width-2-num_space), cur->key);i = 0;while(num_char[i] != '\0'){p->disp_buffer[(p->disp_depth-1)*3][p->disp_column+i] = num_char[i];i++;}// 輸出顏色到緩沖區char color_char[20];if(cur->color == RED){num_space = (p->node_width-2-3)>>1;strncpy(color_char, "|", 2);for(i=0; i<(p->node_width-2-3-num_space); i++){strncat(color_char, " ", 2);}strncat(color_char, "RED", 4);for(i=0; i<num_space; i++){strncat(color_char, " ", 2);}strncat(color_char, "|", 2);}else{num_space = (p->node_width-2-5)>>1;strncpy(color_char, "|", 2);for(i=0; i<(p->node_width-2-5-num_space); i++){strncat(color_char, " ", 2);}strncat(color_char, "BLACK", 6);for(i=0; i<num_space; i++){strncat(color_char, " ", 2);}strncat(color_char, "|", 2);}// strcpy(color_char, (cur->color == RED) ? "| RED |" : "|BLACK|");i = 0;while(color_char[i] != '\0'){p->disp_buffer[(p->disp_depth-1)*3+1][p->disp_column+i] = color_char[i];i++;}// 輸出連接符到緩沖區if(p->disp_depth>1){char connector_char[10];strcpy(connector_char, (cur->parent->left == cur) ? "/" : "\\");p->disp_buffer[(p->disp_depth-1)*3-1][p->disp_column+(p->node_width>>1)] = connector_char[0];}// 下一層需要前進/后退的字符數int steps = 0;if(p->disp_depth+1 == p->max_depth){steps = (p->node_width>>1)+1;}else{steps = (1<<(p->max_depth - p->disp_depth - 2)) * p->node_width;}// 輸出左側節點p->disp_column -= steps;set_display_buffer(T, cur->left, p);p->disp_column += steps;// 輸出右側節點if(p->disp_depth+1 == p->max_depth){steps = p->node_width-steps;}p->disp_column += steps;set_display_buffer(T, cur->right, p);p->disp_column -= steps;p->disp_depth--;}
}// 以圖的形式展示紅黑樹
void rbtree_display(rbtree *T){// 紅黑樹為空不畫圖if(T->root_node == T->nil_node){printf("rbtree DO NOT have any key!\n");return;}// 初始化參數結構體disp_parameters *para = (disp_parameters*)malloc(sizeof(disp_parameters));if(para == NULL){printf("disp_parameters struct malloc failed!");return;}rbtree_node *max_node = rbtree_max(T, T->root_node);para->max_num_width = decimal_width(max_node->key); para->max_depth = rbtree_depth(T);para->node_width = (para->max_num_width<=5) ? 7 : (para->max_num_width+2); // 邊框“||”寬度2 + 數字寬度para->disp_depth = 0;para->disp_width = para->node_width * (1 << (para->max_depth-1)) + 1;para->disp_column = ((para->disp_width-para->node_width)>>1);int height = (para->max_depth-1)*3 + 2;// 根據樹的大小申請內存para->disp_buffer = (char**)malloc(sizeof(char*)*height);int i = 0;for(i=0; i<height; i++){para->disp_buffer[i] = (char*)malloc(sizeof(char)*para->disp_width);memset(para->disp_buffer[i], ' ', para->disp_width);para->disp_buffer[i][para->disp_width-1] = '\0';}// 打印內容set_display_buffer(T, T->root_node, para);for(i=0; i<height; i++){printf("%s\n", para->disp_buffer[i]);}// 釋放內存for(i=0; i<height; i++){free(para->disp_buffer[i]);}free(para->disp_buffer);free(para);
}// 檢查當前紅黑樹的有效性:根節點黑色、紅色不相鄰、所有路徑黑高相同
bool rbtree_check_effective(rbtree *T){bool rc_flag = true; // 根節點黑色bool rn_flag = true; // 紅色不相鄰bool bh_flag = true; // 所有路徑黑高相同if(T->root_node->color == RED){printf("ERROR! root-node's color is RED!\n");rc_flag = false;}else{int depth = rbtree_depth(T);int max_index_path = 1<<(depth-1); // 從根節點出發的路徑總數// 獲取最左側路徑的黑高int black_height = 0;rbtree_node *cur = T->root_node;while(cur != T->nil_node){if(cur->color == BLACK) black_height++;cur = cur->left;// printf("bh = %d\n", black_height);}// 遍歷每一條路徑int i_path = 0;for(i_path=1; i_path<max_index_path; i_path++){int dir = i_path;int bh = 0; // 當前路徑的黑高cur = T->root_node;while(cur != T->nil_node){// 更新黑高if(cur->color == BLACK){bh++;}// 判斷紅色節點不相鄰else{if((cur->left->color == RED) || (cur->right->color == RED)){printf("ERROR! red node %d has red child!\n", cur->key);rn_flag = false;}}// 更新下一節點// 0:left, 1:rightif(dir%2) cur = cur->right;else cur = cur->left;dir = dir>>1;}if(bh != black_height){printf("ERROR! black height is not same! path 0 is %d, path %d is %d.\n", black_height, i_path, bh);bh_flag = false;}}}return (rc_flag && rn_flag && bh_flag);
}
/*------------------------------------------------------------------------*//*-----------------------------下面為測試代碼-------------------------------*/
#if RBTREE_DEBUG
#include<time.h> // 使用隨機數
#include<sys/time.h> // 計算qps中獲取時間
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define ENABLE_QPS 1 // 是否開啟qps性能測試
#define continue_test_len 1000000 // 連續測試的長度
// 冒泡排序
void bubble_sort(int arr[], int len) {int i, j, temp;for (i = 0; i < len - 1; i++)for (j = 0; j < len - 1 - i; j++)if (arr[j] > arr[j + 1]) {temp = arr[j];arr[j] = arr[j + 1];arr[j + 1] = temp;}
}
int main(){/* --------------------定義數組-------------------- */// 預定義的數組// int KeyArray[20] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}; // 正著插// int KeyArray[20] = {20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1}; // 倒著插// int KeyArray[20] = {1,2,3,4,5,10,9,8,7,6,11,12,13,14,15,20,19,18,17,16}; // 亂序插// int KeyArray[31] = {11,12,13,14,15,16,17,18,19,20,1,2,3,4,5,6,7,8,9,10,21,22,23,24,25,26,27,28,29,30,31}; // 亂序插// 順序增長的數組int len_array = 18;int KeyArray[len_array];int i_array = 0;for(i_array=0; i_array<len_array; i_array++){KeyArray[i_array] = i_array + 1;}// // 隨機生成固定大小的隨機數組// int len_array = 18;// int KeyArray[len_array];// srand(time(NULL));// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){// KeyArray[i_array] = rand() % 9999999999;// }/* ------------------以下測試代碼------------------ */printf("-------------------紅黑樹插入測試------------------\n");// 先給輸入的數組排個序int len_max = sizeof(KeyArray)/sizeof(int);printf("測試數組長度: %d\n", len_max);// 將原先的數組深拷貝并升序排序int *KeyArray_sort = (int*)malloc(sizeof(KeyArray));printf("RAND_MAX: %d\n", RAND_MAX);int i = 0;for(i = 0; i < len_max; i++){KeyArray_sort[i] = KeyArray[i];}bubble_sort(KeyArray_sort, len_max);// 申請紅黑樹內存rbtree *T = rbtree_malloc();// 依次插入數據for(i = 0; i < len_max; i++){rbtree_insert(T, KeyArray[i], NULL);}// 遍歷數據,查看是否符合紅黑樹性質// rbtree_display(T);if(rbtree_check_effective(T)){printf("PASS---->插入測試\n");}else{printf("FAIL---->插入測試\n");}// rbtree_display(T);printf("-------------------紅黑樹前驅節點測試------------------\n");int pass_flag = 1;if(rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[0])) != T->nil_node){printf("search first key %d's precursor error! get %d, expected nil_node\n", len_max, rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[0]))->key);pass_flag = 0;}for(i = 1; i<len_max; i++){rbtree_node *precursor = rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[i]));if(precursor->key != KeyArray_sort[i-1]){printf("search key %d error! get %d, expected %d\n", KeyArray_sort[i], precursor->key, KeyArray_sort[i-1]);pass_flag = 0;}}if(pass_flag){printf("PASS---->前驅節點測試\n");}else{printf("FAIL---->前驅節點測試\n");}printf("-------------------紅黑樹后繼節點測試------------------\n");pass_flag = 1;if(rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[len_max-1])) != T->nil_node){printf("search last key %d's successor error! get %d, expected nil_node\n",\KeyArray_sort[len_max-1],\rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[len_max-1]))->key);pass_flag = 0;}for(i = 0; i<len_max-1; i++){rbtree_node *successor = rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[i]));if(successor->key != KeyArray_sort[i+1]){printf("search key %d error! get %d, expected %d\n", KeyArray_sort[i], successor->key, KeyArray_sort[i+1]);pass_flag = 0;}}if(pass_flag){printf("PASS---->后繼節點測試\n");}else{printf("FAIL---->后繼節點測試\n");}printf("-------------------紅黑樹刪除測試------------------\n");// 依次刪除所有元素for(i=0; i<len_max; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[i]));if(!rbtree_check_effective(T)){rbtree_display(T);printf("FAIL---->刪除測試%d\n", i+1);break;}else{printf("PASS---->刪除測試%d\n", i+1);}}printf("-------------------紅黑樹打印測試------------------\n");// 先插入數據1~18,再刪除16/17/18,即可得到4層的滿二叉樹for(i = 0; i < len_max; i++){rbtree_insert(T, KeyArray[i], NULL);}for(i=0; i<3; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[len_max-i-1]));if(!rbtree_check_effective(T)){printf("FAIL---->刪除測試%d\n", KeyArray_sort[len_max-i-1]);break;}else{printf("PASS---->刪除測試%d\n", KeyArray_sort[len_max-i-1]);}}// 打印看看結果rbtree_display(T);// 清空紅黑樹for(i=0; i<len_max; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[i]));}#if ENABLE_QPSprintf("---------------紅黑樹連續插入性能測試---------------\n");// 定義時間結構體struct timeval tv_begin;struct timeval tv_end;gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){rbtree_insert(T, i+1, NULL);}gettimeofday(&tv_end, NULL);int time_ms = TIME_SUB_MS(tv_end, tv_begin);float qps = (float)continue_test_len / (float)time_ms * 1000;printf("total INSERTs:%d time_used:%d(ms) qps:%.2f(INSERTs/sec)\n", continue_test_len, time_ms, qps);printf("---------------紅黑樹連續查找性能測試---------------\n");gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){// rbtree_search(T, i+1);if(rbtree_search(T, i+1)->key != i+1){printf("continue_search error!\n");return 0;}}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total SEARCHs:%d time_used:%d(ms) qps:%.2f(SEARCHs/sec)\n", continue_test_len, time_ms, qps);printf("---------------紅黑樹連續刪除性能測試---------------\n");gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){rbtree_delete(T, rbtree_search(T, i+1));}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total DELETEs:%d time_used:%d(ms) qps:%.2f(DELETEs/sec)\n", continue_test_len, time_ms, qps);
#endifprintf("--------------------------------------------------\n");rbtree_free(T); // 別忘了釋放內存free(KeyArray_sort);return 0;
}
#endif
測試代碼輸出結果
lyl@ubuntu:~/Desktop/kv-store/code_init$ gcc -o main rbtree_int.c
lyl@ubuntu:~/Desktop/kv-store/code_init$ ./main
-------------------紅黑樹插入測試------------------
測試數組長度: 18
RAND_MAX: 2147483647
PASS---->插入測試
-------------------紅黑樹前驅節點測試------------------
PASS---->前驅節點測試
-------------------紅黑樹后繼節點測試------------------
PASS---->后繼節點測試
-------------------紅黑樹刪除測試------------------
PASS---->刪除測試1
PASS---->刪除測試2
PASS---->刪除測試3
PASS---->刪除測試4
PASS---->刪除測試5
PASS---->刪除測試6
PASS---->刪除測試7
PASS---->刪除測試8
PASS---->刪除測試9
PASS---->刪除測試10
PASS---->刪除測試11
PASS---->刪除測試12
PASS---->刪除測試13
PASS---->刪除測試14
PASS---->刪除測試15
PASS---->刪除測試16
PASS---->刪除測試17
PASS---->刪除測試18
-------------------紅黑樹打印測試------------------
PASS---->刪除測試18
PASS---->刪除測試17
PASS---->刪除測試16| 8 | |BLACK| / \ | 4 | | 12 | | RED | | RED | / \ / \ | 2 | | 6 | | 10 | | 14 | |BLACK| |BLACK| |BLACK| |BLACK| / \ / \ / \ / \
| 1 || 3 || 5 || 7 || 9 || 11 || 13 || 15 |
|BLACK||BLACK||BLACK||BLACK||BLACK||BLACK||BLACK||BLACK|
There is NO key=16 in rbtree!
There is NO key=17 in rbtree!
There is NO key=18 in rbtree!
---------------紅黑樹連續插入性能測試---------------
total INSERTs:1000000 time_used:295(ms) qps:3389830.50(INSERTs/sec)
---------------紅黑樹連續查找性能測試---------------
total SEARCHs:1000000 time_used:118(ms) qps:8474576.00(SEARCHs/sec)
---------------紅黑樹連續刪除性能測試---------------
total DELETEs:1000000 time_used:95(ms) qps:10526315.00(DELETEs/sec)
--------------------------------------------------
編程感想:
- 每一次旋轉都是一次謀權篡位。
- 雙旋的時候,最后的“當前節點”應該是原來的“祖父節點/父節點”,若還保留當前身份,那么會造成錯誤。
紅黑樹插入:
- 參考B站:【neko算法課】紅黑樹 插入【11期】
紅黑樹刪除:
- 參考B站:【neko算法課】紅黑樹 刪除【12期】
- 參考微信圖文:圖解:什么是二叉排序樹?–介紹了標準BST刪除操作
- 參考知乎:圖解:紅黑樹刪除篇(一文讀懂)–里面的v對應del_r、u對應del_r_child
紅黑樹打印:
- 參考CSDN:二叉樹生成與打印顯示 c語言實現
2.4 btree的實現

??一般來說,B樹也是一個自平衡的二叉搜索樹。但與紅黑樹不同的是,B樹的節點可以存儲多個元素, m m m階B樹的單個節點,最多有 m ? 1 m-1 m?1 個元素、 m m m 個子節點。并且B樹只有孩子節點、沒有父節點(沒有向上的指針)。也就是說,對于插入/刪除操作,紅黑樹可以先從上往下尋找插入位置,再從下往上進行調整;而B樹要先從上往下調整完(“分裂、合并/借位”),最后在葉子節點進行插入/刪除,而沒有從下往上的過程。即進行插入/刪除時,B樹從上往下只走一次。下面給出一個 m m m階B樹應該滿足的條件(判斷一棵B樹是否有效的依據):
- 每個結點至多擁有 m m m顆子樹。
- 根結點至少擁有兩顆子樹。
- 除了根結點以外,其余每個分支結點至少擁有 m / 2 m/2 m/2棵子樹。
- 所有的葉結點都在同一層上。
- 有 k k k 棵子樹的分支結點則存在 k ? 1 k-1 k?1 個元素,元素按照遞增順序進行排序。
- 單個節點的元素數量 n n n 滿足 ceil ( m / 2 ) ? 1 ≤ n ≤ m ? 1 \text{ceil}(m/2)-1 \le n \le m-1 ceil(m/2)?1≤n≤m?1。
B樹可視化網站:https://www.cs.usfca.edu/~galles/visualization/BTree.html
同樣的,B樹的查找操作只需要從根節點不斷比較即可,而B樹的插入/刪除邏輯如下:
B樹插入:從上往下尋找要插入的葉子節點,過程中要下去的孩子若是滿節點,則進行“分裂”。
- 分裂:取孩子的中間節點(第 ceil m 2 \text{ceil}\frac{m}{2} ceil2m? 個)放上來,剩下的元素列變成兩個子節點。
- 新元素必然添加到葉子節點上。
- 注意:只有根節點分裂會增加層高,其余的不會。
B樹刪除:從上往下尋找要刪除元素的所在節點,過程中看情況進行“合并/借位”。若所在節點不是葉子節點,就將其換到葉子節點中。最后在葉子節點刪除元素。
- 合并:從當前節點下放一個元素,然后該元素對應的兩個子節點合并成一個子節點。團結就是力量。
- 借位:從當前節點下方一個元素到元素較少的孩子,然后從當前元素的另一個孩子節點拉上來一個元素取代位置,注意大小順序。損有余而補不足。
// 遍歷到葉子節點 while(不是葉子節點){// 1. 確定下一節點和其兄弟節點if(當前節點有要刪除的元素) 哪邊少哪邊就是下一節點,當前元素對應的另一邊就是兄弟節點。else(當前節點沒有要刪除的元素) 確定好要去的下一節點后,左右兩邊誰多誰是兄弟節點。// 2. 看是否需要調整if(下一節點元素少){if(孩子的兄弟節點元素多) 借位,進入下一節點。else(孩子的兄弟節點元素少) 合并,進入合并后的節點。}else if(下一節點元素多 && 當前節點有要刪除元素){if(下一節點是刪除元素的左節點) 刪除元素和其前驅元素換位,進入下一節點。else(下一節點是刪除元素的右節點) 刪除元素和其后繼元素換位,進入下一節點。}else{直接進入下一節點。} } // 然后在葉子節點刪除元素
注:判斷孩子節點的元素少的條件是 元素數量 ≤ ceil m 2 ? 1 \le \text{ceil}\frac{m}{2}-1 ≤ceil2m??1,判斷元素多的條件是 元素數量 ≥ ceil m 2 \ge\text{ceil}\frac{m}{2} ≥ceil2m?。
根據上述原理,我使用C語言實現了B樹完整的增刪查操作,并增加了檢驗有效性、打印B樹的代碼,以及測試代碼(終端顯示進度條)。同樣為了加快開發速度,預設“鍵值對”的類型為int key
、void* value
,后續將B樹添加進“kv存儲協議”中時會進一步修改:
btree_int.c-共989行
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>// 編譯指令:gcc -o main 1-1btree.c
// 本代碼實現M階B樹,存儲int型key,未定義value。#define BTREE_DEBUG 1 // 是否運行測試代碼typedef struct _btree_node{int *keys;void *values;struct _btree_node **children;int num; // 當前節點的實際元素數量int leaf; // 當前節點是否為葉子節點
}btree_node;typedef struct _btree{int m; // m階B樹struct _btree_node *root_node;
}btree;/*
下面是所有的函數聲明,排列順序與源代碼調用相同,最外層的函數放在最下面。
*/
/*----初始化分配內存----*/
// 創建單個節點,leaf表示是否為葉子節點
btree_node *btree_node_create(btree *T, int leaf);
// 初始化m階B樹:分配內存,最后記得銷毀B樹btree_destroy()
btree *btree_init(int m);/*----釋放內存----*/
// 刪除單個節點
void btree_node_destroy(btree_node *cur);
// 遞歸刪除給定節點作為根節點的子樹
void btree_node_destroy_recurse(btree_node *cur);
// 刪除所有節點,釋放btree內存
btree *btree_destroy(btree *T);/*----插入操作----*/
// 根節點分裂
btree_node* btree_root_split(btree *T);
// 索引為idx的孩子節點分裂
btree_node* btree_child_split(btree *T, btree_node* cur, int idx);
// btree插入元素:先分裂,再插入,必然在葉子節點插入
void btree_insert_key(btree *T, int key);/*----刪除操作----*/
// 借位:將cur節點的idx_key元素下放到idx_dest孩子
btree_node *btree_borrow(btree_node *cur, int idx_key, int idx_dest);
// 合并:將cur節點的idx元素向下合并
btree_node *btree_merge(btree *T, btree_node *cur, int idx);
// 找出當前節點索引為idx_key的元素的前驅節點
btree_node* btree_precursor_node(btree *T, btree_node *cur, int idx_key);
// 找出當前節點索引為idx_key的元素的后繼節點
btree_node* btree_successor_node(btree *T, btree_node *cur, int idx_key);
// btree刪除元素:先合并/借位,再刪除,必然在葉子節點刪除
void btree_delete_key(btree *T, int key);/*----查找操作----*/
// 查找key
btree_node* btree_search_key(btree *T, int key);/*----打印信息----*/
// 打印當前節點信息
void btree_node_print(btree_node *cur);
// 先序遍歷給定節點為根節點的子樹(遞歸)
void btree_traversal_node(btree *T, btree_node *cur);
// btree遍歷
void btree_traversal(btree *T);/*----檢查有效性----*/
// 獲取B樹的高度
int btree_depth(btree *T);
// 檢查給定節點的有效性
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_node_check_effective(btree *T, btree_node *cur);
// 遍歷所有路徑檢查m階B樹的有效性
// 平衡性:所有葉節點都在同一層(所有路徑高度相等)
// 有序性:所有元素升序排序
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_check_effective(btree *T);/*-----------------------------下面為函數定義-------------------------------*/
// 創建單個節點,leaf表示是否為葉子節點
btree_node *btree_node_create(btree *T, int leaf){btree_node *new = (btree_node*)malloc(sizeof(btree_node));if(new == NULL){printf("btree node malloc failed!\n");return NULL;}new->keys = (int*)calloc(T->m-1, sizeof(int));new->values = (void*)calloc(T->m-1, sizeof(void));new->children = (btree_node **)calloc(T->m, sizeof(btree_node*));new->num = 0;new->leaf = leaf;return new;
}// 刪除單個節點
void btree_node_destroy(btree_node *cur){free(cur->keys);free(cur->values);free(cur->children);free(cur);
}// 初始化m階B樹:分配內存,最后記得銷毀B樹btree_destroy()
btree *btree_init(int m){btree *T = (btree*)malloc(sizeof(btree));if(T == NULL){// 只有內存不夠時才會分配失敗printf("rbtree malloc failed!\n");return NULL;}T->m = m;T->root_node = NULL;
}// 遞歸刪除給定節點作為根節點的子樹
void btree_node_destroy_recurse(btree_node *cur){int i = 0;if(cur->leaf == 1){btree_node_destroy(cur);}else{for(i=0; i<cur->num+1; i++){btree_node_destroy_recurse(cur->children[i]);}}
}// 釋放btree內存
btree *btree_destroy(btree *T){// 刪除所有節點if(T->root_node != NULL){btree_node_destroy_recurse(T->root_node);}// 刪除btreefree(T);
}// 根節點分裂
btree_node* btree_root_split(btree *T){// 創建兄弟節點btree_node *brother = btree_node_create(T, T->root_node->leaf);int i = 0;for(i=0; i<((T->m-1)>>1); i++){brother->keys[i] = T->root_node->keys[i+(T->m>>1)];T->root_node->keys[i+(T->m>>1)] = 0;brother->children[i] = T->root_node->children[i+(T->m>>1)];T->root_node->children[i+(T->m>>1)] = NULL;brother->num++;T->root_node->num--;}// 還需要復制最后一個指針brother->children[brother->num] = T->root_node->children[T->m-1];T->root_node->children[T->m-1] = NULL;// 創建新的根節點btree_node *new_root = btree_node_create(T, 0);new_root->keys[0] = T->root_node->keys[T->root_node->num-1];T->root_node->keys[T->root_node->num-1] = 0;T->root_node->num--;new_root->num = 1;new_root->children[0] = T->root_node;new_root->children[1] = brother;T->root_node = new_root;return T->root_node;
}// 索引為idx的孩子節點分裂
btree_node* btree_child_split(btree *T, btree_node* cur, int idx){// 創建孩子的兄弟節點btree_node *full_child = cur->children[idx];btree_node *new_child = btree_node_create(T, cur->children[idx]->leaf);int i = 0;for(i=0; i<((T->m-1)>>1); i++){new_child->keys[i] = full_child->keys[i+(T->m>>1)];full_child->keys[i+(T->m>>1)] = 0;new_child->children[i] = full_child->children[i+(T->m>>1)];full_child->children[i+(T->m>>1)] = NULL;new_child->num++;full_child->num--;}new_child->children[new_child->num] = full_child->children[T->m-1];full_child->children[T->m-1] = NULL;// 把孩子的元素拿上來// 調整自己的key和childrenfor(i=cur->num; i>idx; i--){cur->keys[i] = cur->keys[i-1];cur->children[i+1] = cur->children[i];}cur->children[idx+1] = new_child;cur->keys[idx] = full_child->keys[full_child->num-1];full_child->keys[full_child->num-1] = 0;cur->num++;full_child->num--;return cur;
}// btree插入元素:先分裂,再插入,必然在葉子節點插入
void btree_insert_key(btree *T, int key){btree_node *cur = T->root_node;if(key <= 0){// printf("illegal insert: key=%d!\n", key);}else if(cur == NULL){btree_node *new = btree_node_create(T, 1);new->keys[0] = key;new->num = 1;T->root_node = new;}else{// 函數整體邏輯:從根節點逐步找到元素要插入的葉子節點,先分裂、再添加// 先查看根節點是否需要分裂if(cur->num == T->m-1){cur = btree_root_split(T);}// 從根節點開始尋找要插入的葉子節點while(cur->leaf == 0){// 找到下一個要比較的孩子節點int next_idx = 0; // 要進入的孩子節點的索引int i = 0;for(i=0; i<cur->num; i++){if(key == cur->keys[i]){// printf("insert failed! already has key=%d!\n", key);return;}else if(key < cur->keys[i]){next_idx = i;break;}else if(i == cur->num-1){next_idx = cur->num;}}// 查看孩子是否需要分裂,不需要就進入if(cur->children[next_idx]->num == T->m-1){cur = btree_child_split(T, cur, next_idx);}else{cur = cur->children[next_idx];}}// 將新元素插入到葉子節點中int i = 0;int pos = 0; // 要插入的位置for(i=0; i<cur->num; i++){if(key == cur->keys[i]){// printf("insert failed! already has key=%d!\n", key);return;}else if(key < cur->keys[i]){pos = i;break;}else if(i == cur->num-1){pos = cur->num;}}// 插入元素if(pos == cur->num){cur->keys[cur->num] = key;}else{for(i=cur->num; i>pos; i--){cur->keys[i] = cur->keys[i-1];}cur->keys[pos] = key;}cur->num++;}
}// 借位:將cur節點的idx_key元素下放到idx_dest孩子
btree_node *btree_borrow(btree_node *cur, int idx_key, int idx_dest){int idx_sour = (idx_key == idx_dest) ? idx_dest+1 : idx_key;btree_node *node_dest = cur->children[idx_dest]; // 目的節點btree_node *node_sour = cur->children[idx_sour]; // 源節點if(idx_key == idx_dest){// 自己下去作為目的節點的最后一個元素node_dest->keys[node_dest->num] = cur->keys[idx_key];node_dest->children[node_dest->num+1] = node_sour->children[0];node_dest->num++;// 把源節點的第一個元素請上來cur->keys[idx_key] = node_sour->keys[0];for(int i=0; i<node_sour->num-1; i++){node_sour->keys[i] = node_sour->keys[i+1];node_sour->children[i] = node_sour->children[i+1];}node_sour->children[node_sour->num-1] = node_sour->children[node_sour->num];node_sour->children[node_sour->num] = NULL;node_sour->keys[node_sour->num-1] = 0;node_sour->num--;}else{// 自己下去作為目的節點的第一個元素node_dest->children[node_dest->num+1] = node_dest->children[node_dest->num];for(int i=node_dest->num; i>0; i--){node_dest->keys[i] = node_dest->keys[i-1];node_dest->children[i] = node_dest->children[i-1];}node_dest->keys[0] = cur->keys[idx_key];node_dest->children[0] = node_sour->children[node_sour->num];node_dest->num++;// 把源節點的最后一個元素請上來cur->keys[idx_key] = node_sour->keys[node_sour->num-1];node_sour->keys[node_sour->num-1] = 0;node_sour->children[node_sour->num] = NULL;node_sour->num--;}return node_dest;
}// 合并:將cur節點的idx元素向下合并
btree_node *btree_merge(btree *T, btree_node *cur, int idx){btree_node *left = cur->children[idx];btree_node *right = cur->children[idx+1];// 自己下去左孩子,調整當前節點left->keys[left->num] = cur->keys[idx];left->num++;for(int i=idx; i<cur->num-1; i++){cur->keys[i] = cur->keys[i+1];cur->children[i+1] = cur->children[i+2];}cur->keys[cur->num-1] = 0;cur->children[cur->num] = NULL;cur->num--;// 右孩子復制到左孩子for(int i=0; i<right->num; i++){left->keys[left->num] = right->keys[i];left->children[left->num] = right->children[i];left->num++;}left->children[left->num] = right->children[right->num];// 刪除右孩子btree_node_destroy(right);// 更新根節點if(T->root_node == cur){btree_node_destroy(cur);T->root_node = left;}return left;
}// 找出當前節點索引為idx_key的元素的前驅節點
btree_node* btree_precursor_node(btree *T, btree_node *cur, int idx_key){if(cur->leaf == 0){cur = cur->children[idx_key];while(cur->leaf == 0){cur = cur->children[cur->num];}}return cur;
}// 找出當前節點索引為idx_key的元素的后繼節點
btree_node* btree_successor_node(btree *T, btree_node *cur, int idx_key){if(cur->leaf == 0){cur = cur->children[idx_key+1];while(cur->leaf == 0){cur = cur->children[0];}}return cur;
}// btree刪除元素:先合并/借位,再刪除,必然在葉子節點刪除
void btree_delete_key(btree *T, int key){if(T->root_node!=NULL && key>0){btree_node *cur = T->root_node;// 在去往葉子節點的過程中不斷調整(合并/借位)while(cur->leaf == 0){// 看看要去哪個孩子int idx_next = 0; //下一個要去的孩子節點索引int idx_bro = 0;int idx_key = 0;if(key < cur->keys[0]){idx_next = 0;idx_bro = 1;}else if(key > cur->keys[cur->num-1]){idx_next = cur->num;idx_bro = cur->num-1;}else{for(int i=0; i<cur->num; i++){if(key == cur->keys[i]){// 哪邊少去哪邊if(cur->children[i]->num <= cur->children[i+1]->num){idx_next = i;idx_bro = i+1;}else{idx_next = i+1;idx_bro = i;}break;}else if((i<cur->num-1) && (key > cur->keys[i]) && (key < cur->keys[i+1])){idx_next = i + 1;// 誰多誰是兄弟if(cur->children[i]->num > cur->children[i+2]->num){idx_bro = i;}else{idx_bro = i+2;}break;}}}idx_key = (idx_next < idx_bro) ? idx_next : idx_bro;// 依據孩子節點的元素數量進行調整if(cur->children[idx_next]->num <= ((T->m>>1)-1)){// 借位:下一孩子的元素少,下一孩子的兄弟節點的元素多if(cur->children[idx_bro]->num >= (T->m>>1)){cur = btree_borrow(cur, idx_key, idx_next);}// 合并:兩個孩子都不多else{cur = btree_merge(T, cur, idx_key);}}else if(cur->keys[idx_key] == key){// 若當前元素就是要刪除的節點,那一定要送下去// 但是不能借位,而是將前驅元素搬上來btree_node* pre;int tmp;if(idx_key == idx_next){// 找到前驅節點pre = btree_precursor_node(T, cur, idx_key);// 交換 當前元素 和 前驅節點的最后一個元素tmp = pre->keys[pre->num-1];pre->keys[pre->num-1] = cur->keys[idx_key];cur->keys[idx_key] = tmp;}else{// 找到后繼節點pre = btree_successor_node(T, cur, idx_key);// 交換 當前元素 和 后繼節點的第一個元素tmp = pre->keys[0];pre->keys[0] = cur->keys[idx_key];cur->keys[idx_key] = tmp;}cur = cur->children[idx_next];// cur = btree_borrow(cur, idx_key, idx_next);}else{cur = cur->children[idx_next];}}// 葉子節點刪除元素for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){if(cur->num == 1){// 若B樹只剩最后一個元素btree_node_destroy(cur);T->root_node = NULL;}else{if(i != cur->num-1){for(int j=i; j<(cur->num-1); j++){cur->keys[j] = cur->keys[j+1];}}cur->keys[cur->num-1] = 0;cur->num--;}}// else if(i == cur->num-1){// printf("there is no key=%d\n", key);// }}}
}// 打印當前節點信息
void btree_node_print(btree_node *cur){if(cur == NULL){printf("NULL\n");}else{printf("leaf:%d, num:%d, key:|", cur->leaf, cur->num);for(int i=0; i<cur->num; i++){printf("%d|", cur->keys[i]);}printf("\n");}
}// 先序遍歷給定節點為根節點的子樹(遞歸)
void btree_traversal_node(btree *T, btree_node *cur){// 打印當前節點信息btree_node_print(cur);// 依次打印所有子節點信息if(cur->leaf == 0){int i = 0;for(i=0; i<cur->num+1; i++){btree_traversal_node(T, cur->children[i]);}}
}// btree遍歷
void btree_traversal(btree *T){if(T->root_node != NULL){btree_traversal_node(T, T->root_node);}else{// printf("btree_traversal(): There is no key in B-tree!\n");}
}// 查找key
btree_node* btree_search_key(btree *T, int key){if(key > 0){btree_node *cur = T->root_node;// 先尋找是否為非葉子節點while(cur->leaf == 0){if(key < cur->keys[0]){cur = cur->children[0];}else if(key > cur->keys[cur->num-1]){cur = cur->children[cur->num];}else{for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){return cur;}else if((i<cur->num-1) && (key > cur->keys[i]) && (key < cur->keys[i+1])){cur = cur->children[i+1];}}}}// 在尋找是否為葉子節點if(cur->leaf == 1){for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){return cur;}}}}// 都沒找到返回NULLreturn NULL;
}// 獲取B樹的高度
int btree_depth(btree *T){int depth = 0;btree_node *cur = T->root_node;while(cur != NULL){depth++;cur = cur->children[0];}return depth;
}// 檢查給定節點的有效性
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_node_check_effective(btree *T, btree_node *cur){bool eff_flag = true;// 統計鍵值和子節點數量int num_kvs = 0, num_child = 0;int i = 0;while(cur->keys[i] != 0){// 判斷元素是否遞增if(i>=1 && (cur->keys[i] <= cur->keys[i-1])){printf("ERROR! the following node DOT sorted!\n");btree_node_print(cur);eff_flag = false;break;}// 統計數量num_kvs++;i++;}i = 0;while(cur->children[i] != NULL){// 子節點和當前節點的有序性if(i<num_kvs){if(cur->keys[i] <= cur->children[i]->keys[cur->children[i]->num]){printf("ERROR! the follwing node's child[%d] has bigger key=%d than %d\n", i, cur->children[i]->keys[cur->children[i]->num], cur->keys[i]);printf("follwing node--");btree_node_print(cur);printf(" error child--");btree_node_print(cur->children[i]);eff_flag = false;}else if(cur->keys[i] >= cur->children[i+1]->keys[0]){printf("ERROR! the follwing node's child[%d] has smaller key=%d than %d\n", i+1, cur->children[i+1]->keys[0], cur->keys[i]);printf("follwing node--");btree_node_print(cur);printf(" error child--");btree_node_print(cur->children[i+1]);eff_flag = false;}}// 統計數量num_child++;i++;}// 判斷元素數量是否合理if(cur->num >= T->m){printf("ERROR! the follwing node has too much keys:%d(at most %d)\n", cur->num, T->m-1);btree_node_print(cur);eff_flag = false;}if((cur != T->root_node) && (num_kvs<((T->m>>1)-1))){printf("ERROR! the follwing node has too few keys:%d(at least %d)\n", num_kvs, (T->m>>1)-1);btree_node_print(cur);eff_flag = false;}if(num_kvs != cur->num){printf("ERROR! the follwing node has %d keys but num=%d\n", num_kvs, cur->num);btree_node_print(cur);eff_flag = false;}if((cur->leaf == 0) && (num_child != cur->num+1)){printf("ERROR! the follwing node has %d keys but %d child(except keys+1=child)\n", num_kvs, num_child);btree_node_print(cur);eff_flag = false;}return eff_flag;
}// 遍歷所有路徑檢查m階B樹的有效性
// 平衡性:所有葉節點都在同一層(所有路徑高度相等)
// 有序性:所有元素升序排序
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_check_effective(btree *T){bool effe_flag = true;int depth = btree_depth(T);if(depth == 0){// printf("btree_check_effective(): There is no key in B-tree!\n");}else if(depth == 1){// 只有一個根節點effe_flag = btree_node_check_effective(T, T->root_node);}else{// 最大的可能路徑數量int max_path = 1;int depth_ = depth-1;while(depth_ != 0){max_path *= T->m;depth_--;}// 遍歷所有路徑(每個路徑對應一個葉子節點)btree_node *cur = T->root_node;int i_path = 0;for(i_path=0; i_path<max_path; i_path++){int dir = i_path; // 本次路徑的方向控制int i_height = 0; // 本次路徑的高度int i_effe = 1; // 指示是否存在本路徑cur = T->root_node;while(cur != NULL){// 當前節點的有效性effe_flag = btree_node_check_effective(T, cur);if(!effe_flag) break;// 更新高度i_height++;// 更新下一節點if(cur->children[dir%T->m]==NULL && !cur->leaf){i_effe = 0;break;}cur = cur->children[dir%T->m];dir /= T->m;}// if(btree_node_check_effective(T, cur))// 判斷本路徑節點數(高度)if(i_height != depth && i_effe){printf("ERROR! not all leaves in the same layer! the leftest path's height=%d, while the %dst path's height=%d.\n",depth, i_path, i_height);effe_flag = false;}if(!effe_flag) break;}}return effe_flag;
}/*-----------------------------下面為測試代碼-------------------------------*/
#if BTREE_DEBUG
#include<time.h> // 使用隨機數
#include<sys/time.h> // 計算qps中獲取時間
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define ENABLE_QPS 1 // 是否開啟qps性能測試
#define continue_test_len 10000000 // 連續測試的長度
// 冒泡排序
void bubble_sort(int arr[], int len) {int i, j, temp;for (i = 0; i < len - 1; i++)for (j = 0; j < len - 1 - i; j++)if (arr[j] > arr[j + 1]) {temp = arr[j];arr[j] = arr[j + 1];arr[j + 1] = temp;}
}
// 打印當前數組
void print_int_array(int* KeyArray, int len_array){printf("測試數組為KeyArray[%d] = {", len_array);for(int i=0; i<len_array; i++){if(i == len_array-1){printf("%d", KeyArray[i]);}else{printf("%d, ", KeyArray[i]);}}printf("}\n");
}
int main(){// 定義/* --------------------定義數組-------------------- */// 預定義的數組// int KeyArray[20] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}; // 正著插// int KeyArray[20] = {20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1}; // 倒著插// int KeyArray[20] = {1,2,3,4,5,10,9,8,7,6,11,12,13,14,15,20,19,18,17,16}; // 亂序插// int KeyArray[31] = {11,12,13,14,15,16,17,18,19,20,1,2,3,4,5,6,7,8,9,10,21,22,23,24,25,26,27,28,29,30,31}; // 亂序插// int KeyArray[18] = {18,8,13,9,13,0,7,13,14,7,1,7,19,7,9,18,17,18}; // 亂序插// // 順序增長的數組// int len_array = 26;// int KeyArray[len_array];// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){// KeyArray[i_array] = i_array + 1;// }// // 隨機生成固定大小的隨機數組// int len_array = 18;// int KeyArray[len_array];// srand(time(NULL));// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){// // KeyArray[i_array] = rand() % 9999999999;// KeyArray[i_array] = rand() % 20;// }// printf("RAND_MAX: %d\n", RAND_MAX);/* ------------------以下測試代碼------------------ */// printf("-------------------B樹插入測試------------------\n");// // 先給輸入的數組排個序// int len_max = sizeof(KeyArray)/sizeof(int);// printf("測試數組長度: %d\n", len_max);// // 將原先的數組深拷貝并升序排序// int *KeyArray_sort = (int*)malloc(sizeof(KeyArray));// int i = 0;// for(i = 0; i < len_max; i++){// KeyArray_sort[i] = KeyArray[i];// }// bubble_sort(KeyArray_sort, len_max);int max_test = 100; // 測試的總次數int len_array = 1000; // 單次測試的數組長度bool detail_flag = false; // 是否打印詳細信息bool pass_flag = true;printf("---------------------常規測試---------------------\n");for(int i_test=0; i_test<max_test; i_test++){// 隨機生成固定大小的隨機數組int KeyArray[len_array];srand(time(NULL));for(int i_array=0; i_array<len_array; i_array++){// KeyArray[i_array] = rand() % 9999999999;KeyArray[i_array] = rand() % len_array;}// int KeyArray[10000] = {};// 申請紅黑樹內存btree *T = btree_init(6);btree *T_old = btree_init(6);if(detail_flag){printf("--------------------開始測試--------------------\n");print_int_array(KeyArray, len_array);}if(detail_flag){printf("-------------------B樹插入測試------------------\n");}/*-------------------B樹插入測試------------------*/// 依次插入數據,并檢查有效性for(int i=0; i<len_array; i++){if(i>0){btree_insert_key(T_old, KeyArray[i-1]);}btree_insert_key(T, KeyArray[i]);// 方便打印調試// if(i==65){// printf("Before insert the %2d's key=%d:\n", i+1, KeyArray[i]);// btree_traversal(T_old);// printf("After insert the %2d's key=%d:\n", i+1, KeyArray[i]);// btree_traversal(T);// }if(btree_check_effective(T)==false){printf("after insert KeyArray[%d]=%d error!\n", i, KeyArray[i]);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->插入測試%d/%d\n", i_test+1, max_test);btree_insert_key(T_old, KeyArray[len_array]);}else{if(detail_flag) printf("FAIL---->插入測試%d/%d\n", i_test+1, max_test);// printf("Before insert:\n");// btree_traversal(T_old);// printf("After insert:\n");// btree_traversal(T);btree_destroy(T_old);btree_destroy(T);break;}if(detail_flag){btree_traversal(T);printf("\n");}if(detail_flag){printf("-------------------B樹查找測試------------------\n");}/*-------------------B樹查找測試------------------*/btree_node* sear = NULL;for(int i=0; i<len_array; i++){if(KeyArray[i] > 0){sear = btree_search_key(T, KeyArray[i]);pass_flag = false;if(sear != NULL){for(int j=0; j<sear->num; j++){if(sear->keys[j] == KeyArray[i]){pass_flag = true;break;}}}if(detail_flag){printf("search KeyArray[%d]=%d ----> ", i, KeyArray[i]);btree_node_print(sear);}}if(pass_flag == false){print_int_array(KeyArray, len_array); // 打印當前數組printf("following node DOT has KeyArray[%d]=%d!\n", i, KeyArray[i]);btree_node_print(sear);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->查找測試%d/%d\n", i_test+1, max_test);}else{printf("FAIL---->查找測試%d/%d\n", i_test+1, max_test);break;}if(detail_flag){printf("-------------------B樹刪除測試------------------\n");}/*-------------------B樹刪除測試------------------*/for(int i=0; i<len_array; i++){// if(i==496){// // 加一句打印方便調試暫停// printf("Now delete KeyArray[%d]=%d:\n", i, KeyArray[i]);// }if(i>0){btree_delete_key(T_old, KeyArray[i-1]);}btree_delete_key(T, KeyArray[i]);if(detail_flag){printf("delete KeyArray[%d]=%d:\n", i, KeyArray[i]);btree_traversal(T);} if(btree_check_effective(T) == false){print_int_array(KeyArray, len_array); // 打印當前數組printf("after delete KeyArray[%d]=%d error!\n", i, KeyArray[i]);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->刪除測試%d/%d\n", i_test+1, max_test);}else{printf("FAIL---->刪除測試%d/%d\n", i_test+1, max_test);// printf("Before delete:\n");// btree_traversal(T_old);// printf("After delete:\n");// btree_traversal(T);btree_destroy(T_old);btree_destroy(T);break;}if(detail_flag){printf("--------------------------------------------------\n");}btree_destroy(T_old);btree_destroy(T);// 整點進度條看看if(pass_flag){// printf("PASS----> WHOLE TEST %d/%d!\r", i_test+1, max_test);int bar_process; // 編譯器初始化為0bool already_print_txt; // 編譯器初始化為falsebool already_print_bar; // 編譯器初始化為falseconst int len_bar = 20; // 完整進度條的長度// 打印進度條前面的說明if(!already_print_txt){printf("PASS TEST PROCESS: ");fflush(stdout);}already_print_txt = true;// 打印進度條if(len_bar*(i_test+1)/max_test > bar_process){// 光標往前回退if(already_print_bar){printf("\033[4D"); // ANSI轉義序列}// 畫出進度條for(int i=0; i<(len_bar*(i_test+1)/max_test - bar_process); i++){printf("█");fflush(stdout);}// 顯示進度范圍printf(" %d%%", 100*(i_test+1)/max_test);fflush(stdout);already_print_bar = true;bar_process = len_bar*(i_test+1)/max_test;if(i_test+1 == max_test) printf("\n");}}}// 只是為了最后一行判斷用if(pass_flag){// printf("\r\033[K"); // 清除本行printf("PASS----> ALL %d TEST!\n", max_test);}printf("--------------------------------------------------\n");printf("---------------------性能測試---------------------\n");btree* bT = btree_init(16); // 初始化16階B樹// 定義時間結構體struct timeval tv_begin;struct timeval tv_end;// 插入性能測試gettimeofday(&tv_begin, NULL);for(int i=0; i<continue_test_len; i++){btree_insert_key(bT, i+1);}gettimeofday(&tv_end, NULL);int time_ms = TIME_SUB_MS(tv_end, tv_begin);float qps = (float)continue_test_len / (float)time_ms * 1000;printf("total INSERTs:%d time_used:%d(ms) qps:%.2f(INSERTs/sec)\n", continue_test_len, time_ms, qps);// 查找性能測試gettimeofday(&tv_begin, NULL);for(int i=0; i<continue_test_len; i++){btree_node* node = btree_search_key(bT, i+1);int idx = 0;for(idx=0; idx<node->num; idx++){if(node->keys[idx] == i+1){break;}}if(idx == node->num){printf("continue_search error!\n");return 0;}}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total SEARCHs:%d time_used:%d(ms) qps:%.2f(SEARCHs/sec)\n", continue_test_len, time_ms, qps);// // 刪除性能測試// gettimeofday(&tv_begin, NULL);// for(int i=0; i<continue_test_len; i++){// btree_delete_key(bT, i+1);// }// gettimeofday(&tv_end, NULL);// time_ms = TIME_SUB_MS(tv_end, tv_begin);// qps = (float)continue_test_len / (float)time_ms * 1000;// printf("total DELETEs:%d time_used:%d(ms) qps:%.2f(DELETEs/sec)\n", continue_test_len, time_ms, qps);// 銷毀B樹btree_destroy(bT);printf("--------------------------------------------------\n");return 0;
}#endif
測試代碼輸出結果----記得測一下連續讀、查、刪的速度
lyl@ubuntu:~/Desktop/kv-store/code_init$ gcc -o main btree_int.c
lyl@ubuntu:~/Desktop/kv-store/code_init$ ./main
---------------------常規測試---------------------
PASS TEST PROCESS: ███████████████████ 100%
PASS----> ALL 100 TEST!
--------------------------------------------------
---------------------性能測試---------------------
total INSERTs:10000000 time_used:2379(ms) qps:4203447.00(INSERTs/sec)
total SEARCHs:10000000 time_used:1007(ms) qps:9930486.00(SEARCHs/sec)
total DELETEs:10000000 time_used:18(ms) qps:555555584.00(DELETEs/sec)
--------------------------------------------------
未解決bug:測試代碼的最后如果多加上一個“刪除性能測試”,就不顯示進度條了?很奇怪
參考內容:
- B站:【CS61B漢化】七海講數據結構-平衡樹&B樹–B樹的插入–超可愛的視頻
- 知乎:從B樹中刪除一個關鍵字–主要參考了孩子元素多的時候怎么刪除
- B站:可視化數據結構-B+樹–插入數據的動畫很清晰
2.5 hash的實現

??終于度過了本項目所有最難的部分,下面的內容都比較簡單。鏈式哈希的增刪查操作簡潔明了。鏈式哈希首先會聲明一個固定長度的哈希表(如1024),若需要插入新元素時,首先計算哈希值作為索引,若有沖突則直接在當前位置使用“頭插法”即可。注意以下幾點:
- 哈希值計算:對應
int
型,key % table_size
就可以直接當作哈希值。對于char*
,則可以sum(key) % table_size
當作哈希值。當前也有專門的哈希函數,但是由于需要頻繁計算哈希值,在簡單情況下,就采用上述處理即可。- 單個索引上的鏈表沒有大小關系。所以查找/刪除時需要遍歷這個索引對應的整條鏈表。
就不單獨寫int
型的代碼并測試了,可以直接參考項目源碼中的“hash.h”、“hash.c”。
2.6 dhash的實現

??顯然上述hash有個很大問題,就是“哈希表的大小”是固定的。如果聲明哈希表大小為1024,卻要插入10w個元素,那每個所有都會對應一個很長的鏈表,最壞的情況下和直接遍歷一遍沒什么區別!這顯然失去了哈希的意義,于是在上面的基礎上,我們使用“空間換時間”,自動增加/縮減哈希表的大小,也就是“動態哈希表”dhash:
- 插入元素時,首先判斷是否需要擴容。若當前元素超過哈希表的1/2(自定義),則將哈希表翻倍(自定義),并將原來的元素重新映射到新的哈希表。若遇到沖突,則將新元素順延插入到下一個空節點。
- 刪除元素時,首先判斷是否需要縮容。若當前元素小于哈希表的1/4(自定義),則將哈希表縮小一半(自定義),并將原來的元素重新映射到新的哈希表。若當前節點不是待刪除元素,則需要從當前索引開始遍歷完所有節點,才能說哈希表不存在此元素。
同樣,不單獨寫int
型的代碼測試了,可以直接參考項目源碼中的“dhash.h”、“dhash.c”。
2.7 skiplist的實現

??跳表本質上是一個有序鏈表。紅黑樹每次比較都能排除一半的節點,這啟發我們,要是每次都能找到鏈表最中間的節點,不就可以實現 O ( log ? N ) O(\log N) O(logN)的查找時間復雜度了嘛。于是如上圖所示,我們不妨規定跳表的每個節點都有一組指針,跳表還有一個額外的空節點作為“跳表頭”,那么每次都從頂層依次底層進行“跳”,就可以實現“每次比較都能排除剩下一半的節點”。但是還有個大問題,那就是上述理想跳表需要插入/刪除一個元素時,元素的調整會非常麻煩,甚至還需要遍歷整個鏈表來調整所有節點的指向!
所以在實際應用中,不會直接使用上述理想跳表的結構。而是在每次插入一個新元素時,按照一定概率計算其高度。統計學證明,當存放元素足夠多的時候,該實際跳表性能無限趨近于理想跳表。

同樣,代碼直接見項目源碼中的“skiplist.h”、“skiplist.c”。
2.8 kv存儲協議的實現
??如“1.2節-項目預期及基本架構”給出的“服務端程序架構”。現在我們實現了網絡收發功能(網絡層)、所有存儲引擎的增刪查改操作(引擎層),還差最后一個“kv存儲協議”(協議層)就可以實現完整的服務端程序。“kv存儲協議”的主要功能有:
- 初始化/銷毀所有的存儲引擎。這個直接調用各引擎的初始化/銷毀函數即可。
- 拆解網絡層接收的數據,若為有效指令則傳遞給相應的存儲引擎函數處理,并根據存儲引擎的處理結果返回相應的信息給網絡層。
值得注意的是,“引擎層”的接口函數應該統一封裝命名,并在各存儲引擎中實現,“引擎層”的頭文件中也只有這些接口函數暴露給“協議層”。這樣保證了“協議層”和“引擎層”的隔離性,即使后續“引擎層”代碼需要進行修改,也不會干擾到接口函數的調用、無需修改協議層。整個“服務端”的“網絡層”、“協議層”、“引擎層”的函數調用關系如下:

同樣,代碼直接見項目源碼中的“kvstore.h”、“kvstore.c”。
3. 性能測試

??上述我們將“服務端”的代碼實現完畢,并且可以使用“網絡連接助手”進行正常的收發數據。如上圖所示,依次發送5條指令后都得到預期的回復。但是我們要想測試客戶端的極限性能,顯然需要寫一個“客戶端”測試程序。該測試程序目標如下:
- 測試單個鍵值對(name:humu)能否正常實現所有功能。
- 測試多個鍵值對能否正常實現所有功能。這一步主要是為了驗證各引擎能正常擴容/縮容。
- 測試連續進行10w次插入、查找、刪除的總耗時,計算出服務器的相應速度qps。
注:測試代碼直接見項目源碼中的“tb_kvstore.c”。
如下圖所示,開啟兩個Ubuntu虛擬機,分別運行“服務端”、“客戶端”程序,得到如下的測試數據:

次數 | 操作 | qps(trans/s) | |||||
---|---|---|---|---|---|---|---|
array | rbtree | btree | hash | dhash | skiplist | ||
1 | 插入 | 1461.33 | 1918.24 | 1981.65 | 1727.24 | 1924.93 | 1778.44 |
查找 | 1604.18 | 1906.07 | 1895.09 | 1741.52 | 1887.68 | 1755.71 | |
刪除 | 1722.71 | 1959.13 | 1909.60 | 1713.88 | 1964.98 | 1887.65 | |
2 | 插入 | 1570.13 | 1972.93 | 1949.62 | 1710.54 | 1926.63 | 1780.18 |
查找 | 1707.30 | 1970.40 | 1883.49 | 1709.46 | 2021.55 | 1747.89 | |
刪除 | 1739.92 | 1918.80 | 1922.82 | 1730.28 | 1917.32 | 1912.74 | |
3 | 插入 | 1103.35 | 1898.25 | 1923.71 | 1737.59 | 1948.86 | 1794.24 |
查找 | 1316.59 | 1867.80 | 1933.38 | 1733.31 | 1953.74 | 1768.44 | |
刪除 | 1774.78 | 1873.89 | 1978.98 | 1771.73 | 1966.76 | 1933.19 | |
4 | 插入 | 1134.22 | 1887.79 | 1888.72 | 1734.21 | 1909.53 | 1802.00 |
查找 | 1394.54 | 1908.80 | 1910.40 | 1726.43 | 1932.37 | 1764.76 | |
刪除 | 1778.54 | 1897.97 | 1937.98 | 1719.25 | 1935.40 | 1928.57 | |
5 | 插入 | 1029.79 | 1981.02 | 1912.16 | 1718.33 | 1916.52 | 1777.62 |
查找 | 1236.52 | 1917.58 | 1947.00 | 1732.17 | 1934.39 | 1754.66 | |
刪除 | 1703.87 | 1955.19 | 1899.59 | 1724.97 | 1915.42 | 1905.71 |
結果分析:
- array引擎的指標都是最差的,這是因為每次插入/刪除都需要遍歷所有元素。
- hash引擎(鏈式哈希)的性能是倒數第二差的,這是因為雖然哈希函數可以快速定位到索引,但當鍵值對遠大于哈希表大小時,沖突元素會形成一個鏈表,增/刪/查操作也需要遍歷整條鏈表。
- btree的插入性能好于rbtree,只是因為btree的單個節點可以容納多個元素,節省了很多增加新結點、調整結構的時間;但是btree的查找/刪除性能不如rbtree,也同樣是因為單個節點還需要進行遍歷。
- 綜合來看,最優秀的存儲引擎是rbtree。
注:上述測試結果可以通過“相對值”比較各數據結構間的差異;“絕對值”則受限于虛擬機內存、運行頻率等物理特性,并且軟件上使用“協程”等也可以大幅提升“絕對值”。
4. 項目總結及改進思路
??C/C++適合做服務器,但不適合做業務。因為可能會因為一行代碼有問題,導致整個進程退出。雖然也能做,但維護成本高,并且對工程師要求高。比如“騰訊課堂”中課程、圖片、價格等參數很適合用C語言做“kv存儲”,但是顯示網頁等業務功能使用Jc語言更加合適。所以VUE框架(Java)等適合做前端業務;C/C++適合做基礎架構、高性能組件、中間件。比如在量化交易中,底層的高頻組件、低延遲組件適合用C/C++,上層的交易業務、交易策略沒必要C/C++。
下面是對本項目的一些 改進思路:
- 持久化和日志:將數據都存儲到(磁盤)文件中,并將接收到的指令記錄下來成為日志。持久化的思路有很多種,比如直接分配一個大的“內存池”(見下一條)存放所有引擎的存儲數據,然后直接對這個大的內存池進行持久化。
- 內存池:現在存儲的數據都是零散的,比如rbtree每次插入新元素都會申請一個節點內存,時間長了就會出現很多內存碎片,不利于內存管理。理想狀態是和數組結構體一樣,直接預先聲明一個內存池,需要新節點的時候就取出一個節點內存。這樣,每次申請和釋放的內存都是一整塊,不會出現大量的內存碎片。
- 分布式鎖:kv存儲內部是單線程的,對于變量的改變無需額外加鎖。但是外部的網絡收發指令,需要引入分布式鎖,防止多臺客戶端同時對同一個鍵值對進行修改。
- 主從模式:所有數據都存儲在一臺服務器上,會有丟失數據的風險,所以需要使用“主從模式”來對數據進行備份。關鍵點在于合適的備份策略。
- 分布式存儲:所有數據都存儲在一臺服務器上,會導致內存壓力過大,多臺服務器結合起來做成“分布式存儲”,所有服務器分攤數據。
編程感想:
- 字符串拷貝:C語言中,使用
strncpy
、snprintf
拷貝字符串時,注意目的字符串不能只是聲明為char*
,而是需要malloc
/calloc
分配內存才可以。另外也不要忘了釋放內存free
。- 天坑:解析指令時層層傳遞的是epoll的讀緩沖
rbuffer
,然后使用strtok
/strtok_r
進行分割指令并存儲在char* tokens[]
中,注意這個char* tokens[]
的元素指向的就是讀緩沖本身!!!而snprintf
是逐字符進行拷貝的,也就是說,此時使用snprintf
將tokens
的內容再寫回讀緩沖就會導致讀緩沖錯亂。如果不額外分配內存很難解決該問題。所以建議不要使用snprintf
拷貝自己的格式化字符串。- 良好的內存管理習慣:
free()
之前先判斷是否為NULL
,free()
之后一定要指向NULL
。- 層層傳遞初始化:
【可行方法1】如果最頂層需要創建實例對象(不如全局變量),那就需要傳地址給最底層,且最底層無需再重新為這個對象
malloc
空間(因為最頂層已經創建對象了),只需要malloc
好這個實例對象的所有指針即可,或者先定義成NULL
后期插入時再分配。
【可行方法2】若最頂層無需創建全局的實例對象,那么也可以不傳參數給最底層,最底層直接創建一個對象指針,并malloc
/NULL
這個對象指針的所有參數,最后直接返回這個對象指針就行。
【不可行方法】頂層創建了全局的實例對象,然后傳地址給最底層,最底層重新malloc
一個新的對象指針,初始化這個對象指針的所有參數,最后讓傳遞下來的地址指向這個指針。最后在頂層就會發現所有參數都沒初始化,都是空!
【關鍵點】:對誰進行malloc
非常重要,一定要對頂層傳下來的結構體指針的元素直接malloc,而不是malloc一個新的結構體,在賦值給這個結構體指針。
- 關于
strcmp()
:在使用strcmp()
時一定要先判斷不為空,才能使用。這是因為strcmp()
的底層使用while(*des++==*src**)
,所以若比較的雙方有一方為空,就會直接報錯。- 注意項目的include關系,是在編譯指令中指定的。當然也可以將“kv_store.c”中的case語句封裝到各自的數據結構中,然后以動態庫的方式進行編譯。