Redis如何避免數據丟失?——AOF

目錄

AOF日志

1. 持久化——命令寫入到AOF文件

寫到用戶緩沖區

AOF的觸發入口函數——propagate

?具體的實現邏輯——feedAppendOnlyFile

從用戶緩沖區寫入到AOF文件(磁盤)

函數write、fsync、fdatasync

Redis的線程池

AOF文件的同步策略

觸發的入口函數——flushAppendOnlyFile

2. AOF重寫?

AOF 重寫的2個觸發時機

用戶發送?bgrewriteaof 命令

在定時函數serverCron中觸發

父子進程使用pipe進行通信?

兩個緩沖區——重寫緩沖 和 差異緩沖

重寫緩沖

差異緩沖

rewriteAppendOnlyFileBackground的實現

執行重寫過程的函數——rewriteAppendOnlyFile

父進程監聽子進程結束, AOF 重寫收尾

在定時函數serverCron中監聽

主進程對AOF重寫收尾——backgroundRewriteDoneHandler

3. Redis重啟,AOF 文件加載


Redis是把數據儲存在內存的鍵值數據庫,而服務器一旦宕機,那內存中的數據將全部丟失。像MySQL那樣,是有宕機后數據恢復機制的。那Redis也是有的,其有兩種方式:AOF和RDB。該文章講解AOF

AOF日志

MySQL是使用redo log(重做日志)來進行宕機恢復的。其是使用了寫前日志(Write Ahead Log,WAL),即是在實際寫數據前,先把修改的數據寫到日志文件中,方便出故障時候進行恢復。

而AOF正好相反,是寫后日志即是先執行命令把數據寫到內存,之后再把該操作記錄到日志中。這是個文本日志,不是二進制文件。

那該日志主要有3個操作:

  • AOF持久化(同步):客戶端向Redis服務器發送命令,這些命令會被存儲到AOF緩沖區中,并隨后會持久化到磁盤文件中
  • AOF重寫:隨著寫入的內容越來越多,就會占用大量的磁盤空間,并且Redis重啟時候需要按照順序執行AOF中的命令,這樣時間就比較長,所以Redis 會定期重寫 AOF 日志,以達到文件瘦身的效果和縮短重啟恢復所需的時間。
  • 重啟數據恢復:Redis重啟后,通過AOF來進行數據恢復

1. 持久化——命令寫入到AOF文件

寫到用戶緩沖區

首先,寫入到AOF的命令是先存儲在一個AOF緩沖區。

struct redisServer {.........sds aof_buf;      /* AOF buffer, written before entering the event loop */
};

客戶端發送的命令轉為RESP協議格式的字符串,然后追加到已有的字符串后面,這些都是存儲在aof_buf中。

AOF的觸發入口函數——propagate

單線程情況下,其函數被調用的流程:readQueryFromClient——>processInputBuffer——>processCommandAndResetClient——>processCommand——>?call(client *c, int flags) ——>propagate

void call(client *c, int flags) {/* Call the command. */c->cmd->proc(c);........................// 入參的 flags 設置了 CMD_CALL_PROPAGATE 標識, 表示當前的命令需要傳播// 同時對應的客戶端內部的標識不是 CLIENT_PREVENT_PROP (客戶端的命令阻止傳播)if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){int propagate_flags = PROPAGATE_NONE;if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);// 當前的客戶端設置了需要強制同步傳播,或者設置了 需要強制 AOF 傳播if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;// 與客戶端c的flags對比,若是符合條件,取消 命令傳播標識的repl或者aofif (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL))propagate_flags &= ~PROPAGATE_REPL;if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF))propagate_flags &= ~PROPAGATE_AOF;//  命令傳播標識 不為 none, 且當前的命令不是模塊命令if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))// 處理aof 和 復制給副本            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}..................................
}
// 將命令寫到aof 文件,并將命令發送給副本
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)
{if (!server.replication_allowed)return;// AOF 開啟了, 同時命令傳播標識為 需要 AOF 傳播if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);     // 將當前的命令保存到 AOF 緩沖區..................  
}

?具體的實現邏輯——feedAppendOnlyFile

該函數就是把命令寫入到aof緩沖區。

  1. 創建一個SDS對象buf,用戶把命令寫入到該對象。判斷該命令使用的數據庫號是否是用戶選擇的數據庫號,若不是就需要在aof文件中添加選擇數據庫。
  2. 把命令寫入到buf。
    1. 對于?EXPIREEXPIREAT?和?PEXPIRE?將其轉換為?PEXPIREAT?特殊處理。
    2. 對于帶?EXPX?參數的?SET?命令特殊處理,主要涉及過期時間的處理;
    3. 對于其它命令,調用?catAppendOnlyGenericCommand?按照 RESP 協議組裝命令,并將其暫存至 buf;
  3. 如果啟用 AOF 日志,則將 buf 中暫存的命令追加到 AOF緩沖區server.aof_buf。
  4. 如果存在正在重寫 AOF 的子進程,則將命令追加到 AOF 重寫緩沖區server.aof_rewrite_buf_blocks
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {sds buf = sdsempty();//該命令寫入的數據庫和用戶選擇的數據庫不一致的話,需要在aof文件添加一段選擇數據庫的記錄if (dictid != server.aof_selected_db) {char seldb[64];snprintf(seldb,sizeof(seldb),"%d",dictid);// 拼接出一個 select 數據庫號 的語句, 這個語句是遵守 RESP 協議buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",(unsigned long)strlen(seldb),seldb);server.aof_selected_db = dictid;}//這三個命令, 在 AOF 保存的時候, 都會轉為 expireat key 具體的過期時間 (單位毫秒) 的格式存入到 AOF 文件中if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||cmd->proc == expireatCommand) {// 轉為過期對應的文本, 同時追加到 buf 中buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setCommand && argc > 3) {//帶 EX、PX 參數的 SET 命令,特殊處理, set key value ex seconds, set key value px millisecondsrobj *pxarg = NULL;if (!strcasecmp(argv[3]->ptr, "px")) {    //過期時間是毫秒的pxarg = argv[4];}if (pxarg) {    //毫秒的robj *millisecond = getDecodedObject(pxarg);long long when = strtoll(millisecond->ptr,NULL,10);when += mstime();decrRefCount(millisecond);robj *newargs[5];newargs[0] = argv[0];newargs[1] = argv[1];newargs[2] = argv[2];newargs[3] = shared.pxat;newargs[4] = createStringObjectFromLongLong(when);// 往 buf 中追加 set 命令buf = catAppendOnlyGenericCommand(buf,5,newargs);// 創建的對象手動修改引用計數, 便于內存回收decrRefCount(newargs[4]);} else {    //秒過期的buf = catAppendOnlyGenericCommand(buf,argc,argv);}} else {// 其他的命令直接轉為 RESP 協議的字符串進行追加buf = catAppendOnlyGenericCommand(buf,argc,argv);}//將組裝好的命令追加到 aof_bufif (server.aof_state == AOF_ON)server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));// CHILD_TYPE_AOF表明后臺正在進行重寫,那么將命令再追加一份到重寫緩沖區中,以便我們記錄重寫時 AOF 文件和當前數據庫的差異if (server.child_type == CHILD_TYPE_AOF)aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));sdsfree(buf);
}

