六:定時發送消息
???????? 哨兵每隔一段時間,會向其所監控的所有實例發送一些命令,用于獲取這些實例的狀態。這些命令包括:”PING”、”INFO”和”PUBLISH”。
???????? “PING”命令,主要用于哨兵探測實例是否活著。如果對方超過一段時間,還沒有回復”PING”命令,則認為其是主觀下線了。
???????? “INFO”命令,主要用于哨兵獲取實例當前的狀態和信息,比如該實例當前是主節點還是從節點;該實例反饋的IP地址和PORT信息,是否與我記錄的一樣;該實例如果是主節點的話,那它都有哪些從節點;該實例如果是從節點的話,它與主節點是否連通,它的優先級是多少,它的復制偏移量是多少等等,這些信息在故障轉移流程中,是判斷實例狀態的重要信息;
???????? “PUBLISH”命令,主要用于哨兵向實例的HELLO頻道發布有關自己以及主節點的信息,也就是所謂的HELLO消息。因為所有哨兵都會訂閱主節點和從節點的HELLO頻道,因此,每個哨兵都會收到其他哨兵發布的信息。
???????? 因此,通過這些命令,盡管在配置文件中只配置了主節點的信息,但是哨兵可以通過主節點的”INFO”回復,得到所有從節點的信息;又可以通過訂閱實例的HELLO頻道,接收其他哨兵通過”PUBLISH”命令發布的信息,從而得到監控同一主節點的所有其他哨兵的信息。
?
???????? 在“主函數”sentinelHandleRedisInstance中,是通過調用sentinelSendPeriodicCommands來發送這些命令的。注意,以上的命令都有自己的發送周期,在sentinelSendPeriodicCommands函數中,并不是一并發送三個命令,而是發送那些,按照發送周期應該發送的命令。該函數的代碼如下:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {mstime_t now = mstime();mstime_t info_period, ping_period;int retval;/* Return ASAP if we have already a PING or INFO already pending, or* in the case the instance is not properly connected. */if (ri->flags & SRI_DISCONNECTED) return;/* For INFO, PING, PUBLISH that are not critical commands to send we* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't* want to use a lot of memory just because a link is not working* properly (note that anyway there is a redundant protection about this,* that is, the link will be disconnected and reconnected if a long* timeout condition is detected. */if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;/* If this is a slave of a master in O_DOWN condition we start sending* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD* period. In this state we want to closely monitor slaves in case they* are turned into masters by another Sentinel, or by the sysadmin. */if ((ri->flags & SRI_SLAVE) &&(ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {info_period = 1000;} else {info_period = SENTINEL_INFO_PERIOD;}/* We ping instances every time the last received pong is older than* the configured 'down-after-milliseconds' time, but every second* anyway if 'down-after-milliseconds' is greater than 1 second. */ping_period = ri->down_after_period;if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;if ((ri->flags & SRI_SENTINEL) == 0 &&(ri->info_refresh == 0 ||(now - ri->info_refresh) > info_period)){/* Send INFO to masters and slaves, not sentinels. */retval = redisAsyncCommand(ri->cc,sentinelInfoReplyCallback, NULL, "INFO");if (retval == REDIS_OK) ri->pending_commands++;} else if ((now - ri->last_pong_time) > ping_period) {/* Send PING to all the three kinds of instances. */sentinelSendPing(ri);} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {/* PUBLISH hello messages to all the three kinds of instances. */sentinelSendHello(ri);}
}
? ? ? ???如果實例標志位中設置了SRI_DISCONNECTED標記,說明當前實例的異步上下文還沒有創建好,因此直接返回;
???????? 實例的pending_commands屬性,表示已經向該實例發送的命令中,尚有pending_commands個命令還沒有收到回復。每次調用redisAsyncCommand函數,向實例異步發送一條命令之后,就會增加該屬性的值,而每當收到命令回復之后,就會減少該屬性的值;
? ? ? ???因此,如果該屬性的值大于SENTINEL_MAX_PENDING_COMMANDS(100),說明該實例尚有超過100條命令的回復信息沒有收到。這種情況下,說明與實例的連接已經不正常了,為了節約內存,因此直接返回;
???????? 接下來計算info_period和ping_period,這倆值表示發送"INFO"和"PING"命令的時間周期。如果當前時間距離上次收到"INFO"或"PING"回復的時間已經超過了info_period或ping_period,則向實例發送"INFO"或"PING"命令;
???????? 如果當前實例為從節點,并且該從節點對應的主節點已經客觀下線了,則置info_period為1000,否則的話置為SENTINEL_INFO_PERIOD(10000)。之所以在主節點客觀下線后更頻繁的向從節點發送"INFO"命令,是因為從節點可能會被置為新的主節點,因此需要更加實時的獲取其狀態;
???????? 將ping_period置為ri->down_after_period的值,該屬性的值是根據配置文件中down-after-milliseconds選項得到的,如果該屬性值大于SENTINEL_PING_PERIOD(1000),則將ping_period置為SENTINEL_PING_PERIOD;
???????? 接下來開始發送命令:如果當前實例不是哨兵實例,并且距離上次收到"INFO"命令回復已經超過了info_period,則向該實例異步發送"INFO"命令。
???????? 否則,如果距離上次收到"PING"命令回復已經超過了ping_period,則調用函數sentinelSendPing向該實例異步發送"PING"命令;
???????? 否則,如果距離上次收到"PUBLISH"命令的回復已經超過了SENTINEL_PUBLISH_PERIOD(2000),則調用函數sentinelSendHello向該實例異步發送"PUBLISH"命令;
???????? 因此,"PING"用于探測實例是否活著,可以發送給所有類型的實例;而"INFO"命令用于獲取實例的信息,只需發送給主節點和從節點實例;而"PUBLISH"用于向HELLO頻道發布哨兵本身和主節點的信息,除了發送給主節點和從節點之外,哨兵本身也實現了"PUBLISH"命令的處理函數,因此該命令也會發送給哨兵實例。
?
1:PING消息
???????? 函數sentinelSendPing用于向實例發送”PING”命令,因為該命令用于探測實例是否主觀下線,因此等到后面講解主觀下線是在分析。
?
2:HELLO消息
???????? 函數sentinelSendHello用于發布HELLO消息,它的代碼如下:
int sentinelSendHello(sentinelRedisInstance *ri) {char ip[REDIS_IP_STR_LEN];char payload[REDIS_IP_STR_LEN+1024];int retval;char *announce_ip;int announce_port;sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);if (ri->flags & SRI_DISCONNECTED) return REDIS_ERR;/* Use the specified announce address if specified, otherwise try to* obtain our own IP address. */if (sentinel.announce_ip) {announce_ip = sentinel.announce_ip;} else {if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)return REDIS_ERR;announce_ip = ip;}announce_port = sentinel.announce_port ?sentinel.announce_port : server.port;/* Format and send the Hello message. */snprintf(payload,sizeof(payload),"%s,%d,%s,%llu," /* Info about this sentinel. */"%s,%s,%d,%llu", /* Info about current master. */announce_ip, announce_port, server.runid,(unsigned long long) sentinel.current_epoch,/* --- */master->name,master_addr->ip,master_addr->port,(unsigned long long) master->config_epoch);retval = redisAsyncCommand(ri->cc,sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",SENTINEL_HELLO_CHANNEL,payload);if (retval != REDIS_OK) return REDIS_ERR;ri->pending_commands++;return REDIS_OK;
}
? ? ? ???首先得到實例ri所屬的主節點實例master;然后調用sentinelGetCurrentMasterAddress函數得到master的地址信息;
???????? 如果實例ri的標志位中具有SRI_DISCONNECTED標記的話,直接返回;
???????? 如果當前哨兵配置了sentinel.announce_ip的話,則使用該ip信息作為自己的ip地址,否則,調用anetSockName函數,根據socket描述符得到當前哨兵的ip地址;
???????? 如果當前哨兵配置了sentinel.announce_port的話,則使用該port信息作為自己的端口信息,否則,使用server.port作為當前哨兵的端口信息;
???????? 接下來組裝要發布的HELLO信息,HELLO信息的格式是:"sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch"
???????? 接下來,向ri異步發送"PUBLISH__sentinel__:hello <HELLO>"命令,設置命令回調函數為sentinelPublishReplyCallback;
?
???????? 當哨兵收到實例對于該”PUBLISH”命令的回復之后,會調用回調函數sentinelPublishReplyCallback,該函數只用于更新屬性ri->last_pub_time,對回復內容無需關心:
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {sentinelRedisInstance *ri = c->data;redisReply *r;REDIS_NOTUSED(privdata);if (ri) ri->pending_commands--;if (!reply || !ri) return;r = reply;/* Only update pub_time if we actually published our message. Otherwise* we'll retry again in 100 milliseconds. */if (r->type != REDIS_REPLY_ERROR)ri->last_pub_time = mstime();
}
?
???????? 之前在介紹sentinelReconnectInstance函數時講過,當哨兵向主節點或從節點實例建立訂閱連接時,向實例發送” SUBSCRIBE __sentinel__:hello"命令,訂閱HELLO頻道時,設置該命令的回調函數為sentinelReceiveHelloMessages。因此,當收到該頻道上發布的消息時,就會調用函數sentinelReceiveHelloMessages。
???????? 該頻道上的消息,是監控同一實例的其他哨兵節點發來的HELLO消息,當前哨兵通過HELLO消息,來發現其他哨兵,并且相互之間交互最新的主節點信息。sentinelReceiveHelloMessages函數的代碼如下:
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {sentinelRedisInstance *ri = c->data;redisReply *r;REDIS_NOTUSED(privdata);if (!reply || !ri) return;r = reply;/* Update the last activity in the pubsub channel. Note that since we* receive our messages as well this timestamp can be used to detect* if the link is probably disconnected even if it seems otherwise. */ri->pc_last_activity = mstime();/* Sanity check in the reply we expect, so that the code that follows* can avoid to check for details. */if (r->type != REDIS_REPLY_ARRAY ||r->elements != 3 ||r->element[0]->type != REDIS_REPLY_STRING ||r->element[1]->type != REDIS_REPLY_STRING ||r->element[2]->type != REDIS_REPLY_STRING ||strcmp(r->element[0]->str,"message") != 0) return;/* We are not interested in meeting ourselves */if (strstr(r->element[2]->str,server.runid) != NULL) return;sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}
???????? 該函數中,首先更新ri->pc_last_activity為當前時間;
???????? 然后判斷是否處理接收到的消息,注意,只處理"message"消息,也就是說不會處理"subscribe"消息;
???????? 注意,如果收到的"message"消息中,包含了自身的runid,說明這是本哨兵自己發送的消息,因此無需處理,直接返回;
???????? 最后,調用sentinelProcessHelloMessage函數處理收到的HELLO消息;
? ? ? ???注意:在測試時發現會收到從節點重復的HELLO消息,也就是同一時間,同一個哨兵發布的兩條一模一樣的消息。這是因為哨兵向主節點發送的”PUBLISH”命令,會因為主從復制的原因,而同步到從節點;而同時該哨兵也向從節點發送”PUBLISH”命令,因此,從節點就會在同一時間,收到兩條一模一樣的HELLO消息,并將它們發布到頻道上。
?
???????? 另外,一旦哨兵發現了其他哨兵之后,可以直接向其發送"PUBLISH __sentinel__:hello <HELLO>"命令。哨兵自己實現了”PUBLISH”的處理函數sentinelPublishCommand,當收到其他哨兵直接發來的HELLO消息時,就會調用該函數處理。該函數的代碼如下:
void sentinelPublishCommand(redisClient *c) {if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");return;}sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));addReplyLongLong(c,1);
}
???????? 因此,不管是從真正的訂閱頻道中收到HELLO消息,還是直接收到其他哨兵發來的”PUBLISH”命令,最終都是通過sentinelProcessHelloMessage函數對HELLO消息進行處理的。該函數的代碼如下:
void sentinelProcessHelloMessage(char *hello, int hello_len) {/* Format is composed of 8 tokens:* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,* 5=master_ip,6=master_port,7=master_config_epoch. */int numtokens, port, removed, master_port;uint64_t current_epoch, master_config_epoch;char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);sentinelRedisInstance *si, *master;if (numtokens == 8) {/* Obtain a reference to the master this hello message is about */master = sentinelGetMasterByName(token[4]);if (!master) goto cleanup; /* Unknown master, skip the message. *//* First, try to see if we already have this sentinel. */port = atoi(token[1]);master_port = atoi(token[6]);si = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,token[0],port,token[2]);current_epoch = strtoull(token[3],NULL,10);master_config_epoch = strtoull(token[7],NULL,10);if (!si) {/* If not, remove all the sentinels that have the same runid* OR the same ip/port, because it's either a restart or a* network topology change. */removed = removeMatchingSentinelsFromMaster(master,token[0],port,token[2]);if (removed) {sentinelEvent(REDIS_NOTICE,"-dup-sentinel",master,"%@ #duplicate of %s:%d or %s",token[0],port,token[2]);}/* Add the new sentinel. */si = createSentinelRedisInstance(NULL,SRI_SENTINEL,token[0],port,master->quorum,master);if (si) {sentinelEvent(REDIS_NOTICE,"+sentinel",si,"%@");/* The runid is NULL after a new instance creation and* for Sentinels we don't have a later chance to fill it,* so do it now. */si->runid = sdsnew(token[2]);sentinelFlushConfig();}}/* Update local current_epoch if received current_epoch is greater.*/if (current_epoch > sentinel.current_epoch) {sentinel.current_epoch = current_epoch;sentinelFlushConfig();sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",(unsigned long long) sentinel.current_epoch);}/* Update master info if received configuration is newer. */if (master->config_epoch < master_config_epoch) {master->config_epoch = master_config_epoch;if (master_port != master->addr->port ||strcmp(master->addr->ip, token[5])){sentinelAddr *old_addr;sentinelEvent(REDIS_WARNING,"+config-update-from",si,"%@");sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",master->name,master->addr->ip, master->addr->port,token[5], master_port);old_addr = dupSentinelAddr(master->addr);sentinelResetMasterAndChangeAddress(master, token[5], master_port);sentinelCallClientReconfScript(master,SENTINEL_OBSERVER,"start",old_addr,master->addr);releaseSentinelAddr(old_addr);}}/* Update the state of the Sentinel. */if (si) si->last_hello_time = mstime();}cleanup:sdsfreesplitres(token,numtokens);
}
? ? ? ???首先,根據消息中的master_name,調用函數sentinelGetMasterByName,在字典sentinel.masters中尋找相應的主節點實例master,如果找不到,則直接退出;
???????? 然后,調用getSentinelRedisInstanceByAddrAndRunID函數,根據消息中的sentinel_ip,sentinel_port和sentinel_runid信息,在字典master->sentinels中,找到runid,ip和port都匹配的哨兵實例。
???????? 如果沒有找到匹配的哨兵實例,要么這是一個新發現的哨兵,要么是某個哨兵的信息發生了變化(比如有可能某個哨兵實例重啟了,導致runid發生了變化;或者網絡拓撲發生了變化,導致ip或port發生了變化)。
???????? 這種情況下,首先調用函數removeMatchingSentinelsFromMaster,刪除字典master->sentinels中,具有相同runid,或者具有相同ip和port的哨兵實例;然后根據HELLO消息中的ip和port信息,重新創建一個新的哨兵實例,添加到字典master->sentinels中,這樣下次調用sentinelReconnectInstance時,就會向該哨兵實例進行建鏈了。;
???????? 如果找到了匹配的哨兵實例,并且HELLO消息中的sentinel_current_epoch,大于本實例當前的current_epoch,則更新本實例的current_epoch屬性;
???????? 如果HELLO消息中的master_config_epoch,大于本實例記錄的master的config_epoch,則更新本實例記錄的master的config_epoch。并且如果HELLO消息中的master_ip或master_port,與本實例記錄的主節點的ip或port信息不匹配的話,則說明可能發生了故障轉移,某個從節點升級成為了新的主節點,因此調用sentinelResetMasterAndChangeAddress函數,重置主節點,及其從節點實例的信息;
???????? 最后,更新si->last_hello_time屬性為當前時間;
?
3:”INFO”命令
???????? “INFO”命令,主要用于哨兵獲取主從節點實例當前的狀態和信息,比如該實例當前是主節點還是從節點;該實例反饋的IP地址和PORT信息,是否與本哨兵記錄的一樣;該實例如果是主節點的話,那它都有哪些從節點;該實例如果是從節點的話,它與主節點是否連通,它的優先級是多少,它的復制偏移量是多少等等,這些信息在故障轉移流程中,是判斷實例狀態的重要信息;
???????? 在sentinelSendPeriodicCommands函數中,設置的”INFO”命令的回調函數是sentinelInfoReplyCallback。該函數的代碼很簡單,主要是調用sentinelRefreshInstanceInfo函數對回復進行處理。因此,主要看一下sentinelRefreshInstanceInfo函數的代碼:
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {sds *lines;int numlines, j;int role = 0;/* The following fields must be reset to a given value in the case they* are not found at all in the INFO output. */ri->master_link_down_time = 0;/* Process line by line. */lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);for (j = 0; j < numlines; j++) {sentinelRedisInstance *slave;sds l = lines[j];/* run_id:<40 hex chars>*/if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {if (ri->runid == NULL) {ri->runid = sdsnewlen(l+7,40);} else {if (strncmp(ri->runid,l+7,40) != 0) {sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");sdsfree(ri->runid);ri->runid = sdsnewlen(l+7,40);}}}/* old versions: slave0:<ip>,<port>,<state>* new versions: slave0:ip=127.0.0.1,port=9999,... */if ((ri->flags & SRI_MASTER) &&sdslen(l) >= 7 &&!memcmp(l,"slave",5) && isdigit(l[5])){char *ip, *port, *end;if (strstr(l,"ip=") == NULL) {/* Old format. */ip = strchr(l,':'); if (!ip) continue;ip++; /* Now ip points to start of ip address. */port = strchr(ip,','); if (!port) continue;*port = '\0'; /* nul term for easy access. */port++; /* Now port points to start of port number. */end = strchr(port,','); if (!end) continue;*end = '\0'; /* nul term for easy access. */} else {/* New format. */ip = strstr(l,"ip="); if (!ip) continue;ip += 3; /* Now ip points to start of ip address. */port = strstr(l,"port="); if (!port) continue;port += 5; /* Now port points to start of port number. *//* Nul term both fields for easy access. */end = strchr(ip,','); if (end) *end = '\0';end = strchr(port,','); if (end) *end = '\0';}/* Check if we already have this slave into our table,* otherwise add it. */if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,atoi(port), ri->quorum, ri)) != NULL){sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");sentinelFlushConfig();}}}/* master_link_down_since_seconds:<seconds> */if (sdslen(l) >= 32 &&!memcmp(l,"master_link_down_since_seconds",30)){ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;}/* role:<role> */if (!memcmp(l,"role:master",11)) role = SRI_MASTER;else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;if (role == SRI_SLAVE) {/* master_host:<host> */if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {if (ri->slave_master_host == NULL ||strcasecmp(l+12,ri->slave_master_host)){sdsfree(ri->slave_master_host);ri->slave_master_host = sdsnew(l+12);ri->slave_conf_change_time = mstime();}}/* master_port:<port> */if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {int slave_master_port = atoi(l+12);if (ri->slave_master_port != slave_master_port) {ri->slave_master_port = slave_master_port;ri->slave_conf_change_time = mstime();}}/* master_link_status:<status> */if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {ri->slave_master_link_status =(strcasecmp(l+19,"up") == 0) ?SENTINEL_MASTER_LINK_STATUS_UP :SENTINEL_MASTER_LINK_STATUS_DOWN;}/* slave_priority:<priority> */if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))ri->slave_priority = atoi(l+15);/* slave_repl_offset:<offset> */if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))ri->slave_repl_offset = strtoull(l+18,NULL,10);}}ri->info_refresh = mstime();sdsfreesplitres(lines,numlines);/* ---------------------------- Acting half -----------------------------* Some things will not happen if sentinel.tilt is true, but some will* still be processed. *//* Remember when the role changed. */if (role != ri->role_reported) {ri->role_reported_time = mstime();ri->role_reported = role;if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();/* Log the event with +role-change if the new role is coherent or* with -role-change if there is a mismatch with the current config. */sentinelEvent(REDIS_VERBOSE,((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?"+role-change" : "-role-change",ri, "%@ new reported role is %s",role == SRI_MASTER ? "master" : "slave",ri->flags & SRI_MASTER ? "master" : "slave");}/* None of the following conditions are processed when in tilt mode, so* return asap. */if (sentinel.tilt) return;/* Handle master -> slave role switch. */if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {/* Nothing to do, but masters claiming to be slaves are* considered to be unreachable by Sentinel, so eventually* a failover will be triggered. */}...
}
? ? ? ???該函數首先在for循環中解析"INFO"回復信息:
? ? ? ???首先解析出"run_id"之后的信息,保存在ri->runid中。如果該實例的runid發生了變化,還需要記錄日志,向"+reboot"頻道發布消息;
???????? 如果實例為主節點,則解析"slave"后的從節點信息,取出其中的ip和port信息,然后根據ip和port,調用sentinelRedisInstanceLookupSlave函數,在字典ri->slaves中尋找是否已經保存了該從節點的信息。如果沒有,則調用createSentinelRedisInstance創建從節點實例,并插入到ri->slaves中,也就是發現了主節點屬下的從節點,下次調用函數sentinelReconnectInstance時,就會向該從節點建鏈了;
???????? 解析"master_link_down_since_seconds"信息,該信息表示從節點與主節點的斷鏈時間。將其轉換成整數后,記錄到ri->master_link_down_time中;
???????? 解析"role"信息,如果包含"role:master",則置role為SRI_MASTER,說明該實例報告自己為主節點;如果包含"role:slave",則置role為SRI_SLAVE,說明該實例報告自己為從節點;
???????? 如果role為SRI_SLAVE,找到回復信息中的"master_host:"信息,記錄到ri->slave_master_host中;找到回復信息中的"master_port:"信息,記錄到ri->slave_master_port中;找到回復信息中的"master_link_status:"信息,根據其值是否為"up",記錄到ri->slave_master_link_status中;找到回復信息中的"slave_priority:"信息,記錄到ri->slave_priority中;找到回復信息中的"slave_repl_offset:"信息,記錄到ri->slave_repl_offset中;
???????? 解析完所有"INFO"回復信息之后,更新ri->info_refresh為當前時間;
????????
???????? 接下來根據實例的角色信息執行一些動作:
???????? ri->role_reported的初始值是根據ri->flags得到的,如果收到"INFO"回復后,解析得到的role與ri->role_reported不同,說明該實例的角色發生了變化,比如從主節點變成了從節點,或者相反。只要role與ri->role_reported不同,就首先更新ri->role_reported_time為當前時間,并且將ri->role_reported置為role;如果role為SRI_SLAVE,還需要更新ri->slave_conf_change_time的值為當前時間;最后,還根據ri->flags中的角色是否與role,來記錄日志,發布信息;
???????? 如果當前哨兵已經進入了TILT模式,則直接返回;
???????? 如果ri->flags中為主節點,但是role為從節點,這種情況無需采取動作,因為這種情況會被視為主節點不可達,最終會引發故障遷移流程;
???????? 本函數剩下的動作,與故障轉移流程有關,后續在介紹。
?
七:判斷實例是否主觀下線
???????? 首先解釋一下主觀下線和客觀下線的區別。
???????? 所謂主觀下線,就是從“我”(當前實例)的角度來看,某個實例已經下線了。但是單個哨兵的視角可能是盲目的,僅從“我”的角度,就決定一個實例下線是武斷的。因此,“我”還會通過命令詢問其他哨兵節點,看它們是否也認為該實例已經下線了,如果超過quorum個(包括“我”)哨兵反饋認為該實例已經下線了,則“我”就會認為該實例確實已經下線了,也就是所謂的客觀下線了。
?
???????? 判斷某個實例主觀下線,主要是根據其是否能及時回復”PING”命令決定的。因此,首先看一下發送”PING”命令的函數sentinelSendPing的實現:
int sentinelSendPing(sentinelRedisInstance *ri) {int retval = redisAsyncCommand(ri->cc,sentinelPingReplyCallback, NULL, "PING");if (retval == REDIS_OK) {ri->pending_commands++;/* We update the ping time only if we received the pong for* the previous ping, otherwise we are technically waiting* since the first ping that did not received a reply. */if (ri->last_ping_time == 0) ri->last_ping_time = mstime();return 1;} else {return 0;}
}
???????? 在該函數中,設置收到”PING”命令回復后的回調函數為sentinelPingReplyCallback。
需要注意的是,如果ri->last_ping_time值為0,則更新ri->last_ping_time為當前時間。而只有在收到"PING"命令的正常回復之后,ri->last_ping_time的值才會被置為0。
?
???????? 下面是回調函數sentinelPingReplyCallback的代碼:
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {sentinelRedisInstance *ri = c->data;redisReply *r;REDIS_NOTUSED(privdata);if (ri) ri->pending_commands--;if (!reply || !ri) return;r = reply;if (r->type == REDIS_REPLY_STATUS ||r->type == REDIS_REPLY_ERROR) {/* Update the "instance available" field only if this is an* acceptable reply. */if (strncmp(r->str,"PONG",4) == 0 ||strncmp(r->str,"LOADING",7) == 0 ||strncmp(r->str,"MASTERDOWN",10) == 0){ri->last_avail_time = mstime();ri->last_ping_time = 0; /* Flag the pong as received. */} else {/* Send a SCRIPT KILL command if the instance appears to be* down because of a busy script. */if (strncmp(r->str,"BUSY",4) == 0 &&(ri->flags & SRI_S_DOWN) &&!(ri->flags & SRI_SCRIPT_KILL_SENT)){if (redisAsyncCommand(ri->cc,sentinelDiscardReplyCallback, NULL,"SCRIPT KILL") == REDIS_OK)ri->pending_commands++;ri->flags |= SRI_SCRIPT_KILL_SENT;}}}ri->last_pong_time = mstime();
}
???????? 如果回復信息為"PONG","LOADING"或"MASTERDOWN",表示正常回復,因此置該實例的屬性ri->last_avail_time為當前時間,并且置ri->last_ping_time為0,這樣下次發送"PING"命令時就會更新ri->last_ping_time的值了;
???????? 如果回復信息以"BUSY"開頭,并且該實例已經被置為主觀下線,并且還沒有向該實例發送過"SCRIPT KILL"命令,則向該實例發送"SCRIPTKILL"命令;
???????? 最后,不管回復信息是什么,更新ri->last_pong_time為當前時間。
?
???????? 因此,有關”PING”命令的時間屬性總結如下:
? ? ? ???ri->last_ping_time:上一次正常發送”PING”命令的時間。需要注意的是,只有當收到"PING"命令的正常回復后,下次發送"PING"命令時才會更新該屬性為當時時間戳。如果發送”PING”命令后,沒有收到任何回復,或者沒有收到正常回復,則下次發送”PING”命令時,就不會更新該屬性。如果該屬性值為0,說明已經收到了上一個"PING"命令的正常回復,但是還沒有開始發送下一個"PING"命令。檢測實例是否主觀下線,主要就是根據該屬性判斷的。
? ? ? ???ri->last_pong_time:每當收到"PING"命令的回復后,不管是否是正常恢復,都會更新該屬性為當時時間戳;
?
???????? 在哨兵的“主函數”sentinelHandleRedisInstance中,調用sentinelCheckSubjectivelyDown函數檢測實例是否主觀下線,該函數同時還會檢測TCP連接是否正常。該函數的代碼如下:
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {mstime_t elapsed = 0;if (ri->last_ping_time)elapsed = mstime() - ri->last_ping_time;/* Check if we are in need for a reconnection of one of the* links, because we are detecting low activity.** 1) Check if the command link seems connected, was connected not less* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a* pending ping for more than half the timeout. */if (ri->cc &&(mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&ri->last_ping_time != 0 && /* Ther is a pending ping... *//* The pending ping is delayed, and we did not received* error replies as well. */(mstime() - ri->last_ping_time) > (ri->down_after_period/2) &&(mstime() - ri->last_pong_time) > (ri->down_after_period/2)){sentinelKillLink(ri,ri->cc);}/* 2) Check if the pubsub link seems connected, was connected not less* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no* activity in the Pub/Sub channel for more than* SENTINEL_PUBLISH_PERIOD * 3.*/if (ri->pc &&(mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&(mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)){sentinelKillLink(ri,ri->pc);}/* Update the SDOWN flag. We believe the instance is SDOWN if:** 1) It is not replying.* 2) We believe it is a master, it reports to be a slave for enough time* to meet the down_after_period, plus enough time to get two times* INFO report from the instance. */if (elapsed > ri->down_after_period ||(ri->flags & SRI_MASTER &&ri->role_reported == SRI_SLAVE &&mstime() - ri->role_reported_time >(ri->down_after_period+SENTINEL_INFO_PERIOD*2))){/* Is subjectively down */if ((ri->flags & SRI_S_DOWN) == 0) {sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");ri->s_down_since_time = mstime();ri->flags |= SRI_S_DOWN;}} else {/* Is subjectively up */if (ri->flags & SRI_S_DOWN) {sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);}}
}
? ? ? ???ri->cc_conn_time屬性表示上一次向該實例發起命令類型的TCP建鏈的時間;ri->pc_conn_time屬性表示上一次向該實例發起訂閱類型的TCP建鏈的時間;
? ? ? ???首先計算elapsed的值,該值表示是當前時間與ri->last_ping_time之間的時間差;
???????? 然后判斷命令類型的TCP連接是否正常,不正常的條件是:距離上次建鏈時已經超過了SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且上次發送"PING"后還沒有收到正常回復,且當前時間與ri->last_ping_time之間的時間差已經超過了ri->down_after_period/2,并且距離上次收到任何"PING"回復的時間,已經超過了ri->down_after_period/2;
???????? 如果命令類型的連接不正常了,則直接調用sentinelKillLink斷開連接,釋放異步上下文;
?
???????? 然后判斷訂閱類型的TCP連接是否正常,不正常的條件是:距離上次建鏈時已經超過了SENTINEL_MIN_LINK_RECONNECT_PERIOD,并且距離上次收到訂閱頻道發來的任何消息的時間,已經超過了SENTINEL_PUBLISH_PERIOD*3;
???????? 如果訂閱類型的連接不正常了,則直接調用sentinelKillLink斷開連接,釋放異步上下文;
?
???????? 如果elapsed的值大于ri->down_after_period,或者:當前實例我認為它是主節點,但是它的"INFO"回復中卻報告自己是從節點,并且距離上次收到它在"INFO"回復中報告自己是從節點的時間,已經超過了ri->down_after_period+SENTINEL_INFO_PERIOD*2;
???????? 滿足以上任意一個條件,都認為該實例是主觀下線了。因此:只要該實例還沒有標志為主觀下線,則將SRI_S_DOWN標記增加到實例標志位中,表示該實例主觀下線;
???????? 如果不滿足以上條件,但是該實例之前已經被標記為主觀下線了,則認為該實例主觀上線了,去掉其標志位中的SRI_S_DOWN和SRI_SCRIPT_KILL_SENT標記;