赞
踩
在server.c文件的initServer函数中,对aeEventLoop进行了初始化:
void initServer(void) { // 创建aeEventLoop结构体 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); } // 省略其他代码... for (j = 0; j < server.ipfd_count; j++) { // 注册监听事件,server.ipfd是TCP文件描述符,AE_READABLE可读事件,acceptTcpHandler事件处理回调函数 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } // 省略其他代码... }
在aeCreateEventLoop函数调用时,传入的最大文件描述符个数为客户端最大连接数+宏定义CONFIG_FDSET_INCR的大小,CONFIG_FDSET_INCR的定义在server.h中:
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
#define CONFIG_MIN_RESERVED_FDS 32
aeEventLoop结构体定义,在ae.h中:
typedef struct aeEventLoop {
int maxfd; /* 记录最大的文件描述符 */
int setsize; /* 最大文件描述符个数 */
long long timeEventNextId;
time_t lastTime;
aeFileEvent *events; /* IO事件集合,记录了每个文件描述符产生事件时的回调函数 */
aeFiredEvent *fired; /* 记录已触发的事件 */
aeTimeEvent *timeEventHead; /* 时间事件 */
int stop;
void *apidata; /* IO多路复用API接口相关数据 */
aeBeforeSleepProc *beforesleep;/* 进入事件循环流程前的执行函数 */
aeBeforeSleepProc *aftersleep;/* 退出事件循环流程后的执行函数 */
} aeEventLoop;
aeCreateEventLoop
aeEventLoop结构体创建在aeCreateEventLoop函数中(ae.c文件):
aeEventLoop *aeCreateEventLoop(int setsize) { // aeEventLoop结构体 aeEventLoop *eventLoop; int i; // 分配eventLoop内存 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; // 分配IO事件内存 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; // 创建poll实例 if (aeApiCreate(eventLoop) == -1) goto err; for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; // 初始化为空事件 return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
aeApiState结构体定义,在ae_epoll.c中:
epfd:创建的epoll实例文件描述符
events:记录文件描述符产生的事件
typedef struct aeApiState {
int epfd; // epoll实例文件描述符
struct epoll_event *events; // 记录就绪的事件
} aeApiState;
aeApiCreate
epoll实例的的创建在aeApiCreate函数(ae_epoll.c文件)中,处理逻辑如下:
为aeApiState结构体分配内存空间
为aeApiState中的events分配内存空间,events数组个数为eventLoop中的最大文件描述个数
调用epoll_create函数创建epoll实例,将返回的epoll文件描述符保存在epfd中
将eventLoop的apidata指向创建的aeApiState,之后就可以通过eventLoop获取到epoll实例并且注册监听事件了
static int aeApiCreate(aeEventLoop *eventLoop) { // 分配内存 aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; // 为epoll事件分配内存 state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } // epoll_create创建epoll实例,返回文件描述符,保存在state的epfd中 state->epfd = epoll_create(1024); if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } // 将aeApiState设置到eventLoop的apidata eventLoop->apidata = state; return 0; }
IO 事件的数据结构是 aeFileEvent 结构体,在ae.c中定义:
mask:事件类型掩码,共有READABLE、WRITABLE、BARRIER三种事件,分别为可读事件、可写事件和屏障事件
rfileProc:写事件回调函数
wfileProc:读事件回调函数
typedef struct aeFileEvent {
int mask; /* 事件类型掩码 READABLE|WRITABLE|BARRIER */
aeFileProc *rfileProc; /* 写事件回调函数 */
aeFileProc *wfileProc; /* 读事件回调函数 */
void *clientData; /* 客户端数据 */
} aeFileEvent;
aeCreateFileEvent
aeCreateFileEvent函数在ae.c文件中,主要处理逻辑如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } // 根据传入的文件描述符获取对应的IO事件 aeFileEvent *fe = &eventLoop->events[fd]; // 注册要监听的事件,让内核可以监听到当前文件描述符上的IO事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; // 设置写事件的回调函数 if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置读事件的回调函数 fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
aeApiAddEvent
aeApiAddEvent用于注册事件(ae_epoll.c文件中):
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { // 获取aeApiState aeApiState *state = eventLoop->apidata; // 创建epoll_event类型的变量ee,添加监听事件的时候使用 struct epoll_event ee = {0}; /* avoid valgrind warning */ /* 如果fd文件描述符还未设置监听事件, 类型设置为添加,否则设置为修改,简言之就是根据掩码判断是添加还是修改监听事件 */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; // 如果是可读事件,转换为epoll的读事件监听类型EPOLLIN,并设置到ee的events中 if (mask & AE_READABLE) ee.events |= EPOLLIN; // 如果是可写事件,转换为epoll的写事件监听类型EPOLLOUT,并设置到ee的events中 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 记录要监听的文件描述符 ee.data.fd = fd; // 调用epoll_ctl函数向epoll添加监听事件,参数分别为epoll实例、操作类型、要监听的文件描述符、epoll_event类型变量ee if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
总结
Redis在启动时,调用aeCreateEventLoop创建aeEventLoop结构体和epoll实例,之后调用aeCreateFileEvent函数向内核注册TCP文件描述符的监听事件,当有客户端连接Redis服务时,TCP文件描述符产生可读事件,通过epoll可以获取产生事件的文件描述符,Redis就可以对连接请求进行处理。
// server.el是eventLoop
// server.ipfd[j]是监听socket的文件描述符
// AE_READABLE是读事件
// acceptTcpHandler是事件产生时的回调函数
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
aeMain函数在ae.c文件中,里面是一个while循环,它的处理逻辑如下:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 调用了aeProcessEvents处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
aeProcessEvents
aeProcessEvents函数在ae.c文件中,处理逻辑如下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* 如果没有事件 */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* 如果有IO事件或者时间事件 */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 省略代码... // 等待事件,返回就绪文件描述符的数量 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 处理就绪的事件 for (j = 0; j < numevents; j++) { // aeApiPoll中已将就绪的事件放在了fired中,通过fired可以获取到产生事件的文件描述符fd // 根据文件描述符fd获取对应的事件aeFileEvent,aeFileEvent中记录了事件的回调函数 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; // 获取文件描述符 int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* 判断屏障 */ int invert = fe->mask & AE_BARRIER; /* 处理可读事件 */ if (!invert && fe->mask & mask & AE_READABLE) { // 如果是可读事件,调用可读事件的回调函数,参数分别为eventLoop、文件描述符、aeFileEvent的clientData、事件类型掩码 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* 处理可写事件 */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { // 如果是写事件,调用写事件的回调函数,参数分别为eventLoop、文件描述符、aeFileEvent的clientData、事件类型掩码 fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* 如果有时间事件 */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
aeApiPoll
aeApiPoll处理就绪的事件:
调用IO多路复用epoll_wait函数等待事件的产生,epoll_wait函数需要传入epoll实例、记录就绪事件集合的epoll_event,这两个参数分别在aeApiState的epfd和events中,当监听的文件描述符有事件产生时,epoll_wait返回就绪的文件描述符个数
对epoll_wait返回的就绪事件进行处理,事件记录在events变量中,遍历每一个就绪的事件,将事件对应的文件描述符设置在eventLoop的fire中,后续通过fire对事件进行处理
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { // 获取aeApiState,aeApiState记录了epoll实例,events记录了产生的事件 aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 等待事件的产生,epoll_wait返回就绪的文件描述符个数,就绪的事件记录在state->events中 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; // 处理返回的就绪事件 for (j = 0; j < numevents; j++) { int mask = 0; // 获取每一个就绪的事件 struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; // 将就绪事件的文件描述符设置到已触发的事件fired的fd中 eventLoop->fired[j].fd = e->data.fd; // 设置事件类型掩码 eventLoop->fired[j].mask = mask; } } return numevents; }
acceptTcpHandler
由上面的调用可知,Redis在启动时,注册了AE_READABLE读事件,回调函数为acceptTcpHandler(network.c文件中)用于处理客户端连接,当有客户端与Redis连接时,epoll返回就绪的文件描述符,Redis在处理就绪的事件时调用acceptTcpHandler进行处理:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { // 建立连接,返回已连接的套接字文件描述符cfd cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); // 调用acceptCommonHandler处理连接,这里传入的文件描述符为已连接的套接字 acceptCommonHandler(cfd,0,cip); } } static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // 调用createClient if ((c = createClient(fd)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } // .. }
createClient
createClient函数中调用了aeCreateFileEvent方法向内核中注册可读事件,上文可知传入的描述符是已连接套接字cfd,回调函数为readQueryFromClient,此时事件驱动框架增加了对客户端已连接套接字的监听,当客户端有数据发送到服务端时,Redis调用readQueryFromClient函数处理读事件:
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 注册已连接套接字的可读事件,回调函数为readQueryFromClient if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } // ... }
readQueryFromClient
readQueryFromClient函数在network.c文件中,是可读事件的回调函数,用于处理已连接套接字上的读事件,处理逻辑如下:
// aeProcessEvents中调用回调函数时,传入的参数分别为aeEventLoop、已连接套接字的文件描述符、aeFileEvent的clientData私有数据、事件类型掩码 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); readlen = PROTO_IOBUF_LEN; if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 从已连接的套接字中读取客户端的请求数据到输入缓冲区 nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } // 省略... /* 处理输入缓冲区数据 */ processInputBufferAndReplicate(c); }
在aeMain调用aeProcessEvents之前,先调用了beforeSleep方法,beforeSleep中又调用了handleClientsWithPendingWrites,它会将Redis Server缓冲区的数据写回到客户端:
void beforeSleep(struct aeEventLoop *eventLoop) {
// 省略...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
// 省略...
}.
handleClientsWithPendingWrites
Redis Server收到客户端的请求命令后,需要处理请求,然后将要返回的数据写回到客户端,写回到客户端的逻辑在handleClientsWithPendingWrites函数中,处理逻辑如下:
int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); // 获取待写回数据的客户端列表 listRewind(server.clients_pending_write,&li); // 遍历每一个待写回数据的客户端 while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); if (c->flags & CLIENT_PROTECTED) continue; /* 将缓冲区的数据写到客户端socket中 */ if (writeToClient(c->fd,c,0) == C_ERR) continue; /* 如果数据未全部写回到客户端 */ if (clientHasPendingReplies(c)) { int ae_flags = AE_WRITABLE; if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_flags |= AE_BARRIER; } // 调用aeCreateFileEvent方法,向内核注册客户端文件描述符的可写事件监听,交由回调函数sendReplyToClient处理 if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; }
clientHasPendingReplies
有时由于网络原因或者其他原因,可能只发出去了部分数据,客户端如果一直未从缓冲区读取数据,在缓冲区已满的情况,服务端将无法往客户端发送数据,所以调用clientHasPendingReplies函数判断数据是否写回完毕,如果未写回完毕交由事件循环驱动处理,提高处理效率。
整体流程图
总结
参考
【osc_avxkth26】Redis 网络通信模块源码分析(3)
Redis版本:redis-5.0.8
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。