從用戶緩沖區寫入到AOF文件(磁盤)

函數write、fsync、fdatasync

  • write只是將數據保存到系統緩沖區或者用戶緩沖區,還沒有真正落入到磁盤中的
  • fsync是真正地把數據寫入到磁盤,即是把緩沖區中的數據落入磁盤。
    • POSIX 標準定義的?fsync?函數在文件元數據(metadata,例如?st_sizest_atimest_mtime?等)變臟時,會將所有元數據同步到磁盤。由于每次同步都必定導致時間戳的改變,而且文件內容和文件元數據通常存儲在磁盤上的不同位置,因此每次調用?fsync?至少需要兩次隨機磁盤 I/O
  • 為此,Linux 平臺提供了一個?fdatasync?函數。該函數僅在必要時才將元數據同步到磁盤(文件讀寫時間戳等信息的改變不會實時落盤),大大降低了元數據同步的頻率。
    • 舉例來說,文件的尺寸(st_size)如果變化,是需要立即同步的,否則OS一旦崩潰,即使文件的數據部分已同步,由于metadata沒有同步,依然讀不到修改的內容。而最后訪問時間(atime)/修改時間(mtime)是不需要每次都同步的,只要應用程序對這兩個時間戳沒有苛刻的要求,基本無傷大雅

?Redis 通過條件編譯,將 Linux 平臺的?redis_fsync?定義成了?fdatasync,而在其它類 Unix 平臺上依舊是?fsync

#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif

Redis的線程池

真正寫入到磁盤的是使用fsync函數,那說明該函數是相對比較耗時的。Redis維護了一個線程池,就是用來處理一些比較耗時的操作。

那么,AOF緩沖區寫入到AOF文件(存入到磁盤)過程中,會先通過write將數據寫入到系統緩存,然后根據當前的AOF保存策略,決定是否需要把fsync函數的執行交給線程池。

AOF文件的同步策略

  • no:不進行同步,每個寫命令執行完后,只是先把記錄寫到AOF文件中的內存緩沖區中,由操作系統決定合適將緩沖區內存寫回磁盤。
  • always:每次write后,都會立即執行fsync,這種就是在主線程中執行fsync。
  • everysec:每次write后,不會立即執行fsync,理論是每秒執行一次fsync,同時內部將fysnc的執行交給線程池去處理

觸發的入口函數——flushAppendOnlyFile

將緩沖區中的數據寫入到aof文件的函數是flushAppendOnlyFile。

在Redis中有5處會調用該函數

  1. 通過命令動態關閉AOF功能時,會進行一次保存,即是發送命令將appendonly yes設置為appendonly no。
  2. 在Redis正常關閉之前,會執行該函數。
  3. 在事件循環中的beforesleep函數中會調用一次,這個是AOF功能的主要的保存入口
  4. Redis的定時函數serverCron(默認100毫秒執行一次)
  5. 定時函數serverCron,判斷上次AOF寫入狀態,失敗就執行一次該函數。

?定時函數serverCron關于這部分的代碼。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {..................//上次的 AOF 寫文件時, 沒有執行, 將 aof_flush_postponed_start 設置為true, 表示需要延遲處理//存在延遲的AOF落盤操作,在這里完成if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)flushAppendOnlyFile(0);run_with_period(1000) {//上次的寫文件失敗,即是fync失敗if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)flushAppendOnlyFile(0);}...................
}

存在延遲的AOF落盤操作

比如:主線程在執行flushAppendOnlyFile中調用write后,提交一個任務給后臺線程,假設此時數據量很大,fsync需要執行較長時間。而主線程又執行到了flushAppendOnlyFile,而上一次的fsync函數還沒有執行完,Redis會選擇延遲執行,將Server成員變量aof_flush_postponed_start設置為當前時間,就結束該函數。

所以在執行定時任務時候,會判斷該變量是否>0,若是,會再執行flushAppendOnlyFile,這個就是AOF同步延遲到定時函數處執行。

但是,延遲到定時任務處觸發, 還是無法保證后臺線程一定執行完上次的?fsync。所以該函數會根據當前的時間和變量儲存的時間進行判斷,若是在2s內,就不做任何處理,退出該函數;而大于2s,立即執行AOF緩沖區寫入文件的邏輯。

flushAppendOnlyFile的實現

  1. 如果AOF緩沖區為空,?并且AOF策略是everysec,同步到磁盤的內容大小不等于當前AOF文件的內容大小,當前時間 >?上次AOF fsync的時間,同時當前沒有正在運行的bio后臺任務,則嘗試執行fsync。
  2. 如果策略是everysec,且后臺存在正在同步的bio線程,則判斷aof_flush_postponed_start是否為0:
    1. 若是0,表示之前沒有延遲落盤任務,所以就只記錄當前的時間戳給aof_flush_postponed_start并退出。
    2. 若是不為0,但判斷距離aof_flush_postponed_start是否已經過去2s,若是就增加server.aof_delayed_fsync?計數,強制后續的磁盤同步流程
  3. 調用aofWrite將AOF緩沖區中的數據寫入到系統內核緩沖區(這時是還沒有使用fsync),若是寫入到系統的數據長度不等于當前 AOF 緩沖區的長度,?需要進行異常處理
  4. 如果aof_buf的總空間小于4kb,則清空buffer內容并重新使用該緩沖區,否則創建一個新的。
