原創文章,轉載請標明,謝謝。
上一篇分析過memcached的連接模型,了解memcached是如何高效處理客戶端連接,這一篇分析memcached源碼中的process_update_command函數,探究memcached客戶端的set命令,解讀memcached是如何解析客戶端文本命令,剖析memcached的內存管理,LRU算法是如何工作等等。
客戶端向memcached server發出set操作,memcached server讀取客戶端的命令,客戶端的連接狀態由 conn_read > conn_parse_cmd 轉換,這時候,memcached server開始解析命令。memcached server調用try_read_command函數解析命令,memcached接收兩種格式的命令,一種是二進制格式,另一種是文本格式(本文只講文本格式的命令)。
1 static int try_read_command(conn *c) { 2 3 // .......... 4 5 if (c->protocol == binary_prot) { 6 7 // 二進制格式 8 // .... 9 10 } else { 11 char *el, *cont; 12 13 // 沒有接收到客戶端的命令,返回進入conn_waiting狀態,等待更多的客戶端數據 14 if (c->rbytes == 0) 15 return 0; 16 17 el = memchr(c->rcurr, '\n', c->rbytes); 18 if (!el) { 19 if (c->rbytes > 1024) { 20 /* 21 * We didn't have a '\n' in the first k. This _has_ to be a 22 * large multiget, if not we should just nuke the connection. 23 */ 24 char *ptr = c->rcurr; 25 while (*ptr == ' ') { /* ignore leading whitespaces */ 26 ++ptr; 27 } 28 29 if (ptr - c->rcurr > 100 || 30 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) { 31 32 conn_set_state(c, conn_closing); 33 return 1; 34 } 35 } 36 37 return 0; 38 } 39 40 // 客戶端報文以'\r\n'結尾 41 cont = el + 1; 42 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { 43 el--; 44 } 45 *el = '\0'; 46 47 assert(cont <= (c->rcurr + c->rbytes)); 48 49 // 真正解析命令的地方 50 process_command(c, c->rcurr); 51 52 c->rbytes -= (cont - c->rcurr); 53 c->rcurr = cont; 54 55 assert(c->rcurr <= (c->rbuf + c->rsize)); 56 } 57 58 return 1; 59 }
在分析process_command函數前,我們先看看memcached的命令格式:
1 <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n 2 3 cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n 4 5 // 例如 set 命令 : 6 set key 0 60 2 7 12 8 STORED 9 10 // 空格對應著空格 11 set => <command name> 12 key => <key> 13 0 => <flags> 14 60 => <exptime> 15 2 => <bytes>
memcached在process_command中調用tokenize_command函數根據上面的命令格式處理命令,把相應位置的字段保存在 token_t *tokens 的相應位置。
1 // 參數1:命令的字符串 2 // 參數2:解析命令後,存放命令各個字段的結構體數組 3 // 參數3:命令字段的最大數量 4 /* 5 * tokens[0] => <command name> 的信息 6 * tokens[1] => <key> 的信息 7 * tokens[2] => <flags> 的信息 8 */ 9 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { 10 char *s, *e; 11 size_t ntokens = 0; 12 size_t len = strlen(command); 13 unsigned int i = 0; 14 15 assert(command != NULL && tokens != NULL && max_tokens > 1); 16 17 s = e = command; 18 for (i = 0; i < len; i++) { 19 if (*e == ' ') { 20 if (s != e) { 21 tokens[ntokens].value = s; // value存放各個字段的字符串值,例如:'set' 22 tokens[ntokens].length = e - s; // length表示各個字段相應的長度,例如:'set'的長度為3 23 ntokens++; 24 *e = '\0'; 25 if (ntokens == max_tokens - 1) { 26 e++; 27 s = e; /* so we don't add an extra token */ 28 break; 29 } 30 } 31 s = e + 1; 32 } 33 e++; 34 } 35 36 if (s != e) { 37 tokens[ntokens].value = s; 38 tokens[ntokens].length = e - s; 39 ntokens++; 40 } 41 42 /* 43 * If we scanned the whole string, the terminal value pointer is null, 44 * otherwise it is the first unprocessed character. 45 */ 46 tokens[ntokens].value = *e == '\0' ? NULL : e; 47 tokens[ntokens].length = 0; 48 ntokens++; 49 50 return ntokens; 51 }
解析完文本命令後,回到process_command函數中,我們可以看到很熟悉的命令,是的,接下來,在一個if-else的多分支判斷中,memcached根據tokens[COMMAND_TOKEN].value決定調用那一個函數處理相應的命令:
1 static void process_command(conn *c, char *command) { 2 3 // .... 4 5 ntokens = tokenize_command(command, tokens, MAX_TOKENS); 6 if (ntokens >= 3 && 7 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || 8 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { 9 10 // 這裡就是執行get命令的分支 11 process_get_command(c, tokens, ntokens, false); 12 13 } else if ((ntokens == 6 || ntokens == 7) && 14 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || 15 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || 16 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) || 17 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) || 18 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) { 19 20 // 這裡就是執行set、add、replace等命令的分支 21 process_update_command(c, tokens, ntokens, comm, false); 22 23 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) { 24 25 // 這裡也執行process_update_command函數,也是對相應的key執行寫操作,與上面一個分支不同的是最後一個參數是true,意思是寫的過程使用CAS協議,這裡不側重講, 26 // CAS目的是保證在並發寫的時候保證一致性 27 process_update_command(c, tokens, ntokens, comm, true); 28 29 } else if .............
memcached把內存分割成各種尺寸的塊(chunk),並把尺寸相同的塊分成組(chunk的集合),每個chunk集合被稱為slab。Memcached的內存分配以Page為單位,Page默認值為1M,可以在啟動時通過-I參數來指定。Slab是由多個Page組成的,Page按照指定大小切割成多個chunk。
每一對[key,value]的數據被封裝到item的結構體裡,每種類型的slab用一個item鏈表來維護它的所有item。例如,一個item項的數據大小加上item的頭部信息(為了方便描述,下面把這兩項的和統稱[key,value]大小吧)是90KB,slab[i]的chunk塊大小是136KB,slab[i-1]的chunk塊大小是88KB,那麼item會被分配slab[i]的一個chunk塊(並保存到slab[i]維護的一個item鏈表),這樣做的目的是為了盡量減少內存碎片。更多關於Slab Allocation的原理可以查找其他的資料。這裡不詳解
memcached的存儲命令:add、set、replace、append、prepend等,上面簡單地說了memcached slab機制,知道memcached是根據相應的[key,value]大小找到相應的slab,那麼,我們再次調用set命令某個已存在的key的value的時候,memcached是怎麼工作的呢?
起初,我的直覺思維是,找到key相應的item,修改item的value就好了。那麼,問題來了,假如先前[key,value]大小是90KB,被分配到slab[i]的,現在我們修改了key對應的value,[key,value]大小也改變了,變成了80KB,應該分配到slab[i-1]的,如果只是修改原來item的數據,那麼就不符合Slab Allocation的原理,會造成很大的內存碎片浪費。
memcached對存儲命令:add、set、replace、append、prepend處理方法大體都相似的,從上面的源碼可以看出,都是通過執行process_update_command函數來處理。
memcached的處理存儲命令思路是這樣的:例如,客戶端的一個set命令,memcached都會重新根據[key,value]大小找到合適slab,並把相應的數據封裝到新的item裡面【源碼的注1】(不會直接修改舊的item項),如果對應的slab沒有內存空間不足,就調用LRU算法把該slab的一個最近最少使用項的空間分配給新的item【源碼的注2,出現在do_item_alloc函數】(如果關閉LRU移除項的功能,那麼就會報“SERVER_ERROR out of memory storing object”錯誤,是set命令的話,還會把key對應的舊的item項移除【源碼的注3】,即我們這時候不能通過get key來獲取到舊的數據了),分配空間成功,那就是對add、set、replace、append、prepend這幾個存儲命令做差異化處理。
1 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) { 2 3 // .... 4 5 set_noreply_maybe(c, tokens, ntokens); // 設置命令可選字段的[noreply] 6 7 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { 8 out_string(c, "CLIENT_ERROR bad command line format"); 9 return; 10 } 11 12 key = tokens[KEY_TOKEN].value; 13 nkey = tokens[KEY_TOKEN].length; 14 15 // 把命令相應字段的字符串安全轉換成整數 16 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags) 17 && safe_strtol(tokens[3].value, &exptime_int) 18 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) { 19 out_string(c, "CLIENT_ERROR bad command line format"); 20 return; 21 } 22 23 exptime = exptime_int; 24 25 // #define REALTIME_MAXDELTA 60*60*24*30 26 if (exptime < 0) 27 exptime = REALTIME_MAXDELTA + 1; 28 29 // CAS協議,防止並發寫不一致 30 if (handle_cas) { 31 if (!safe_strtoull(tokens[5].value, &req_cas_id)) { 32 out_string(c, "CLIENT_ERROR bad command line format"); 33 return; 34 } 35 } 36 37 // ........ 38 39 // 注1:無論是add、set或者是replace命令,都會從新分配一個新的item 40 it = item_alloc(key, nkey, flags, realtime(exptime), vlen); 41 42 // 如果新的item分配失敗 43 if (it == 0) { 44 if (! item_size_ok(nkey, flags, vlen)) 45 out_string(c, "SERVER_ERROR object too large for cache"); // 一種錯誤情況:數據太大,沒有合適slab,不能緩存數據 46 else 47 out_string(c, "SERVER_ERROR out of memory storing object"); // 另一種是:沒有了內存空間緩存數據,通常這種事在關閉LRU功能的情況下出現 48 /* swallow the data line */ 49 c->write_and_go = conn_swallow; 50 c->sbytes = vlen; 51 52 // 注3:新的item分配失敗,如果是set命令,並且key對應著存在舊的item,那麼就把舊的item刪除 53 if (comm == NREAD_SET) { 54 it = item_get(key, nkey); 55 if (it) { 56 item_unlink(it); 57 item_remove(it); 58 } 59 } 60 61 return; 62 } 63 ITEM_set_cas(it, req_cas_id); 64 65 c->item = it; 66 c->ritem = ITEM_data(it); 67 c->rlbytes = it->nbytes; 68 c->cmd = comm; 69 70 // 會在這一步進行add、set、replace等存儲命令的差異化處理 71 conn_set_state(c, conn_nread); 72 }
do_item_alloc函數:
1 item *do_item_alloc(char *key, const size_t nkey, const int flags, 2 const rel_time_t exptime, const int nbytes, 3 const uint32_t cur_hv) { 4 uint8_t nsuffix; 5 item *it = NULL; 6 char suffix[40]; 7 size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix); //接收到的item數據長度+item頭部長度 8 if (settings.use_cas) { 9 ntotal += sizeof(uint64_t); 10 } 11 12 unsigned int id = slabs_clsid(ntotal); 13 if (id == 0) 14 return 0; 15 16 mutex_lock(&cache_lock); 17 /* do a quick check if we have any expired items in the tail.. */ 18 int tries = 5; 19 int tried_alloc = 0; 20 item *search; 21 void *hold_lock = NULL; 22 rel_time_t oldest_live = settings.oldest_live; 23 24 search = tails[id]; 25 26 // tries = 5 ,循環查找過期的item,最多循環5次 27 for (; tries > 0 && search != NULL; tries--, search=search->prev) { 28 uint32_t hv = hash(ITEM_key(search), search->nkey, 0); 29 30 // 如果當前item被上鎖,那麼就跳過 31 if (hv != cur_hv && (hold_lock = item_trylock(hv)) == NULL) 32 continue; 33 /* Now see if the item is refcount locked */ 34 if (refcount_incr(&search->refcount) != 2) { 35 refcount_decr(&search->refcount); 36 /* Old rare bug could cause a refcount leak. We haven't seen 37 * it in years, but we leave this code in to prevent failures 38 * just in case */ 39 if (search->time + TAIL_REPAIR_TIME < current_time) { 40 itemstats[id].tailrepairs++; 41 search->refcount = 1; 42 do_item_unlink_nolock(search, hv); 43 } 44 if (hold_lock) 45 item_trylock_unlock(hold_lock); 46 continue; 47 } 48 49 // item過期,如果沒有設置過期時間,那麼就使用系統設置的默認過期時間 50 if ((search->exptime != 0 && search->exptime < current_time) 51 || (search->time <= oldest_live && oldest_live <= current_time)) { 52 itemstats[id].reclaimed++; 53 if ((search->it_flags & ITEM_FETCHED) == 0) { 54 itemstats[id].expired_unfetched++; 55 } 56 it = search; 57 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); // 當前搜索的item過期,重新計算slab已經分配的字節 58 do_item_unlink_nolock(it, hv); // 把當前搜索的item從鏈表中移除 59 /* Initialize the item block: */ 60 it->slabs_clsid = 0; 61 } else if ((it = slabs_alloc(ntotal, id)) == NULL) { // 沒有找到過期的item,新分配一個item,分配失敗就執行else if裡面的代碼 62 tried_alloc = 1; 63 if (settings.evict_to_free == 0) { // 注2:內存耗盡,如果evict_to_free = 1(默認)LRU算法啟動,移除最近最少使用的item 64 itemstats[id].outofmemory++; 65 } else { 66 itemstats[id].evicted++; 67 itemstats[id].evicted_time = current_time - search->time; 68 if (search->exptime != 0) 69 itemstats[id].evicted_nonzero++; 70 if ((search->it_flags & ITEM_FETCHED) == 0) { 71 itemstats[id].evicted_unfetched++; 72 } 73 it = search; 74 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); // 當前搜索的item過期,重新計算slab已經分配的字節 75 do_item_unlink_nolock(it, hv); // 把當前需要移除的item從鏈表中移除 76 /* Initialize the item block: */ 77 it->slabs_clsid = 0; 78 79 if (settings.slab_automove == 2) 80 slabs_reassign(-1, id); 81 } 82 } 83 84 refcount_decr(&search->refcount); 85 /* If hash values were equal, we don't grab a second lock */ 86 if (hold_lock) 87 item_trylock_unlock(hold_lock); 88 break; 89 } 90 91 if (!tried_alloc && (tries == 0 || search == NULL)) 92 it = slabs_alloc(ntotal, id); 93 94 if (it == NULL) { 95 itemstats[id].outofmemory++; 96 mutex_unlock(&cache_lock); 97 return NULL; 98 } 99 100 assert(it->slabs_clsid == 0); 101 assert(it != heads[id]); 102 103 104 it->refcount = 1; 105 mutex_unlock(&cache_lock); 106 it->next = it->prev = it->h_next = 0; 107 it->slabs_clsid = id; 108 109 DEBUG_REFCNT(it, '*'); 110 it->it_flags = settings.use_cas ? ITEM_CAS : 0; 111 it->nkey = nkey; 112 it->nbytes = nbytes; 113 memcpy(ITEM_key(it), key, nkey); 114 it->exptime = exptime; 115 memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix); 116 it->nsuffix = nsuffix; 117 return it; 118 }
以上memcached只是為[key,value]找到了新的slab,分配了新的item,並把命令相關的頭部信息保存到,但是,還有一個重要的步奏沒有說的,那就是[key,value]中的value怎麼和item關聯起來的,add和set的區別又是怎樣區分的,由於還有很長的一段代碼,所以我還是分篇記錄,預告一下,下一篇《memcached學習筆記——存儲命令源碼分析下》會講遺留的這兩個問題。
未完,待續。
更多閱讀查看:JC&hcoding