? ? ???
【redis導讀】redis作為一款高性能的內存數據庫,面試服務端開發,redis是繞不開的話題,如果想提升自己的網絡編程的水平和技巧,redis這款優秀的開源軟件是很值得大家去分析和研究的。
? ??
? ? ? 筆者從大學畢業一直有分析redis源碼的想法,但由于各種原因,一直沒有付諸行動,今天抽空把redis4.0的源碼做了一次深層次的剖析,redis作為一款高效的、支持高并發的內存型數據庫,相信很多同學認為redis采用了非常復雜的網絡通信架構,但實則不然!redis之所以性能高,redis4.0采用了單線程的模式(redis6.0不再是單線程模式),有效地避免了線程切換和同步所帶的性能開銷;redis鍵值對全部存儲在內存中,redis自實現了一套高效的內存管理機制,數據的存取都是直接訪問內存,無需進行磁盤IO訪問。
1、前期準備工作
? ??centos的終端上運行:
wget http://download.redis.io/releases/redis-4.0.11.tar.gz
tar -zxvf redis-4.0.11.tar.gz
cd?redis-4.0.11
make -j 5
? ? ?編譯redis源碼:
? ? ??gdb調試redis-server:
?gdb redis-server
?r
? ?在redis編譯目錄下,再啟一個終端,運行如下指令,把redis-client運行起來:
gdb redis-cli
r
set?hello redis
? ? 這樣就完成了redis的前期準備工作,可以高效地往redis-server中更新鍵值對,好那接下來看看redis-server關于服務端源碼的剖析。
2、調試源碼
? ? ??redis-server也是作為一個獨立的進程,既然是獨立的進程,那么程序肯定有入口點,也即是main函數入口,全局搜索了下redis的源碼,可以看到server.c中有main函數有入口。
int?main(int?argc,?char?**argv)?{
? ? ......
? ??//初始化服務端
? ? initServer();
? ? //設置一些回調函數
? ? aeSetBeforeSleepProc(server.el, beforeSleep);
? ? aeSetAfterSleepProc(server.el, afterSleep);
? ? //aeMain開啟事件循環
? ? aeMain(server.el);
? ? ......
? ? aeDeleteEventLoop(server.el);
? ? return 0;
}
? ?以上是server.c中main函數的主要執行流,只有一個主線程,初始化服務,設置回調,開始事件循環。那逐步開始拆解,先看看initServer()的執行流。
? ? 備注:initServer()接口中很多細節值得大家去學習,也是編寫服務端程序容易被遺漏的細節。
/* Global vars */
struct redisServer server; /* Server global state */void setupSignalHandlers(void) {struct sigaction act;/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.* Otherwise, sa_handler is used. */sigemptyset(&act.sa_mask);act.sa_flags = 0;act.sa_handler = sigShutdownHandler;sigaction(SIGTERM, &act, NULL);sigaction(SIGINT, &act, NULL);......return;
}void initServer(void)
{int j;/*忽略SIGHUP、SIGPIPE信號,否則這兩個信號容易把redis進程給掛掉*/signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);//設置指定信號處理函數。setupSignalHandlers();....../*全局redisServer對象,生命周期和整個進程保持一致redisServer對象保存了事件循環、客戶端隊列等成員變量*/server.pid = getpid();server.current_client = NULL;server.clients = listCreate();server.clients_to_close = listCreate();server.slaves = listCreate();server.monitors = listCreate();//clients_pending_write表示已連接客戶端,但未注冊寫事件的隊列server.clients_pending_write = listCreate();server.slaveseldb = -1; server.unblocked_clients = listCreate();server.ready_keys = listCreate();//還未給回復的客戶端隊列server.clients_waiting_acks = listCreate();server.get_ack_from_slaves = 0;server.clients_paused = 0;server.system_memory_size = zmalloc_get_memory_size();createSharedObjects();adjustOpenFilesLimit();/*根據配置的參數,給主evetLoop的各成員隊列初始化指定大小的空間比如: 讀、寫回調函數的aeFileEvent隊列typedef struct aeFileEvent {int mask;//可讀、可寫、異常aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;}aeFileEvent;*///全局就一個redisServer,一個redisServer對應一個eventLoopserver.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);if (server.el == NULL) {serverLog(LL_WARNING,"Failed creating the event loop. Error message: '%s'",strerror(errno));exit(1);}server.db = zmalloc(sizeof(redisDb) * server.dbnum);//開啟監聽if (server.port != 0 &&listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)exit(1);if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog);if (server.sofd == ANET_ERR) {serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);exit(1);}//將socket設置成非阻塞的anetNonBlock(NULL,server.sofd);}......//創建Redis定時器,用于執行定時任務if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");exit(1);}/*1、為redisServer監聽套接字設置連接建立成功回調函數acceptTcpHandler,只關注可讀事件,監聽套接字產生可讀事件,說明連接建立成功。2、將監聽socket綁定到IO復用模型上面去*/for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler, NULL) == AE_ERR){serverPanic("Unrecoverable error creating server.ipfd file event.");}}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd, AE_READABLE,acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");/* 創建一個管道,用于主動喚醒被epoll_wait掛起的eventLoop */if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,moduleBlockedClientPipeReadable, NULL) == AE_ERR) {serverPanic("Error registering the readable event for the module ""blocked clients subsystem.");}......
}
??? ?基于上述的主流程,我們進一步剖析,如何將監聽socket綁定到IO多路復用模型上?進一步剖析aeCreateFileEvent接口。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}static int aeApiAddEvent(aeEventLoop *eventLoop,int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0};int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE)ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;//從這里看redis使用epoll模型,將fd綁定到epfd上if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}
? ? ? 假設epoll模型檢測到監聽套接字有可讀事件產生,那主Loop的勢必從epoll_wait接口返回,再根據事件類型,轉調我們提前設置的回調函數acceptTcpHandler中來。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {//調用accept接口,生成一個客戶端套接字cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);//acceptCommonHandler(cfd,0,cip);}
}#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;//創建cif ((c = createClient(fd)) == NULL) {serverLog(LL_WARNING,"Error registering fd event for the new client: %s (fd=%d)",strerror(errno),fd);close(fd); /* May be already closed, just ignore errors */return;}......
}//以客戶端套接字創建一個client對象
client *createClient(int fd) {client *c = zmalloc(sizeof(client));if (fd != -1) {//將客戶端套接字設置成非阻塞的anetNonBlock(NULL,fd);//關閉nagel算法anetEnableTcpNoDelay(NULL,fd);//設置TCP鏈接保活機制if (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);/*將客戶端套接字綁定到epfd上,同時設置可讀事件回調函數readQueryFromClient*/ if (aeCreateFileEvent(server.el, fd, AE_READABLE,readQueryFromClient, c) == AE_ERR){close(fd);zfree(c);return NULL;}}......
}
? ??那接著看客戶端套接字產生了可讀事件,進而主Loop循環會執行到和當前客戶端套接字相關的回調函數中來,一起看下readQueryFromClient的源碼。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
{client *c = (client*)privdata;int nread, readlen;size_t qblen;UNUSED(el);UNUSED(mask);readlen = PROTO_IOBUF_LEN;if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG){ssize_t remaining = (size_t)(c->bulklen+2) - sdslen(c->querybuf);if (remaining < readlen)readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = read(fd, c->querybuf + qblen, readlen);if (nread == -1) {if (errno == EAGAIN) {//說明當前接收緩沖區不夠,沒法讀到最新的數據return;} else {//那說明真的出錯了serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {serverLog(LL_VERBOSE, "Client closed connection");freeClient(c);return;} else if (c->flags & CLIENT_MASTER){c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf + qblen, nread);}......if (!(c->flags & CLIENT_MASTER)) {processInputBuffer(c);} else {size_t prev_offset = c->reploff;processInputBuffer(c);size_t applied = c->reploff - prev_offset;if (applied) {replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);sdsrange(c->pending_querybuf, applied, -1);}}
}
? ??processInputBuffer 判斷接收到的字符串是不是以星號( * )開頭,如果以*開頭,設置 client 對象的 reqtype 字段值為 PROTO_REQ_MULTIBULK ,接著調用 processMultibulkBuffer 函數繼續處理剩余的字符串。處理后的字符串被解析成 redis 命令,如果是具體的命令,那么redis會按照指定的規則去執行。
? ? 既然提到指令command,那么processInputBuffer 接口中肯定有和指令command處理相關的接口。
int processCommand(client *c) {//如果是quit指令,那么給客戶端回應一個ok的應答replayif (!strcasecmp(c->argv[0]->ptr,"quit")) {addReply(c,shared.ok);c->flags |= CLIENT_CLOSE_AFTER_REPLY;return C_ERR;}//查找指令,執行對應的指令,出錯了,給客戶端回應一個錯誤信息c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {flagTransaction(c);sds args = sdsempty();int i;for (i=1; i < c->argc && sdslen(args) < 128; i++)args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",(char*)c->argv[0]->ptr, args);sdsfree(args);return C_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return C_OK;}......
}
? 那繼續看看addReply接口:
?
void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;if (sdsEncodedObject(obj)) {if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)_addReplyObjectToList(c,obj);} else if (obj->encoding == OBJ_ENCODING_INT) {......} else {serverPanic("Wrong obj->encoding in addReply()");}
}
? ? 繼續看prepareClientToWrite接口:
int prepareClientToWrite(client *c) {if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;if ((c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;if (c->fd <= 0) return C_ERR; if (!clientHasPendingReplies(c) &&!(c->flags & CLIENT_PENDING_WRITE) &&(c->replstate == REPL_STATE_NONE ||(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))){/*如果當前client沒有CLIENT_PENDING_WRITE標記而且沒有暫存的數據要發送,那么給它設置個CLIENT_PENDING_WRITE同時將當前client添加到redisServer的clients_pending_write鏈表中去*/ c->flags |= CLIENT_PENDING_WRITE;listAddNodeHead(server.clients_pending_write, c);}return C_OK;
}
? ? ? 還有接口_addReplyToBuffer:
/*最重要的一步,將客戶端請求command執行的結果添加到cliet對應的buf緩沖區中去。
*/
int _addReplyToBuffer(client *c, const char *s, size_t len)
{size_t available = sizeof(c->buf) - c->bufpos;if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;/*如果client對應的replay鏈表長度大于0,那么將該應答指令添加到replay鏈表中去*/ if (listLength(c->reply) > 0) return C_ERR;if (len > available) return C_ERR;memcpy(c->buf + c->bufpos, s, len);c->bufpos += len;return C_OK;
}//_addReplyToBuffer返回C_ERR,那將replay添加到replay鏈表
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)_addReplyObjectToList(c, obj);
? ?redis4.0最核心的部分就是這個主Loop循環:
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP);}
}
?? ? 每次循環都會執行下beforesleep接口,beforesleep接口主要做了啥呢,可以看看beforesleep接口的實現:
void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* Handle writes with pending output buffers. */handleClientsWithPendingWrites();......
}int handleClientsWithPendingWrites(void) {listIter li;listNode *ln;int processed = listLength(server.clients_pending_write);//先處理有數據需要發送的鏈表clients_pending_writelistRewind(server.clients_pending_write, &li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//注銷掉CLIENT_PENDING_WRITE標記c->flags &= ~CLIENT_PENDING_WRITE;listDelNode(server.clients_pending_write,ln);//直接往socket寫數據if (writeToClient(c->fd, c, 0) == C_ERR) continue;//如果當前client對象有需要發送的replayif (clientHasPendingReplies(c)) {int ae_flags = AE_WRITABLE;if (server.aof_state == AOF_ON &&server.aof_fsync == AOF_FSYNC_ALWAYS){ae_flags |= AE_BARRIER;}/*如果tcp窗口太小,那么數據有可能發不出去,將client的fd可寫事件添加到epoll模型上去并注冊可寫回調函數sendReplyToClient*/if (aeCreateFileEvent(server.el, c->fd, ae_flags,sendReplyToClient, c) == AE_ERR){freeClientAsync(c);}}}return processed;
}//sendReplyToClient也是調用writeToClient接口
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {UNUSED(el);UNUSED(mask);writeToClient(fd,privdata,1);
}
? ? ?所以分析了這么多,感覺redis的通信模型就是單線程,外加一個主Loop循環,定義一個全局的redisServer對象,定義多個數據成員鏈表用于管理已連接的client對象集合,需要回復的client對象、有數據需要待發送的client對象集合,epoll模型監聽listenSocket、AcceptSocket可讀事件,客戶端有請求指令發送過來,redisServer解析指令,執行指令,并給client回復執行結果,如果tcp窗口太小,給當前client的fd注冊可寫事件和可寫回調函數sendReplyToClient,待TCP窗口滿足發送數據要求時,sendReplyToClient再執行數據的發送。另外主Loop每次循環時都會主動檢測待回復鏈表replay、待發送鏈表clients_pending_write,如果有數據需要發送給客戶端,逐個遍歷發送。
3、實測驗證
? ? ?在centos7做下實測,我們同時開啟兩個redis-cli,先后給redis-server發送兩個指令
set?hello world
???此時看下redis-server的堆棧以及主線程:
?? ?redis處理客戶端請求,并不是多線程并發處理,而是循環遍歷去給pending client回復報文,逐一回應。
? ? ?