// AOF 緩沖區數據寫入文件
// 當持久策略被設置為 everysec, 實際上會由后臺線程進行處理, 那么當前這次刷新寫入時, 后臺可能有線程還在寫入, 所以這時的操作會延遲寫入 
//參數force 1:表示無視后臺的 fsync, 直接寫入, 0: 表示可以延遲, 一般 AOF 過程都是 0
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {ssize_t nwritten;int sync_in_progress = 0;mstime_t latency;//表示aof緩沖區中沒有數據, 就可以結束了,但是Redis中有一些極端情況,不會結束,當前學習可以不用了解,后序熟悉該代碼了再回頭看if (sdslen(server.aof_buf) == 0) {if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync &&!(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} else {return;}}// 持久策略為每秒 fsync 一次, 判斷后臺的線程池是否有線程在執行 fsync if (server.aof_fsync == AOF_FSYNC_EVERYSEC)sync_in_progress = aofFsyncInProgress();//該返回值為true,表示當前有BIO線程在執行 fsync // 持久策略為每秒 fsync 一次, 同時不需要強制寫入文件if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {// 當前有 BIO 線程在執行 fsyncif (sync_in_progress) {if (server.aof_flush_postponed_start == 0) { // 0 表示當前沒有延遲執行//當前有后臺線程在執行 fsync, 那么先延長一下, 設置aof_flush_postponed_start 為當前時間server.aof_flush_postponed_start = server.unixtime;return;//若之前是偶延遲執行,然后又進入了該函數(一般是執行定時函數觸發),那這時后臺還在執行fsync,但是當前時間和上一次設置的延遲時間小于2s,可以接受,就暫時不做處理} else if (server.unixtime - server.aof_flush_postponed_start < 2) {/* We were already waiting for fsync to finish, but for less* than two seconds this is still ok. Postpone again. */return;}//到了這一步表示線程池中有請求 fsync 的任務, 同時上次延遲距離當前時間超過 2 秒了server.aof_delayed_fsync++;    // 延遲 fsync 的次數 + 1serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");}}if (server.aof_flush_sleep && sdslen(server.aof_buf)) {usleep(server.aof_flush_sleep);}latencyStartMonitor(latency);//調用 write 函數將緩沖區中的數據寫入到文件 (此時還在系統緩存, 還沒寫入到磁盤nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));latencyEndMonitor(latency);if (sync_in_progress) {latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);} else if (hasActiveChildProcess()) {latencyAddSampleIfNeeded("aof-write-active-child",latency);} else {latencyAddSampleIfNeeded("aof-write-alone",latency);}latencyAddSampleIfNeeded("aof-write",latency);//將緩沖區中的數據 write 到系統后, 可以把延遲執行設置為 0   server.aof_flush_postponed_start = 0;                 // 寫入到系統的數據長度不等于當前 AOF 緩沖區的長度, 進入異常處理if (nwritten != (ssize_t)sdslen(server.aof_buf)) {static time_t last_write_error_log = 0;if (nwritten == -1) {    // -1, 沒有寫入任何數據, 就直接失敗了server.aof_last_write_errno = errno;}} else {// 大于 -1 但是不等于緩沖區的大小, 寫入成功了一部分, if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {//寫錯誤日志..............}} else {/* If the ftruncate() succeeded we can set nwritten to* -1 since there is no longer partial data into the AOF. */nwritten = -1;}server.aof_last_write_errno = ENOSPC;}// 同步策略為 alwaysif (server.aof_fsync == AOF_FSYNC_ALWAYS) {// 這種情況無法處理了, 已經告知客戶端寫入成功了, 但是當前寫入失敗了, 直接退出程序。serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");exit(1);} else {// 設置上一次寫入狀態為異常, 在定時器中會判斷這個狀態, 再次觸發 flushAppendOnlyFile server.aof_last_write_status = C_ERR;if (nwritten > 0) {// 更新當前 aof 文件的大小, 同時將緩沖區中這部分大小的數據移除// 表示這部分寫入成功了, 剩余部分下次調用繼續server.aof_current_size += nwritten;sdsrange(server.aof_buf,nwritten,-1);}return; /* We'll try again on the next call... */}} else {    //寫入成功if (server.aof_last_write_status == C_ERR) {server.aof_last_write_status = C_OK;}}server.aof_current_size += nwritten;    // 更新當前 AOF 文件的大小//如果當前 AOF 緩沖區足夠小,小于4K,那么重用這個緩存,否則釋放 AOF 緩沖區, 然后重新分配一個    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {sdsclear(server.aof_buf);} else {sdsfree(server.aof_buf);server.aof_buf = sdsempty();}try_fsync://aof緩沖區中沒有數據,但是有一些特例情況的需要處理的..........
}

try_fsync部分的代碼:

void flushAppendOnlyFile(int force) {if (sdslen(server.aof_buf) == 0) {if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync && !(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} .......................}..........................
try_fsync:// no-appendfsync-on-rewrite (正在重寫, 不執行 fsync) 被設置為 yes//判斷是否有運行中的 bio 線程if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())return;if (server.aof_fsync == AOF_FSYNC_ALWAYS) {latencyStartMonitor(latency);//如果 AOF 落盤策略為 always,直接同步if (redis_fsync(server.aof_fd) == -1) {serverLog(LL_WARNING,"Can't persist AOF for fsync error when the ""AOF fsync policy is 'always': %s. Exiting...", strerror(errno));exit(1);}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-fsync-always",latency);server.aof_fsync_offset = server.aof_current_size;//更新 aof_fsync_offset 為當前的 AOF文件大小server.aof_last_fsync = server.unixtime;     // 上次 fsync 為當前的時間} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.unixtime > server.aof_last_fsync)) {// 當前沒有請求 fsync 的任務在線程池中if (!sync_in_progress) {//提交一個任務,就是向線程池的任務鏈表中添加任務節點, 最終就是一個后臺線程執行一次 redis_fsync 函數aof_background_fsync(server.aof_fd);server.aof_fsync_offset = server.aof_current_size;}server.aof_last_fsync = server.unixtime;}
}

2. AOF重寫?

很容易想到的一個情況,文件越寫越大。AOF文件是以追加形式逐一記錄接受到的寫命令的。當一個鍵值對被多條命令反復修改時,AOF文件會記錄相應的多條命令。要是宕機后重啟,對同一個key,就需要依次執行AOF文件中對該key的操作命令。但是我們只需要最新的對該key的操作。

所以就有了重寫。重寫的時候,是根據這個鍵值對的最新狀態,為它生成對應的寫入命令。這樣一來,一個鍵值對在重寫日志中只使用一條命令即可。在日志恢復時,只執行一條命令,就可以直接完成這個鍵值對的寫入,也方便省時。

AOF 重寫的2個觸發時機

  • bgrewriteaof 命令被執行。
  • 定時器函數, 定時檢查 AOF 文件, 如果滿足配置文件里面設置的條件, 就觸發AOF重寫

用戶發送?bgrewriteaof 命令

bgrewriteaof 命令方式對應的邏輯函數為?bgrewriteaofCommand。主要邏輯是:

  1. 如果正在執行重寫中了,返回錯誤提示
  2. 如果正在執行RDB保存,就將?aof_rewrite_scheduled 屬性設置為 true, 返回提示后, 結束。之后是通過定時器函數serverCron判斷這個狀態確定是否需要觸發
  3. 否則,調用?rewriteAppendOnlyFileBackground 執行重寫
