今天學習了Redis中比較高大上的名詞,“發布訂閱模式”,發布訂閱模式這個詞在我最開始接觸聽說的時候是在JMS(Java Message Service)java消息服務中聽說的。這個名次用通俗的一點話說,就是我訂閱了這類消息,當只有這類的消息進行廣播發送的時候,我才會,其他的消息直接過濾,保證了一個高效的傳輸效率。下面切入正題,學習一下Redis是如何實現這個發布訂閱模式的。先看看裡面的簡單的API構造;
/*----------------------------------------------------------------------------- * Pubsub low level API *----------------------------------------------------------------------------*/ void freePubsubPattern(void *p) /* 釋放發布訂閱的模式 */ int listMatchPubsubPattern(void *a, void *b) /* 發布訂閱模式是否匹配 */ int clientSubscriptionsCount(redisClient *c) /* 返回客戶端的所訂閱的數量,包括channels + patterns管道和模式 */ int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client訂閱一個Channel管道 */ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消訂閱Client中的Channel */ int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客戶端訂閱一種模式 */ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客戶端取消訂閱pattern模式 */ int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客戶端取消自身訂閱的所有Channel */ int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客戶端取消訂閱所有的pattern模式 */ int pubsubPublishMessage(robj *channel, robj *message) /* 為所有訂閱了Channel的Client發送消息message */ /* ------------PUB/SUB API ---------------- */ void subscribeCommand(redisClient *c) /* 訂閱Channel的命令 */ void unsubscribeCommand(redisClient *c) /* 取消訂閱Channel的命令 */ void psubscribeCommand(redisClient *c) /* 訂閱模式命令 */ void punsubscribeCommand(redisClient *c) /* 取消訂閱模式命令 */ void publishCommand(redisClient *c) /* 發布消息命令 */ void pubsubCommand(redisClient *c) /* 發布訂閱命令 */在這裡面出現了高頻的詞Pattern(模式)和Channel(頻道,叫管道比較別扭),也就是說,後續所有的關於發布訂閱的東東都是基於這2者展開進行的。現在大致講解一下在Redis中是如何實現此中模式的:
1.在RedisClient 內部維護了一個pubsub_channels的Channel列表,記錄了此客戶端所訂閱的頻道
2.在Server服務端,同樣維護著一個類似的變量叫做,pubsub_channels,這是一個dict字典變量,每一個Channel對應著一批訂閱了此頻道的Client,也就是Channel-->list of Clients
3.當一個Client publish一個message的時候,會先去服務端的pubsub_channels找相應的Channel,遍歷裡面的Client,然後發送通知,即完成了整個發布訂閱模式。
我們可以簡單的看一下Redis訂閱一個Channel的方法實現;
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ /* Client訂閱一個Channel管道 */ int pubsubSubscribeChannel(redisClient *c, robj *channel) { struct dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ //在Client的字典pubsub_channels中添加Channel if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ //添加Clietn到server中的pubsub_channels,對應的列表中 de = dictFind(server.pubsub_channels,channel); if (de == NULL) { //如果此頻道的Client列表為空,則創建新列表並添加 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { //否則,獲取這個頻道的客戶端列表,在尾部添加新的客戶端 clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ //添加給回復客戶端 addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; }添加操作主要分2部,Client自身的內部維護的pubsub_channels的添加,是一個dict字典對象,然後,是server端維護的pubsub_channels中的client列表的添加。在進行Channel頻道的刪除的時候,也是執行的這2步驟操作:
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ /* 取消訂閱Client中的Channel */ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { struct dictEntry *de; list *clients; listNode *ln; int retval = 0; /* Remove the channel from the client -> channels hash table */ incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ //字典刪除Client中pubsub_channels中的Channel if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ //再移除Channel對應的Client列表 de = dictFind(server.pubsub_channels,channel); redisAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); redisAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ dictDelete(server.pubsub_channels,channel); } } /* Notify the client */ if (notify) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } decrRefCount(channel); /* it is finally safe to release it */ return retval; }裡面還有對應的模式的訂閱和取消訂閱的操作,原理和channel完全一致,二者的區別在於,pattern是用來匹配的Channel的,這個是什麼意思呢。在後面會做出答案,接著看。最後看一個最最核心的方法,客戶端發步消息方法:
/* Publish a message */ /* 為所有訂閱了Channel的Client發送消息message */ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; struct dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ //找到Channel所對應的dictEntry de = dictFind(server.pubsub_channels,channel); if (de) { //獲取此Channel對應的客戶單列表 list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { //依次取出List中的客戶單,添加消息回復 redisClient *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); //添加消息回復 addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ /* 發送給嘗試匹配該Channel的客戶端消息 */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; //客戶端的模式如果匹配了Channel,也會發送消息 if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers; }pattern的作用就在上面體現了,如果某種pattern匹配了Channel頻道,則模式的客戶端也會接收消息。在server->pubsub_patterns中,pubsub_patterns是一個list列表,裡面的每一個pattern只對應一個Client,就是上面的pat->client,這一點和Channel還是有本質的區別的。講完發布訂閱模式的基本操作後,順便把與此相關的notify通知類也稍稍講講,通知只有3個方法,
/* ----------------- API ------------------- */ int keyspaceEventsStringToFlags(char *classes) /* 鍵值字符類型轉為對應的Class類型 */ sds keyspaceEventsFlagsToString(int flags) /* 通過輸入的flag值類,轉為字符類型*/ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 發布通知方法,分為2類,keySpace的通知,keyEvent的通知 */涉及到string To flag 和flag To String 的轉換,也不知道這個會在哪裡用到;
/* Turn a string representing notification classes into an integer * representing notification classes flags xored. * * The function returns -1 if the input contains characters not mapping to * any class. */ /* 鍵值字符類型轉為對應的Class類型 */ int keyspaceEventsStringToFlags(char *classes) { char *p = classes; int c, flags = 0; while((c = *p++) != '\0') { switch(c) { case 'A': flags |= REDIS_NOTIFY_ALL; break; case 'g': flags |= REDIS_NOTIFY_GENERIC; break; case '$': flags |= REDIS_NOTIFY_STRING; break; case 'l': flags |= REDIS_NOTIFY_LIST; break; case 's': flags |= REDIS_NOTIFY_SET; break; case 'h': flags |= REDIS_NOTIFY_HASH; break; case 'z': flags |= REDIS_NOTIFY_ZSET; break; case 'x': flags |= REDIS_NOTIFY_EXPIRED; break; case 'e': flags |= REDIS_NOTIFY_EVICTED; break; case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break; case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break; default: return -1; } } return flags; }應該是響應鍵盤輸入的類型和Redis類型之間的轉換。在notify的方法還有一個event事件的通知方法:
/* The API provided to the rest of the Redis core is a simple function: * * notifyKeyspaceEvent(char *event, robj *key, int dbid); * * 'event' is a C string representing the event name. * 'key' is a Redis object representing the key name. * 'dbid' is the database ID where the key lives. */ /* 發布通知方法,分為2類,keySpace的通知,keyEvent的通知 */ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { sds chan; robj *chanobj, *eventobj; int len = -1; char buf[24]; /* If notifications for this class of events are off, return ASAP. */ if (!(server.notify_keyspace_events & type)) return; eventobj = createStringObject(event,strlen(event)); //2種的通知形式,略有差別 /* __keyspace@有keySpace和keyEvent的2種事件通知。具體怎麼用,等後面碰到的時候在看看。__: notifications. */ if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) { chan = sdsnewlen("__keyspace@",11); len = ll2string(buf,sizeof(buf),dbid); chan = sdscatlen(chan, buf, len); chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, key->ptr); chanobj = createObject(REDIS_STRING, chan); //上述幾步操作,組件格式字符串,最後發布消息,下面keyEvent的通知同理 pubsubPublishMessage(chanobj, eventobj); decrRefCount(chanobj); } /* __keyevente@ __: notifications. */ if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) { chan = sdsnewlen("__keyevent@",11); if (len == -1) len = ll2string(buf,sizeof(buf),dbid); chan = sdscatlen(chan, buf, len); chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, eventobj->ptr); chanobj = createObject(REDIS_STRING, chan); pubsubPublishMessage(chanobj, key); decrRefCount(chanobj); } decrRefCount(eventobj); }