欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 运维知识 > 数据库 >内容正文

数据库

redis 发布订阅实际案例_Redis源码分析之发布订阅+慢查询+排序以及监视器

发布时间:2025/4/5 数据库 48 豆豆
生活随笔 收集整理的这篇文章主要介绍了 redis 发布订阅实际案例_Redis源码分析之发布订阅+慢查询+排序以及监视器 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

发布订阅

发布订阅就是一个经典的观察者模式,其中通道是指channel字符串本身,而模式是指正则表达式,进行匹配。结合Redis设计与实现一书

数据结构
基本数据结构

在client对象中,分别记录了,当前client订阅的通道和模式。

1234struct client{dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */};

当然当开启订阅后,客户端的flag会设置对应标记:

12345678void subscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB;}

在redisServer对象中,分别记录了,当前订阅的通道和模式所对应的客户端client。

1234struct redisServer { dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */};

模式要比通道多一个match内容,因此redis使用pubsubPattern对象进行记录,差异体现在redisServer的记录中。

1234typedef struct pubsubPattern { client *client; robj *pattern;} pubsubPattern;

Redis设计与实现一书中一张图,很直观:

引用计数

一个通道或模式可能被多个客户端引用,因此发布消息之后,只有等到引用计数为0时,才是真正的释放对象。

12345678910111213141516171819void decrRefCount(robj *o) { if (o->refcount == 1) { switch(o->type) { case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; case OBJ_SET: freeSetObject(o); break; case OBJ_ZSET: freeZsetObject(o); break; case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: freeModuleObject(o); break; case OBJ_STREAM: freeStreamObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o); } else { if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0"); if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--; }}
订阅消息

订阅消息实际很简单,就是设置对应的client和server中的值:

  • 客户端就是将对应的模式或者通道添加至对应的客户端中

123456789101112131415161718192021222324252627282930/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval;}
模式

模式其实比较特殊,因为他支持正则表达式,所以没法放入hash中,所以直接用list来保存。

12345678910111213141516171819202122/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */int pubsubSubscribePattern(client *c, robj *pattern) { int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval;}

这里的match是函数指针,而match的设置是在创建client时候设置的,见createClient函数:

需要注意的是模式情况下,判断某个客户端是否已经包含了该模式,是直接通过listMatchObjects函数,功能比较简单(注意要与后文中的发布消息时,那个匹配分开):

123int listMatchObjects(void *a, void *b) { return equalStringObjects(a,b);}

再看看equalStringObjects函数,算了我们直接到最底层的函数吧:

1234567891011121314151617181920212223242526272829303132333435int compareStringObjects(robj *a, robj *b) { return compareStringObjectsWithFlags(a,b,REDIS_COMPARE_BINARY);}int compareStringObjectsWithFlags(robj *a, robj *b, int flags) { serverAssertWithInfo(NULL,a,a->type == OBJ_STRING && b->type == OBJ_STRING); char bufa[128], bufb[128], *astr, *bstr; size_t alen, blen, minlen; if (a == b) return 0; if (sdsEncodedObject(a)) { astr = a->ptr; alen = sdslen(astr); } else { alen = ll2string(bufa,sizeof(bufa),(long) a->ptr); astr = bufa; } if (sdsEncodedObject(b)) { bstr = b->ptr; blen = sdslen(bstr); } else { blen = ll2string(bufb,sizeof(bufb),(long) b->ptr); bstr = bufb; } if (flags & REDIS_COMPARE_COLL) { return strcoll(astr,bstr); } else { int cmp; minlen = (alen < blen) ? alen : blen; cmp = memcmp(astr,bstr,minlen); if (cmp == 0) return alen-blen; return cmp; }}

害,最后居然是memcmp。

发布消息

发布消息就是读取该模式或者主题下锁监听的客户端,向他们发送消息

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849/* Publish a message */int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers;}
模式