struct redisCommand redisCommandTable[] = {   {"bgrewriteaof",bgrewriteaofCommand,1,"admin no-script",0,NULL,0,0,0,0,0,0},.................
};void bgrewriteaofCommand(client *c) {if (server.child_type == CHILD_TYPE_AOF) {addReplyError(c,"Background append only file rewriting already in progress");} else if (hasActiveChildProcess()) {server.aof_rewrite_scheduled = 1;addReplyStatus(c,"Background append only file rewriting scheduled");} else if (rewriteAppendOnlyFileBackground() == C_OK) {addReplyStatus(c,"Background append only file rewriting started");} else {addReplyError(c,"Can't execute an AOF background rewriting. ""Please check the server logs for more information.");}
}

在定時函數serverCron中觸發

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  .......................// 后臺沒有進程在 RDB 和 AOF, 同時通過 bgrewriteaof 命令設置了定時刷新重寫 AOF  if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) {rewriteAppendOnlyFileBackground();}if (hasActiveChildProcess() || ldbPendingChildren()){    // 后臺有進程在 RDB 或者 AOF.......................} else {    // 當前后臺 沒有進程在 RDB 或者 AOF//省略了一些檢查.............//達到了AOF重寫的條件:開啟了AOF && 后臺沒有RDB和AOF重寫進行 &&// 配置了目前 AOF 文件大小超過上次重寫的 AOF 文件的百分比 &&//當前的 AOF 文件大小超過了配置的需要觸發重寫的最小大小if (server.aof_state == AOF_ON && !hasActiveChildProcess() &&server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size){// 計算當前的文件增長的比例long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1;long long growth = (server.aof_current_size*100/base) - 100;// 超過了就調用 rewriteAppendOnlyFileBackground 進行重寫if (growth >= server.aof_rewrite_perc) {rewriteAppendOnlyFileBackground();}}}...........................................
}

所以,都是集中到函數rewriteAppendOnlyFileBackground中處理的

在某時刻,需要AOF文件重寫:

  • 那為了不阻塞主線程,那可以fork一個子進程來重寫。fork出來的子進程,擁有了和父進程一樣的內存數據,子進程就先把這些內存數據寫入到一個AOF臨時文件。
  • 但是在這個過程中,父進程還是能接受客戶端的命令的,所以父子進程需要通訊,而Redis中父子進程是使用管道pipe進行通訊的。

父子進程使用pipe進行通信?

Redis是使用了3個管道。每個管道有2端,所以有6個fd。

