基于C語言實現內存型數據庫(kv存儲)

基于C語言實現內存型數據庫(kv存儲)

文章目錄

  • 基于C語言實現內存型數據庫(kv存儲)
  • 1. 項目背景
    • 1.1 Redis介紹
    • 1.2 項目預期及基本架構
  • 2. 服務端原理及代碼框架
    • 2.1 網絡數據回環的實現
    • 2.2 array的實現
    • 2.3 rbtree的實現
    • 2.4 btree的實現
    • 2.5 hash的實現
    • 2.6 dhash的實現
    • 2.7 skiplist的實現
    • 2.8 kv存儲協議的實現
  • 3. 性能測試
  • 4. 項目總結及改進思路

  • 源代碼倉庫見Github:kv-store倉庫
  • 參考視頻:“零聲教育”的“linux基礎架構-Kv存儲”。
  • 其他源碼:協程。

1. 項目背景

1.1 Redis介紹

??本項目主要想仿照Redis的交互方式,實現一個基本的“內存型數據庫”,所以首先來介紹一下Redis。隨著互聯網的普及,只要是上網的APP基本上都需要和相應的服務器請求數據,通常來說,這些數據被服務器保存在“磁盤”上的文件中,稱之為“磁盤型數據庫”。但是面對海量用戶時(比如秒殺活動),磁盤IO的讀寫速率不夠快從而導致用戶體驗下降,并且服務器數據庫的壓力也非常大。鑒于很多請求只是讀取數據,這就啟發我們將一些熱點數據存放在內存中,以便快速響應請求、并且減輕磁盤的讀寫壓力。

當然,上述只是一個初步的想法,后續如何清理內存數據、分布式存儲等可以參考B站的科普視頻,講的非常簡潔易懂:

  • 【趣話Redis第一彈】我是Redis,MySQL大哥被我坑慘了!—“緩存穿透、緩存擊穿、緩存雪崩”、“定時刪除、惰性刪除、內存淘汰”
  • 【趣話Redis第二彈】Redis數據持久化AOF和RDB原理一次搞懂!—“RDB+AOF”
  • 【趣話Redis第三彈】Redis的高可用是怎么實現的?哨兵是什么原理?—“主觀下線、客觀下線”、“哨兵選舉”、“故障轉移”
  • 趣話Redis:Redis集群是如何工作的?—“哈希桶”、“集群工作+主從復制”

下面是一些典型的面試題:

  • 為什么要使用Redis?
  1. 從高并發上來說:直接操作緩存能夠承受的請求是遠遠大于直接訪問數據庫的,所以我們可以考慮把數據庫中的部分數據轉移到緩存中去。這樣用戶的一部分請求會直接到緩存,而不用經過數據庫。
  2. 從高性能上來說:用戶第一次訪問數據庫中的某些數據,因為是從硬盤上讀取的,所以這個過程會比較慢。將該用戶訪問的數據存在緩存中,下一次再訪問這些數據的時候就可以直接從緩存中獲取了。操作緩存就是直接操作內存,所以速度相當快。
  • 為什么要使用Redis而不是其他的,例如Java自帶的map或者guava?

??緩存分為本地緩存和分布式緩存,像map或者guava就是本地緩存。本地緩存最主要的特點是輕量以及快速,生命周期隨著jvm的銷毀而結束。在多實例的情況下,每個實例都需要各自保存一份緩存,緩存不具有一致性。redis或memcached之類的稱為分布式緩存,在多實例的情況下,各實例共用一份緩存數據,緩存具有一致性。

  • Redis應用場景有哪些?
  1. 緩存熱點數據,緩解數據庫的壓力。
  2. 利用Redis原子性的自增操作,可以實現計數器的功能。比如統計用戶點贊數、用戶訪問數等。
  3. 分布式鎖。在分布式場景下,無法使用單機環境下的鎖來對多個節點上的進程進行同步。可以使用Redis自帶的SETNX命令實現分布式鎖,除此之外,還可以使用官方提供的RedLock分布式鎖實現。
  4. 簡單的消息隊列。可以使用Redis自身的發布/訂閱模式或者List來實現簡單的消息隊列,實現異步操作。
  5. 限速器。可用于限制某個用戶訪問某個接口的頻率,比如秒殺場景用于防止用戶快速點擊帶來不必要的壓力。
  6. 好友關系。利用集合的一些命令,比如交集、并集、差集等,實現共同好友、共同愛好之類的功能。
  • 為什么Redis這么快?
  1. Redis是基于內存進行數據操作的Redis使用內存存儲,沒有磁盤IO上的開銷,數據存在內存中,讀寫速度快。
  2. 采用IO多路復用技術。Redis使用單線程來輪詢描述符,將數據庫的操作都轉換成了事件,不在網絡I/O上浪費過多的時間。
  3. 高效的數據結構。Redis每種數據類型底層都做了優化,目的就是為了追求更快的速度。
  • 參考視頻:為什么要使用Redis?、Redis的應用場景有哪些?、Redis,好快!

1.2 項目預期及基本架構

2:OK
1:SET name humu
服務端
tcp server
kv_protocol
store-engine
array/rbtree/btree/hash/dhash/skiplist
客戶端
圖1 項目框架

??于是我們現在就來實現這個“內存型數據庫”,本項目使用C語言,默認鍵值對key-value都是char*類型。如上圖所示,我們希望“客戶端”可以和“服務端”通訊,發送相應的指令并得到相應的信息。比如“客戶端”插入一個新的鍵值對“(name: humu)”,那么就發送“SET name humu”;“服務端”接收到這個數據包后,執行相應的操作,再返回“OK”給“客戶端”。鑒于kv存儲需要強查找的數據結構,我們可以使用rbtree、btree、b+tree、hash、dhash、array(數據量不多,比如http頭)、skiplist、list(性能低不考慮)。最終,下表列出了我們要實現的所有數據結構及其對應的指令:

表1 kv存儲協議對應的數據結構及命令
操作/
數據結構
插入查找刪除計數存在
arraySET key valueGET keyDELETE keyCOUNTEXIST key
rbtreeRSET key valueRGET keyRDELETE keyRCOUNTREXIST key
btreeBSET key valueBGET keyBDELETE keyBCOUNTBEXIST key
hashHSET key valueHGET keyHDELETE keyHCOUNTHEXIST key
dhashDHSET key valueDHGET keyDHDELETE keyDHCOUNTDHEXIST key
skiplistZSET key valueZGET keyZDELETE keyZCOUNTZEXIST key
備注返回OK/Fail,
表示插入鍵值對是否成功
返回對應的value返回OK/Fail,
刪除對應的鍵值對
返回當前數據結構中
存儲的鍵值對數量
返回True/False,
判斷是否存在對應的鍵值對

??進一步,由于網絡編程中的“Hello,World!程序”就是實現一個echo,收到什么數據就原封不動的發送回去。所以我們希望在此基礎上,將kv存儲寫成一個獨立的進程,和網絡收發相關代碼隔離開,進而提升代碼的可維護性。另外在網絡協議的選擇中,由于我們的鍵值對設置通常較短只有十幾個字符(比如set key value),而http協議的協議頭就有幾十個字符,有效數據占比太低;udp協議只能在底層網卡確認對方收到,但沒法在應用層確認,所以不可控;于是我們網絡通信協議選擇tcp。于是對于“服務端”,我們就可以有如下的架構設計:

圖2 “服務端”程序架構
  1. 網絡層:負責收發數據。本項目中都是“字符串”。
  2. 協議層:將“網絡層”傳輸過來的字符串進行拆解,若為無效指令直接返回相應的提示信息;若為有效指令則傳遞給“引擎層”進行進一步的處理,根據“引擎層”的處理結果給出相應的返回信息。
  3. 引擎層:分為6種存儲引擎,每種存儲引擎都可以進行具體的增、刪、查等操作,也就是實現上表給出的5種命令。
  4. 存儲層:注意“內存型數據庫”的數據在內存中,但若后續需要“持久化”也會將數據備份到磁盤中。

2. 服務端原理及代碼框架

2.1 網絡數據回環的實現

??在使用原生的socket庫函數進行網絡通信時,會一直阻塞等待客戶端的連接/通信請求,這個線程就做不了其他的事情,非常浪費資源。于是“reactor模式”應運而生,也被稱為“基于事件驅動”,核心點在于:注冊事件、監聽事件、處理事件。也就是說,線程找了一個“秘書”專門負責去監聽網絡端口是否有“網絡通信”的發生,線程就可以去做其他的事情;等到線程想處理“網絡通信”的時候一起全部通知給該線程,然后這個“秘書”繼續監聽。顯然,有這樣一個“秘書”存在,可以將“網絡通信”、“業務處理”分隔開,一個線程可以同時處理多個客戶端的請求/通信,也就實現了“IO多路復用一個線程”。下面是常見的三種reactor模式:

  1. reactor單線程模型:只分配一個線程。顯然若線程的“業務處理”時間過長,會導致“秘書”積壓的事件過多,甚至可能會丟棄一些事件。本模型不適合計算密集型場景,只適合業務處理非常快的場景(本項目就是業務處理非常快)。
  2. reactor多線程模型:分配一個主線程和若干子線程。主線程只負責處理“網絡通信”,“業務處理”則交給子線程處理。本模式的好處是可以充分利用多核CPU性能,但是帶來了線程安全的問題。并且只有一個線程響應“網絡通信”,在瞬時高并發的場景下容易成為性能瓶頸。
  3. 主從reactor多線程模型:在上述多線程模型的基礎上,再額外開辟出新的子線程專門負責“與客戶端通信”,主線程則只負責“連接請求”。

參考B站視頻:【Java面試】介紹一下Reactor模式

下面使用epoll作為“秘書”,采用“reactor單線程模型”,完成網路數據回環(echo),也就是“服務端”程序框架的“網絡層”:

main.c–共356行

