Redis事件驱动框架
经常会遇到这样一道问题:Redis的网络框架是实现了 Reactor
模型吗?
(1) Reactor模型是什么
Reactor模型是网络服务器端用来处理高并发网络IO请求的一种编程模型。
Reactor的核心思想:Reactor模式 也叫 Dispatcher 模式,将关注的IO事件注册到多路复用器上,一旦有IO事件触发,将事件分发到事件处理器中,执行就绪IO事件对应的处理函数中。
(1.1) 角色
Reactor模型中定义的三种角色:
角色 | 职责 |
---|---|
Reactor | 负责监听和分配事件,将I/O事件分派给对应的Handler。 |
Acceptor | 处理客户端新连接,并分派请求到处理器链中。 |
Handler | 将自身与事件绑定,执行非阻塞读/写任务,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。可用资源池来管理。 |
(1.2) 事件
Reactor 模型处理的是客户端和服务器端的交互过程,而这三类事件正好对应了客户端和服务器端交互过程中,不同类请求在服务器端引发的待处理事件:
事件类型 | 解释 | 备注 |
---|---|---|
连接事件 | 客户端和服务器建立连接 | 对应connect()函数 |
写事件 | 连接建立后,客户端会给服务器端发送读请求数据;\n 或 服务器端处理完客户端请求后写数据。 | 对应write()函数 |
读事件 | 服务器端读取请求数据。 | 对应 read()函数 |
(1.3) Reactor处理请求的流程
连接事件由 acceptor 来处理,负责接收连接;acceptor 在接收连接后,会创建 handler,用于网络连接上对后续读写事件的处理;
读写事件由 handler 处理;
在高并发场景中,连接事件、读写事件会同时发生,需要有一个角色专门监听和分配事件,这就是 reactor 角色。当有连接请求时,reactor 将产生的连接事件交由 acceptor 处理;当有读写请求时,reactor 将读写事件交由 handler 处理。
1、单 Reactor 单线程;
2、单 Reactor 多线程;
3、主从 Reactor 多线程。
(2) 为什么要用Reactor模型
(2.1) BIO模型的优缺点
优点:
1、使用简单,容易编程
2、在多核系统下,能够充分利用了多核CPU的资源。
缺点:
该模式的本质问题在于严重依赖线程,随着并发访问量的增加,线程数量的不断膨胀将服务端的性能将急剧下降。
1、线程的创建与销毁都需要调用系统函数,开销比较高。
2、资源消耗大。大量空闲的线程会占用许多内存;并发量大时,线程资源争抢严重,CPU性能可能会下降。
3、线程的切换成本高。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。线程数过高,会带来许多无用的上下文切换,可能导致执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统负载偏高、CPU sy(系统CPU)使用率特别高。
4、客户端和服务器端的连接会一直保留着,可能会存在大量线程在大量时间内都处于空置状态的情况。
(2.2) Reactor模型的优缺点
Reactor模型具有如下的优点:
响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
可扩展性,可以方便地通过增加Reactor实例个数来充分利用CPU资源;
可复用性,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。
(3) 如何按照Reactor模型实现的
Redis 为了实现事件驱动框架,相应地定义了事件的数据结构、框架主循环函数、事件捕获分发函数、事件和 handler 注册函数。
(4) 源码解析
(4.1) 事件的数据结构
Redis 的事件驱动框架定义了两类事件:IO事件和时间事件,分别对应了客户端发送的网络请求和 Redis自身的周期性操作。
// file: ae.h
/* 文件事件结构 */
typedef struct aeFileEvent {
int mask; // 标记 可读/可写/屏障
aeFileProc *rfileProc; // 写事件函数
aeFileProc *wfileProc; // 读事件函数
void *clientData; //
} aeFileEvent;
// file: ae.h
/* 时间事件结构 */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
(4.2) 事件执行入口-main函数
// file: src/ae.c
/*
* 循环处理事件
*
* @param *eventLoop
*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 循环处理事件
while (!eventLoop->stop) {
// 处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
(4.3) 获取就绪事件并分发
// file: src/ae.c
/*
* 处理事件
*
* @param *eventLoop
* @param flags
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 省略部分细节...
// 调用多路复用API获取就绪事件
numevents = aeApiPoll(eventLoop, tvp);
// 处理写事件
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
// 处理读事件
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// file: src/ae_poll.c
/*
* 获取就绪事件
*
* @param *eventLoop
* @param *tvp
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
// 等待事件
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 调linux epoll_wait函数来获取已就绪socket
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// ...
return numevents;
}
(4.4) 事件注册
// file: src/ae.c
/*
* @param *eventLoop
* @param fd
* @param mask 0:未注册事件 1:描述符可读时触发 2:描述符可写时触发 3:
* @param *proc aeFileProc类型 入参传的是 acceptTcpHandler函数 回调时会用到这个函数
* @param *clientData
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 从aeFileEvent事件数组里取出一个文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定fd的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置文件事件类型 以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc; // 设置读事件函数
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置写事件函数
// 私有数据
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
(5) Redis里的事件
Redis事件驱动框架是如何以事件形式,处理 server 运行过程中面临的请求操作和多种任务的。
(5.1) 事件循环结构体
// file: src/ae.h
/*
* 基于事件的程序的状态
* State of an event based program
*/
typedef struct aeEventLoop {
int maxfd; // 当前注册的最大文件描述符
int setsize; // 跟踪的最大文件描述符数
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; // 注册事件数组的指针 指向aeFileEvent数组
aeFiredEvent *fired; // 就绪事件数组的指针 指向aeFiredEvent数组
aeTimeEvent *timeEventHead; // 时间事件
int stop;
void *apidata; // 指向aeApiState结构体 创建的epoll对象就在aeApiState->epfd
aeBeforeSleepProc *beforesleep; // 在事件处理前执行的函数
aeBeforeSleepProc *aftersleep; // 在事件处理后执行的函数
int flags;
} aeEventLoop;
// file: src/ae.h
/*
* 文件事件结构
* File event structure
*/
typedef struct aeFileEvent {
int mask; // 标记 可读/可写/屏障
aeFileProc *rfileProc; // 写事件回调
aeFileProc *wfileProc; // 读事件回调
void *clientData; // 扩展数据
} aeFileEvent;
(5.2) 事件对象的初始化
//file: src/server.c
void initServer(void) {
// 2.1 创建 epoll
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
// 省略部分代码
}
事件的个数对应入参里的 server.maxclients+CONFIG_FDSET_INCR
,
server.maxclients
变量的值大小,可以在 Redis 的配置文件 redis.conf
中进行定义,默认值是 1000。
CONFIG_FDSET_INCR
的大小 = 32 + 96
(5.2.1) IO多路复用模块初始化
Redis 在操作系统提供的 epoll 对象基础上又封装了一个 eventLoop 出来,所以创建的时候是先申请和创建 eventLoop。
// file: src/ae.c
/*
* 创建aeEventLoop结构体
*
* @param setsize
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
// ... 省略部分代码
eventLoop = zmalloc(sizeof(*eventLoop))
// 将来的各种回调事件就都会存在这里
// eventLoop->events是一个指针 指向数组 元素类型:aeFileEvent 大小:setsize
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
// ... 省略部分代码
// 创建epoll
aeApiCreate(eventLoop)
}
(5.3) IO事件处理
Redis 的 IO 事件主要包括三类,分别是可读事件、可写事件和屏障事件。
读事件:从客户端读取数据
写事件:向客户端写入数据。
屏障事件的主要作用是用来反转事件的处理顺序。
(5.3.1) IO事件创建/注册事件
// file: src/ae.c
/*
* @param *eventLoop
* @param fd
* @param mask 0:未注册事件 1:描述符可读时触发 2:描述符可写时触发 3:
* @param *proc aeFileProc类型 入参传的是 acceptTcpHandler函数 回调时会用到这个函数
* @param *clientData
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 从aeFileEvent事件数组里取出一个文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定fd的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置文件事件类型 以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc; // 设置读事件回调
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置写事件回调
// 私有数据
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
//file: src/ae_epoll.c
// 添加事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// ...
// epoll_ctl 添加事件
epoll_ctl(state->epfd,op,fd,&ee);
return 0;
}
(5.3.2) 读事件处理
当 Redis server 接收到客户端的连接请求时,就会使用注册好的 acceptTcpHandler 函数进行处理。
acceptTcpHandler -> acceptCommonHandler -> createClient-> aeCreateFileEvent
aeCreateFileEvent 函数会针对已连接套接字上,创建监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient。
到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了。
// file: src/networking.c
/*
* @param *conn
*/
client *createClient(connection *conn) {
// 为用户连接创建client结构体
client *c = zmalloc(sizeof(client));
if (conn) {
// ... 处理连接
// 注册读事件处理器,等连接可读时调用 回调函数是readQueryFromClient
connSetReadHandler(conn, readQueryFromClient);
// 会把新创建的client结构体放到 conn结构体的private_data字段里
connSetPrivateData(conn, c);
}
// 设置client的一些参数
selectDb(c,0);
uint64_t client_id = ++server.next_client_id;
c->id = client_id;
c->resp = 2;
c->conn = conn;
// ...
return c;
}
(5.3.3) 写事件处理
Redis 实例在收到客户端请求后,会在处理客户端命令后,将要返回的数据写入客户端输出缓冲区。
beforeSleep -> handleClientsWithPendingWrites
(5.4) 时间事件处理
typedef struct aeTimeEvent {
long long id; // 时间事件ID
long when_sec; // 事件到达的秒级时间戳
long when_ms; // 事件到达的毫秒级时间戳
aeTimeProc *timeProc; // 时间事件触发后的处理函数
aeEventFinalizerProc *finalizerProc; // 事件结束后的处理函数
void *clientData; // 事件相关的私有数据
struct aeTimeEvent *prev; // 时间事件链表的前向指针
struct aeTimeEvent *next; // 时间事件链表的后向指针
} aeTimeEvent;
(5.4.1) 时间事件创建
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
(5.4.2) 时间事件的触发
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
// 省略部分代码
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
// 链表节点
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
// 遍历链表
while(te) {
long now_sec, now_ms;
long long id;
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
te->refcount++;
// 处理
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
参考资料
[1] Redis源码剖析与实战 - 10 | Redis事件驱动框架(中):Redis实现了Reactor模型吗?
[2] Netty中的Reactor模型详解