struct redisServer {/* AOF pipes used to communicate between parent and child during rewrite. */int aof_pipe_write_data_to_child;int aof_pipe_read_data_from_parent;int aof_pipe_write_ack_to_parent;int aof_pipe_read_ack_from_child;int aof_pipe_write_ack_to_child;int aof_pipe_read_ack_from_parent;...................
}; int aofCreatePipes(void) {int fds[6] = {-1, -1, -1, -1, -1, -1};int j;//int pipe(int pipefd[2]); 成功:0;失敗:-1,設置errno,函數調用成功返回r/w兩個文件描述符if (pipe(fds) == -1) goto error; /* parent -> children data. */if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */if (pipe(fds+4) == -1) goto error; /* parent -> children ack. *//* Parent -> children data is non blocking. */if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return C_OK;................
}
  1. aof_pipe_write_data_to_child 和 aof_pipe_read_data_from_parent, 主要是父進程將子進程重寫過程中產生的命令同步給子進程(即是同步數據
  2. aof_pipe_write_ack_to_parent 和 aof_pipe_read_ack_from_child, 主要是用于子進程通知父進程停止同步變更命令
  3. aof_pipe_write_ack_to_child 和 aof_pipe_read_ack_from_parent, 主要用于父進程響應子進程的停止同步變更命令的請求

?我們要重點關注aof_pipe_write_data_to_child(寫端)?aof_pipe_read_data_from_parent(讀端)。這兩個是傳輸Redis內存數據的管道fd。

兩個緩沖區——重寫緩沖差異緩沖

重寫緩沖

struct redisServer {list *aof_rewrite_buf_blocks; //重寫緩沖區,注意是個鏈表  /* Hold changes during an AOF rewrite. */...................
};#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block *///AOF 重寫緩存列表的節點定義
typedef struct aofrwblock {unsigned long used, free;    //used:已使用的空間,free:剩余的空阿金char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;void aofRewriteBufferReset(void) {if (server.aof_rewrite_buf_blocks)listRelease(server.aof_rewrite_buf_blocks);server.aof_rewrite_buf_blocks = listCreate();listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}

什么時候使用到重寫緩沖區?那就是需要進行AOF重寫的時候。

將緩沖區中的數據寫入到aof的函數是flushAppendOnlyFile。那也是在該函數中,會使用到重寫緩沖區。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {.............................if (server.child_type == CHILD_TYPE_AOF)    /// 如果后臺正在進行重寫aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));//將命令寫入到 AOF 重寫緩沖區
}/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {//獲取緩沖區列表,是添加在尾部,所以獲取尾部listNode *ln = listLast(server.aof_rewrite_buf_blocks);aofrwblock *block = ln ? ln->value : NULL;while(len) {/* If we already got at least an allocated block, try appending* at least some piece into it. */if (block) {    //表明重寫緩沖列表已有數據//計算當前節點的剩余空間是否夠len長度的數據寫入unsigned long thislen = (block->free < len) ? block->free : len;if (thislen) {  /* The current block is not already full. */memcpy(block->buf+block->used, s, thislen);block->used += thislen;block->free -= thislen;s += thislen;len -= thislen;}}// len > 0, 說明還需要空間, 但是當前的節點沒有空間了, 需要新建一個節點if (len) { /* First block to allocate, or need another block. */int numblocks;// 分配新的緩存節點, 同時放到列表的尾部block = zmalloc(sizeof(*block));block->free = AOF_RW_BUF_BLOCK_SIZE;block->used = 0;listAddNodeTail(server.aof_rewrite_buf_blocks,block);numblocks = listLength(server.aof_rewrite_buf_blocks);if (((numblocks+1) % 10) == 0) {int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :LL_NOTICE;serverLog(level,"Background AOF buffer size: %lu MB",aofRewriteBufferSize()/(1024*1024));}}}// 注冊一個文件事件, 用來將緩沖區的數據寫入到 aof_pipe_write_data_to_child 中, //然后在 Pipe 的作用下, 可以同步到 aof_pipe_read_data_from_parentif (!server.aof_stop_sending_diff &&aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0){//這里注意:注冊的是 寫事件, 寫事件就緒的條件是內核空間的緩沖有空,就可以寫aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,AE_WRITABLE, aofChildWriteDiffData, NULL);}
}

接著來看看管道fd的寫事件回調函數aofChildWriteDiffData

那么當內核緩沖區空間有空閑,就會觸發該管道fd的寫事件,就會執行aofChildWriteDiffData通過該函數就把重寫緩存中的數據寫到了管道中,供子進程讀取到子進程的差異緩沖中。

//事件回調函數, 把當前的 AOF 緩沖區同步到 aof_pipe_write_data_to_child, 在 Pipe 的作用下間接同步到 aof_pipe_read_data_from_parent
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {listNode *ln;aofrwblock *block;ssize_t nwritten;while(1) {ln = listFirst(server.aof_rewrite_buf_blocks);block = ln ? ln->value : NULL;// 停止同步 或者 重寫緩沖區為空,  就需要刪除這個 寫事件if (server.aof_stop_sending_diff || !block) {aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);return;}if (block->used > 0) {// 把 block 的數據寫入到 aof_pipe_write_data_to_childnwritten = write(server.aof_pipe_write_data_to_child,block->buf,block->used);if (nwritten <= 0) return;memmove(block->buf,block->buf+nwritten,block->used-nwritten);block->used -= nwritten;block->free += nwritten;}if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);}
}

差異緩沖

在子進程重寫AOF過程中,子進程等待主進程把重寫緩沖中的數據通過pipe發送到差異緩沖區。

struct redisServer {sds aof_child_diff;  //子進程的差異緩沖區  /* AOF diff accumulator child side. */te. */...................
};

子進程通過pipe將重寫緩沖區中的數據同步到差異緩沖區的函數是aofReadDiffFromParent

ssize_t aofReadDiffFromParent(void) {char buf[65536]; /* Default pipe buffer size on most Linux systems. */ssize_t nread, total = 0;// 將 aof_pipe_read_data_from_parent 中的數據讀取到 buf 中while ((nread =read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {// 把buf的數據拼接到aof_child_diff 中        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);  total += nread;}return total;
}

rewriteAppendOnlyFileBackground的實現

了解了Redis中關于AOF重寫的兩個緩沖區和父子進程通過pipe通訊,那對AOF重寫的過程就好理解了。

其具體的細節步驟:

  1. 主進程fork出一個子進程,讓子進程來進行AOF重寫。fork出來的子進程,擁有了和父進程一樣的內存數據?
  2. 子進程將內存中的數據寫入到一個AOF臨時文件中
  3. 在子進程重寫期間,主進程還是會繼續將新到達的命令追加寫到原AOF,并將這些命令拷貝到重寫緩沖,然后通過pipe管道發送給子進程的差異緩沖中。
  4. 子進程處理完內存數據后,就把差異緩沖中的數據追加到臨時AOF文件中,之后就禁止主進程發新數據。
  5. 這時,若主進程中的重寫緩存中還剩余數據,就把該數據追加到臨時AOF文件中,再用臨時AOF文件替換舊的AOF,結束。?
int rewriteAppendOnlyFileBackground(void) {pid_t childpid;if (hasActiveChildProcess()) return C_ERR;  //判斷當前沒有RDB和aof重寫  if (aofCreatePipes() != C_OK) return C_ERR;    //創建 Pipe 通道, 用于父子進程之間通信//創建 AOF 子進程if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {char tmpfile[256];/* Child */redisSetProcTitle("redis-aof-rewrite");//將自己綁定給某個cpuredisSetCpuAffinity(server.aof_rewrite_cpulist);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());//這個是重點,  重寫AOFif (rewriteAppendOnlyFile(tmpfile) == C_OK) {//子進程重寫完成的一些收尾工作, 基本不涉及主流程, 通知父進程過程中子進程修改了多少數據sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");exitFromChild(0);} else {exitFromChild(1);}} else {/* Parent */..............server.aof_rewrite_scheduled = 0;server.aof_rewrite_time_start = time(NULL);server.aof_selected_db = -1;// 清空 redisServer 的 repl_scriptcache_dict 字典和 repl_scriptcache_fifo 這個列表// 和主從復制相關replicationScriptCacheFlush();return C_OK;}return C_OK; /* unreached */
}

執行重寫過程的函數——rewriteAppendOnlyFile

子進程執行的rewriteAppendOnlyFile就是真正的AOF重寫過程。

這個流程步驟有點多:

  1. 打開aof臨時文件,并命名;初始化差異緩沖server.aof_child_diff
  2. 若是啟用了混合持久化,則調用rdbSaveRio將 RDB 數據寫入 aof 臨時文件;否則,調用?rewriteAppendOnlyFileRio()?進行普通的 aof 重寫。其內部會遍歷字典快照,刪除無效數據后,將其封裝為 RESP 數據寫入臨時文件。在遍歷的過程中,還會周期性地從管道中拉取增量數據到?aof_child_diff
  3. 將I/O緩沖和內核緩沖中的剩余數據同步到磁盤
  4. 從管道中讀取剩余的增量數據,持續一段時間
  5. 停止讀取后,發送指令給管道讓主進程停止向管道寫入。然后等待主進程地 ACK;
  6. 此時父進程不會在同步差異命令過來了, 再做最后一次同步, 將 Pipe 通道中殘留的數據同步過來,再次從管道中讀取數據。
  7. 將差異緩沖中的數據追加到AOF臨時文件中,并再次將AOF臨時文件緩沖中的數據同步到磁盤中。
  8. 修改臨時文件名,并確認寫入成功
int rewriteAppendOnlyFile(char *filename) {rio aof;char tmpfile[256];char byte;// 1snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());FILE *fp = fopen(tmpfile,"w");//..................// 清空 aof_child_diff 的數據, 這個就是 AOF 子進程差異緩沖區server.aof_child_diff = sdsempty();rioInitWithFile(&aof,fp);      // 初始 rio 流, 也就是 IO 流, 用于寫入數據到文件// 設定 fsync 觸發條件if (server.aof_rewrite_incremental_fsync)rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);startSaving(RDBFLAGS_AOF_PREAMBLE);// 2if (server.aof_use_rdb_preamble) {int error;//混合持久化if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {errno = error;goto werr;}} else {//普通持久化if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;}// 3//fflush:是把C庫中的緩沖調用write函數寫到磁盤[其實是寫到內核的緩沖區]。//fsync:是把內核緩沖刷到磁盤上。if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;int nodata = 0;mstime_t start = mstime();// 4 .從管道中拉取剩余的增量數據,持續一段時間while(mstime()-start < 1000 && nodata < 20) {if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0){nodata++;continue;}nodata = 0; /* Start counting from zero, we stop on N *contiguous*timeouts. */aofReadDiffFromParent();    //從管道讀數據到 差異緩沖aof_child_diff}// 5 通知主進程 停止發送增量數據if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)goto werr;// 等待主進程的 ACK,最多等 5sif (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||byte != '!') goto werr;// 此時父進程不會在同步差異命令過來了, 再做最后一次同步, 將 Pipe 通道中殘留的數據同步過來// 再次從管道中讀取差異數據aofReadDiffFromParent();//獲取差異緩沖數據的內容大小size_t bytes_to_write = sdslen(server.aof_child_diff);const char *buf = server.aof_child_diff;long long cow_updated_time = mstime();long long key_count = dbTotalServerKeyCount();// 6 . 將差異緩沖數據寫入 aof 文件while (bytes_to_write) {size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);// 將 aof_child_diff 中的數據寫入到 aof 文件中if (rioWrite(&aof,buf,chunk_size) == 0)goto werr;bytes_to_write -= chunk_size;buf += chunk_size;/* Update COW info */long long now = mstime();if (now - cow_updated_time >= 1000) {sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");cow_updated_time = now;}}// 7 .將 aof 文件緩沖中的數據,同步到磁盤if (fflush(fp)) goto werr;if (fsync(fileno(fp))) goto werr;if (fclose(fp)) { fp = NULL; goto werr; }fp = NULL;//8 .重命名文件if (rename(tmpfile,filename) == -1) {unlink(tmpfile);stopSaving(0);return C_ERR;}stopSaving(1);return C_OK;werr:if (fp) fclose(fp);unlink(tmpfile);stopSaving(0);return C_ERR;
}

父進程監聽子進程結束, AOF 重寫收尾

在定時函數serverCron中監聽

那是在哪進行監聽呢?還是在定時函數serverCron中。定時地檢查子進程的狀態是否為結束了, 是的話, 執行結束邏輯。在下一次運行?serverCron定時函數時,調用?checkChildrenDone()完成 AOF 收尾工作。checkChildrenDone的核心內容backgroundRewriteDoneHandler函數

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {.................// 檢查是否有 RDB 子進程或者 AOF 重寫子進程結束了if (hasActiveChildProcess() || ldbPendingChildren()){run_with_period(1000) receiveChildInfo();checkChildrenDone();} else {...........}............................
}/* Receive info data from child. */
void receiveChildInfo(void) {if (server.child_info_pipe[0] == -1) return;size_t cow;monotime cow_updated;size_t keys;double progress;childInfoType information_type;/* Drain the pipe and update child info so that we get the final message. */while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &progress)) {updateChildInfo(information_type, cow, cow_updated, keys, progress);}
}void checkChildrenDone(void) {int statloc = 0;pid_t pid;// wait3可以獲取所有的進程是否有一個進程退出狀態的, 有的話, 進行徹底的銷毀,并返回其進程idif ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;int bysignal = 0;if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {bysignal = SIGUSR1;exitcode = 1;}if (pid == -1) {//打印日志} else if (pid == server.child_pid) {if (server.child_type == CHILD_TYPE_RDB) {backgroundSaveDoneHandler(exitcode, bysignal);} else if (server.child_type == CHILD_TYPE_AOF) {backgroundRewriteDoneHandler(exitcode, bysignal); //自己想哦買噶最終的清理邏輯} if (!bysignal && exitcode == 0) receiveChildInfo();    //獲取子進程發送給父進程的信息resetChildState();} else {if (!ldbRemoveChild(pid)) {//打印日志}}/* start any pending forks immediately. */replicationStartPendingFork();}
}

主進程對AOF重寫收尾——backgroundRewriteDoneHandler

主進程的backgroundRewriteDoneHandler中主要是4步驟:

  1. 打開子進程剛剛處理完的 aof 臨時文件
  2. 將停止發送增量數據期間積累的數據追加到 臨時AOF文件
  3. ?重命名,替換舊的aof文件
  4. 最后,進行清除工作
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {if (!bysignal && exitcode == 0) {int newfd, oldfd;char tmpfile[256];long long now = ustime();mstime_t latency;latencyStartMonitor(latency);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.child_pid);// 1 打開子進程剛剛處理完的 aof 臨時文件newfd = open(tmpfile,O_WRONLY|O_APPEND);if (newfd == -1) { goto cleanup; }// 2 將停止發送增量數據期間積累的數據追加到 臨時AOF文件if (aofRewriteBufferWrite(newfd) == -1) {close(newfd); goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {aof_background_fsync(newfd);} else if (server.aof_fsync == AOF_FSYNC_ALWAYS) {latencyStartMonitor(latency);if (redis_fsync(newfd) == -1) {close(newfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-done-fsync",latency);}// aof_fd 為當前的 AOF 文件的文件描述符, 等于 -1, 應該是 AOF 功能停用了// 這時為了下面的流程能走下去, 從配置文件中獲取到配置的文件名, 嘗試打開禁用前的文件if (server.aof_fd == -1) {/* AOF disabled */oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);} else {/* AOF enabled */oldfd = -1; /* We'll set this to the current AOF filedes later. */}latencyStartMonitor(latency);// 3  重命名,替換舊的aof文件if (rename(tmpfile,server.aof_filename) == -1) {close(newfd);if (oldfd != -1) close(oldfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rename",latency);if (server.aof_fd == -1) {/* AOF disabled, we don't need to set the AOF file descriptor* to this new file, so we can close it. */close(newfd);} else {/* AOF enabled, replace the old fd with the new one. */oldfd = server.aof_fd;server.aof_fd = newfd;server.aof_selected_db = -1; /* Make sure SELECT is re-issued */aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;server.aof_fsync_offset = server.aof_current_size;server.aof_last_fsync = server.unixtime;/* Clear regular AOF buffer since its contents was just written to* the new AOF from the background rewrite buffer. */sdsfree(server.aof_buf);server.aof_buf = sdsempty();}server.aof_lastbgrewrite_status = C_OK;/* Change state from WAIT_REWRITE to ON if needed */if (server.aof_state == AOF_WAIT_REWRITE)server.aof_state = AOF_ON;/* Asynchronously close the overwritten AOF. */if (oldfd != -1) bioCreateCloseJob(oldfd);} else if (!bysignal && exitcode != 0) {server.aof_lastbgrewrite_status = C_ERR;} else {if (bysignal != SIGUSR1)server.aof_lastbgrewrite_status = C_ERR;}cleanup://清除工作aofClosePipes();aofRewriteBufferReset();aofRemoveTempFile(server.child_pid);server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;server.aof_rewrite_time_start = -1;/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */if (server.aof_state == AOF_WAIT_REWRITE)server.aof_rewrite_scheduled = 1;
}

重寫失敗的話,原來的AOF文件依然是可以使用的。在AOF重寫過程中,新來的命令會被寫入磁盤兩次(主進程寫入到舊AOF,子進程是追加到臨時AOF),這就會浪費一定的磁盤空間(磁盤便宜大碗,沒問題的)。只是在重寫過程中,新的命令會被全部儲存到子進程的差異緩沖區中,這可能會導致較高的內存占用。

3. Redis重啟,AOF 文件加載

從main函數開始,其調用流程?main——>loadDataFromDisk——>loadAppendOnlyFile

其主要流程:

  1. 打開AOF文件
  2. 創建一個虛擬客戶端,用于執行AOF中的命令
  3. 根據aof文件中的前導碼,判斷若是REDIS開頭,就調用rdbLoadRio加載RDB的數據;否則將文件指針歸零;
  4. 開始循環處理RESP格式的字符串
    1. 按照RESP協議讀取命令的參數的個數
    2. 讀取命令的每個參數
    3. 根據第一個參數,查詢命令表,得到命令
    4. 執行命令
int main(int argc, char **argv) {.........if (!server.sentinel_mode) {    //非哨兵模式loadDataFromDisk();............}
}void loadDataFromDisk(void) {if (server.aof_state == AOF_ON) {loadAppendOnlyFile(server.aof_filename)   }...................
}int loadAppendOnlyFile(char *filename) {struct client *fakeClient;// 1  打開aof文件FILE *fp = fopen(filename,"r");struct redis_stat sb;int old_aof_state = server.aof_state;long loops = 0;off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {server.aof_current_size = 0;server.aof_fsync_offset = server.aof_current_size;fclose(fp);return C_ERR;}/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI* to the same file we're about to read. */server.aof_state = AOF_OFF;// 2  創建虛擬客戶端fakeClient = createAOFClient();startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);// 3  根據是否有RDB前導碼,再確定處理方式char sig[5]; /* "REDIS" */if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {/* No RDB preamble, seek back at 0 offset. */if (fseek(fp,0,SEEK_SET) == -1) goto readerr;} else {/* RDB preamble. Pass loading the RDB functions. */rio rdb;if (fseek(fp,0,SEEK_SET) == -1) goto readerr;rioInitWithFile(&rdb,fp);//加載rdb內容if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {goto readerr;}}// 4  循環處理Aof文件中剩下的所有命令while(1) {int argc, j;unsigned long len;robj **argv;char buf[128];sds argsds;struct redisCommand *cmd;/* Serve the clients from time to time */if (!(loops++ % 1000)) {loadingProgress(ftello(fp));processEventsWhileBlocked();processModuleLoadingProgressEvent(1);}if (fgets(buf,sizeof(buf),fp) == NULL) {if (feof(fp))break;elsegoto readerr;}if (buf[0] != '*') goto fmterr;if (buf[1] == '\0') goto readerr;// 4.1  按照resp協議讀取命令的參數數量argc = atoi(buf+1);if (argc < 1) goto fmterr;argv = zmalloc(sizeof(robj*)*argc);fakeClient->argc = argc;fakeClient->argv = argv;// 4.2  循環讀取命令的每個參數for (j = 0; j < argc; j++) {/* Parse the argument len. */char *readres = fgets(buf,sizeof(buf),fp);if (readres == NULL || buf[0] != '$') {fakeClient->argc = j; /* Free up to j-1. */freeFakeClientArgv(fakeClient);if (readres == NULL)goto readerr;elsegoto fmterr;}len = strtol(buf+1,NULL,10);/* Read it into a string object. */argsds = sdsnewlen(SDS_NOINIT,len);if (len && fread(argsds,len,1,fp) == 0) {sdsfree(argsds);fakeClient->argc = j; /* Free up to j-1. */freeFakeClientArgv(fakeClient);goto readerr;}argv[j] = createObject(OBJ_STRING,argsds);/* Discard CRLF. */if (fread(buf,2,1,fp) == 0) {fakeClient->argc = j+1; /* Free up to j. */freeFakeClientArgv(fakeClient);goto readerr;}}// 4.3  根據第一個參數,查詢命令表,獲取命令cmd = lookupCommand(argv[0]->ptr);if (cmd == server.multiCommand) valid_before_multi = valid_up_to;// 4.4 執行命令fakeClient->cmd = fakeClient->lastcmd = cmd;if (fakeClient->flags & CLIENT_MULTI &&fakeClient->cmd->proc != execCommand){queueMultiCommand(fakeClient);} else {cmd->proc(fakeClient);}/* Clean up. Command code may have changed argv/argc so we use the* argv/argc of the client instead of the local variables. */freeFakeClientArgv(fakeClient);fakeClient->cmd = NULL;if (server.aof_load_truncated) valid_up_to = ftello(fp);if (server.key_load_delay)debugDelay(server.key_load_delay);}if (fakeClient->flags & CLIENT_MULTI) {valid_up_to = valid_before_multi;goto uxeof;}..........................
}

goto部分的代碼:

int loadAppendOnlyFile(char *filename) {...................................
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */fclose(fp);freeFakeClient(fakeClient);server.aof_state = old_aof_state;stopLoading(1);aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;server.aof_fsync_offset = server.aof_current_size;return C_OK;readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */if (!feof(fp)) {if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));exit(1);}uxeof: /* Unexpected AOF end of file. */if (server.aof_load_truncated) {serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",(unsigned long long) valid_up_to);if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {if (valid_up_to == -1) {serverLog(LL_WARNING,"Last valid command offset is invalid");} else {serverLog(LL_WARNING,"Error truncating the AOF file: %s",strerror(errno));}} else {/* Make sure the AOF file descriptor points to the end of the* file after the truncate call. */if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",strerror(errno));} else {serverLog(LL_WARNING,"AOF loaded anyway because aof-load-truncated is enabled");goto loaded_ok;}}}if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);exit(1);fmterr: /* Format error. */if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);exit(1);
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/13200.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/13200.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/13200.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

