Base 2.8.7
接Redis Sentinel源碼分析(一)
sentinelTimer函數周期性運行,第一次在服務啟動後1ms執行,後續執行周期1000/server.hz(sentinelTimer函數會修改server.hz的值)
sentinelTimer內部包含sentinel模式需要定期執行的操作,包括check master、slave、sentinel的狀態,並根據配置的條件判斷是否需要fail over。
void sentinelTimer(void) { //check是否需要進入TITL模式 sentinelCheckTiltCondition(); //執行定期操作(檢查redis-server狀態,和其他sentinel節點交互等) sentinelHandleDictOfRedisInstances(sentinel.masters); //運行等待執行的腳本 sentinelRunPendingScripts(); //清理已執行完畢腳本 sentinelCollectTerminatedScripts(); //殺死超時運行的腳本 sentinelKillTimedoutScripts(); //修改hz值(影響sentinel相關操作執行頻率),引入隨機值,盡量避免所有sentinel節點持續性的同一時間發起投票請求 server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ; }
void sentinelCheckTiltCondition(void) { mstime_t now = mstime(); mstime_t delta = now - sentinel.previous_time; //兩次執行時間<0或者大於2s,則進入TITL模式 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { sentinel.tilt = 1; sentinel.tilt_start_time = mstime(); sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered"); } sentinel.previous_time = mstime(); }
void sentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL; //遍歷獲取所有master結點 di = dictGetIterator(instances); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); //執行結點的周期性操作 sentinelHandleRedisInstance(ri); // 如果被遍歷的是master,則遍歷和該master關聯的所有slave&sentinel if (ri->flags & SRI_MASTER) { sentinelHandleDictOfRedisInstances(ri->slaves); sentinelHandleDictOfRedisInstances(ri->sentinels); //如果master的狀態為SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,則准備執行failover if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { switch_to_promoted = ri; } } } //執行failover if (switch_to_promoted) sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); dictReleaseIterator(di); }
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { /* 以下為所有實例都需要執行的操作 */ //連接及訂閱管理 sentinelReconnectInstance(ri); //和instance交流(PING/INFO/PUBLISH) sentinelPingInstance(ri); //如果仍然處於TILT模式,啥也不干 if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; sentinel.tilt = 0; sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited"); } //判斷instance是否下線(sdown) sentinelCheckSubjectivelyDown(ri); ...... /* 以下操作只針對master instance*/ if (ri->flags & SRI_MASTER) { //check master是否為odown(滿足用戶配置的quorum節點數判斷master為sdown) sentinelCheckObjectivelyDown(ri); //check是否需要做fail over,如果確認需要,則調用sentinelStartFailover修改自身狀態 if (sentinelStartFailoverIfNeeded(ri)) //發送SENTINEL is-master-down-by-addr給其他的sentinel,並注冊毀掉函數 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); //執行故障轉移 sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } }
void sentinelReconnectInstance(sentinelRedisInstance *ri) { if (!(ri->flags & SRI_DISCONNECTED)) return; //和master/slave/sentinel instance建立連接 if (ri->cc == NULL) { ...... } //針對master/slave,訂閱其“__sentinel__:hello”頻道 if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) { ...... retval = redisAsyncCommand(ri->pc, sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s", SENTINEL_HELLO_CHANNEL); ...... } ...... }
void sentinelPingInstance(sentinelRedisInstance *ri) { //假如instance處於不可連接狀態或者過多的命令(100)還沒有發送出去,直接返回 if (ri->flags & SRI_DISCONNECTED) return; if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return; //對於slave instance,如果其master處於異常狀態(SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS),則向該slave發送info的頻率從10s一發提高到1s一發 if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; } //對於mastere/slave instance,每隔info_period時間,向其發送info命令,注冊info命令的回調函數為sentinelInfoReplyCallback //sentinelInfoReplyCallback會根據從master/slave所得到的回復中分析出相關信息,並更新sentinelRedisInstance的當前狀態 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval != REDIS_OK) return; ri->pending_commands++; } //對於所有類型的instance,都定時向其發送PING命令(1s),注冊ping命令的回調函數為sentinelPingReplyCallback //sentinelPingReplyCallback根據PING命令的返回值判斷instance當前狀態 else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) { retval = redisAsyncCommand(ri->cc, sentinelPingReplyCallback, NULL, "PING"); if (retval != REDIS_OK) return; ri->pending_commands++; //每隔2s向master/slave的“__sentinel__:hello”頻道發布消息 //消息內容為:ip,port,runid,current_epoch, master->name,master->ip,master->port } else if ((ri->flags & SRI_SENTINEL) == 0 && (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { sentinelSendHello(ri); } }
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { ...... //假如本身的狀態為sdown,則開始判斷是否可以判斷為odown if (master->flags & SRI_S_DOWN) { quorum = 1; di = dictGetIterator(master->sentinels); //遍歷sentinel字典,查看其是否將master狀態職位sdown while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); //假如sentinel flag狀態為SRI_MASTER_DOWN的sentinel個數達到用戶定義的quorum個數,則將master狀態置為odown if (quorum >= master->quorum) odown = 1; } ...... }
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { //確認master狀態為odown if (!(master->flags & SRI_O_DOWN)) return 0; //確認failover沒有在運行 if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0; //確認在超時時間*2內沒有failover在運行 if (mstime() - master->failover_start_time < master->failover_timeout*2) return 0; sentinelStartFailover(master); return 1; }
void sentinelStartFailover(sentinelRedisInstance *master) { redisAssert(master->flags & SRI_MASTER); // 設置 failover 狀態 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; // 設置master當前狀態 master->flags |= SRI_FAILOVER_IN_PROGRESS; // 設置failover_epoch master->failover_epoch = ++sentinel.current_epoch; // 設置fail over開始時間 master->failover_start_time = mstime()+rand()%s; master->failover_state_change_time = mstime(); } sentinelAskMasterStateToOtherSentinels是在檢測到master狀態為sdown後,sentinel向其它sentinel節點發送sentinel is-master-down-by-addr消息 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) { //遍歷關注該master的sentinel節點 while((de = dictNext(di)) != NULL) { //向其它sentinle發送消息SENTINEL is-master-down-by-addr master_ip master_port current_epoch runid/* //如果本身已經開始了failover進程,則向其他sentinel節點發送自己的runid,否則發送* //注冊回調函數sentinelReceiveIsMasterDownReply接受回復的信息 string(port,sizeof(port),master->addr->port); retval = redisAsyncCommand(ri->cc, sentinelReceiveIsMasterDownReply, NULL, "SENTINEL is-master-down-by-addr %s %s %llu %s", master->addr->ip, port, sentinel.current_epoch, (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? server.runid : "*"); if (retval == REDIS_OK) ri->pending_commands++; } dictReleaseIterator(di); }
void sentinelCommand(redisClient *c) { ...... //處理sentinel is-master-down-by-addr消息 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { /* SENTINEL IS-MASTER-DOWN-BY-ADDR*/ ...... //根據其它sentinel傳送過來的消息 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, c->argv[2]->ptr,port,NULL); /* It exists? Is actually a master? Is subjectively down? It's down. * Note: if we are in tilt mode we always reply with "0". */ if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && (ri->flags & SRI_MASTER)) isdown = 1; //假如發過來的信息中包含請求來源sentinel的runid,則開始進行投票 if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) { leader = sentinelVoteLeader(ri,(uint64_t)req_epoch, c->argv[5]->ptr, &leader_epoch); } //回復信息,包括isdown,leader,leader_epoch addReplyMultiBulkLen(c,3); addReply(c, isdown ? shared.cone : shared.czero); addReplyBulkCString(c, leader ? leader : "*"); addReplyLongLong(c, (long long)leader_epoch); if (leader) sdsfree(leader); }
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) { ...... //根據返回值,判斷是否將對應sentinel的狀態置為SRI_MASTER_DOWN if (r->element[0]->integer == 1) { ri->flags |= SRI_MASTER_DOWN; } else { ri->flags &= ~SRI_MASTER_DOWN; } //如果sentinel返回了其選舉的leader,則更新自己的leader和leader_epoch if (strcmp(r->element[1]->str,"*")) { sdsfree(ri->leader); if (ri->leader_epoch != r->element[2]->integer) redisLog(REDIS_WARNING, "%s voted for %s %llu", ri->name, r->element[1]->str, (unsigned long long) r->element[2]->integer); ri->leader = sdsnew(r->element[1]->str); ri->leader_epoch = r->element[2]->integer; } }
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { //master節點&正處於failover狀態則繼續 redisAssert(ri->flags & SRI_MASTER); if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; switch(ri->failover_state) { //等待故障轉移開始,如果自己為leader,置狀態為SENTINEL_FAILOVER_STATE_SELECT_SLAVE,開始下一步操作,否則,不變更狀態,等待fail-over完成/超時 case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; //從slave中選擇一個master,置狀態為SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break; //升級被選中的從服務器為新主服務器,置狀態為SENTINEL_FAILOVER_STATE_WAIT_PROMOTION case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break; //等待fail over生效,info語句的回調函數sentinelRefreshInstanceInfo會更新當前狀態SENTINEL_FAILOVER_STATE_RECONF_SLAVES case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break; //令其它從服務器同步新主服務器 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; } }