redis源码客户端和服务端通信过程
最近想学习一下redis源码,先看一下redis通信流程。由于功力有限,不足之处望大家指正。服务端和客户端通信,一般都是服务端先启动,那先从服务端的源码看起。
首先启动服务端会做一些初始化动作,初始化事件处理器状态,先看一下事件处理器状态的结构
//事件处理器的状态 typedef struct aeEventLoop {// 目前已注册的最大描述符int maxfd; /* highest file descriptor currently registered */// 目前已追踪的最大描述符int setsize; /* max number of file descriptors tracked */// 用于生成时间事件 idlong long timeEventNextId;// 最后一次执行时间事件的时间time_t lastTime; /* Used to detect system clock skew */// 已注册的文件事件aeFileEvent *events; /* Registered events */// 已就绪的文件事件aeFiredEvent *fired; /* Fired events */// 时间事件aeTimeEvent *timeEventHead;// 事件处理器的开关int stop;// 多路复用库的私有数据void *apidata; /* This is used for polling API specific data */// 在处理事件前要执行的函数aeBeforeSleepProc *beforesleep;} aeEventLoop;下面要对事件处理器进行初始化。
//事件状态 typedef struct aeApiState {// epoll_event 实例描述符int epfd;// 事件槽struct epoll_event *events; } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;// 初始化事件槽空间state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 创建 epoll 实例state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}// 赋值给 eventLoopeventLoop->apidata = state;return 0; }上面创建的epoll句柄和初始化的事件槽保存到传入的eventLoop事件对象中。这个事件对象保存在全局的一个redisserver中,redisServer中结构体成员很多,这里只展示一个
struct redisServer {//...// 事件状态aeEventLoop *el;// 一个链表,保存了所有客户端状态结构list *clients; /* List of active clients *//... };aeEventLoop *el 存储刚才创建的事件状态
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {int s, rv;char _port[6]; /* strlen("65535") */struct addrinfo hints, *servinfo, *p;snprintf(_port,6,"%d",port);memset(&hints,0,sizeof(hints));hints.ai_family = af;hints.ai_socktype = SOCK_STREAM;hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {anetSetError(err, "%s", gai_strerror(rv));return ANET_ERR;}for (p = servinfo; p != NULL; p = p->ai_next) {if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)continue;if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;goto end;}if (p == NULL) {anetSetError(err, "unable to bind socket");goto error;}error:s = ANET_ERR; end:freeaddrinfo(servinfo);return s; }上面的函数用来打开监听端口
// 为 TCP 连接关联连接应答(accept)处理器// 用于接受并应答客户端的 connect() 调用for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, //使文件读关联一个函数acceptTcpHandler,NULL) == AE_ERR){redisPanic("Unrecoverable error creating server.ipfd file event.");}} /** 根据 mask 参数的值,监听 fd 文件的状态,* 当 fd 可用时,执行 proc 函数*/ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) {if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}if (fd >= eventLoop->setsize) return AE_ERR;// 取出文件事件结构aeFileEvent *fe = &eventLoop->events[fd];// 监听指定 fd 的指定事件if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;// 设置文件事件类型,以及事件的处理器fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;// 私有数据fe->clientData = clientData;// 如果有需要,更新事件处理器的最大 fdif (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK; }aeCreateFileEvent函数用来注册回调用,参数aeEventLoop *eventLoop就是前面初始化的事件处理器的状态,当AE_READABLE产生时就会调用acceptTcpHandler函数,这时是有客户端connect了。前面已经初始化了一定数量的处理器,aeApiAddEvent把所有的事件对象都注册到epoll,后面接着设置对应AE_READABLE和AE_WRITABLE对应的回调函数。
/* File event structure** 文件事件结构*/ typedef struct aeFileEvent {// 监听事件类型掩码,// 值可以是 AE_READABLE 或 AE_WRITABLE ,// 或者 AE_READABLE | AE_WRITABLEint mask; /* one of AE_(READABLE|WRITABLE) */// 读事件处理器aeFileProc *rfileProc;// 写事件处理器aeFileProc *wfileProc;// 多路复用库的私有数据void *clientData;} aeFileEvent;上面是文件事件结构的结构体,对应的读和写的回调函数都保存在aeFileEvent(文件事件)中,aeFileEvent(文件事件)就是aeEventLoop(事件处理器状态)的成员,aeEventLoop(事件处理器状态)就是redisServer结构体中aeEventLoop *el(事件状态成员),所有的这些都保存在全局的redisServer结构体中。接下来就是事件处理器主循环中
/** 事件处理器的主循环*/ void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {// 如果有需要在事件处理前执行的函数,那么运行它if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);// 开始处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);//一直循环调用这个函数等到消息} } //处理所有已到达的时间事件,以及所有已就绪的文件事件。 int aeProcessEvents(aeEventLoop *eventLoop, int flags) {int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// 获取最近的时间事件if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {// 如果时间事件存在的话// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */// 计算距今最近的时间事件还要多久才能达到// 并将该时间距保存在 tv 结构中aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {// 执行到这一步,说明没有时间事件// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {// 设置文件事件不阻塞tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */// 文件事件可以阻塞直到有事件到达为止tvp = NULL; /* wait forever */}}// 处理文件事件,阻塞时间由 tvp 决定numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {// 从已就绪数组中获取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// 读事件if (fe->mask & mask & AE_READABLE) {// rfired 确保读/写事件只能执行其中一个rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}// 写事件if (fe->mask & mask & AE_WRITABLE) {printf("can writable\n");if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* Check time events */// 执行时间事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */ } /** 获取可执行事件*/ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 等待时间retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//epoll_wait用于向用户进程返回ready list// 有至少一个事件就绪?if (retval > 0) {int j;// 为已就绪事件设置相应的模式// 并加入到 eventLoop 的 fired 数组中numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}// 返回已就绪事件个数return numevents; }aeProcessEvents一直被循环调用用来处理就绪的文件事件(时间事件这里不考虑),通过调用aeApiPoll中的epoll_wait等待事件的促发。
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; /* A fired event** 已就绪事件*/ typedef struct aeFiredEvent {// 已就绪文件描述符int fd;// 事件类型掩码,// 值可以是 AE_READABLE 或 AE_WRITABLE// 或者是两者的或int mask;} aeFiredEvent;上面列出了epoll结构体和aeFiredEvent(已就绪事件结构体),aeFiredEvent属于事件处理器状态(aeEventLoop)成员,并循环保存就绪文件事件对应中已就绪描述符和其类型,这些又都保存在事件处理器状态(aeEventLoop)中。函数返回到aeProcessEvents中,然后走对应的回调(这时候还没讲回调关联对应的函数)。
现在假如有客户端来连接了,按前面说的,套接字变的可读,acceptTcpHandler被调用,acceptTcpHandler函数接收客户端的连接,并为客户端创建状态,并注册读取客户端命令的函数readQueryFromClient。并把创建的客户端保存在redisServer里面的list *clients里面。
/** 创建一个新客户端*/ redisClient *createClient(int fd) {printf("-----------%s--------\n",__FUNCTION__);// 分配空间redisClient *c = zmalloc(sizeof(redisClient));/* passing -1 as fd it is possible to create a non connected client.* This is useful since all the Redis commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */// 当 fd 不为 -1 时,创建带网络连接的客户端// 如果 fd 为 -1 ,那么创建无网络连接的伪客户端// 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时// 需要用到这种伪终端if (fd != -1) {// 非阻塞anetNonBlock(NULL,fd);// 禁用 Nagle 算法anetEnableTcpNoDelay(NULL,fd);// 设置 keep aliveif (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);// 绑定读事件到事件 loop (开始接收命令请求)if (aeCreateFileEvent(server.el,fd,AE_READABLE, //客户端连接上之后,再为客户端关联一个读数据的函数。之前关联的建立连接readQueryFromClient, c) == AE_ERR) //没有建立连接之前关联建立函数,建立连接之后关联读数据的函数{close(fd);zfree(c);return NULL;}}// 初始化各个属性// 默认数据库selectDb(c,0);// 套接字c->fd = fd;// 名字c->name = NULL;// 回复缓冲区的偏移量c->bufpos = 0;// 查询缓冲区c->querybuf = sdsempty();// 查询缓冲区峰值c->querybuf_peak = 0;// 命令请求的类型c->reqtype = 0;// 命令参数数量c->argc = 0;// 命令参数c->argv = NULL;// 当前执行的命令和最近一次执行的命令c->cmd = c->lastcmd = NULL;// 查询缓冲区中未读入的命令内容数量c->multibulklen = 0;// 读入的参数的长度c->bulklen = -1;// 已发送字节数c->sentlen = 0;// 状态 FLAGc->flags = 0;// 创建时间和最后一次互动时间c->ctime = c->lastinteraction = server.unixtime;// 认证状态c->authenticated = 0;// 复制状态c->replstate = REDIS_REPL_NONE;// 复制偏移量c->reploff = 0;// 通过 ACK 命令接收到的偏移量c->repl_ack_off = 0;// 通过 AKC 命令接收到偏移量的时间c->repl_ack_time = 0;// 客户端为从服务器时使用,记录了从服务器所使用的端口号c->slave_listening_port = 0;// 回复链表c->reply = listCreate();// 回复链表的字节量c->reply_bytes = 0;// 回复缓冲区大小达到软限制的时间c->obuf_soft_limit_reached_time = 0;// 回复链表的释放和复制函数listSetFreeMethod(c->reply,decrRefCountVoid);listSetDupMethod(c->reply,dupClientReplyValue);// 阻塞类型c->btype = REDIS_BLOCKED_NONE;// 阻塞超时c->bpop.timeout = 0;// 造成客户端阻塞的列表键c->bpop.keys = dictCreate(&setDictType,NULL);// 在解除阻塞时将元素推入到 target 指定的键中// BRPOPLPUSH 命令时使用c->bpop.target = NULL;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;// 进行事务时监视的键c->watched_keys = listCreate();// 订阅的频道和模式c->pubsub_channels = dictCreate(&setDictType,NULL);c->pubsub_patterns = listCreate();c->peerid = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);listSetMatchMethod(c->pubsub_patterns,listMatchObjects);// 如果不是伪客户端,那么添加到服务器的客户端链表中if (fd != -1) listAddNodeTail(server.clients,c);// 初始化客户端的事务状态initClientMultiState(c);// 返回客户端return c; } /* With multiplexing we need to take per-client state.* Clients are taken in a liked list.** 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。** 多个客户端状态被服务器用链表连接起来。*/ typedef struct redisClient {// 套接字描述符int fd;// 当前正在使用的数据库redisDb *db;// 当前正在使用的数据库的 id (号码)int dictid;// 客户端的名字robj *name; /* As set by CLIENT SETNAME */// 查询缓冲区sds querybuf;// 查询缓冲区长度峰值size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */// 参数数量int argc;// 参数对象数组robj **argv;// 记录被客户端执行的命令struct redisCommand *cmd, *lastcmd;// 请求的类型:内联命令还是多条命令int reqtype;// 剩余未读取的命令内容数量int multibulklen; /* number of multi bulk arguments left to read */// 命令内容的长度long bulklen; /* length of bulk argument in multi bulk request */// 回复链表list *reply;// 回复链表中对象的总大小unsigned long reply_bytes; /* Tot bytes of objects in reply list */// 已发送字节,处理 short write 用int sentlen; /* Amount of bytes already sent in the currentbuffer or object being sent. */// 创建客户端的时间time_t ctime; /* Client creation time */// 客户端最后一次和服务器互动的时间time_t lastinteraction; /* time of the last interaction, used for timeout */// 客户端的输出缓冲区超过软性限制的时间time_t obuf_soft_limit_reached_time;// 客户端状态标志int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */// 当 server.requirepass 不为 NULL 时// 代表认证的状态// 0 代表未认证, 1 代表已认证int authenticated; /* when requirepass is non-NULL */// 复制状态int replstate; /* replication state if this is a slave */// 用于保存主服务器传来的 RDB 文件的文件描述符int repldbfd; /* replication DB file descriptor */// 读取主服务器传来的 RDB 文件的偏移量off_t repldboff; /* replication DB file offset */// 主服务器传来的 RDB 文件的大小off_t repldbsize; /* replication DB file size */sds replpreamble; /* replication DB preamble. */// 主服务器的复制偏移量long long reploff; /* replication offset if this is our master */// 从服务器最后一次发送 REPLCONF ACK 时的偏移量long long repl_ack_off; /* replication ack offset, if this is a slave */// 从服务器最后一次发送 REPLCONF ACK 的时间long long repl_ack_time;/* replication ack time, if this is a slave */// 主服务器的 master run ID// 保存在客户端,用于执行部分重同步char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */// 从服务器的监听端口号int slave_listening_port; /* As configured with: SLAVECONF listening-port */// 事务状态multiState mstate; /* MULTI/EXEC state */// 阻塞类型int btype; /* Type of blocking op if REDIS_BLOCKED. */// 阻塞状态blockingState bpop; /* blocking state */// 最后被写入的全局复制偏移量long long woff; /* Last write global replication offset. */// 被监视的键list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */// 这个字典记录了客户端所有订阅的频道// 键为频道名字,值为 NULL// 也即是,一个频道的集合dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */// 链表,包含多个 pubsubPattern 结构// 记录了所有订阅频道的客户端的信息// 新 pubsubPattern 结构总是被添加到表尾list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */sds peerid; /* Cached peer ID. *//* Response buffer */// 回复偏移量int bufpos;// 回复缓冲区char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;当客户端发送命令过来时,epoll返回,readQueryFromClient被调用,注意回调函数的转变。没建立连接之前是关联acceptTcpHandler,建立连接之后关联readQueryFromClient函数读取客户端的数据。
/** 读取客户端的查询缓冲区内容*/ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {printf("-----------%s--------\n",__FUNCTION__);redisClient *c = (redisClient*) privdata;int nread, readlen;size_t qblen;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);// 设置服务器的当前客户端server.current_client = c;// 读入长度(默认为 16 MB)readlen = REDIS_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= REDIS_MBULK_BIG_ARG){int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);if (remaining < readlen) readlen = remaining;}// 获取查询缓冲区当前内容的长度// 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面// 这些滞留内容也许不能完整构成一个符合协议的命令,qblen = sdslen(c->querybuf);// 如果有需要,更新缓冲区内容长度的峰值(peak)if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;// 为查询缓冲区分配空间c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);// 读入内容到查询缓存nread = read(fd, c->querybuf+qblen, readlen);//接收客户端发送过来的数据到// 读入出错if (nread == -1) {if (errno == EAGAIN) {nread = 0;} else {redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}// 遇到 EOF} else if (nread == 0) {redisLog(REDIS_VERBOSE, "Client closed connection");freeClient(c);return;}if (nread) {// 根据内容,更新查询缓冲区(SDS) free 和 len 属性// 并将 '\0' 正确地放到内容的最后sdsIncrLen(c->querybuf,nread);// 记录服务器和客户端最后一次互动的时间c->lastinteraction = server.unixtime;// 如果客户端是 master 的话,更新它的复制偏移量if (c->flags & REDIS_MASTER) c->reploff += nread;} else {// 在 nread == -1 且 errno == EAGAIN 时运行server.current_client = NULL;return;}// 查询缓冲区长度超出服务器最大缓冲区长度// 清空缓冲区并释放客户端if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}// 从查询缓存重读取内容,创建参数,并执行命令// 函数会执行到缓存中的所有内容都被处理完为止processInputBuffer(c);server.current_client = NULL; }收到客户端的命令之后就要分析并执行命令,然后被结果返给客户端。readQueryFromClient->processInputBuffer->processCommand->addReply->prepareClientToWrite。prepareClientToWrite这个函数就是注册回复客户端的函数sendReplyToClient。
int prepareClientToWrite(redisClient *c) {// LUA 脚本环境所使用的伪客户端总是可写的if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;// 客户端是主服务器并且不接受查询,// 那么它是不可写的,出错if ((c->flags & REDIS_MASTER) &&!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;// 无连接的伪客户端总是不可写的if (c->fd <= 0) return REDIS_ERR; /* Fake client */// 一般情况,为客户端套接字安装写处理器到事件循环if (c->bufpos == 0 && listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||c->replstate == REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR) return REDIS_ERR;return REDIS_OK; }每一个阶段都关联一个回调函数,当事件触发后走回调函数。
总结
以上是生活随笔为你收集整理的redis源码客户端和服务端通信过程的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 计算机组成原理:储存系统和结构
- 下一篇: redis源码epoll用法