24.HashMap的擴容機制

ps&#xff1a;沒看太懂源碼&#xff0c;不確定是否正確... 一、擴容條件 當HashMap中元素的總個數超過&#xff08;threshold&#xff09;閾值&#xff08;數組容量乘以負載因子&#xff09;時&#xff0c;會觸發擴容。默認情況下&#xff0c;&#xff08;capacity&#xff0…

JavaScript函數聲明

JS函數聲明 JS中的方法,多稱為函數,函數的聲明語法和JAVA中有較大區別 語法1&#xff1a;function 函數名 (參數列表){函數體} 語法2&#xff1a;var 函數名 function (參數列表){函數體} 函數說明 函數沒有權限控制符不用聲明函數的返回值類型,需要返回在函數體中直接return即…

UBUNTU下指定執行文件運行時查找庫的路徑

在Ubuntu下&#xff0c;當指定執行文件時&#xff0c;程序運行時會查找庫文件。通常情況下&#xff0c;程序會在系統默認的庫文件路徑中查找&#xff0c;例如/lib和/usr/lib。 如果需要程序在執行時查找特定路徑下的庫文件&#xff0c;可以通過以下方法實現&#xff1a; 設置環…

Gone框架介紹18 - redis 分布式緩存 和 分布式鎖

gone是可以高效開發Web服務的Golang依賴注入框架 github地址&#xff1a;https://github.com/gone-io/gone 文檔地址&#xff1a;https://goner.fun/zh/ 請幫忙在github上點個 ??吧&#xff0c;這對我很重要 &#xff1b;萬分感謝&#xff01;&#xff01; 文章目錄 利用redi…