模式总是特殊的,模式采用了Glob风格的正则表达式来进行匹配。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123/* Glob-style pattern matching. */int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase){ while(patternLen && stringLen) { switch(pattern[0]) { case '*': while (pattern[1] == '*') { pattern++; patternLen--; } if (patternLen == 1) return 1; /* match */ while(stringLen) { if (stringmatchlen(pattern+1, patternLen-1, string, stringLen, nocase)) return 1; /* match */ string++; stringLen--; } return 0; /* no match */ break; case '?': if (stringLen == 0) return 0; /* no match */ string++; stringLen--; break; case '[': { int not, match; pattern++; patternLen--; not = pattern[0] == '^'; if (not) { pattern++; patternLen--; } match = 0; while(1) { if (pattern[0] == '\\' && patternLen >= 2) { pattern++; patternLen--; if (pattern[0] == string[0]) match = 1; } else if (pattern[0] == ']') { break; } else if (patternLen == 0) { pattern--; patternLen++; break; } else if (pattern[1] == '-' && patternLen >= 3) { int start = pattern[0]; int end = pattern[2]; int c = string[0]; if (start > end) { int t = start; start = end; end = t; } if (nocase) { start = tolower(start); end = tolower(end); c = tolower(c); } pattern += 2; patternLen -= 2; if (c >= start && c <= end) match = 1; } else { if (!nocase) { if (pattern[0] == string[0]) match = 1; } else { if (tolower((int)pattern[0]) == tolower((int)string[0])) match = 1; } } pattern++; patternLen--; } if (not) match = !match; if (!match) return 0; /* no match */ string++; stringLen--; break; } case '\\': if (patternLen >= 2) { pattern++; patternLen--; } /* fall through */ default: if (!nocase) { if (pattern[0] != string[0]) return 0; /* no match */ } else { if (tolower((int)pattern[0]) != tolower((int)string[0])) return 0; /* no match */ } string++; stringLen--; break; } pattern++; patternLen--; if (stringLen == 0) { while(*pattern == '*') { pattern++; patternLen--; } break; } } if (patternLen == 0 && stringLen == 0) return 1; return 0;}

排序

数据结构
123456789101112131415typedef struct _redisSortObject { //存储的是对象指针,因此不会深复制 robj *obj; union { //排序权重,适合数值类型 double score; //自定义排序时使用的比较对象 robj *cmpobj; } u;} redisSortObject;typedef struct _redisSortOperation { int type; robj *pattern;} redisSortOperation;

可以看出他支持模式匹配。

限制

参数
ALPHA

ASC和DESC

BY和alpha

limit

Get

store

慢查询

数据结构

在server中会有list进行记录相应的慢查询日志。

1234list *slowlog; /* SLOWLOG list of commands */ long long slowlog_entry_id; /* SLOWLOG current entry ID */ long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */

list中记录着慢查询日志实体,其结构如下:

1234567891011121314#define SLOWLOG_ENTRY_MAX_ARGC 32#define SLOWLOG_ENTRY_MAX_STRING 128/* This structure defines an entry inside the slow log list */typedef struct slowlogEntry { robj **argv; int argc; long long id; /* Unique entry identifier. */ long long duration; /* Time spent by the query, in microseconds. */ time_t time; /* Unix time at which the query was executed. */ sds cname; /* Client name. */ sds peerid; /* Client network address. */} slowlogEntry;
配置文件

操作
插入
1234567891011121314/* Push a new entry into the slow log. * This function will make sure to trim the slow log accordingly to the * configured max length. */void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long duration) { if (server.slowlog_log_slower_than < 0) return; /* Slowlog disabled */ if (duration >= server.slowlog_log_slower_than) listAddNodeHead(server.slowlog, slowlogCreateEntry(c,argv,argc,duration)); /* Remove old entries if needed. */ while (listLength(server.slowlog) > server.slowlog_max_len) listDelNode(server.slowlog,listLast(server.slowlog));}
  • 新插入的慢查询日志放置在链表的表头

  • 受参数限制,会更新慢查询队列

查询

支持获取指定个数的慢查询日志,如果不指定默认值为10.

监视器

当在客户端执行monitor命令后,会将该客户端加入到monitor的链表中。其组成为:

格式为:时间戳 +数据库id号+ (客户端ip+port)+ 命令+命令参数

在call函数中,会执行replicationFeedMonitors函数,向所有的monitors发送指令.

秒杀小玩意

发现好多地方介绍秒杀很复杂,我感觉不需要那么复杂吧。不就下面几个点吗?

总结

以上是生活随笔为你收集整理的redis 发布订阅实际案例_Redis源码分析之发布订阅+慢查询+排序以及监视器的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。