/** zv開頭的變量是zvnet異步網絡庫(epoll)。* kv開頭的變量是kv存儲協議解析。
*/
#include<stdio.h>
#include<stdlib.h>
#include<string.h>#include<errno.h>
#include<unistd.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<fcntl.h>
#include<sys/epoll.h>#include"kvstore.h"/*-------------------------------------------------------*/
/*-----------------------異步網路庫-----------------------*/
/*-------------------------------------------------------*/
/*-----------------------函數聲明-------------------------*/
#define max_buffer_len      1024    // 讀寫buffer長度
#define epoll_events_size   1024    // epoll就緒集合大小
#define connblock_size      1024    // 單個連接塊存儲的連接數量
#define listen_port_count   1       // 監聽端口總數// 有如下參數列表和返回之類型的函數都稱為 CALLBACK
// 回調函數,方便在特定epoll事件發生時執行相應的操作
typedef int (*ZV_CALLBACK)(int fd, int events, void *arg);
// 回調函數:建立連接
int accept_cb(int fd, int event, void* arg);
// 回調函數:接收數據
int recv_cb(int clientfd, int event, void* arg);
// 回調函數:發送數據
int send_cb(int clientfd, int event, void* arg);// 單個連接
typedef struct zv_connect_s{// 本連接的客戶端fdint fd;// 本連接的讀寫bufferchar rbuffer[max_buffer_len];size_t rcount;  // 讀buffer的實際大小char wbuffer[max_buffer_len];size_t wcount;  // 寫buffer的實際大小size_t next_len;  // 下一次讀數據長度(讀取多個包會用到)// 本連接的回調函數--accept()/recv()/send()ZV_CALLBACK cb;
}zv_connect;// 連接塊的頭
typedef struct zv_connblock_s{struct zv_connect_s *block;  // 指向的當前塊,注意大小為 connblock_sizestruct zv_connblock_s *next;  // 指向的下一個連接塊的頭
}zv_connblock;// 反應堆結構體
typedef struct zv_reactor_s{int epfd;   // epoll文件描述符// struct epoll_event events[epoll_events_size];  // 就緒事件集合struct zv_connblock_s *blockheader;  // 連接塊的第一個頭int blkcnt;  // 現有的連接塊的總數
}zv_reactor;// reactor初始化
int init_reactor(zv_reactor *reactor);
// reator銷毀
void destory_reactor(zv_reactor* reactor);
// 服務端初始化:將端口設置為listen狀態
int init_sever(int port);
// 將本地的listenfd添加進epoll
int set_listener(zv_reactor *reactor, int listenfd, ZV_CALLBACK cb);
// 創建一個新的連接塊(尾插法)
int zv_create_connblock(zv_reactor* reactor);
// 根據fd從連接塊中找到連接所在的位置
// 邏輯:整除找到所在的連接塊、取余找到在連接塊的位置
zv_connect* zv_connect_idx(zv_reactor* reactor, int fd);
// 運行kv存儲協議
int kv_run_while(int argc, char *argv[]);
/*-------------------------------------------------------*//*-----------------------函數定義-------------------------*/
// reactor初始化
int init_reactor(zv_reactor *reactor){if(reactor == NULL) return -1;// 初始化參數memset(reactor, 0, sizeof(zv_reactor));reactor->epfd = epoll_create(1);if(reactor->epfd <= 0){printf("init reactor->epfd error: %s\n", strerror(errno));return -1;}// 為鏈表集合分配內存reactor->blockheader = (zv_connblock*)calloc(1, sizeof(zv_connblock));if(reactor->blockheader == NULL) return -1;reactor->blockheader->next = NULL;// 為鏈表集合中的第一個塊分配內存reactor->blockheader->block = (zv_connect*)calloc(connblock_size, sizeof(zv_connect));if(reactor->blockheader->block == NULL) return -1;reactor->blkcnt = 1;return 0;
}// reator銷毀
void destory_reactor(zv_reactor* reactor){if(reactor){close(reactor->epfd);  // 關閉epollzv_connblock* curblk = reactor->blockheader;zv_connblock* nextblk = reactor->blockheader;do{curblk = nextblk;nextblk = curblk->next;if(curblk->block) free(curblk->block);if(curblk) free(curblk);}while(nextblk != NULL);}
}// 服務端初始化:將端口設置為listen狀態
int init_sever(int port){// 創建服務端int sockfd = socket(AF_INET, SOCK_STREAM, 0);  // io// fcntl(sockfd, F_SETFL, O_NONBLOCK);  // 非阻塞// 設置網絡地址和端口struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr_in));servaddr.sin_family = AF_INET;  // IPv4servaddr.sin_addr.s_addr = htonl(INADDR_ANY);  // 0.0.0.0,任何地址都可以連接本服務器servaddr.sin_port = htons(port);  // 端口// 將套接字綁定到一個具體的本地地址和端口if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){printf("bind failed: %s", strerror(errno));return -1;}// 將端口設置為listen(并不會阻塞程序執行)listen(sockfd, 10);  // 等待連接隊列的最大長度為10printf("listen port: %d, sockfd: %d\n", port, sockfd);return sockfd;
}// 將本地的listenfd添加進epoll
int set_listener(zv_reactor *reactor, int listenfd, ZV_CALLBACK cb){if(!reactor || !reactor->blockheader) return -1;// 將服務端放進連接塊reactor->blockheader->block[listenfd].fd = listenfd;reactor->blockheader->block[listenfd].cb = cb;  // listenfd的回調函數應該是accept()// 將服務端添加進epoll事件struct epoll_event ev;ev.data.fd = listenfd;ev.events = EPOLLIN;epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, listenfd, &ev);return 0;
}// 創建一個新的連接塊(尾插法)
int zv_create_connblock(zv_reactor* reactor){if(!reactor) return -1;// 初始化新的連接塊zv_connblock* newblk = (zv_connblock*)calloc(1, sizeof(zv_connblock));if(newblk == NULL) return -1;newblk->block = (zv_connect*)calloc(connblock_size, sizeof(zv_connect));if(newblk->block == NULL) return -1;newblk->next = NULL;// 找到最后一個連接塊zv_connblock* endblk = reactor->blockheader;while(endblk->next != NULL){endblk = endblk->next;}// 添加上新的連接塊endblk->next = newblk;reactor->blkcnt++;return 0;
}// 根據fd從連接塊中找到連接所在的位置
// 邏輯:整除找到所在的連接塊、取余找到在連接塊的位置
zv_connect* zv_connect_idx(zv_reactor* reactor, int fd){if(!reactor) return NULL;// 計算fd應該在的連接塊int blkidx = fd / connblock_size;while(blkidx >= reactor->blkcnt){zv_create_connblock(reactor);// printf("create a new connblk!\n");}// 找到這個連接塊zv_connblock* blk = reactor->blockheader;int i = 0;while(++i < blkidx){blk = blk->next;}return &blk->block[fd % connblock_size];
}// 回調函數:建立連接
// fd:服務端監聽端口listenfd
// event:沒用到,但是回調函數的常用格式
// arg:應該是reactor*
int accept_cb(int fd, int event, void* arg){// 與客戶端建立連接struct sockaddr_in clientaddr;  // 連接到本服務器的客戶端信息socklen_t len_sockaddr = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len_sockaddr);if(clientfd < 0){printf("accept() error: %s\n", strerror(errno));return -1;}// 將連接添加進連接塊zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);conn->fd = clientfd;conn->cb = recv_cb;conn->next_len = max_buffer_len;conn->rcount = 0;conn->wcount = 0;// 將客戶端添加進epoll事件struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLIN;  // 默認水平觸發(有數據就觸發)epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);printf("connect success! sockfd:%d, clientfd:%d\n", fd, clientfd);
}// 回調函數:接收數據
int recv_cb(int clientfd, int event, void* arg){zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);int recv_len = recv(clientfd, conn->rbuffer+conn->rcount, conn->next_len, 0);  // 由于當前fd可讀所以沒有阻塞if(recv_len < 0){printf("recv() error: %s\n", strerror(errno));close(clientfd);// return -1;exit(0);}else if(recv_len == 0){// 重置對應的連接塊conn->fd = -1;conn->rcount = 0;conn->wcount = 0;// 從epoll監聽事件中移除epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, clientfd, NULL);// 關閉連接close(clientfd);printf("close clientfd:%d\n", clientfd);}else if(recv_len > 0){conn->rcount += recv_len;// conn->next_len = *(short*)conn->rbuffer;  // 從tcp協議頭中獲取數據長度,假設前兩位是長度// 處理接收到的字符串,并將需要發回的信息存儲在緩沖區中printf("recv clientfd:%d, len:%d, mess: %s\n", clientfd, recv_len, conn->rbuffer);// conn->rcount = kv_protocal(conn->rbuffer, max_buffer_len);// 將kv存儲的回復消息(rbuffer)拷貝給wbuffer// printf("msg:%s len:%ld\n", msg, strlen(msg));memset(conn->wbuffer, '\0', max_buffer_len);memcpy(conn->wbuffer, conn->rbuffer, conn->rcount);conn->wcount = conn->rcount;memset(conn->rbuffer, 0, max_buffer_len);conn->rcount = 0;// 將本連接更改為epoll寫事件conn->cb = send_cb;struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLOUT;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, clientfd, &ev);}return 0;
}// 回調函數:發送數據
int send_cb(int clientfd, int event, void* arg){zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);int send_len = send(clientfd, conn->wbuffer, conn->wcount, 0);if(send_len < 0){printf("send() error: %s\n", strerror(errno));close(clientfd);return -1;}memset(conn->wbuffer, 0, conn->next_len);conn->wcount -= send_len;// 發送完成后將本連接再次更改為讀事件conn->cb = recv_cb;struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLIN;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, clientfd, &ev);return 0;
}// 運行kv存儲協議
int kv_run_while(int argc, char *argv[]){// 創建管理連接的反應堆// zv_reactor reactor;zv_reactor *reactor = (zv_reactor*)malloc(sizeof(zv_reactor));init_reactor(reactor);// 服務端初始化int start_port = atoi(argv[1]);for(int i=0; i<listen_port_count; i++){int sockfd = init_sever(start_port+i);set_listener(reactor, sockfd, accept_cb);  // 將sockfd添加進epoll}printf("init finish, listening connet...\n");// 開始監聽事件struct epoll_event events[epoll_events_size] = {0};  // 就緒事件集合while(1){// 等待事件發生int nready = epoll_wait(reactor->epfd, events, epoll_events_size, -1);  // -1等待/0不等待/正整數則為等待時長if(nready == -1){printf("epoll_wait error: %s\n", strerror(errno));break;}else if(nready == 0){continue;}else if(nready > 0){// printf("process %d epoll events...\n", nready);// 處理所有的就緒事件int i = 0;for(i=0; i<nready; i++){int connfd = events[i].data.fd;zv_connect* conn = zv_connect_idx(reactor, connfd);// 回調函數和下面的的邏輯實現了數據回環if(EPOLLIN & events[i].events){conn->cb(connfd, events[i].events, reactor);}if(EPOLLOUT & events[i].events){conn->cb(connfd, events[i].events, reactor);} }}}destory_reactor(reactor);return 0;
}int main(int argc, char *argv[]){if(argc != 2){printf("please enter port! e.x. 9999.\n");return -1;}// 初始化存儲引擎// kv_engine_init();// 運行kv存儲kv_run_while(argc, argv);// 銷毀存儲引擎// kv_engine_desy();return 0;
}
/*-------------------------------------------------------*/

注:只有251行、346行、352行是后續和kv存儲有關的接口函數。可以發現“網絡層”和“協議層”被很好的隔離開來。

圖3 驗證網絡數據回環

可以看到上述實現了網絡數據回環的功能。并且進一步來說,假如客戶端使用瀏覽器(http協議)對該端口進行訪問,那么對接收到的數據包進行http協議拆包,根據其請求的內容返回相應的信息(如html文件),那么就是我們所熟知的“web服務器”了。為什么是“爛大街”啊,一代比一代卷是吧😭。

2.2 array的實現

??現在使用“數組”來存儲“鍵值對節點”。首先我們可以想到的是,直接定義一個長度足夠大的數組(如1024),然后每次“插入”就直接找第一個空節點,“查找”和“刪除”都是遍歷所有節點。但是數組長度固定是一件很危險的事情,所以我們可以借鑒“內存池”的思想,來自動進行擴容和縮容。

圖4 array結構自動擴容/縮容的示意圖

如上圖所示,首先申請一個固定大小的“數組”存儲元素,當在某次“插入”元素發現沒有空節點時,可以直接再申請一塊“數組”,并將當前數組指向這個新數組;同理,當我們“刪除”一個元素時,若發現刪除后當前數組塊為空,可以直接free掉這塊內存,然后將其前后的內存塊連起來。注意到,為了能幫助我們快速判斷某個數組塊是否為空,還需要在其結構體中定義當前數組塊的有效元素數量count

array結構的增/刪/查操作還是比較簡單的,可以直接參考項目源碼中的“array.h”、“array.c”。

2.3 rbtree的實現

圖5 紅黑樹示意圖

??紅黑樹是一種“自平衡的二叉搜索樹”,可謂是耳熟能詳,其增刪查改的操作都已經非常成熟。通俗來說,紅黑樹本質上是一個二叉樹,而一個滿二叉樹的查找性能近似于 O ( log ? N ) O(\log N) O(logN),于是我們便希望每次插入/刪除元素時,紅黑樹都能把自己調整成近乎于滿二叉樹的狀態。具體來說,就是在普通二叉樹的基礎上,只需要為每個節點額外添加一個“顏色信息”,再額外規定一些必須滿足的性質,就可以保證紅黑樹始終平衡。下面是紅黑樹的性質–來源中文維基百科“紅黑樹”:

  1. 每個節點是紅的或者黑的。
  2. 根結點是黑的。
  3. 所有葉子節點都是黑的(葉子節點是nil節點)。
  4. 紅色節點的子節點必須是黑色。(或者說紅色節點不相鄰)
  5. 從任一節點到其每個葉子節點的所有簡單路徑,都包含相同數目的黑色節點。

“nil節點”:是一個黑色的空節點,作為紅黑樹的葉子節點。
性質口訣:左根右,根葉黑,不紅紅,黑路同
注:零聲教育提供了能顯示紅黑樹生成步驟的本地HTML文件–“紅黑樹生成步驟.html(39KB)”。

??紅黑樹的查找操作只需要從根節點不斷比較即可,而紅黑樹的插入/刪除操作則有點說法。下面是我按照編程邏輯進行簡化的插入/刪除原理。具體來說,就是每次插入/刪除后都需要進行調整,調整的邏輯如下:

紅黑樹插入:剛插入的位置一定是葉子節點,顏色為紅,然后按照如下方式調整。

// 父節點是黑色:無需操作。
while(父節點是紅色){if(叔節點為紅色){變色:祖父變紅/父變黑/叔變黑、祖父節點成新的當前節點。進入下一次循環。}else(叔節點是黑色,注意葉子節點也是黑色){判斷“祖父節點->父節點->當前節點”的左右走向:LL:祖父變紅/父變黑、祖父右旋。最后的當前節點應該是原來的當前節點。LR:祖父變紅/父變紅/當前變黑、父左旋/祖父右旋。最后的當前節點應該是原來的祖父節點/父節點。RL:祖父變紅/父變紅/當前變黑、父右旋/祖父左旋。最后的當前節點應該是原來的祖父節點/父節點。RR:祖父變紅/父變黑、祖父左旋。最后的當前節點應該是原來的當前節點。然后進入下一次循環。}
}

紅黑樹刪除:要刪除的位置一定是 沒有/只有一個 孩子。也就是說,如果要刪除的元素有兩個孩子,那就和其后繼節點交換鍵值對,然后實際刪除這個后繼節點。實際刪除的節點del_r為黑色,則將“孩子節點”(沒有孩子就是左側的葉子節點)作為“當前節點”按照如下方式調整。

while(當前節點是黑色 && 不是根節點){if(兄弟節點為紅色){if(當前節點是左孩子)父變紅/兄弟變黑、父左旋,當前節點下一循環。else(當前節點是右孩子)父變紅/兄弟變黑、父右旋,當前節點下一循環。}else(兄弟節點為黑色){if(兄弟節點沒有紅色子節點){if(父為黑)父變黑/兄弟變紅,父節點成新的當前節點進入下一循環。else(父為紅)父變黑/兄弟變紅,結束循環(當前節點指向根節點)。}else(兄弟節點有紅色子節點){判斷“父節點->兄弟節點->兄弟的紅色子節點(兩個子節點都是紅色則默認是左)”的走向:LL:紅子變黑/兄弟變父色、父右旋、父變黑,結束循環。LR:紅子變父色、兄弟左旋/父右旋、父變黑,結束循環。RR:紅子變黑/兄弟變父色、父左旋、父變黑,結束循環。RL:紅子變父色、兄弟右旋/父左旋、父變黑,結束循環。“結束循環”等價于當前節點指向根節點。}}
}
注意出了循環別忘了將根節點調整成黑色。

根據上述原理,我使用C語言實現了紅黑樹完整的增刪查操作,并增加了檢驗有效性打印紅黑樹圖的代碼,以及測試代碼。值得注意的是,為了加快開發速度,下面的代碼預設“鍵值對”的類型為int keyvoid* value,也就是只關心int型的key、不關心value,后續將紅黑樹添加進“kv存儲協議”中時會進一步修改:

rbtree_int.c-共905行

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>// 編譯指令:gcc -o main rbtree_int.c
// 本代碼實現紅黑樹,存儲int型key,未指定value。#define RBTREE_DEBUG 1  // 是否運行測試代碼typedef int KEY_TYPE;  // 節點的key類型
#define RED   1
#define BLACK 0// 定義紅黑樹單獨節點
typedef struct _rbtree_node {KEY_TYPE key;      // 鍵void *value;  // 值,可以指向任何類型struct _rbtree_node *left;struct _rbtree_node *right;struct _rbtree_node *parent;unsigned char color;  // 不同編譯器的無符號性質符號不同,這里加上unsigned減少意外。/* 對于32位系統,上述只有color是1個字節,其余都是4個字節,所以color放在最后可以節省內存。 */
} rbtree_node;// 定義整個紅黑樹
typedef struct _rbtree{struct _rbtree_node *root_node; // 根節點struct _rbtree_node *nil_node; // 空節點,也就是葉子節點、根節點的父節點
} rbtree;// 存儲打印紅黑樹所需的參數
typedef struct _disp_parameters{// 打印緩沖區char **disp_buffer;// 打印緩沖區的深度,寬度,當前打印的列數int disp_depth;int disp_width;int disp_column;// 樹的深度int max_depth;// 最大的數字位寬int max_num_width;// 單個節點的顯示寬度int node_width;
}disp_parameters;/*----初始化及釋放內存----*/
// 紅黑樹初始化,注意調用完后釋放內存rbtree_free
rbtree *rbtree_malloc(void);
// 紅黑樹釋放內存
void rbtree_free(rbtree *T);/*----插入操作----*/
// 紅黑樹插入
void rbtree_insert(rbtree *T, KEY_TYPE key, void *value);
// 調整插入新節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_insert_fixup(rbtree *T, rbtree_node *cur);/*----刪除操作----*/
// 紅黑樹刪除
void rbtree_delete(rbtree *T, rbtree_node *del);
// 調整刪除某節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_delete_fixup(rbtree *T, rbtree_node *cur);/*----查找操作----*/
// 紅黑樹查找
rbtree_node* rbtree_search(rbtree *T, KEY_TYPE key);/*----打印信息----*/
// 中序遍歷整個紅黑樹,依次打印節點信息
void rbtree_traversal(rbtree *T);
// 以圖的形式展示紅黑樹
void rbtree_display(rbtree *T);
// 先序遍歷,打印紅黑樹信息到字符數組指針
void set_display_buffer(rbtree *T, rbtree_node *cur, disp_parameters *p);/*----檢查有效性----*/
// 檢查當前紅黑樹的有效性:根節點黑色、紅色不相鄰、所有路徑黑高相同
bool rbtree_check_effective(rbtree *T);/*----其他函數----*/
// 在給定節點作為根節點的子樹中,找出key最小的節點
rbtree_node* rbtree_min(rbtree *T, rbtree_node *cur);
// 在給定節點作為根節點的子樹中,找出key最大的節點
rbtree_node* rbtree_max(rbtree *T, rbtree_node *cur);
// 找出當前節點的前驅節點
rbtree_node* rbtree_precursor_node(rbtree *T, rbtree_node *cur);
// 找出當前節點的后繼節點
rbtree_node* rbtree_successor_node(rbtree *T, rbtree_node *cur);
// 紅黑樹節點左旋,無需修改顏色
void rbtree_left_rotate(rbtree *T, rbtree_node *x);
// 紅黑樹節點右旋,無需修改顏色
void rbtree_right_rotate(rbtree *T, rbtree_node *y);
// 計算紅黑樹的深度
int rbtree_depth(rbtree *T);
// 遞歸計算紅黑樹的深度(不包括葉子節點)
int rbtree_depth_recursion(rbtree *T, rbtree_node *cur);/*-----------------------------下面為函數定義-------------------------------*/
// 紅黑樹初始化,注意調用完后釋放內存rbtree_free()
rbtree *rbtree_malloc(void){rbtree *T = (rbtree*)malloc(sizeof(rbtree));if(T == NULL){printf("rbtree malloc failed!");}else{T->nil_node = (rbtree_node*)malloc(sizeof(rbtree_node));T->nil_node->color = BLACK;T->nil_node->left = T->nil_node;T->nil_node->right = T->nil_node;T->nil_node->parent = T->nil_node;T->root_node = T->nil_node;}return T;
}// 紅黑樹釋放內存
void rbtree_free(rbtree *T){free(T->nil_node);free(T);
}// 在給定節點作為根節點的子樹中,找出key最小的節點
rbtree_node* rbtree_min(rbtree *T, rbtree_node *cur){  while(cur->left != T->nil_node){cur = cur->left;}return cur;
}// 在給定節點作為根節點的子樹中,找出key最大的節點
rbtree_node* rbtree_max(rbtree *T, rbtree_node *cur){  while(cur->right != T->nil_node){cur = cur->right;}return cur;
}// 找出當前節點的前驅節點
rbtree_node* rbtree_precursor_node(rbtree *T, rbtree_node *cur){// 若當前節點有左孩子,那就直接向下找if(cur->left != T->nil_node){return rbtree_max(T, cur->left);}// 若當前節點沒有左孩子,那就向上找rbtree_node *parent = cur->parent;while((parent != T->nil_node) && (cur == parent->left)){cur = parent;parent = cur->parent;}return parent;// 若返回值為空節點,則說明當前節點就是第一個節點
}// 找出當前節點的后繼節點
rbtree_node* rbtree_successor_node(rbtree *T, rbtree_node *cur){// 若當前節點有右孩子,那就直接向下找if(cur->right != T->nil_node){return rbtree_min(T, cur->right);}// 若當前節點沒有右孩子,那就向上找rbtree_node *parent = cur->parent;while((parent != T->nil_node) && (cur == parent->right)){cur = parent;parent = cur->parent;}return parent;// 若返回值為空節點,則說明當前節點就是最后一個節點
}// 紅黑樹節點左旋,無需修改顏色
void rbtree_left_rotate(rbtree *T, rbtree_node *x){// 傳入rbtree*是為了判斷節點node的左右子樹是否為葉子節點、父節點是否為根節點。rbtree_node *y = x->right;// 注意紅黑樹中所有路徑都是雙向的,兩邊的指針都要改!// 另外,按照如下的修改順序,無需存儲額外的節點。x->right = y->left;if(y->left != T->nil_node){y->left->parent = x;}y->parent = x->parent;if(x->parent == T->nil_node){  // x為根節點T->root_node = y;}else if(x->parent->left == x){x->parent->left = y;}else{x->parent->right = y;}y->left = x;x->parent = y;
}// 紅黑樹節點右旋,無需修改顏色
void rbtree_right_rotate(rbtree *T, rbtree_node *y){rbtree_node *x = y->left;y->left = x->right;if(x->right != T->nil_node){x->right->parent = y;}x->parent = y->parent;if(y->parent == T->nil_node){T->root_node = x;}else if(y->parent->left == y){y->parent->left = x;}else{y->parent->right = x;}x->right = y;y->parent = x;
}// 調整插入新節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_insert_fixup(rbtree *T, rbtree_node *cur){// 父節點是黑色,無需調整。// 父節點是紅色,則有如下八種情況。while(cur->parent->color == RED){// 獲取叔節點rbtree_node *uncle;if(cur->parent->parent->left == cur->parent){uncle = cur->parent->parent->right;}else{uncle = cur->parent->parent->left;}// 若叔節點為紅,只需更新顏色(隱含了四種情況)// 循環主要在這里起作用if(uncle->color == RED){// 叔節點為紅色:祖父變紅/父變黑/叔變黑、祖父節點成新的當前節點。if(uncle->color == RED){cur->parent->parent->color = RED;cur->parent->color = BLACK;uncle->color = BLACK;cur = cur->parent->parent;}}// 若叔節點為黑,需要變色+旋轉(當前節點相當于祖父節點位置包括四種情況:LL/RR/LR/RL)// 下面對四種情況進行判斷:都是只執行一次else{if(cur->parent->parent->left == cur->parent){// LL:祖父變紅/父變黑、祖父右旋。最后的當前節點應該是原來的當前節點。if(cur->parent->left == cur){cur->parent->parent->color = RED;cur->parent->color = BLACK;rbtree_right_rotate(T, cur->parent->parent);}// LR:祖父變紅/父變紅/當前變黑、父左旋、祖父右旋。最后的當前節點應該是原來的祖父節點/父節點。else{cur->parent->parent->color = RED;cur->parent->color = RED;cur->color = BLACK;cur = cur->parent;rbtree_left_rotate(T, cur);rbtree_right_rotate(T, cur->parent->parent);}}else{// RL:祖父變紅/父變紅/當前變黑、父右旋、祖父左旋。最后的當前節點應該是原來的祖父節點/父節點。if(cur->parent->left == cur){cur->parent->parent->color = RED;cur->parent->color = RED;cur->color = BLACK;cur = cur->parent;rbtree_right_rotate(T, cur);rbtree_left_rotate(T, cur->parent->parent);}// RR:祖父變紅/父變黑、祖父左旋。最后的當前節點應該是原來的當前節點。else{cur->parent->parent->color = RED;cur->parent->color = BLACK;rbtree_left_rotate(T, cur->parent->parent);}}}}// 將根節點變為黑色T->root_node->color = BLACK;
}// 插入
// void rbtree_insert(rbtree *T, rbtree_node *new){
void rbtree_insert(rbtree *T, KEY_TYPE key, void *value){// 創建新節點rbtree_node *new = (rbtree_node*)malloc(sizeof(rbtree_node));new->key = key;new->value = value;// 尋找插入位置(紅黑樹中序遍歷升序)rbtree_node *cur = T->root_node;rbtree_node *next = T->root_node;// 剛插入的位置一定是葉子節點while(next != T->nil_node){cur = next;if(new->key > cur->key){next = cur->right;}else if(new->key < cur->key){next = cur->left;}else if(new->key == cur->key){// 紅黑樹本身沒有明確如何處理key相同節點,所以取決于業務。// 場景1:統計不同課程的人數,相同就+1。// 場景2:時間戳,若相同則稍微加一點// 其他場景:覆蓋、丟棄...printf("Already have the same key=%d!\n", new->key);free(new);return;}}if(cur == T->nil_node){// 若紅黑樹本身沒有節點T->root_node = new;}else if(new->key > cur->key){cur->right = new;}else{cur->left = new;}new->parent = cur;new->left = T->nil_node;new->right = T->nil_node;new->color = RED;// 調整紅黑樹,使得紅色節點不相鄰rbtree_insert_fixup(T, new);
}// 調整刪除某節點后的紅黑樹,使得紅色節點不相鄰(平衡性)
void rbtree_delete_fixup(rbtree *T, rbtree_node *cur){// child是黑色、child不是根節點才會進入循環while((cur->color == BLACK) && (cur != T->root_node)){// 獲取兄弟節點rbtree_node *brother = T->nil_node;if(cur->parent->left == cur){brother = cur->parent->right;}else{brother = cur->parent->left;}// 兄弟節點為紅色:父變紅/兄弟變黑、父單旋、當前節點下一循環if(brother->color == RED){cur->parent->color = RED;brother->color = BLACK;if(cur->parent->left == cur){rbtree_left_rotate(T, cur->parent);}else{rbtree_right_rotate(T, cur->parent);}}// 兄弟節點為黑色else{ // 兄弟節點沒有紅色子節點:父變黑/兄弟變紅、看情況是否結束循環if((brother->left->color == BLACK) && (brother->right->color == BLACK)){// 若父原先為黑,父節點成新的當前節點進入下一循環;否則結束循環。if(brother->parent->color == BLACK){cur = cur->parent;}else{cur = T->root_node;}brother->parent->color = BLACK;brother->color = RED;}// 兄弟節點有紅色子節點:LL/LR/RR/RLelse if(brother->parent->left == brother){// LL:紅子變黑/兄弟變父色/父變黑、父右旋,結束循環if(brother->left->color == RED){brother->left->color = BLACK;brother->color = brother->parent->color;brother->parent->color = BLACK;rbtree_right_rotate(T, brother->parent);cur = T->root_node;}// LR:紅子變父色/父變黑、兄弟左旋/父右旋,結束循環else{brother->right->color = brother->parent->color;cur->parent->color = BLACK;rbtree_left_rotate(T, brother);rbtree_right_rotate(T, cur->parent);cur = T->root_node;}}else{// RR:紅子變黑/兄弟變父色/父變黑、父左旋,結束循環if(brother->right->color == RED){brother->right->color = BLACK;brother->color = brother->parent->color;brother->parent->color = BLACK;rbtree_left_rotate(T, brother->parent);cur = T->root_node;}// RL:紅子變父色/父變黑、兄弟右旋/父左旋,結束循環else{brother->left->color = brother->parent->color;brother->parent->color = BLACK;rbtree_right_rotate(T, brother);rbtree_left_rotate(T, cur->parent);cur = T->root_node;}}}}// 下面這行處理情況2/3cur->color = BLACK;
}// 紅黑樹刪除
void rbtree_delete(rbtree *T, rbtree_node *del){if(del != T->nil_node){/* 紅黑樹刪除邏輯:1. 標準的BST刪除操作(本函數):最紅都會轉換成刪除只有一個子節點或者沒有子節點的節點。2. 若刪除節點為黑色,則進行調整(rebtre_delete_fixup)。*/rbtree_node *del_r = T->nil_node;        // 實際刪除的節點rbtree_node *del_r_child = T->nil_node;  // 實際刪除節點的子節點// 找出實際刪除的節點// 注:實際刪除的節點最多只有一個子節點,或者沒有子節點(必然在最后兩層中,不包括葉子節點那一層)if((del->left == T->nil_node) || (del->right == T->nil_node)){// 如果要刪除的節點本身就只有一個孩子或者沒有孩子,那實際刪除的節點就是該節點del_r = del;}else{// 如果要刪除的節點有兩個孩子,那就使用其后繼節點(必然最多只有一個孩子)del_r = rbtree_successor_node(T, del);}// 看看刪除節點的孩子是誰,沒有孩子就是空節點if(del_r->left != T->nil_node){del_r_child = del_r->left;}else{del_r_child = del_r->right;}// 將實際要刪除的節點刪除del_r_child->parent = del_r->parent;  // 若child為空節點,最后再把父節點指向空節點if(del_r->parent == T->nil_node){T->root_node = del_r_child;}else if(del_r->parent->left == del_r){del_r->parent->left = del_r_child;}else{del_r->parent->right = del_r_child;}// 替換替換鍵值對if(del != del_r){del->key = del_r->key;del->value = del_r->value;}// 最后看是否需要調整if(del_r->color == BLACK){rbtree_delete_fixup(T, del_r_child);}// 調整空節點的父節點if(del_r_child == T->nil_node){del_r_child->parent = T->nil_node;}free(del_r);}
}// 查找
rbtree_node* rbtree_search(rbtree *T, KEY_TYPE key){rbtree_node *cur = T->root_node;while(cur != T->nil_node){if(cur->key > key){cur = cur->left;}else if(cur->key < key){cur = cur->right;}else{return cur;}}printf("There is NO key=%d in rbtree!\n", key);return T->nil_node;
}// 中序遍歷給定結點為根節點的子樹(遞歸)
void rbtree_traversal_node(rbtree *T, rbtree_node *cur){if(cur != T->nil_node){rbtree_traversal_node(T, cur->left);if(cur->color == RED){printf("Key:%d\tColor:Red\n", cur->key);}else{printf("Key:%d\tColor:Black\n", cur->key);}rbtree_traversal_node(T, cur->right);}
}// 中序遍歷整個紅黑樹
void rbtree_traversal(rbtree *T){rbtree_traversal_node(T, T->root_node);
}// 遞歸計算紅黑樹的深度(不包括葉子節點)
int rbtree_depth_recursion(rbtree *T, rbtree_node *cur){if(cur == T->nil_node){return 0;}else{int left = rbtree_depth_recursion(T, cur->left);int right = rbtree_depth_recursion(T, cur->right);return ((left > right) ? left : right) + 1;}
}// 計算紅黑樹的深度
int rbtree_depth(rbtree *T){return rbtree_depth_recursion(T, T->root_node);
}// 獲取輸入數字的十進制顯示寬度
int decimal_width(int num_in){int width = 0;while (num_in != 0){num_in = num_in / 10;width++;}return width;
}// 先序遍歷,打印紅黑樹信息到字符數組指針
void set_display_buffer(rbtree *T, rbtree_node *cur, disp_parameters *p){if(cur != T->nil_node){// 輸出當前節點p->disp_depth++;// 輸出數字到緩沖區char num_char[20];char formatString[20];int cur_num_width = decimal_width(cur->key);int num_space = (p->node_width - 2 - cur_num_width) >> 1;  // 數字后面需要補充的空格數量strncpy(formatString, "|%*d", sizeof(formatString));int i = 0;for(i=0; i<num_space; i++){strncat(formatString, " ", 2);}strncat(formatString, "|", 2);snprintf(num_char, sizeof(num_char), formatString, (p->node_width-2-num_space), cur->key);i = 0;while(num_char[i] != '\0'){p->disp_buffer[(p->disp_depth-1)*3][p->disp_column+i] = num_char[i];i++;}// 輸出顏色到緩沖區char color_char[20];if(cur->color == RED){num_space = (p->node_width-2-3)>>1;strncpy(color_char, "|", 2);for(i=0; i<(p->node_width-2-3-num_space); i++){strncat(color_char, " ", 2);}strncat(color_char, "RED", 4);for(i=0; i<num_space; i++){strncat(color_char, " ", 2);}strncat(color_char, "|", 2);}else{num_space = (p->node_width-2-5)>>1;strncpy(color_char, "|", 2);for(i=0; i<(p->node_width-2-5-num_space); i++){strncat(color_char, " ", 2);}strncat(color_char, "BLACK", 6);for(i=0; i<num_space; i++){strncat(color_char, " ", 2);}strncat(color_char, "|", 2);}// strcpy(color_char, (cur->color == RED) ? "| RED |" : "|BLACK|");i = 0;while(color_char[i] != '\0'){p->disp_buffer[(p->disp_depth-1)*3+1][p->disp_column+i] = color_char[i];i++;}// 輸出連接符到緩沖區if(p->disp_depth>1){char connector_char[10];strcpy(connector_char, (cur->parent->left == cur) ? "/" : "\\");p->disp_buffer[(p->disp_depth-1)*3-1][p->disp_column+(p->node_width>>1)] = connector_char[0];}// 下一層需要前進/后退的字符數int steps = 0;if(p->disp_depth+1 == p->max_depth){steps = (p->node_width>>1)+1;}else{steps = (1<<(p->max_depth - p->disp_depth - 2)) * p->node_width;}// 輸出左側節點p->disp_column -= steps;set_display_buffer(T, cur->left, p);p->disp_column += steps;// 輸出右側節點if(p->disp_depth+1 == p->max_depth){steps = p->node_width-steps;}p->disp_column += steps;set_display_buffer(T, cur->right, p);p->disp_column -= steps;p->disp_depth--;}
}// 以圖的形式展示紅黑樹
void rbtree_display(rbtree *T){// 紅黑樹為空不畫圖if(T->root_node == T->nil_node){printf("rbtree DO NOT have any key!\n");return;}// 初始化參數結構體disp_parameters *para = (disp_parameters*)malloc(sizeof(disp_parameters));if(para == NULL){printf("disp_parameters struct malloc failed!");return;}rbtree_node *max_node = rbtree_max(T, T->root_node);para->max_num_width = decimal_width(max_node->key);    para->max_depth = rbtree_depth(T);para->node_width = (para->max_num_width<=5) ? 7 : (para->max_num_width+2);  // 邊框“||”寬度2 + 數字寬度para->disp_depth = 0;para->disp_width = para->node_width * (1 << (para->max_depth-1)) + 1;para->disp_column = ((para->disp_width-para->node_width)>>1);int height = (para->max_depth-1)*3 + 2;// 根據樹的大小申請內存para->disp_buffer = (char**)malloc(sizeof(char*)*height);int i = 0;for(i=0; i<height; i++){para->disp_buffer[i] = (char*)malloc(sizeof(char)*para->disp_width);memset(para->disp_buffer[i], ' ', para->disp_width);para->disp_buffer[i][para->disp_width-1] = '\0';}// 打印內容set_display_buffer(T, T->root_node, para);for(i=0; i<height; i++){printf("%s\n", para->disp_buffer[i]);}// 釋放內存for(i=0; i<height; i++){free(para->disp_buffer[i]);}free(para->disp_buffer);free(para);
}// 檢查當前紅黑樹的有效性:根節點黑色、紅色不相鄰、所有路徑黑高相同
bool rbtree_check_effective(rbtree *T){bool rc_flag = true;  // 根節點黑色bool rn_flag = true;  // 紅色不相鄰bool bh_flag = true;  // 所有路徑黑高相同if(T->root_node->color == RED){printf("ERROR! root-node's color is RED!\n");rc_flag = false;}else{int depth = rbtree_depth(T);int max_index_path = 1<<(depth-1);  // 從根節點出發的路徑總數// 獲取最左側路徑的黑高int black_height = 0;rbtree_node *cur = T->root_node;while(cur != T->nil_node){if(cur->color == BLACK) black_height++;cur = cur->left;// printf("bh = %d\n", black_height);}// 遍歷每一條路徑int i_path = 0;for(i_path=1; i_path<max_index_path; i_path++){int dir = i_path;int bh = 0;  // 當前路徑的黑高cur = T->root_node;while(cur != T->nil_node){// 更新黑高if(cur->color == BLACK){bh++;}// 判斷紅色節點不相鄰else{if((cur->left->color == RED) || (cur->right->color == RED)){printf("ERROR! red node %d has red child!\n", cur->key);rn_flag = false;}}// 更新下一節點// 0:left, 1:rightif(dir%2) cur = cur->right;else      cur = cur->left;dir = dir>>1;}if(bh != black_height){printf("ERROR! black height is not same! path 0 is %d, path %d is %d.\n", black_height, i_path, bh);bh_flag = false;}}}return (rc_flag && rn_flag && bh_flag);
}
/*------------------------------------------------------------------------*//*-----------------------------下面為測試代碼-------------------------------*/
#if RBTREE_DEBUG
#include<time.h>  // 使用隨機數
#include<sys/time.h>  // 計算qps中獲取時間
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define ENABLE_QPS  1  // 是否開啟qps性能測試
#define continue_test_len  1000000  // 連續測試的長度
// 冒泡排序
void bubble_sort(int arr[], int len) {int i, j, temp;for (i = 0; i < len - 1; i++)for (j = 0; j < len - 1 - i; j++)if (arr[j] > arr[j + 1]) {temp = arr[j];arr[j] = arr[j + 1];arr[j + 1] = temp;}
}
int main(){/* --------------------定義數組-------------------- */// 預定義的數組// int KeyArray[20] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};  // 正著插// int KeyArray[20] = {20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1};  // 倒著插// int KeyArray[20] = {1,2,3,4,5,10,9,8,7,6,11,12,13,14,15,20,19,18,17,16};  // 亂序插// int KeyArray[31] = {11,12,13,14,15,16,17,18,19,20,1,2,3,4,5,6,7,8,9,10,21,22,23,24,25,26,27,28,29,30,31};  // 亂序插// 順序增長的數組int len_array = 18;int KeyArray[len_array];int i_array = 0;for(i_array=0; i_array<len_array; i_array++){KeyArray[i_array] = i_array + 1;}// // 隨機生成固定大小的隨機數組// int len_array = 18;// int KeyArray[len_array];// srand(time(NULL));// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){//     KeyArray[i_array] = rand() % 9999999999;// }/* ------------------以下測試代碼------------------ */printf("-------------------紅黑樹插入測試------------------\n");// 先給輸入的數組排個序int len_max = sizeof(KeyArray)/sizeof(int);printf("測試數組長度: %d\n", len_max);// 將原先的數組深拷貝并升序排序int *KeyArray_sort = (int*)malloc(sizeof(KeyArray));printf("RAND_MAX: %d\n", RAND_MAX);int i = 0;for(i = 0; i < len_max; i++){KeyArray_sort[i] = KeyArray[i];}bubble_sort(KeyArray_sort, len_max);// 申請紅黑樹內存rbtree *T = rbtree_malloc();// 依次插入數據for(i = 0; i < len_max; i++){rbtree_insert(T, KeyArray[i], NULL);}// 遍歷數據,查看是否符合紅黑樹性質// rbtree_display(T);if(rbtree_check_effective(T)){printf("PASS---->插入測試\n");}else{printf("FAIL---->插入測試\n");}// rbtree_display(T);printf("-------------------紅黑樹前驅節點測試------------------\n");int pass_flag = 1;if(rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[0])) != T->nil_node){printf("search first key %d's precursor error! get %d, expected nil_node\n", len_max, rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[0]))->key);pass_flag = 0;}for(i = 1; i<len_max; i++){rbtree_node *precursor = rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[i]));if(precursor->key != KeyArray_sort[i-1]){printf("search key %d error! get %d, expected %d\n", KeyArray_sort[i], precursor->key, KeyArray_sort[i-1]);pass_flag = 0;}}if(pass_flag){printf("PASS---->前驅節點測試\n");}else{printf("FAIL---->前驅節點測試\n");}printf("-------------------紅黑樹后繼節點測試------------------\n");pass_flag = 1;if(rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[len_max-1])) != T->nil_node){printf("search last key %d's successor error! get %d, expected nil_node\n",\KeyArray_sort[len_max-1],\rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[len_max-1]))->key);pass_flag = 0;}for(i = 0; i<len_max-1; i++){rbtree_node *successor = rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[i]));if(successor->key != KeyArray_sort[i+1]){printf("search key %d error! get %d, expected %d\n", KeyArray_sort[i], successor->key, KeyArray_sort[i+1]);pass_flag = 0;}}if(pass_flag){printf("PASS---->后繼節點測試\n");}else{printf("FAIL---->后繼節點測試\n");}printf("-------------------紅黑樹刪除測試------------------\n");// 依次刪除所有元素for(i=0; i<len_max; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[i]));if(!rbtree_check_effective(T)){rbtree_display(T);printf("FAIL---->刪除測試%d\n", i+1);break;}else{printf("PASS---->刪除測試%d\n", i+1);}}printf("-------------------紅黑樹打印測試------------------\n");// 先插入數據1~18,再刪除16/17/18,即可得到4層的滿二叉樹for(i = 0; i < len_max; i++){rbtree_insert(T, KeyArray[i], NULL);}for(i=0; i<3; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[len_max-i-1]));if(!rbtree_check_effective(T)){printf("FAIL---->刪除測試%d\n", KeyArray_sort[len_max-i-1]);break;}else{printf("PASS---->刪除測試%d\n", KeyArray_sort[len_max-i-1]);}}// 打印看看結果rbtree_display(T);// 清空紅黑樹for(i=0; i<len_max; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[i]));}#if ENABLE_QPSprintf("---------------紅黑樹連續插入性能測試---------------\n");// 定義時間結構體struct timeval tv_begin;struct timeval tv_end;gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){rbtree_insert(T, i+1, NULL);}gettimeofday(&tv_end, NULL);int time_ms = TIME_SUB_MS(tv_end, tv_begin);float qps = (float)continue_test_len / (float)time_ms * 1000;printf("total INSERTs:%d  time_used:%d(ms)  qps:%.2f(INSERTs/sec)\n", continue_test_len, time_ms, qps);printf("---------------紅黑樹連續查找性能測試---------------\n");gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){// rbtree_search(T, i+1);if(rbtree_search(T, i+1)->key != i+1){printf("continue_search error!\n");return 0;}}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total SEARCHs:%d  time_used:%d(ms)  qps:%.2f(SEARCHs/sec)\n", continue_test_len, time_ms, qps);printf("---------------紅黑樹連續刪除性能測試---------------\n");gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){rbtree_delete(T, rbtree_search(T, i+1));}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total DELETEs:%d  time_used:%d(ms)  qps:%.2f(DELETEs/sec)\n", continue_test_len, time_ms, qps);
#endifprintf("--------------------------------------------------\n");rbtree_free(T); // 別忘了釋放內存free(KeyArray_sort);return 0;
}
#endif

測試代碼輸出結果

lyl@ubuntu:~/Desktop/kv-store/code_init$ gcc -o main rbtree_int.c 
lyl@ubuntu:~/Desktop/kv-store/code_init$ ./main
-------------------紅黑樹插入測試------------------
測試數組長度: 18
RAND_MAX: 2147483647
PASS---->插入測試
-------------------紅黑樹前驅節點測試------------------
PASS---->前驅節點測試
-------------------紅黑樹后繼節點測試------------------
PASS---->后繼節點測試
-------------------紅黑樹刪除測試------------------
PASS---->刪除測試1
PASS---->刪除測試2
PASS---->刪除測試3
PASS---->刪除測試4
PASS---->刪除測試5
PASS---->刪除測試6
PASS---->刪除測試7
PASS---->刪除測試8
PASS---->刪除測試9
PASS---->刪除測試10
PASS---->刪除測試11
PASS---->刪除測試12
PASS---->刪除測試13
PASS---->刪除測試14
PASS---->刪除測試15
PASS---->刪除測試16
PASS---->刪除測試17
PASS---->刪除測試18
-------------------紅黑樹打印測試------------------
PASS---->刪除測試18
PASS---->刪除測試17
PASS---->刪除測試16|  8  |                        |BLACK|                        /                           \             |  4  |                     |  12 |          | RED |                     | RED |          /             \             /             \      |  2  |       |  6  |       |  10 |       |  14 |   |BLACK|       |BLACK|       |BLACK|       |BLACK|   /      \      /      \      /      \      /      \   
|  1  ||  3  ||  5  ||  7  ||  9  ||  11 ||  13 ||  15 |
|BLACK||BLACK||BLACK||BLACK||BLACK||BLACK||BLACK||BLACK|
There is NO key=16 in rbtree!
There is NO key=17 in rbtree!
There is NO key=18 in rbtree!
---------------紅黑樹連續插入性能測試---------------
total INSERTs:1000000  time_used:295(ms)  qps:3389830.50(INSERTs/sec)
---------------紅黑樹連續查找性能測試---------------
total SEARCHs:1000000  time_used:118(ms)  qps:8474576.00(SEARCHs/sec)
---------------紅黑樹連續刪除性能測試---------------
total DELETEs:1000000  time_used:95(ms)  qps:10526315.00(DELETEs/sec)
--------------------------------------------------

編程感想:

  1. 每一次旋轉都是一次謀權篡位。
  2. 雙旋的時候,最后的“當前節點”應該是原來的“祖父節點/父節點”,若還保留當前身份,那么會造成錯誤。

紅黑樹插入:

  • 參考B站:【neko算法課】紅黑樹 插入【11期】

紅黑樹刪除:

  • 參考B站:【neko算法課】紅黑樹 刪除【12期】
  • 參考微信圖文:圖解:什么是二叉排序樹?–介紹了標準BST刪除操作
  • 參考知乎:圖解:紅黑樹刪除篇(一文讀懂)–里面的v對應del_r、u對應del_r_child

紅黑樹打印:

  • 參考CSDN:二叉樹生成與打印顯示 c語言實現

2.4 btree的實現

圖6 6階B樹示意圖——依次插入26個英文字母

??一般來說,B樹也是一個自平衡的二叉搜索樹。但與紅黑樹不同的是,B樹的節點可以存儲多個元素, m m m階B樹的單個節點,最多有 m ? 1 m-1 m?1 個元素、 m m m 個子節點。并且B樹只有孩子節點、沒有父節點(沒有向上的指針)。也就是說,對于插入/刪除操作,紅黑樹可以先從上往下尋找插入位置,再從下往上進行調整;而B樹要先從上往下調整完(“分裂、合并/借位”),最后在葉子節點進行插入/刪除,而沒有從下往上的過程。即進行插入/刪除時,B樹從上往下只走一次。下面給出一個 m m m階B樹應該滿足的條件(判斷一棵B樹是否有效的依據):

  1. 每個結點至多擁有 m m m顆子樹。
  2. 根結點至少擁有兩顆子樹。
  3. 除了根結點以外,其余每個分支結點至少擁有 m / 2 m/2 m/2棵子樹。
  4. 所有的葉結點都在同一層上。
  5. k k k 棵子樹的分支結點則存在 k ? 1 k-1 k?1 個元素,元素按照遞增順序進行排序。
  6. 單個節點的元素數量 n n n 滿足 ceil ( m / 2 ) ? 1 ≤ n ≤ m ? 1 \text{ceil}(m/2)-1 \le n \le m-1 ceil(m/2)?1nm?1

B樹可視化網站:https://www.cs.usfca.edu/~galles/visualization/BTree.html

同樣的,B樹的查找操作只需要從根節點不斷比較即可,而B樹的插入/刪除邏輯如下:

B樹插入:從上往下尋找要插入的葉子節點,過程中要下去的孩子若是滿節點,則進行“分裂”。

  • 分裂:取孩子的中間節點(第 ceil m 2 \text{ceil}\frac{m}{2} ceil2m? 個)放上來,剩下的元素列變成兩個子節點。
  • 新元素必然添加到葉子節點上。
  • 注意:只有根節點分裂會增加層高,其余的不會。

B樹刪除:從上往下尋找要刪除元素的所在節點,過程中看情況進行“合并/借位”。若所在節點不是葉子節點,就將其換到葉子節點中。最后在葉子節點刪除元素。

  • 合并:從當前節點下放一個元素,然后該元素對應的兩個子節點合并成一個子節點。團結就是力量。
  • 借位:從當前節點下方一個元素到元素較少的孩子,然后從當前元素的另一個孩子節點拉上來一個元素取代位置,注意大小順序。損有余而補不足。
// 遍歷到葉子節點
while(不是葉子節點){// 1. 確定下一節點和其兄弟節點if(當前節點有要刪除的元素) 哪邊少哪邊就是下一節點,當前元素對應的另一邊就是兄弟節點。else(當前節點沒有要刪除的元素) 確定好要去的下一節點后,左右兩邊誰多誰是兄弟節點。// 2. 看是否需要調整if(下一節點元素少){if(孩子的兄弟節點元素多) 借位,進入下一節點。else(孩子的兄弟節點元素少) 合并,進入合并后的節點。}else if(下一節點元素多 && 當前節點有要刪除元素){if(下一節點是刪除元素的左節點) 刪除元素和其前驅元素換位,進入下一節點。else(下一節點是刪除元素的右節點) 刪除元素和其后繼元素換位,進入下一節點。}else{直接進入下一節點。}
}
// 然后在葉子節點刪除元素

注:判斷孩子節點的元素少的條件是 元素數量 ≤ ceil m 2 ? 1 \le \text{ceil}\frac{m}{2}-1 ceil2m??1,判斷元素多的條件是 元素數量 ≥ ceil m 2 \ge\text{ceil}\frac{m}{2} ceil2m?

根據上述原理,我使用C語言實現了B樹完整的增刪查操作,并增加了檢驗有效性打印B樹的代碼,以及測試代碼(終端顯示進度條)。同樣為了加快開發速度,預設“鍵值對”的類型為int keyvoid* value,后續將B樹添加進“kv存儲協議”中時會進一步修改:

btree_int.c-共989行

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>// 編譯指令:gcc -o main 1-1btree.c
// 本代碼實現M階B樹,存儲int型key,未定義value。#define BTREE_DEBUG 1  // 是否運行測試代碼typedef struct _btree_node{int *keys;void *values;struct _btree_node **children;int num;  // 當前節點的實際元素數量int leaf; // 當前節點是否為葉子節點
}btree_node;typedef struct _btree{int m;  // m階B樹struct _btree_node *root_node;
}btree;/*
下面是所有的函數聲明,排列順序與源代碼調用相同,最外層的函數放在最下面。
*/
/*----初始化分配內存----*/
// 創建單個節點,leaf表示是否為葉子節點
btree_node *btree_node_create(btree *T, int leaf);
// 初始化m階B樹:分配內存,最后記得銷毀B樹btree_destroy()
btree *btree_init(int m);/*----釋放內存----*/
// 刪除單個節點
void btree_node_destroy(btree_node *cur);
// 遞歸刪除給定節點作為根節點的子樹
void btree_node_destroy_recurse(btree_node *cur);
// 刪除所有節點,釋放btree內存
btree *btree_destroy(btree *T);/*----插入操作----*/
// 根節點分裂
btree_node* btree_root_split(btree *T);
// 索引為idx的孩子節點分裂
btree_node* btree_child_split(btree *T, btree_node* cur, int idx);
// btree插入元素:先分裂,再插入,必然在葉子節點插入
void btree_insert_key(btree *T, int key);/*----刪除操作----*/
// 借位:將cur節點的idx_key元素下放到idx_dest孩子
btree_node *btree_borrow(btree_node *cur, int idx_key, int idx_dest);
// 合并:將cur節點的idx元素向下合并
btree_node *btree_merge(btree *T, btree_node *cur, int idx);
// 找出當前節點索引為idx_key的元素的前驅節點
btree_node* btree_precursor_node(btree *T, btree_node *cur, int idx_key);
// 找出當前節點索引為idx_key的元素的后繼節點
btree_node* btree_successor_node(btree *T, btree_node *cur, int idx_key);
// btree刪除元素:先合并/借位,再刪除,必然在葉子節點刪除
void btree_delete_key(btree *T, int key);/*----查找操作----*/
// 查找key
btree_node* btree_search_key(btree *T, int key);/*----打印信息----*/
// 打印當前節點信息
void btree_node_print(btree_node *cur);
// 先序遍歷給定節點為根節點的子樹(遞歸)
void btree_traversal_node(btree *T, btree_node *cur);
// btree遍歷
void btree_traversal(btree *T);/*----檢查有效性----*/
// 獲取B樹的高度
int btree_depth(btree *T);
// 檢查給定節點的有效性
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_node_check_effective(btree *T, btree_node *cur);
// 遍歷所有路徑檢查m階B樹的有效性
// 平衡性:所有葉節點都在同一層(所有路徑高度相等)
// 有序性:所有元素升序排序
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_check_effective(btree *T);/*-----------------------------下面為函數定義-------------------------------*/
// 創建單個節點,leaf表示是否為葉子節點
btree_node *btree_node_create(btree *T, int leaf){btree_node *new = (btree_node*)malloc(sizeof(btree_node));if(new == NULL){printf("btree node malloc failed!\n");return NULL;}new->keys = (int*)calloc(T->m-1, sizeof(int));new->values = (void*)calloc(T->m-1, sizeof(void));new->children = (btree_node **)calloc(T->m, sizeof(btree_node*));new->num = 0;new->leaf = leaf;return new;
}// 刪除單個節點
void btree_node_destroy(btree_node *cur){free(cur->keys);free(cur->values);free(cur->children);free(cur);
}// 初始化m階B樹:分配內存,最后記得銷毀B樹btree_destroy()
btree *btree_init(int m){btree *T = (btree*)malloc(sizeof(btree));if(T == NULL){// 只有內存不夠時才會分配失敗printf("rbtree malloc failed!\n");return NULL;}T->m = m;T->root_node = NULL;
}// 遞歸刪除給定節點作為根節點的子樹
void btree_node_destroy_recurse(btree_node *cur){int i = 0;if(cur->leaf == 1){btree_node_destroy(cur);}else{for(i=0; i<cur->num+1; i++){btree_node_destroy_recurse(cur->children[i]);}}
}// 釋放btree內存
btree *btree_destroy(btree *T){// 刪除所有節點if(T->root_node != NULL){btree_node_destroy_recurse(T->root_node);}// 刪除btreefree(T);
}// 根節點分裂
btree_node* btree_root_split(btree *T){// 創建兄弟節點btree_node *brother = btree_node_create(T, T->root_node->leaf);int i = 0;for(i=0; i<((T->m-1)>>1); i++){brother->keys[i] = T->root_node->keys[i+(T->m>>1)];T->root_node->keys[i+(T->m>>1)] = 0;brother->children[i] = T->root_node->children[i+(T->m>>1)];T->root_node->children[i+(T->m>>1)] = NULL;brother->num++;T->root_node->num--;}// 還需要復制最后一個指針brother->children[brother->num] = T->root_node->children[T->m-1];T->root_node->children[T->m-1] = NULL;// 創建新的根節點btree_node *new_root = btree_node_create(T, 0);new_root->keys[0] = T->root_node->keys[T->root_node->num-1];T->root_node->keys[T->root_node->num-1] = 0;T->root_node->num--;new_root->num = 1;new_root->children[0] = T->root_node;new_root->children[1] = brother;T->root_node = new_root;return T->root_node;
}// 索引為idx的孩子節點分裂
btree_node* btree_child_split(btree *T, btree_node* cur, int idx){// 創建孩子的兄弟節點btree_node *full_child = cur->children[idx];btree_node *new_child = btree_node_create(T, cur->children[idx]->leaf);int i = 0;for(i=0; i<((T->m-1)>>1); i++){new_child->keys[i] = full_child->keys[i+(T->m>>1)];full_child->keys[i+(T->m>>1)] = 0;new_child->children[i] = full_child->children[i+(T->m>>1)];full_child->children[i+(T->m>>1)] = NULL;new_child->num++;full_child->num--;}new_child->children[new_child->num] = full_child->children[T->m-1];full_child->children[T->m-1] = NULL;// 把孩子的元素拿上來// 調整自己的key和childrenfor(i=cur->num; i>idx; i--){cur->keys[i] = cur->keys[i-1];cur->children[i+1] = cur->children[i];}cur->children[idx+1] = new_child;cur->keys[idx] = full_child->keys[full_child->num-1];full_child->keys[full_child->num-1] = 0;cur->num++;full_child->num--;return cur;
}// btree插入元素:先分裂,再插入,必然在葉子節點插入
void btree_insert_key(btree *T, int key){btree_node *cur = T->root_node;if(key <= 0){// printf("illegal insert: key=%d!\n", key);}else if(cur == NULL){btree_node *new = btree_node_create(T, 1);new->keys[0] = key;new->num = 1;T->root_node = new;}else{// 函數整體邏輯:從根節點逐步找到元素要插入的葉子節點,先分裂、再添加// 先查看根節點是否需要分裂if(cur->num == T->m-1){cur = btree_root_split(T);}// 從根節點開始尋找要插入的葉子節點while(cur->leaf == 0){// 找到下一個要比較的孩子節點int next_idx = 0;  // 要進入的孩子節點的索引int i = 0;for(i=0; i<cur->num; i++){if(key == cur->keys[i]){// printf("insert failed! already has key=%d!\n", key);return;}else if(key < cur->keys[i]){next_idx = i;break;}else if(i == cur->num-1){next_idx = cur->num;}}// 查看孩子是否需要分裂,不需要就進入if(cur->children[next_idx]->num == T->m-1){cur = btree_child_split(T, cur, next_idx);}else{cur = cur->children[next_idx];}}// 將新元素插入到葉子節點中int i = 0;int pos = 0;  // 要插入的位置for(i=0; i<cur->num; i++){if(key == cur->keys[i]){// printf("insert failed! already has key=%d!\n", key);return;}else if(key < cur->keys[i]){pos = i;break;}else if(i == cur->num-1){pos = cur->num;}}// 插入元素if(pos == cur->num){cur->keys[cur->num] = key;}else{for(i=cur->num; i>pos; i--){cur->keys[i] = cur->keys[i-1];}cur->keys[pos] = key;}cur->num++;}
}// 借位:將cur節點的idx_key元素下放到idx_dest孩子
btree_node *btree_borrow(btree_node *cur, int idx_key, int idx_dest){int idx_sour = (idx_key == idx_dest) ? idx_dest+1 : idx_key;btree_node *node_dest = cur->children[idx_dest];  // 目的節點btree_node *node_sour = cur->children[idx_sour];  // 源節點if(idx_key == idx_dest){// 自己下去作為目的節點的最后一個元素node_dest->keys[node_dest->num] = cur->keys[idx_key];node_dest->children[node_dest->num+1] = node_sour->children[0];node_dest->num++;// 把源節點的第一個元素請上來cur->keys[idx_key] = node_sour->keys[0];for(int i=0; i<node_sour->num-1; i++){node_sour->keys[i] = node_sour->keys[i+1];node_sour->children[i] = node_sour->children[i+1];}node_sour->children[node_sour->num-1] = node_sour->children[node_sour->num];node_sour->children[node_sour->num] = NULL;node_sour->keys[node_sour->num-1] = 0;node_sour->num--;}else{// 自己下去作為目的節點的第一個元素node_dest->children[node_dest->num+1] = node_dest->children[node_dest->num];for(int i=node_dest->num; i>0; i--){node_dest->keys[i] = node_dest->keys[i-1];node_dest->children[i] = node_dest->children[i-1];}node_dest->keys[0] = cur->keys[idx_key];node_dest->children[0] = node_sour->children[node_sour->num];node_dest->num++;// 把源節點的最后一個元素請上來cur->keys[idx_key] = node_sour->keys[node_sour->num-1];node_sour->keys[node_sour->num-1] = 0;node_sour->children[node_sour->num] = NULL;node_sour->num--;}return node_dest;
}// 合并:將cur節點的idx元素向下合并
btree_node *btree_merge(btree *T, btree_node *cur, int idx){btree_node *left = cur->children[idx];btree_node *right = cur->children[idx+1];// 自己下去左孩子,調整當前節點left->keys[left->num] = cur->keys[idx];left->num++;for(int i=idx; i<cur->num-1; i++){cur->keys[i] = cur->keys[i+1];cur->children[i+1] = cur->children[i+2];}cur->keys[cur->num-1] = 0;cur->children[cur->num] = NULL;cur->num--;// 右孩子復制到左孩子for(int i=0; i<right->num; i++){left->keys[left->num] = right->keys[i];left->children[left->num] = right->children[i];left->num++;}left->children[left->num] = right->children[right->num];// 刪除右孩子btree_node_destroy(right);// 更新根節點if(T->root_node == cur){btree_node_destroy(cur);T->root_node = left;}return left;
}// 找出當前節點索引為idx_key的元素的前驅節點
btree_node* btree_precursor_node(btree *T, btree_node *cur, int idx_key){if(cur->leaf == 0){cur = cur->children[idx_key];while(cur->leaf == 0){cur = cur->children[cur->num];}}return cur;
}// 找出當前節點索引為idx_key的元素的后繼節點
btree_node* btree_successor_node(btree *T, btree_node *cur, int idx_key){if(cur->leaf == 0){cur = cur->children[idx_key+1];while(cur->leaf == 0){cur = cur->children[0];}}return cur;
}// btree刪除元素:先合并/借位,再刪除,必然在葉子節點刪除
void btree_delete_key(btree *T, int key){if(T->root_node!=NULL && key>0){btree_node *cur = T->root_node;// 在去往葉子節點的過程中不斷調整(合并/借位)while(cur->leaf == 0){// 看看要去哪個孩子int idx_next = 0; //下一個要去的孩子節點索引int idx_bro = 0;int idx_key = 0;if(key < cur->keys[0]){idx_next = 0;idx_bro = 1;}else if(key > cur->keys[cur->num-1]){idx_next = cur->num;idx_bro = cur->num-1;}else{for(int i=0; i<cur->num; i++){if(key == cur->keys[i]){// 哪邊少去哪邊if(cur->children[i]->num <= cur->children[i+1]->num){idx_next = i;idx_bro = i+1;}else{idx_next = i+1;idx_bro = i;}break;}else if((i<cur->num-1) && (key > cur->keys[i]) && (key < cur->keys[i+1])){idx_next = i + 1;// 誰多誰是兄弟if(cur->children[i]->num > cur->children[i+2]->num){idx_bro = i;}else{idx_bro = i+2;}break;}}}idx_key = (idx_next < idx_bro) ? idx_next : idx_bro;// 依據孩子節點的元素數量進行調整if(cur->children[idx_next]->num <= ((T->m>>1)-1)){// 借位:下一孩子的元素少,下一孩子的兄弟節點的元素多if(cur->children[idx_bro]->num >= (T->m>>1)){cur = btree_borrow(cur, idx_key, idx_next);}// 合并:兩個孩子都不多else{cur = btree_merge(T, cur, idx_key);}}else if(cur->keys[idx_key] == key){// 若當前元素就是要刪除的節點,那一定要送下去// 但是不能借位,而是將前驅元素搬上來btree_node* pre;int tmp;if(idx_key == idx_next){// 找到前驅節點pre = btree_precursor_node(T, cur, idx_key);// 交換 當前元素 和 前驅節點的最后一個元素tmp = pre->keys[pre->num-1];pre->keys[pre->num-1] = cur->keys[idx_key];cur->keys[idx_key] = tmp;}else{// 找到后繼節點pre = btree_successor_node(T, cur, idx_key);// 交換 當前元素 和 后繼節點的第一個元素tmp = pre->keys[0];pre->keys[0] = cur->keys[idx_key];cur->keys[idx_key] = tmp;}cur = cur->children[idx_next];// cur = btree_borrow(cur, idx_key, idx_next);}else{cur = cur->children[idx_next];}}// 葉子節點刪除元素for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){if(cur->num == 1){// 若B樹只剩最后一個元素btree_node_destroy(cur);T->root_node = NULL;}else{if(i != cur->num-1){for(int j=i; j<(cur->num-1); j++){cur->keys[j] = cur->keys[j+1];}}cur->keys[cur->num-1] = 0;cur->num--;}}// else if(i == cur->num-1){//     printf("there is no key=%d\n", key);// }}}
}// 打印當前節點信息
void btree_node_print(btree_node *cur){if(cur == NULL){printf("NULL\n");}else{printf("leaf:%d, num:%d, key:|", cur->leaf, cur->num);for(int i=0; i<cur->num; i++){printf("%d|", cur->keys[i]);}printf("\n");}
}// 先序遍歷給定節點為根節點的子樹(遞歸)
void btree_traversal_node(btree *T, btree_node *cur){// 打印當前節點信息btree_node_print(cur);// 依次打印所有子節點信息if(cur->leaf == 0){int i = 0;for(i=0; i<cur->num+1; i++){btree_traversal_node(T, cur->children[i]);}}
}// btree遍歷
void btree_traversal(btree *T){if(T->root_node != NULL){btree_traversal_node(T, T->root_node);}else{// printf("btree_traversal(): There is no key in B-tree!\n");}
}// 查找key
btree_node* btree_search_key(btree *T, int key){if(key > 0){btree_node *cur = T->root_node;// 先尋找是否為非葉子節點while(cur->leaf == 0){if(key < cur->keys[0]){cur = cur->children[0];}else if(key > cur->keys[cur->num-1]){cur = cur->children[cur->num];}else{for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){return cur;}else if((i<cur->num-1) && (key > cur->keys[i]) && (key < cur->keys[i+1])){cur = cur->children[i+1];}}}}// 在尋找是否為葉子節點if(cur->leaf == 1){for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){return cur;}}}}// 都沒找到返回NULLreturn NULL;
}// 獲取B樹的高度
int btree_depth(btree *T){int depth = 0;btree_node *cur = T->root_node;while(cur != NULL){depth++;cur = cur->children[0];}return depth;
}// 檢查給定節點的有效性
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_node_check_effective(btree *T, btree_node *cur){bool eff_flag = true;// 統計鍵值和子節點數量int num_kvs = 0, num_child = 0;int i = 0;while(cur->keys[i] != 0){// 判斷元素是否遞增if(i>=1 && (cur->keys[i] <= cur->keys[i-1])){printf("ERROR! the following node DOT sorted!\n");btree_node_print(cur);eff_flag = false;break;}// 統計數量num_kvs++;i++;}i = 0;while(cur->children[i] != NULL){// 子節點和當前節點的有序性if(i<num_kvs){if(cur->keys[i] <= cur->children[i]->keys[cur->children[i]->num]){printf("ERROR! the follwing node's child[%d] has bigger key=%d than %d\n", i, cur->children[i]->keys[cur->children[i]->num], cur->keys[i]);printf("follwing node--");btree_node_print(cur);printf("  error child--");btree_node_print(cur->children[i]);eff_flag = false;}else if(cur->keys[i] >= cur->children[i+1]->keys[0]){printf("ERROR! the follwing node's child[%d] has smaller key=%d than %d\n", i+1, cur->children[i+1]->keys[0], cur->keys[i]);printf("follwing node--");btree_node_print(cur);printf("  error child--");btree_node_print(cur->children[i+1]);eff_flag = false;}}// 統計數量num_child++;i++;}// 判斷元素數量是否合理if(cur->num >= T->m){printf("ERROR! the follwing node has too much keys:%d(at most %d)\n", cur->num, T->m-1);btree_node_print(cur);eff_flag = false;}if((cur != T->root_node) && (num_kvs<((T->m>>1)-1))){printf("ERROR! the follwing node has too few keys:%d(at least %d)\n", num_kvs, (T->m>>1)-1);btree_node_print(cur);eff_flag = false;}if(num_kvs != cur->num){printf("ERROR! the follwing node has %d keys but num=%d\n", num_kvs, cur->num);btree_node_print(cur);eff_flag = false;}if((cur->leaf == 0) && (num_child != cur->num+1)){printf("ERROR! the follwing node has %d keys but %d child(except keys+1=child)\n", num_kvs, num_child);btree_node_print(cur);eff_flag = false;}return eff_flag;
}// 遍歷所有路徑檢查m階B樹的有效性
// 平衡性:所有葉節點都在同一層(所有路徑高度相等)
// 有序性:所有元素升序排序
// 鍵值:根節點至少有一個key,其余節點至少有ceil(m/2)-1個key
// 分支:所有節點數目子樹為當前節點元素數量+1
bool btree_check_effective(btree *T){bool effe_flag = true;int depth = btree_depth(T);if(depth == 0){// printf("btree_check_effective(): There is no key in B-tree!\n");}else if(depth == 1){// 只有一個根節點effe_flag = btree_node_check_effective(T, T->root_node);}else{// 最大的可能路徑數量int max_path = 1;int depth_ = depth-1;while(depth_ != 0){max_path *= T->m;depth_--;}// 遍歷所有路徑(每個路徑對應一個葉子節點)btree_node *cur = T->root_node;int i_path = 0;for(i_path=0; i_path<max_path; i_path++){int dir = i_path;  // 本次路徑的方向控制int i_height = 0;  // 本次路徑的高度int i_effe = 1; // 指示是否存在本路徑cur = T->root_node;while(cur != NULL){// 當前節點的有效性effe_flag = btree_node_check_effective(T, cur);if(!effe_flag) break;// 更新高度i_height++;// 更新下一節點if(cur->children[dir%T->m]==NULL && !cur->leaf){i_effe = 0;break;}cur = cur->children[dir%T->m];dir /= T->m;}// if(btree_node_check_effective(T, cur))// 判斷本路徑節點數(高度)if(i_height != depth && i_effe){printf("ERROR! not all leaves in the same layer! the leftest path's height=%d, while the %dst path's height=%d.\n",depth, i_path, i_height);effe_flag = false;}if(!effe_flag) break;}}return effe_flag;
}/*-----------------------------下面為測試代碼-------------------------------*/
#if BTREE_DEBUG
#include<time.h>  // 使用隨機數
#include<sys/time.h>  // 計算qps中獲取時間
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define ENABLE_QPS  1  // 是否開啟qps性能測試
#define continue_test_len  10000000  // 連續測試的長度
// 冒泡排序
void bubble_sort(int arr[], int len) {int i, j, temp;for (i = 0; i < len - 1; i++)for (j = 0; j < len - 1 - i; j++)if (arr[j] > arr[j + 1]) {temp = arr[j];arr[j] = arr[j + 1];arr[j + 1] = temp;}
}
// 打印當前數組
void print_int_array(int* KeyArray, int len_array){printf("測試數組為KeyArray[%d] = {", len_array);for(int i=0; i<len_array; i++){if(i == len_array-1){printf("%d", KeyArray[i]);}else{printf("%d, ", KeyArray[i]);}}printf("}\n");
}
int main(){// 定義/* --------------------定義數組-------------------- */// 預定義的數組// int KeyArray[20] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};  // 正著插// int KeyArray[20] = {20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1};  // 倒著插// int KeyArray[20] = {1,2,3,4,5,10,9,8,7,6,11,12,13,14,15,20,19,18,17,16};  // 亂序插// int KeyArray[31] = {11,12,13,14,15,16,17,18,19,20,1,2,3,4,5,6,7,8,9,10,21,22,23,24,25,26,27,28,29,30,31};  // 亂序插// int KeyArray[18] = {18,8,13,9,13,0,7,13,14,7,1,7,19,7,9,18,17,18};  // 亂序插// // 順序增長的數組// int len_array = 26;// int KeyArray[len_array];// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){//     KeyArray[i_array] = i_array + 1;// }// // 隨機生成固定大小的隨機數組// int len_array = 18;// int KeyArray[len_array];// srand(time(NULL));// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){//     // KeyArray[i_array] = rand() % 9999999999;//     KeyArray[i_array] = rand() % 20;// }// printf("RAND_MAX: %d\n", RAND_MAX);/* ------------------以下測試代碼------------------ */// printf("-------------------B樹插入測試------------------\n");// // 先給輸入的數組排個序// int len_max = sizeof(KeyArray)/sizeof(int);// printf("測試數組長度: %d\n", len_max);// // 將原先的數組深拷貝并升序排序// int *KeyArray_sort = (int*)malloc(sizeof(KeyArray));// int i = 0;// for(i = 0; i < len_max; i++){//     KeyArray_sort[i] = KeyArray[i];// }// bubble_sort(KeyArray_sort, len_max);int max_test = 100;         // 測試的總次數int len_array = 1000;      // 單次測試的數組長度bool detail_flag = false;  // 是否打印詳細信息bool pass_flag = true;printf("---------------------常規測試---------------------\n");for(int i_test=0; i_test<max_test; i_test++){// 隨機生成固定大小的隨機數組int KeyArray[len_array];srand(time(NULL));for(int i_array=0; i_array<len_array; i_array++){// KeyArray[i_array] = rand() % 9999999999;KeyArray[i_array] = rand() % len_array;}// int KeyArray[10000] = {};// 申請紅黑樹內存btree *T = btree_init(6);btree *T_old = btree_init(6);if(detail_flag){printf("--------------------開始測試--------------------\n");print_int_array(KeyArray, len_array);}if(detail_flag){printf("-------------------B樹插入測試------------------\n");}/*-------------------B樹插入測試------------------*/// 依次插入數據,并檢查有效性for(int i=0; i<len_array; i++){if(i>0){btree_insert_key(T_old, KeyArray[i-1]);}btree_insert_key(T, KeyArray[i]);// 方便打印調試// if(i==65){//     printf("Before insert the %2d's key=%d:\n", i+1, KeyArray[i]);//     btree_traversal(T_old);//     printf("After insert the %2d's key=%d:\n", i+1, KeyArray[i]);//     btree_traversal(T);// }if(btree_check_effective(T)==false){printf("after insert KeyArray[%d]=%d error!\n", i, KeyArray[i]);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->插入測試%d/%d\n", i_test+1, max_test);btree_insert_key(T_old, KeyArray[len_array]);}else{if(detail_flag) printf("FAIL---->插入測試%d/%d\n", i_test+1, max_test);// printf("Before insert:\n");// btree_traversal(T_old);// printf("After insert:\n");// btree_traversal(T);btree_destroy(T_old);btree_destroy(T);break;}if(detail_flag){btree_traversal(T);printf("\n");}if(detail_flag){printf("-------------------B樹查找測試------------------\n");}/*-------------------B樹查找測試------------------*/btree_node* sear = NULL;for(int i=0; i<len_array; i++){if(KeyArray[i] > 0){sear = btree_search_key(T, KeyArray[i]);pass_flag = false;if(sear != NULL){for(int j=0; j<sear->num; j++){if(sear->keys[j] == KeyArray[i]){pass_flag = true;break;}}}if(detail_flag){printf("search KeyArray[%d]=%d  ---->  ", i, KeyArray[i]);btree_node_print(sear);}}if(pass_flag == false){print_int_array(KeyArray, len_array);  // 打印當前數組printf("following node DOT has KeyArray[%d]=%d!\n", i, KeyArray[i]);btree_node_print(sear);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->查找測試%d/%d\n", i_test+1, max_test);}else{printf("FAIL---->查找測試%d/%d\n", i_test+1, max_test);break;}if(detail_flag){printf("-------------------B樹刪除測試------------------\n");}/*-------------------B樹刪除測試------------------*/for(int i=0; i<len_array; i++){// if(i==496){//     // 加一句打印方便調試暫停//     printf("Now delete KeyArray[%d]=%d:\n", i, KeyArray[i]);// }if(i>0){btree_delete_key(T_old, KeyArray[i-1]);}btree_delete_key(T, KeyArray[i]);if(detail_flag){printf("delete KeyArray[%d]=%d:\n", i, KeyArray[i]);btree_traversal(T);} if(btree_check_effective(T) == false){print_int_array(KeyArray, len_array);  // 打印當前數組printf("after delete KeyArray[%d]=%d error!\n", i, KeyArray[i]);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->刪除測試%d/%d\n", i_test+1, max_test);}else{printf("FAIL---->刪除測試%d/%d\n", i_test+1, max_test);// printf("Before delete:\n");// btree_traversal(T_old);// printf("After delete:\n");// btree_traversal(T);btree_destroy(T_old);btree_destroy(T);break;}if(detail_flag){printf("--------------------------------------------------\n");}btree_destroy(T_old);btree_destroy(T);// 整點進度條看看if(pass_flag){// printf("PASS----> WHOLE TEST %d/%d!\r", i_test+1, max_test);int bar_process;         // 編譯器初始化為0bool already_print_txt;  // 編譯器初始化為falsebool already_print_bar;  // 編譯器初始化為falseconst int len_bar = 20;  // 完整進度條的長度// 打印進度條前面的說明if(!already_print_txt){printf("PASS TEST PROCESS: ");fflush(stdout);}already_print_txt = true;// 打印進度條if(len_bar*(i_test+1)/max_test > bar_process){// 光標往前回退if(already_print_bar){printf("\033[4D");  // ANSI轉義序列}// 畫出進度條for(int i=0; i<(len_bar*(i_test+1)/max_test - bar_process); i++){printf("█");fflush(stdout);}// 顯示進度范圍printf(" %d%%", 100*(i_test+1)/max_test);fflush(stdout);already_print_bar = true;bar_process = len_bar*(i_test+1)/max_test;if(i_test+1 == max_test) printf("\n");}}}// 只是為了最后一行判斷用if(pass_flag){// printf("\r\033[K");  // 清除本行printf("PASS----> ALL %d TEST!\n", max_test);}printf("--------------------------------------------------\n");printf("---------------------性能測試---------------------\n");btree* bT = btree_init(16);  // 初始化16階B樹// 定義時間結構體struct timeval tv_begin;struct timeval tv_end;// 插入性能測試gettimeofday(&tv_begin, NULL);for(int i=0; i<continue_test_len; i++){btree_insert_key(bT, i+1);}gettimeofday(&tv_end, NULL);int time_ms = TIME_SUB_MS(tv_end, tv_begin);float qps = (float)continue_test_len / (float)time_ms * 1000;printf("total INSERTs:%d  time_used:%d(ms)  qps:%.2f(INSERTs/sec)\n", continue_test_len, time_ms, qps);// 查找性能測試gettimeofday(&tv_begin, NULL);for(int i=0; i<continue_test_len; i++){btree_node* node = btree_search_key(bT, i+1);int idx = 0;for(idx=0; idx<node->num; idx++){if(node->keys[idx] == i+1){break;}}if(idx == node->num){printf("continue_search error!\n");return 0;}}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total SEARCHs:%d  time_used:%d(ms)  qps:%.2f(SEARCHs/sec)\n", continue_test_len, time_ms, qps);// // 刪除性能測試// gettimeofday(&tv_begin, NULL);// for(int i=0; i<continue_test_len; i++){//     btree_delete_key(bT, i+1);// }// gettimeofday(&tv_end, NULL);// time_ms = TIME_SUB_MS(tv_end, tv_begin);// qps = (float)continue_test_len / (float)time_ms * 1000;// printf("total DELETEs:%d  time_used:%d(ms)  qps:%.2f(DELETEs/sec)\n", continue_test_len, time_ms, qps);// 銷毀B樹btree_destroy(bT);printf("--------------------------------------------------\n");return 0;
}#endif

測試代碼輸出結果----記得測一下連續讀、查、刪的速度

lyl@ubuntu:~/Desktop/kv-store/code_init$ gcc -o main btree_int.c 
lyl@ubuntu:~/Desktop/kv-store/code_init$ ./main
---------------------常規測試---------------------
PASS TEST PROCESS: ███████████████████ 100%
PASS----> ALL 100 TEST!
--------------------------------------------------
---------------------性能測試---------------------
total INSERTs:10000000  time_used:2379(ms)  qps:4203447.00(INSERTs/sec)
total SEARCHs:10000000  time_used:1007(ms)  qps:9930486.00(SEARCHs/sec)
total DELETEs:10000000  time_used:18(ms)  qps:555555584.00(DELETEs/sec)
--------------------------------------------------

未解決bug:測試代碼的最后如果多加上一個“刪除性能測試”,就不顯示進度條了?很奇怪

參考內容:

  • B站:【CS61B漢化】七海講數據結構-平衡樹&B樹–B樹的插入–超可愛的視頻
  • 知乎:從B樹中刪除一個關鍵字–主要參考了孩子元素多的時候怎么刪除
  • B站:可視化數據結構-B+樹–插入數據的動畫很清晰

2.5 hash的實現

圖7 鏈式哈希結構示意圖——固定長度為6

??終于度過了本項目所有最難的部分,下面的內容都比較簡單。鏈式哈希的增刪查操作簡潔明了。鏈式哈希首先會聲明一個固定長度的哈希表(如1024),若需要插入新元素時,首先計算哈希值作為索引,若有沖突則直接在當前位置使用“頭插法”即可。注意以下幾點:

  1. 哈希值計算:對應int型,key % table_size就可以直接當作哈希值。對于char*,則可以sum(key) % table_size當作哈希值。當前也有專門的哈希函數,但是由于需要頻繁計算哈希值,在簡單情況下,就采用上述處理即可。
  2. 單個索引上的鏈表沒有大小關系。所以查找/刪除時需要遍歷這個索引對應的整條鏈表

就不單獨寫int型的代碼并測試了,可以直接參考項目源碼中的“hash.h”、“hash.c”。

2.6 dhash的實現

圖8 動態哈希的插入和刪除過程

??顯然上述hash有個很大問題,就是“哈希表的大小”是固定的。如果聲明哈希表大小為1024,卻要插入10w個元素,那每個所有都會對應一個很長的鏈表,最壞的情況下和直接遍歷一遍沒什么區別!這顯然失去了哈希的意義,于是在上面的基礎上,我們使用“空間換時間”,自動增加/縮減哈希表的大小,也就是“動態哈希表”dhash:

  1. 插入元素時,首先判斷是否需要擴容。若當前元素超過哈希表的1/2(自定義),則將哈希表翻倍(自定義),并將原來的元素重新映射到新的哈希表。若遇到沖突,則將新元素順延插入到下一個空節點
  2. 刪除元素時,首先判斷是否需要縮容。若當前元素小于哈希表的1/4(自定義),則將哈希表縮小一半(自定義),并將原來的元素重新映射到新的哈希表。若當前節點不是待刪除元素,則需要從當前索引開始遍歷完所有節點,才能說哈希表不存在此元素

同樣,不單獨寫int型的代碼測試了,可以直接參考項目源碼中的“dhash.h”、“dhash.c”。

2.7 skiplist的實現

圖9 理想跳表的結構示意圖

??跳表本質上是一個有序鏈表。紅黑樹每次比較都能排除一半的節點,這啟發我們,要是每次都能找到鏈表最中間的節點,不就可以實現 O ( log ? N ) O(\log N) O(logN)的查找時間復雜度了嘛。于是如上圖所示,我們不妨規定跳表的每個節點都有一組指針,跳表還有一個額外的空節點作為“跳表頭”,那么每次都從頂層依次底層進行“跳”,就可以實現“每次比較都能排除剩下一半的節點”。但是還有個大問題,那就是上述理想跳表需要插入/刪除一個元素時,元素的調整會非常麻煩,甚至還需要遍歷整個鏈表來調整所有節點的指向!

所以在實際應用中,不會直接使用上述理想跳表的結構。而是在每次插入一個新元素時,按照一定概率計算其高度。統計學證明,當存放元素足夠多的時候,該實際跳表性能無限趨近于理想跳表。

圖10 實際跳表的結構示意圖

同樣,代碼直接見項目源碼中的“skiplist.h”、“skiplist.c”。

2.8 kv存儲協議的實現

??如“1.2節-項目預期及基本架構”給出的“服務端程序架構”。現在我們實現了網絡收發功能(網絡層)、所有存儲引擎的增刪查改操作(引擎層),還差最后一個“kv存儲協議”(協議層)就可以實現完整的服務端程序。“kv存儲協議”的主要功能有:

  1. 初始化/銷毀所有的存儲引擎。這個直接調用各引擎的初始化/銷毀函數即可。
  2. 拆解網絡層接收的數據,若為有效指令則傳遞給相應的存儲引擎函數處理,并根據存儲引擎的處理結果返回相應的信息給網絡層。

值得注意的是,“引擎層”的接口函數應該統一封裝命名,并在各存儲引擎中實現,“引擎層”的頭文件中也只有這些接口函數暴露給“協議層”。這樣保證了“協議層”和“引擎層”的隔離性,即使后續“引擎層”代碼需要進行修改,也不會干擾到接口函數的調用、無需修改協議層。整個“服務端”的“網絡層”、“協議層”、“引擎層”的函數調用關系如下:

圖11 “服務端”函數調用關系

同樣,代碼直接見項目源碼中的“kvstore.h”、“kvstore.c”。

3. 性能測試

圖12 使用網絡連接助手進行簡單的功能驗證

??上述我們將“服務端”的代碼實現完畢,并且可以使用“網絡連接助手”進行正常的收發數據。如上圖所示,依次發送5條指令后都得到預期的回復。但是我們要想測試客戶端的極限性能,顯然需要寫一個“客戶端”測試程序。該測試程序目標如下:

  1. 測試單個鍵值對(name:humu)能否正常實現所有功能。
  2. 測試多個鍵值對能否正常實現所有功能。這一步主要是為了驗證各引擎能正常擴容/縮容。
  3. 測試連續進行10w次插入、查找、刪除的總耗時,計算出服務器的相應速度qps。

注:測試代碼直接見項目源碼中的“tb_kvstore.c”。

如下圖所示,開啟兩個Ubuntu虛擬機,分別運行“服務端”、“客戶端”程序,得到如下的測試數據:

圖13 性能測試結果
表2 連續5次的性能測試結果
次數操作qps(trans/s)
arrayrbtreebtreehashdhashskiplist
1插入1461.33 1918.24 1981.65 1727.24 1924.93 1778.44
查找1604.18 1906.07 1895.09 1741.52 1887.68 1755.71
刪除1722.71 1959.13 1909.60 1713.88 1964.98 1887.65
2插入1570.13 1972.93 1949.62 1710.54 1926.63 1780.18
查找1707.30 1970.40 1883.49 1709.46 2021.55 1747.89
刪除1739.92 1918.80 1922.82 1730.28 1917.32 1912.74
3插入1103.35 1898.25 1923.71 1737.59 1948.86 1794.24
查找1316.59 1867.80 1933.38 1733.31 1953.74 1768.44
刪除1774.78 1873.89 1978.98 1771.73 1966.76 1933.19
4插入1134.22 1887.79 1888.72 1734.21 1909.53 1802.00
查找1394.54 1908.80 1910.40 1726.43 1932.37 1764.76
刪除1778.54 1897.97 1937.98 1719.25 1935.40 1928.57
5插入1029.79 1981.02 1912.16 1718.33 1916.52 1777.62
查找1236.52 1917.58 1947.00 1732.17 1934.39 1754.66
刪除1703.87 1955.19 1899.59 1724.97 1915.42 1905.71

結果分析

  1. array引擎的指標都是最差的,這是因為每次插入/刪除都需要遍歷所有元素。
  2. hash引擎(鏈式哈希)的性能是倒數第二差的,這是因為雖然哈希函數可以快速定位到索引,但當鍵值對遠大于哈希表大小時,沖突元素會形成一個鏈表,增/刪/查操作也需要遍歷整條鏈表。
  3. btree的插入性能好于rbtree,只是因為btree的單個節點可以容納多個元素,節省了很多增加新結點、調整結構的時間;但是btree的查找/刪除性能不如rbtree,也同樣是因為單個節點還需要進行遍歷。
  4. 綜合來看,最優秀的存儲引擎是rbtree。

注:上述測試結果可以通過“相對值”比較各數據結構間的差異;“絕對值”則受限于虛擬機內存、運行頻率等物理特性,并且軟件上使用“協程”等也可以大幅提升“絕對值”。

4. 項目總結及改進思路

??C/C++適合做服務器,但不適合做業務。因為可能會因為一行代碼有問題,導致整個進程退出。雖然也能做,但維護成本高,并且對工程師要求高。比如“騰訊課堂”中課程、圖片、價格等參數很適合用C語言做“kv存儲”,但是顯示網頁等業務功能使用Jc語言更加合適。所以VUE框架(Java)等適合做前端業務;C/C++適合做基礎架構、高性能組件、中間件。比如在量化交易中,底層的高頻組件、低延遲組件適合用C/C++,上層的交易業務、交易策略沒必要C/C++。

下面是對本項目的一些 改進思路:

  • 持久化和日志:將數據都存儲到(磁盤)文件中,并將接收到的指令記錄下來成為日志。持久化的思路有很多種,比如直接分配一個大的“內存池”(見下一條)存放所有引擎的存儲數據,然后直接對這個大的內存池進行持久化。
  • 內存池:現在存儲的數據都是零散的,比如rbtree每次插入新元素都會申請一個節點內存,時間長了就會出現很多內存碎片,不利于內存管理。理想狀態是和數組結構體一樣,直接預先聲明一個內存池,需要新節點的時候就取出一個節點內存。這樣,每次申請和釋放的內存都是一整塊,不會出現大量的內存碎片。
  • 分布式鎖:kv存儲內部是單線程的,對于變量的改變無需額外加鎖。但是外部的網絡收發指令,需要引入分布式鎖,防止多臺客戶端同時對同一個鍵值對進行修改。
  • 主從模式:所有數據都存儲在一臺服務器上,會有丟失數據的風險,所以需要使用“主從模式”來對數據進行備份。關鍵點在于合適的備份策略。
  • 分布式存儲:所有數據都存儲在一臺服務器上,會導致內存壓力過大,多臺服務器結合起來做成“分布式存儲”,所有服務器分攤數據。

編程感想:

  1. 字符串拷貝:C語言中,使用strncpysnprintf拷貝字符串時,注意目的字符串不能只是聲明為char*,而是需要malloc/calloc分配內存才可以。另外也不要忘了釋放內存free
  2. 天坑:解析指令時層層傳遞的是epoll的讀緩沖rbuffer,然后使用strtok/strtok_r進行分割指令并存儲在char* tokens[]中,注意這個char* tokens[]的元素指向的就是讀緩沖本身!!!而snprintf是逐字符進行拷貝的,也就是說,此時使用snprintftokens的內容再寫回讀緩沖就會導致讀緩沖錯亂。如果不額外分配內存很難解決該問題。所以建議不要使用snprintf拷貝自己的格式化字符串。
  3. 良好的內存管理習慣:free()之前先判斷是否為NULLfree()之后一定要指向NULL
  4. 層層傳遞初始化:

【可行方法1】如果最頂層需要創建實例對象(不如全局變量),那就需要傳地址給最底層,且最底層無需再重新為這個對象malloc空間(因為最頂層已經創建對象了),只需要malloc好這個實例對象的所有指針即可,或者先定義成NULL后期插入時再分配。
【可行方法2】若最頂層無需創建全局的實例對象,那么也可以不傳參數給最底層,最底層直接創建一個對象指針,并malloc/NULL這個對象指針的所有參數,最后直接返回這個對象指針就行。
【不可行方法】頂層創建了全局的實例對象,然后傳地址給最底層,最底層重新malloc一個新的對象指針,初始化這個對象指針的所有參數,最后讓傳遞下來的地址指向這個指針。最后在頂層就會發現所有參數都沒初始化,都是空!
【關鍵點】:對誰進行malloc非常重要,一定要對頂層傳下來的結構體指針的元素直接malloc,而不是malloc一個新的結構體,在賦值給這個結構體指針。

  1. 關于strcmp():在使用strcmp()時一定要先判斷不為空,才能使用。這是因為strcmp()的底層使用while(*des++==*src**),所以若比較的雙方有一方為空,就會直接報錯。
  2. 注意項目的include關系,是在編譯指令中指定的。當然也可以將“kv_store.c”中的case語句封裝到各自的數據結構中,然后以動態庫的方式進行編譯。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/719143.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/719143.shtml
英文地址,請注明出處:http://en.pswp.cn/news/719143.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

XV4001KC數字輸出 車載用(piezoman)

EPSON的XV4001KC角速度傳感器是為滿足汽車行業對高精度和高可靠性需求而設計的。它不僅提供了高級的運動監測特性&#xff0c;高精度的角速度測量和溫度監測功能&#xff0c;而且其緊湊的設計6.04.83.3mm尺寸對于空間受限的車載環境來說&#xff0c;是一大優勢&#xff0c;使得…

二十篇esp345

from machine import I2C,Pin from ssd1306 import SSD1306_I2C i2c I2C(sdaPin(“Y8”), sclPin(“Y6”)) oled SSD1306_I2C(128, 64, i2c, addr0x3c) oled.text(“Hello World!”, 0, 0) oled.text(“MicroPython”, 0, 20) oled.text(“By 01Studio”, 0, 50) oled.show()…

vue 中在子頁面中使用watch監聽父頁面數據而導致接口多次調用

vue 中在子頁面中使用watch監聽父頁面數據而導致接口多次調用 解決方式 debounce function debounce(func, delay) {let timerId;return function(...args) {clearTimeout(timerId);timerId setTimeout(() > {func.apply(this, args);}, delay);}; }watch中 watch:{監聽值…

AIGC 知識:機器學習中的“微調“和“遷移學習“有什么區別?

以下是關于**微調 (fine-tuning)和遷移學習 (Transfer learning)**的區別&#xff0c;涉及到機器學習和深度學習的上下文&#xff1a; 遷移學習&#xff1a; 概述&#xff1a;遷移學習涉及使用預訓練模型作為新任務或領域的起點。目標&#xff1a;利用預訓練模型在大型數據集上…

政務瀏覽器——打通信創閉環最后一公里

當前&#xff0c;信創建設工作主要集中在芯片、操作系統、數據庫以及pc整機&#xff0c;這些領域基本可用&#xff0c;或者達到了市場主流水平。但是&#xff0c;政務辦事場景下的信創落地仍然困難重重&#xff0c;很多地方不得不裝雙系統或買兩臺設備來來平衡日常業務和信創考…

Qt:基于QQuickFramebufferObject顯示QImage到QML中

GItHub地址 簡介 本倉庫實現了一個在QML框架中&#xff0c;顯示QImage數據的QML控件&#xff0c;取名為JQImageItem 本控件針對的場合是需要顯示并且頻繁修改QImage的場景&#xff0c;例如視頻顯示。 提供了2個實現版本&#xff0c;一個是基于QQuickFramebufferObject&…

STM32CubeIDE基礎學習-軟件安裝,環境搭建

STM32CubeIDE基礎學習-軟件介紹及環境搭建步驟 文章目錄 STM32CubeIDE基礎學習-軟件介紹及環境搭建步驟前言第1章 STM32CubeIDE 介紹1.1 軟件描述1.2 軟件支持的功能及特點 第2章 STM32CubeIDE 軟件安裝2.1 STM32CubeIDE 軟件獲取方法2.2 STM32CubeIDE 軟件安裝步驟2.2.1 錯誤安…

C++模板完整版

顧得泉&#xff1a;個人主頁 個人專欄&#xff1a;《Linux操作系統》 《C從入門到精通》 《LeedCode刷題》 鍵盤敲爛&#xff0c;年薪百萬&#xff01; 一、泛型編程 如何實現一個通用的交換函數呢&#xff1f; void Swap(int& left, int& right) {int temp left…

抖店入駐費用是多少?新手入駐都有哪些要求?2024費用明細!

我是電商珠珠 我做電商做了將近五年&#xff0c;做抖店做了三年多&#xff0c;期間還帶著學員一起做店。 今天&#xff0c;就來給大家詳細的講一下在抖音開店&#xff0c;需要多少費用&#xff0c;最低需要投入多少。 1、營業執照200元左右 就拿個體店舉例&#xff0c;在入…

hook函數——useReducer

目錄 1.useReducer定義2.useReducer用法3.useState和useReducer區別 1.useReducer定義 const [state, dispatch] useReducer(reducer, initialArg, init?) reducer&#xff1a;用于更新 state 的純函數。參數為 state 和 action&#xff0c;返回值是更新后的 state。state …

這波操作看麻了!十億行數據,從71s到1.7s的優化之路。

節期間關注到了一個關于 Java 方面的比賽&#xff0c;很有意思。由于是開源的&#xff0c;我把項目拉下來試圖學&#xff08;白&#xff09;習&#xff08;嫖&#xff09;別人的做題思路&#xff0c;在這期間一度讓我產生了一個自我懷疑&#xff1a; 他們寫的 Java 和我會的 Ja…

解鎖軟件管理新篇章,Allegro許可證使用規定全解

在數字化經濟的時代&#xff0c;軟件已經成為企業運營的關鍵要素。然而&#xff0c;軟件的使用往往伴隨著一系列的合規性問題&#xff0c;導致企業面臨潛在的風險和成本。Allegro許可證作為業界領先的軟件解決方案提供商&#xff0c;為企業提供全面的許可證使用規定&#xff0c…

每日一題——LeetCode1576.替換所有的問號

方法一 3個字母原則 把&#xff1f;替換為和他左右都不相等的字符&#xff0c;那么找3個字符abc&#xff0c;&#xff1f;總能替換為abc中的一個字符&#xff0c;遍歷字符串找到所有&#xff1f;&#xff0c;再遍歷abc把&#xff1f;替換為abc中的一個字符 var modifyString …

解析 openGauss 的 AutoVacuum 機制及優化策略

前言 在 openGauss 數據庫中&#xff0c;AutoVacuum 機制是一個關鍵的自動化功能&#xff0c;用于管理表的空間和性能。AutoVacuum 通過定期清理過時數據和更新統計信息&#xff0c;幫助數據庫管理員維護數據庫的性能和穩定性。 為什么需要 AutoVacuum&#xff1f; 了解AutoV…

JAVA內存模型與JVM內存結構

注意區分Java內存模型&#xff08;Java Memory Model&#xff0c;簡稱JMM&#xff09;與Jvm內存結構&#xff0c;前者與多線程相關&#xff0c;后者與JVM內部存儲相關。本文會對兩者進行簡單介紹。 一、JAVA內存模型(JMM) 1. 概念 說來話長&#xff0c;由于在不同硬件廠商和…

No matching version found for @babel/traverse@^7.24.0.

問題&#xff1a; npm安裝 依賴失敗&#xff0c;找不到所需依賴。 原因&#xff1a; npm鏡像源中沒有該依賴。&#xff08;大概率是因為依賴最近剛更新&#xff0c;當前鏡像源沒有同步&#xff09; 解決&#xff1a; 查看自己的npm鏡像&#xff1a;npm config get registry…

機器學習-面經(part2)

3. 驗證方式 3.1什么是過擬合?產生過擬合原因? 定義:指模型在訓練集上的效果很好,在測試集上的預測效果很差 數據有噪聲 訓練數據不足,有限的訓練數據 訓練模型過度導致模型非常復雜3.2 如何避免過擬合問題? 3.3 什么是機器學習的欠擬合?產生原…

D4890可應用在對講機上,采用 SOP8/MSOP8兩種封裝形式

D4890 目前客戶主要使用在對講機上&#xff0c;電壓范圍2.2V &#xff5e; 5.5V之間&#xff0c;輸出功率&#xff08;THDN1%&#xff09;1.0W/8Ω 5.0V。采用 SOP8/MSOP8兩種封裝形式。 2、推薦的應用線路圖如下&#xff1a; 3、實際測試輸出波形如下&#xff08;VCC4.5V&…

Web Component 轉圖片

一、HTML 轉圖片 目前&#xff0c;常見的開源的能夠將 HTML 轉換為圖片有html2canvas、dom-to-image&#xff0c;大部分場景下&#xff0c;這些開源庫都能很友好的處理。 HTML 轉圖片的實現原理&#xff0c;通常分為兩種&#xff1a;svg 與 canvas。今天主要討論下 svg 的場景…

Flutter中使用Dio庫封裝網絡請求服務工具類

在Flutter應用程序中&#xff0c;進行網絡請求是非常常見的任務。Dio是一個強大的、易于使用的Dart包&#xff0c;用于處理HTTP請求。本篇博客將介紹如何封裝Dio庫&#xff0c;以及如何在Flutter應用中進行網絡請求并取消請求。 什么是Dio&#xff1f; Dio是一個基于Dart語言…