Python | Leetcode Python題解之第92題反轉鏈表II

題目&#xff1a; 題解&#xff1a; class Solution:def reverseBetween(self, head: ListNode, left: int, right: int) -> ListNode:# 設置 dummyNode 是這一類問題的一般做法dummy_node ListNode(-1)dummy_node.next headpre dummy_nodefor _ in range(left - 1):pre…

云計算第十八課

目錄操作 移動 改名 批量改名&#xff0c;寫腳本 mv [選項] … 源文件或目錄… 目標文件或目錄 單個文件 移動 或者改名 -f&#xff1a;強制覆蓋&#xff0c;如果目標文件已經存在&#xff0c;則不詢問&#xff0c;直接強制覆蓋&#xff1b; -i&#xff1a;交互移動&#x…

零基礎學Java第十四天之抽象類

抽象類和抽象類的深入 抽象類 1、理解 抽象類&#xff08;Abstract Class&#xff09;是面向對象編程中的一個重要概念&#xff0c;尤其在像Java、C#和C等編程語言中。抽象類是一種特殊的類&#xff0c;它不能被實例化&#xff08;即不能創建抽象類的對象&#xff09;&#x…

鼠標懸浮(hover)時顯示提示框的效果

在Vue中&#xff0c;你可以使用多種方法來實現鼠標懸浮&#xff08;hover&#xff09;時顯示提示框的效果。以下是一個簡單的示例&#xff0c;它使用了Vue的指令&#xff08;directive&#xff09;和條件渲染&#xff08;conditional rendering&#xff09;來實現這個功能。 首…

