基于C語言實現的KV存儲引擎
- 項目簡介
- 整體架構
- 網絡模塊的實現
- recator
- proactor
- Ntyco
項目簡介
本文主要是基于 C 語言來實現一個簡單的 KV 存儲架構,目的就是將網絡模塊跟實際開發結合起來。
首先我們知道對于數據的存儲可以分為兩種方式,一種是在內存中進行存儲,像 Redis 這種,是一種內存型數據庫,主要也是以 KV 的形式進行存儲,提升了對應的數據的訪問效率;另一種就是在磁盤中進行存儲,像 MySQL 這種,是一種關系型數據庫,更多的是以一種表的形式去組織的數據庫,也是一種主流的數據庫,但是訪問的速度就稍微慢一點兒。
那么當前已經有眾多的數據庫的存在了,我們又為什么需要去實現一個自己的 KV 存儲引擎呢?
對于 Redis,MySQL 這些數據庫來說,他們可以適用于多種數據類型,我們可以理解為是一個非常大類別的實現,如果我們當前只是需要存儲某些類別的數據,比如我們就單純的進行一些短鏈接映射存儲,用戶的一些信息存儲,在某些時候需要快速進行讀取,我們就沒必要使用 Redis 和 MySQL 這種的數據庫存儲的方式,也就避免了使用它們需要去考慮的一些問題,同樣,我們只是單純的去進行某些數據的存儲,我們就可以自己進行優化,將對應的性能提升到最優。
我們自己實現的 KV 存儲其實跟 Redis 中的 KV 存儲的思想很相像,都是以鍵值對的方式去進行存儲,比如說我們需要訪問某一張圖片,知道鏈接,輸入對應的鏈接,就可以訪問這張圖片,其實這就是一種 KV 存儲的方式。
整體架構
基于上面的了解,我們就可以來梳理一下我們的 KV 存儲的一個整體架構,其實整個流程也可以去理解為一個請求響應的流程,我么需要進行存儲,首先就要發送需要存儲的數據,然后服務端進行存儲,返回給我們對應的信息(成功或者失敗),我們后續訪問就可以直接進行看到對應的信息,我們就可以簡單的理解為下面這種架構:
網絡模塊的實現
在前面的文章當中我們介紹了幾種網絡高并發的實現方式,reactor
、io_uring
和 Ntyco(協程)
,在我們的實現當中這幾種實現方式都會被使用到。
首先我們需要明白的一點就是,對于網絡模塊與 KV 引擎模塊我們是要進行分開的,網絡模塊只進行網絡模塊的工作就可以了,而 KV 引擎模塊在進行協議的解析工作。
recator
在前面的章節當中,我們已經實現過 reactor 了,reacor 本質上就是將對應的 IO 管理轉化為對事件的管理,其實它的思想就是使用了 IO 多路復用模型,對讀寫事件進行監聽,通過回調函數的調用,異步的去處理讀寫事件,不去過分的占用核心線程的工作。
首先就是對應的封裝工作,對于一個事件來說,肯定有對應的 fd,回調函數,對應的接收和發送緩沖區,我們將其封裝在一個 struct 中,方便后續進行調用。
#ifndef __SERVER_H__
#define __SERVER_H__#define BUFFER_LENGTH 1024#define ENABLE_KVS 1typedef int (*RCALLBACK)(int fd);struct conn {int fd;char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;int status;
};#if ENABLE_KVS
int kvs_request(struct conn *c);
int kvs_response(struct conn *c);
#endif#endif
我們在這一塊兒也提供了兩個接口kvs_request
和kvs_response
,從命名就可以可以看出,他其實就是接受服務端發送過來的請以及響應,那么我們接收到對應的請求以后,就需要進行協議的制定,如何對數據進行解析,很明顯kvs_request
接口中就需要與我們的 KV 引擎模塊相連接,接收到請求以后就需要調用到 KV 引擎模塊進行協議的解析工作。
我們來看具體的實現代碼:
reactor.c
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include "server.h"#define CONNECTION_SIZE 1024#define MAX_PORTS 20#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)typedef int (*msg_handler)(char* msg, int length, char* response);
static msg_handler kvs_handler;// 請求處理
int kvs_request(struct conn *c) {// printf("recv: %d, %s\n", c->rlength, c->rbuffer);c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer);return c->wlength;
}// 響應處理
int kvs_response(struct conn *c) {}int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);int epfd = 0;
struct timeval begin;struct conn conn_list[CONNECTION_SIZE] = {0};
// fdint set_event(int fd, int event, int flag) {if (flag) { // non-zero addstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else { // zero modstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}int event_register(int fd, int event) {if (fd < 0) return -1;conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd, event, 1);
}// listenfd(sockfd) --> EPOLLIN --> accept_cb
int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);//printf("accept finshed: %d\n", clientfd);if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}event_register(clientfd, EPOLLIN); // | EPOLLETif ((clientfd % 1000) == 0) {struct timeval current;gettimeofday(¤t, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, ¤t, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}int recv_cb(int fd) {memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0) { // disconnectprintf("client disconnect: %d\n", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinishedreturn 0;} else if (count < 0) { // printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return 0;}conn_list[fd].rlength = count;// printf("RECV: %s\n", conn_list[fd].rbuffer);#if ENABLE_KVSkvs_request(&conn_list[fd]);#endif set_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd) {
#if ENABLE_KVSkvs_response(&conn_list[fd]);#endifint count = 0;if (conn_list[fd].wlength != 0) {count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}set_event(fd, EPOLLIN, 0);//set_event(fd, EPOLLOUT, 0);return count;
}int init_reactor_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));}listen(sockfd, 10);//printf("listen finshed: %d\n", sockfd); // 3 return sockfd;
}int recator_entry(unsigned short port, msg_handler handler) {kvs_handler = handler;epfd = epoll_create(1);int i = 0;for (i = 0;i < MAX_PORTS; i++) {int sockfd = init_reactor_server(port + i);conn_list[sockfd].fd = sockfd;conn_list[sockfd].r_action.recv_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);while (1) { // mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);} if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}}}
}
kvstore.h
#ifndef __KV_STORE__
#define __KV_STORE__#define NETWORK_RECATOR 0
#define NETWORK_PROACTOR 1
#define NETWORK_NTYCO 2#define NETWORK_TYPE NETWORK_RECATORtypedef int (*msg_handler)(char* msg, int length, char* response);const char* command[] = {"SET", "GET", "DEL", "MOD", "EXIST"
};const char* response[] = {};#endif
kvstore.c
#include <stdio.h>
#include <stdlib.h>
#include "kvstore.h"extern int recator_entry(unsigned short port, msg_handler handler);
extern int ntyco_start(unsigned short port, msg_handler handler);
extern int proactor_entry(unsigned short port, msg_handler handler);
/*
* @brief 協議解析
* @param msg 消息體
* @param length 消息體長度
* @param response 響應體
* @return 0 成功 -1 失敗
*/
// 協議解析
int kvs_protocal(char* msg, int length, char* response)
{printf("recv: %d, %s\n", length, msg);}int main(int argc, char* argv[])
{if (argc != 2) {printf("Usage: %s <port>\n", argv[0]);return -1;}if (NETWORK_TYPE == NETWORK_RECATOR) {recator_entry(atoi(argv[1]), kvs_protocal);} else if (NETWORK_TYPE == NETWORK_PROACTOR) {proactor_entry(atoi(argv[1]), kvs_protocal);} else if (NETWORK_TYPE == NETWORK_NTYCO) {ntyco_start(atoi(argv[1]), kvs_protocal);}return 0;
}
kvstore.c
文件當中主要就是對于 KV 引擎的一個實現,當然 main 函數也是用在這當中的,kvs_protocal
是我們的具體協議解析的函數,我們要實現其與網絡模塊的互聯,就可以采用函數指針的方式,無論是reactor
、proactor
或者是協程
,其實都采用這種方式,C 語言并不想 C++ 那樣有包裝器,所以在這兒我們就使用函數指針的方法來操作。kvstore.h
其實就是定義與實現的分離,我們當前是仿照 Redis 協議進行制定的;recator.c
主要就是kvs_request
模塊,實現了與 KV 引擎的互聯,但是網絡模塊又是與 KV 引擎模塊是解耦的。
proactor
procator.c
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024typedef int (*msg_handler)(char* msg, int length, char* response);
static msg_handler kvs_handler;struct conn_info {int fd;int event;
};int init_proactor_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10);return sockfd;
}int set_event_recv(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_READ,};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int set_event_send(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_WRITE,};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_ACCEPT,};io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int proactor_entry(unsigned short port, msg_handler handler) {kvs_handler = handler;int sockfd = init_proactor_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);char buffer[BUFFER_LENGTH] = {0};while (1) {io_uring_submit(&ring);struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_waitint i = 0;for (i = 0;i < nready;i ++) {struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == EVENT_ACCEPT) {set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);//printf("set_event_accept\n"); //int connfd = entries->res;set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);} else if (result.event == EVENT_READ) { //int ret = entries->res;if (ret == 0) {close(result.fd);} else if (ret > 0) {// printf("set_event_recv ret: %d, %s\n", ret, buffer); // 協議解析char response[BUFFER_LENGTH] = {0};ret = kvs_handler(buffer, ret, response);set_event_send(&ring, result.fd, response, ret, 0);}} else if (result.event == EVENT_WRITE) {int ret = entries->res;//printf("set_event_send ret: %d, %s\n", ret, buffer);set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);}}io_uring_cq_advance(&ring, nready);}
}
有了前面recator
模塊的理解,我們現在就只需要在已經實現過的io_uring
代碼中進行一些修改即可,將協議解析的內容添加進去。
Ntyco
當前協程實現是采用的一個 github 上開源的網絡協程庫組件來進行實現的,也是將對應的協議解析的內容添加進去即可:https://github.com/wangbojing/NtyCo,感興趣的可以將對應的代碼下載下來進行使用。
ntyco.c
#include "nty_coroutine.h"
#include <arpa/inet.h>typedef int (*msg_handler)(char* msg, int length, char* response);
static msg_handler kvs_handler;void server_reader(void *arg) {int fd = *(int *)arg;int ret = 0;while (1) {char buf[1024] = {0};ret = recv(fd, buf, 1024, 0);if (ret > 0) {// printf("read from server: %.*s\n", ret, buf);// 協議解析位置char response[1024] = {0};int slength = kvs_handler(buf, ret, response);ret = send(fd, response, slength, 0);if (ret == -1) {close(fd);break;}} else if (ret == 0) { close(fd);break;}}
}void server(void *arg) {unsigned short port = *(unsigned short *)arg;int fd = socket(AF_INET, SOCK_STREAM, 0);if (fd < 0) return ;struct sockaddr_in local, remote;local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in));listen(fd, 20);printf("listen port : %d\n", port);while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = accept(fd, (struct sockaddr*)&remote, &len);nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}
}int ntyco_start(unsigned short port, msg_handler handler) {kvs_handler = handler;nty_coroutine *co = NULL;nty_coroutine_create(&co, server, &port);nty_schedule_run();
}
接下來可以看一下對應的實踐成果:
當前與網絡模塊的關聯已經實現完畢,后續更新請看下一篇文章。