上次我只分析了Redis網絡部分的代碼一部分,今天我把networking的代碼實現部分也學習了一遍,netWorking的代碼更多偏重的是Client客戶端的操作。裡面addReply()系列的方法操作是主要的部分。光光這個系列的方法,應該占據了一半的API的數量。我把API分成了3個部分:
/* ------------ API ---------------------- */ void *dupClientReplyValue(void *o) /* 復制value一份 */ int listMatchObjects(void *a, void *b) /* 比價2個obj是否相等 */ robj *dupLastObjectIfNeeded(list *reply) /* 返回回復列表中最後一個元素對象 */ void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 將源Client的輸出buffer復制給目標Client */ static void acceptCommonHandler(int fd, int flags) /* 網絡連接後的調用方法 */ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) void disconnectSlaves(void) /* 使server的slave失去連接 */ void replicationHandleMasterDisconnection(void) void flushSlavesOutputBuffers(void) /* 從方法將會在freeMemoryIfNeeded(),釋放內存空間函數,將存在內存中數據操作結果刷新到磁盤中 */ int processEventsWhileBlocked(void) /* ------------- addReply API ----------------- */ int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往客戶端緩沖區中添加內容 */ void _addReplyObjectToList(redisClient *c, robj *o) /* robj添加到reply的列表中 */ void _addReplySdsToList(redisClient *c, sds s) /* 在回復列表中添加Sds字符串對象 */ void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回復列表中添加字符串對象,參數中已經給定字符的長度 */ void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中寫入數據,數據存在obj->ptr的指針中 */ void addReplySds(redisClient *c, sds s) /* 在回復中添加Sds字符串,下面的額addReply()系列方法原理基本類似 */ void addReplyString(redisClient *c, char *s, size_t len) void addReplyErrorLength(redisClient *c, char *s, size_t len) void addReplyError(redisClient *c, char *err) /* 往Reply中添加error類的信息 */ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) void addReplyStatusLength(redisClient *c, char *s, size_t len) void addReplyStatus(redisClient *c, char *status) void addReplyStatusFormat(redisClient *c, const char *fmt, ...) void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中添加一個空的obj對象 */ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) void addReplyDouble(redisClient *c, double d) /* 在bulk reply中添加一個double類型值,bulk的意思為大塊的,bulk reply的意思為大數據量的回復 */ void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) void addReplyLongLong(redisClient *c, long long ll) void addReplyMultiBulkLen(redisClient *c, long length) void addReplyBulkLen(redisClient *c, robj *obj) /* 添加bulk 大塊的數據的長度 */ void addReplyBulk(redisClient *c, robj *obj) /* 將一個obj的數據,拆分成大塊數據的添加 */ void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) void addReplyBulkCString(redisClient *c, char *s) void addReplyBulkLongLong(redisClient *c, long long ll) /* ------------- Client API ----------------- */ redisClient *createClient(int fd) /* 創建redisClient客戶端,1.建立連接,2.設置數據庫,3.屬性設置 */ int prepareClientToWrite(redisClient *c) /* 此方法將會被調用於Client准備接受新數據之前調用,在fileEvent為客戶端設定writer的handler處理事件 */ static void freeClientArgv(redisClient *c) void freeClient(redisClient *c) /* 釋放freeClient,要分為Master和Slave2種情況作不同的處理 */ void freeClientAsync(redisClient *c) void freeClientsInAsyncFreeQueue(void) /* 異步的free客戶端 */ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 將Client中的reply數據存入文件中 */ void resetClient(redisClient *c) int processInlineBuffer(redisClient *c) /* 處理redis Client的內鏈的buffer,就是c->querybuf */ static void setProtocolError(redisClient *c, int pos) int processMultibulkBuffer(redisClient *c) /* 處理大塊的buffer */ void processInputBuffer(redisClient *c) /* 處理redisClient的查詢buffer */ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 從Client獲取查詢query語句 */ void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer) /* 獲取Client中輸入buffer和輸出buffer的最大長度值 */ void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口號的輸出,ip:port */ int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 獲取Client客戶端的ip,port地址信息 */ char *getClientPeerId(redisClient *c) /* 獲取c->peerid客戶端的地址信息 */ sds catClientInfoString(sds s, redisClient *client) /* 格式化的輸出客戶端的屬性信息,直接返回一個拼接好的字符串 */ sds getAllClientsInfoString(void) /* 獲取所有Client客戶端的屬性信息,並連接成一個總的字符串並輸出 */ void clientCommand(redisClient *c) /* 執行客戶端的命令的作法 */ void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重寫客戶端的命令集合,舊的命令集合的應用計數減1,新的Command Vector的命令集合增1 */ void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重寫Client中的第i個參數 */ unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 獲取Client中已經用去的輸出buffer的大小 */ int getClientType(redisClient *c) int getClientTypeByName(char *name) /* Client中的名字的3種類型,normal,slave,pubsub */ char *getClientTypeName(int class) int checkClientOutputBufferLimits(redisClient *c) /* 判斷Clint的輸出緩沖區的已經占用大小是否超過軟限制或是硬限制 */ void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 異步的關閉Client,如果緩沖區中的軟限制或是硬限制已經到達的時候,緩沖區超出限制的結果會導致釋放不安全, */我們從最簡單的_addReplyToBuffer在緩沖區中添加回復數據開始說起,因為後面的各種addReply的方法都或多或少的調用了和這個歌方法。
/* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ /* 往客戶端緩沖區中添加內容 */ int _addReplyToBuffer(redisClient *c, char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ //如果當前的reply已經存在內容,則操作出錯 if (listLength(c->reply) > 0) return REDIS_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return REDIS_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return REDIS_OK; }最直接影響的一句話,就是memcpy(c->buf+c->bufpos,s,len);所以內容是加到c->buf中的,這也就是客戶端的輸出buffer,添加操作還有另外一種形式是添加對象類型:
/* robj添加到reply的列表中 */ void _addReplyObjectToList(redisClient *c, robj *o) { robj *tail; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; if (listLength(c->reply) == 0) { incrRefCount(o); //在回復列表匯總添加robj內容 listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) { c->reply_bytes -= zmalloc_size_sds(tail->ptr); tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); c->reply_bytes += zmalloc_size_sds(tail->ptr); } else { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); } } asyncCloseClientOnOutputBufferLimitReached(c); }把robj對象加載reply列表中,並且改變reply的byte大小,最後還調用了一個asyncCloseClientOnOutputBufferLimitReached(c);方法,這個方法我是在這個文件的最底部找到的,一開始還真不知道什麼意思,作用就是當添加完數據後,當客戶端的輸出緩沖的大小超出限制時,會被異步關閉:
/* Asynchronously close a client if soft or hard limit is reached on the * output buffer size. The caller can check if the client will be closed * checking if the client REDIS_CLOSE_ASAP flag is set. * * Note: we need to close the client asynchronously because this function is * called from contexts where the client can't be freed safely, i.e. from the * lower level functions pushing data inside the client output buffers. */ /* 異步的關閉Client,如果緩沖區中的軟限制或是硬限制已經到達的時候,緩沖區超出限制的結果會導致釋放不安全, */ void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) { redisAssert(c->reply_bytes < ULONG_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); freeClientAsync(c); redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); sdsfree(client); } }在addReply方法調用的時候,有時是需要一個前提的,我說的是在寫數據事件發生的時候,你得先對寫的文件創建一個監聽事件:
/* 在回復中添加Sds字符串 */ void addReplySds(redisClient *c, sds s) { //在調用添加操作之前,都要先執行prepareClientToWrite(c),設置文件事件的寫事件 if (prepareClientToWrite(c) != REDIS_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { sdsfree(s); } else { /* This method free's the sds when it is no longer needed. */ _addReplySdsToList(c,s); } }在這個prepareClientToWrite()裡面是干嘛的呢?
/* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns REDIS_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client, * a master, a slave not yet online, or because the setup of the write handler * failed, the function returns REDIS_ERR. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns REDIS_ERR no * data should be appended to the output buffers. */ /* 此方法將會被調用於Client准備接受新數據之前調用,在fileEvent為客戶端設定writer的handler處理事件 */ int prepareClientToWrite(redisClient *c) { if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK; if ((c->flags & REDIS_MASTER) && !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR; if (c->fd <= 0) return REDIS_ERR; /* Fake client */ if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && //在這裡創建寫的文件事件 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return REDIS_ERR; return REDIS_OK; }在addReply的方法裡提到了一個addReplyBulk類型方法,Bulk的中文意思為大塊的,說明addReplyBulk添加的都是一些比較大塊的數據,找一個方法看看:
/* Add a Redis Object as a bulk reply */ /* 將一個obj的數據,拆分成大塊數據的添加 */ void addReplyBulk(redisClient *c, robj *obj) { //reply添加長度 addReplyBulkLen(c,obj); //reply添加對象 addReply(c,obj); addReply(c,shared.crlf); }將原本一個robj的數據拆分成可3個普通的addReply的方法調用。就變成了數據量變大了的數據。大數據的回復一個比較不好的地方是到時解析的時候或者是Data的復制的時候會比較耗時。在networking的方法裡還提供了freeClient()的操作:
/* 釋放freeClient,要分為Master和Slave2種情況作不同的處理 */ void freeClient(redisClient *c) { listNode *ln; /* If this is marked as current client unset it */ if (server.current_client == c) server.current_client = NULL; /* If it is our master that's beging disconnected we should make sure * to cache the state to try a partial resynchronization later. * * Note that before doing this we make sure that the client is not in * some unexpected state, by checking its flags. */ if (server.master && c->flags & REDIS_MASTER) { redisLog(REDIS_WARNING,"Connection with master lost."); if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY| REDIS_CLOSE_ASAP| REDIS_BLOCKED| REDIS_UNBLOCKED))) { //如果是Master客戶端,需要做緩存Client的處理,可以迅速重新啟用 replicationCacheMaster(c); return; } }...後面代碼略去了
當Client中的輸出buffer數據漸漸變多了的時候就要准備持久化到磁盤文件了,要調用下面這個方法了,
/* Helper function used by freeMemoryIfNeeded() in order to flush slave * output buffers without returning control to the event loop. */ /* 從方法將會在freeMemoryIfNeeded(),釋放內存空間函數,將存在內存中數據操作結果刷新到磁盤中 */ void flushSlavesOutputBuffers(void) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = listNodeValue(ln); int events; events = aeGetFileEvents(server.el,slave->fd); if (events & AE_WRITABLE && slave->replstate == REDIS_REPL_ONLINE && listLength(slave->reply)) { //在這裡調用了write的方法 sendReplyToClient(server.el,slave->fd,slave,0); } } }這個方法的核心調用又在sendReplyToClient()方法,就是把Client的reply內容和buf內容存入文件。以上就是我的理解了,代碼量有點大,的確看的我頭有點大。