關于FIFO Generator IP和XPM_FIFO在涉及位寬轉換上的區別

在Xilinx FPGA中&#xff0c;要實現FIFO的功能時&#xff0c;大部分時候會使用兩種方法&#xff1a; FIFO Generator IP核XPM_FIFO原語 FIFO Generator IP核的優點是有圖形化界面&#xff0c;配置參數非常直觀&#xff1b;缺點是參數一旦固定&#xff0c;想要更改的化就只能重…

一次tomcat閃退處理

雙擊tomcat目錄下bin目錄中startup.bat 在我的電腦上是一閃而過&#xff0c;不能正常地啟動tomcat軟件 以記事本打開startup.bat文件&#xff0c;在文件的結尾處加上pause 然后再雙擊該bat執行&#xff0c;此時窗口就不會關閉&#xff0c;并會將錯誤信息打印在提示框中 可能是…

英偉達發布 VILA 視覺語言模型,實現多圖像推理、增強型上下文學習,性能超越 LLaVA-1.5

前言 近年來&#xff0c;大型語言模型 (LLM) 的發展取得了顯著的成果&#xff0c;并逐漸應用于多模態領域&#xff0c;例如視覺語言模型 (VLM)。VLM 旨在將 LLM 的強大能力擴展到視覺領域&#xff0c;使其能夠理解和處理圖像和文本信息&#xff0c;并完成諸如視覺問答、圖像描…

一看就會的AOP事務

文章目錄 AOPAOP簡介AOP簡介和作用AOP的應用場景為什么要學習AOP AOP入門案例思路分析代碼實現AOP中的核心概念 AOP工作流程AOP工作流程AOP核心概念在測試類中驗證代理對象 AOP切入點表達式語法格式通配符書寫技巧 AOP通知類型AOP通知分類AOP通知詳解 AOP案例案例-測量業務層接…

Linux bc命令(bc指令)(基本計算器)(任意精度計算語言:支持浮點數運算、變量賦值和自定義函數等)

文章目錄 bc命令文檔英文中文 Linux bc 命令詳解bc 命令的基本用法啟動 bc 環境進行基本計算退出 bc bc 中的數學功能執行高級數學計算平方根和指數函數對數函數 處理精度問題 變量和數組變量賦值和使用數組的使用 創建和使用自定義函數 bc 命令的高級用法在腳本中使用 bc基本腳…

Google I/O 大會 | 精彩看點一覽

作者 / 開發者關系和開源總監 Timothy Jordan 2024 年 Google I/O 大會于北京時間 5 月 15 日 1:00am 在加利福尼亞的山景城以 Google 主題演講直播拉開序幕。隨后&#xff0c;在北京時間 4:30am 舉行開發者主題演講。大家可前往回看 "Google 主題演講" 以及 "開…

AIGC時代已至,你準備好抓住機遇了嗎?

一、行業前景 AIGC&#xff0c;即人工智能生成內容&#xff0c;是近年來人工智能領域中發展迅猛的一個分支。隨著大數據、云計算、機器學習等技術的不斷進步&#xff0c;AIGC已經取得了顯著的成果&#xff0c;并且在廣告、游戲、自媒體、教育、電商等多個領域實現了廣泛應用。…

AI寫算法:支持向量機(SVM)

在Python中&#xff0c;我們可以使用scikit-learn庫來實現支持向量機&#xff08;SVM&#xff09;。以下是一個簡單的示例&#xff0c;演示如何使用scikit-learn的SVC類來訓練一個SVM分類器&#xff0c;并使用它對一些數據進行預測。 python復制代碼 # 導入必要的庫 from skle…

圖像中的attention及QKV機制解釋

簡單記錄/推薦兩篇博客&#xff0c;后續細化寫一下&#xff1a; 圖像中的各類 attention https://blog.csdn.net/weixin_44505185/article/details/127013204 Cross-attention的直觀理解 首先理解&#xff0c;cross-attention 是兩個不同向量間的相關計算&#xff0c;一般Q…

DolphinScheduler(海豚調度)- docker部署實戰

1.官方文檔 https://dolphinscheduler.apache.org/zh-cn/docs/3.2.1/guide/start/docker 2.docker環境安裝 版本情況&#xff08;這個地方踩了不少坑&#xff09;&#xff1a;docker-26.1.2&#xff0c;docker-compose-v2.11.0。 具體可使用我上傳的安裝包&#xff0c;一鍵安…

leetcode題目55

跳躍游戲 中等 給你一個非負整數數組 nums &#xff0c;你最初位于數組的 第一個下標 。數組中的每個元素代表你在該位置可以跳躍的最大長度。 判斷你是否能夠到達最后一個下標&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否則&#xff0c;返回 false 。 示例 1…

MT3037 新月軒就餐

思路&#xff1a; 此題每道菜的價錢相同&#xff0c;想最小化付的錢即求最小區間長度可以滿足“品嘗到所有名廚手藝”。 使用雙端隊列存儲元素&#xff0c;隊尾不斷向后遍歷&#xff1a;頭->尾 如果隊頭隊尾&#xff0c;則隊頭往右移一格&#xff0c;直到區間不同元素數m…