本篇文章給大家?guī)砹岁P(guān)于redis的相關(guān)問題,其中主要介紹了關(guān)于事件處理示例的相關(guān)內(nèi)容,包括了redis事件介紹、事件的抽象以及事件的實現(xiàn)等內(nèi)容,下面一起來看一下,希望對大家有幫助。
千萬級數(shù)據(jù)并發(fā)如何處理?進入學(xué)習(xí)
推薦學(xué)習(xí):Redis視頻教程
1. Redis事件介紹
Redis服務(wù)器是一個事件驅(qū)動程序
,所謂事件驅(qū)動就是輸入一條命令并且按下回車,然后消息被組裝成Redis
協(xié)議的格式發(fā)送給Redis
服務(wù)器,這個時候就會產(chǎn)生一個事件,Redis
服務(wù)器會接收改命令,處理該命令和發(fā)送回復(fù),而當(dāng)我們沒有與服務(wù)器進行交互時,服務(wù)器就會處于阻塞等待狀態(tài),它會讓出CPU然后進入睡眠狀態(tài),當(dāng)事件觸發(fā)時,就會被操作系統(tǒng)喚醒.
而Redis服務(wù)器需要處理以下兩類事件:
文件事件
:Redis 服務(wù)器通過套接字與客戶端(或者其他Redis服務(wù)器)進行連接,而文件事件就是服務(wù)器對套接字操作的抽象. 服務(wù)器與客戶端(或者其他服務(wù)器)的通信會產(chǎn)生相應(yīng)的文件事件,而服務(wù)器則通過監(jiān)聽并處理這些事件來完成一系列網(wǎng)絡(luò)通信操作.
時間事件
:Redis 服務(wù)器中的一些操作(比如serverCron函數(shù))需要在給定的時間點執(zhí)行,而時間事件就是服務(wù)器對這類定時操作的抽象.
2. 事件的抽象
Redis把文件事件
和時間事件
分別抽象成一個數(shù)據(jù)結(jié)構(gòu)來管理.
2.1 文件事件結(jié)構(gòu)
typedef struct aeFileEvent { // 文件時間類型:AE_NONE,AE_READABLE,AE_WRITABLE int mask; // 可讀處理函數(shù) aeFileProc *rfileProc; // 可寫處理函數(shù) aeFileProc *wfileProc; // 客戶端傳入的數(shù)據(jù) void *clientData; } aeFileEvent; //文件事件
其中rfileProc
和wfileProc
成員分別為兩個函數(shù)指針,他們的原型為:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
該函數(shù)是回調(diào)函數(shù)
,如果當(dāng)前文件事件所指定的事件類型發(fā)生時,則會調(diào)用對應(yīng)的回調(diào)函數(shù)
來處理該事件.
當(dāng)事件就緒的時候,我們需要知道文件事件的文件描述符還有事件類型才能對于鎖定該事件,因此定義了aeFiredEvent
結(jié)構(gòu)統(tǒng)一管理:
typedef struct aeFiredEvent { // 就緒事件的文件描述符 int fd; // 就緒事件類型:AE_NONE,AE_READABLE,AE_WRITABLE int mask; } aeFiredEvent; //就緒事件
文件事件的類型:
#define AE_NONE 0 //未設(shè)置 #define AE_READABLE 1 //事件可讀 #define AE_WRITABLE 2 //事件可寫
2.2 時間事件結(jié)構(gòu)
typedef struct aeTimeEvent { // 時間事件的id long long id; // 時間事件到達(dá)的時間的秒數(shù) long when_sec; /* seconds */ // 時間事件到達(dá)的時間的毫秒數(shù) long when_ms; /* milliseconds */ // 時間事件處理函數(shù) aeTimeProc *timeProc; // 時間事件終結(jié)函數(shù) aeEventFinalizerProc *finalizerProc; // 客戶端傳入的數(shù)據(jù) void *clientData; // 指向下一個時間事件 struct aeTimeEvent *next; } aeTimeEvent; //時間事件
可以看出,時間事件的結(jié)構(gòu)就是一個鏈表的節(jié)點,因為struct aeTimeEvent *next
是指向下一個時間事件的指針.
和文件事件一樣,當(dāng)時間事件所指定的事件發(fā)生時,也會調(diào)用對應(yīng)的回調(diào)函數(shù)
,結(jié)構(gòu)成員timeProc
和finalizerProc
都是回調(diào)函數(shù),函數(shù)原型如下:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
雖然對文件事件和時間事件都做了抽象,Redis
仍然需要對事件做一個整體的抽象,用來描述一個事件的狀態(tài). 也就是下面要介紹的事件狀態(tài)結(jié)構(gòu):aeEventLoop
.
2.3 事件狀態(tài)結(jié)構(gòu)
typedef struct aeEventLoop { // 當(dāng)前已注冊的最大的文件描述符 int maxfd; /* highest file descriptor currently registered */ // 文件描述符監(jiān)聽集合的大小 int setsize; /* max number of file descriptors tracked */ // 下一個時間事件的ID long long timeEventNextId; // 最后一次執(zhí)行事件的時間 time_t lastTime; /* Used to detect system clock skew */ // 注冊的文件事件表 aeFileEvent *events; /* Registered events */ // 已就緒的文件事件表 aeFiredEvent *fired; /* Fired events */ // 時間事件的頭節(jié)點指針 aeTimeEvent *timeEventHead; // 事件處理開關(guān) int stop; // 多路復(fù)用庫的事件狀態(tài)數(shù)據(jù) void *apidata; /* This is used for polling API specific data */ // 執(zhí)行處理事件之前的函數(shù) aeBeforeSleepProc *beforesleep; } aeEventLoop; //事件輪詢的狀態(tài)結(jié)構(gòu)
aeEventLoop
結(jié)構(gòu)保存了一個void *
類型的萬能指針apidata
,用來保存輪詢事件的狀態(tài),也就是保存底層調(diào)用的多路復(fù)用庫的事件狀態(tài).
Redis
的 I/O
多路復(fù)用程序的所有功能都是通過包裝常見的select
、epoll
、evport
和kqueue
這些I/O
多路復(fù)用函數(shù)庫來實現(xiàn)的,每個I/O多路復(fù)用函數(shù)庫在Redis源碼中都對應(yīng)著一個單獨的文件,比如ae_select.c
、ae_epoll.c
等等.
他們在編譯階段,會根據(jù)不同的系統(tǒng)選擇性能最高的一個多路復(fù)用庫作為Redis
的多路復(fù)用程序?qū)崿F(xiàn),而且所有庫的API都是相同的,這就可以讓Redis多路復(fù)用程序的底層可以互換.
下面是具體選擇庫的源碼:
// IO復(fù)用的選擇,性能依次下降,Linux支持 "ae_epoll.c" 和 "ae_select.c" #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
也可以通過命令INFO server
來查看當(dāng)前使用的是哪個多路復(fù)用庫:
可以看到Linux下默認(rèn)使用的是epoll
多路復(fù)用庫,那么apidata
保存的就是epoll
模型的事件狀態(tài)結(jié)構(gòu),它在ae_epoll.c
源文件中:
typedef struct aeApiState { // epoll事件的文件描述符 int epfd; // 事件表 struct epoll_event *events; } aeApiState; // 事件的狀態(tài)
epoll
模型的struct epoll_event
結(jié)構(gòu)中定義著epoll事件的類型,比如EPOLLIN
、EPOLLOUT
等等,但是Redis的文件結(jié)構(gòu)aeFileEvent
中也在mask
中定義了自己的事件類型,例如:AE_READABLE
、AE_WRITABLE
等等,于是就需要實現(xiàn)一個中間層將兩者的事件類型相聯(lián)系起來,這就是之前提到的ae_epoll.c
文件中實現(xiàn)的相同的API:
// 創(chuàng)建一個epoll實例,保存到eventLoop中 static int aeApiCreate(aeEventLoop *eventLoop) // 調(diào)整事件表的大小 static int aeApiResize(aeEventLoop *eventLoop, int setsize) // 釋放epoll實例和事件表空間 static void aeApiFree(aeEventLoop *eventLoop) // 在epfd標(biāo)識的事件表上注冊fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) // 在epfd標(biāo)識的事件表上注刪除fd的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) // 等待所監(jiān)聽文件描述符上有事件發(fā)生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) // 返回正在使用的IO多路復(fù)用庫的名字 static char *aeApiName(void)
這些API會講epoll
的底層函數(shù)封裝起來,Redis實現(xiàn)事件時,只需要調(diào)用這些接口即可.
我們以下面兩個API的源碼舉例:
aeApiAddEvent
該函數(shù)會向Redis事件狀態(tài)結(jié)構(gòu)aeEventLoop
的事件表event
注冊一個事件,對應(yīng)的是epoll_ctl
函數(shù).
// 在epfd標(biāo)識的事件表上注冊fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; // EPOLL_CTL_ADD,向epfd注冊fd的上的event // EPOLL_CTL_MOD,修改fd已注冊的event // #define AE_NONE 0 //未設(shè)置 // #define AE_READABLE 1 //事件可讀 // #define AE_WRITABLE 2 //事件可寫 // 判斷fd事件的操作,如果沒有設(shè)置事件,則進行關(guān)聯(lián)mask類型事件,否則進行修改 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // struct epoll_event { // uint32_t events; /* Epoll events */ // epoll_data_t data; /* User data variable */ // }; ee.events = 0; // 如果是修改事件,合并之前的事件類型 mask |= eventLoop->events[fd].mask; /* Merge old events */ // 根據(jù)mask映射epoll的事件類型 if (mask & AE_READABLE) ee.events |= EPOLLIN; //讀事件 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; //寫事件 ee.data.fd = fd; //設(shè)置事件所從屬的目標(biāo)文件描述符 // 將ee事件注冊到epoll中 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }
aeApiPoll
等待所監(jiān)聽文件描述符上有事件發(fā)生,對應(yīng)著底層的epoll_wait
函數(shù).
// 等待所監(jiān)聽文件描述符上有事件發(fā)生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 監(jiān)聽事件表上是否有事件發(fā)生 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 至少有一個就緒的事件 if (retval > 0) { int j; numevents = retval; // 遍歷就緒的事件表,將其加入到eventLoop的就緒事件表中 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; // 根據(jù)就緒的事件類型,設(shè)置mask if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; // 添加到就緒事件表中 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } // 返回就緒的事件個數(shù) return numevents; }
3. 事件的實現(xiàn)
事件的所有源碼都定義在ae.c
源文件中,先從aeMain
函數(shù)說起.
// 事件輪詢的主函數(shù) void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; // 一直處理事件 while (!eventLoop->stop) { // 執(zhí)行處理事件之前的函數(shù) if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //處理到時的時間事件和就緒的文件事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
可以看到,如果服務(wù)器一直處理事件,那么就是一個死循環(huán),而一個最典型的事件驅(qū)動,就是一個死循環(huán). 在循環(huán)中,程序會調(diào)用處理事件的函數(shù)aeProcessEvents()
,它的參數(shù)是一個事件狀態(tài)結(jié)構(gòu)aeEventLoop
和AE_ALL_EVENTS
.
事件類型的宏定義,在ae.h
頭文件中:
#define AE_FILE_EVENTS 1 //文件事件 #define AE_TIME_EVENTS 2 //時間事件 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //文件和時間事件 #define AE_DONT_WAIT 4
// 處理到時的時間事件和就緒的文件事件 // 如果flags = 0,函數(shù)什么都不做,直接返回 // 如果flags設(shè)置了 AE_ALL_EVENTS ,則執(zhí)行所有類型的事件 // 如果flags設(shè)置了 AE_FILE_EVENTS ,則執(zhí)行文件事件 // 如果flags設(shè)置了 AE_TIME_EVENTS ,則執(zhí)行時間事件 // 如果flags設(shè)置了 AE_DONT_WAIT ,那么函數(shù)處理完事件后直接返回,不阻塞等待 // 函數(shù)返回執(zhí)行的事件個數(shù) int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; // 如果什么事件都沒有設(shè)置則直接返回 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; // 請注意,既然我們要處理時間事件,即使沒有要處理的文件事件,我們?nèi)砸{(diào)用select(),以便在下一次事件準(zhǔn)備啟動之前進行休眠 // 當(dāng)前還沒有要處理的文件事件,或者設(shè)置了時間事件但是沒有設(shè)置不阻塞標(biāo)識 if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 如果設(shè)置了時間事件而沒有設(shè)置不阻塞標(biāo)識 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 獲取最近到時的時間事件 shortest = aeSearchNearestTimer(eventLoop); // 獲取到了最早到時的時間事件 if (shortest) { long now_sec, now_ms; // 獲取當(dāng)前時間 aeGetTime(&now_sec, &now_ms); tvp = &tv; // 等待該時間事件到時所需要的時長 long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; // 如果沒到時 if (ms > 0) { // 保存時長到tvp中 tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; // 如果已經(jīng)到時,則將tvp的時間設(shè)置為0 } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } // 沒有獲取到了最早到時的時間事件,時間事件鏈表為空 } else { // 如果設(shè)置了不阻塞標(biāo)識 if (flags & AE_DONT_WAIT) { // 將tvp的時間設(shè)置為0,就不會阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { // 阻塞到第一個時間事件的到來 /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 等待所監(jiān)聽文件描述符上有事件發(fā)生 // 如果tvp為NULL,則阻塞在此,否則等待tvp設(shè)置阻塞的時間,就會有時間事件到時 // 返回了就緒文件事件的個數(shù) numevents = aeApiPoll(eventLoop, tvp); // 遍歷就緒文件事件表 for (j = 0; j < numevents; j++) { // 獲取就緒文件事件的地址 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; // 獲取就緒文件事件的類型,文件描述符 int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; // 如果是文件可讀事件發(fā)生 if (fe->mask & mask & AE_READABLE) { // 設(shè)置讀事件標(biāo)識 且 調(diào)用讀事件方法處理讀事件 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 如果是文件可寫事件發(fā)生 if (fe->mask & mask & AE_WRITABLE) { // 讀寫事件的執(zhí)行發(fā)法不同,則執(zhí)行寫事件,避免重復(fù)執(zhí)行相同的方法 if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; //執(zhí)行的事件次數(shù)加1 } } /* Check time events */ // 執(zhí)行時間事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
Redis服務(wù)器在沒有被事件觸發(fā)時,如果沒有設(shè)置AE_DONT_WAIT
標(biāo)識,就會開始阻塞等待. 但是它不會死等待,因為還需要處理時間事件,所以在調(diào)用aeApiPoll
進行監(jiān)聽之前,會先從時間事件表中獲取一個最近到達(dá)的時間,根據(jù)需要等待的時間構(gòu)建一個struct timeval tv, *tvp
結(jié)構(gòu)的變量,這個變量保存著服務(wù)器阻塞等待文件事件的最長時間,一旦時間到達(dá)而沒有觸發(fā)文件事件aeApiPoll
函數(shù)就會停止阻塞,進而調(diào)用processTimeEvents
函數(shù)處理時間事件.
如果在阻塞等待的最長時間之間,觸發(fā)了文件事件,就會先執(zhí)行文件事件,后執(zhí)行時間事件,因此處理時間事件通常比預(yù)設(shè)的會晚一點.
而執(zhí)行文件事件rfileProc
和wfileProc
也是調(diào)用了回調(diào)函數(shù),Redis將文件事件的處理分為了好幾種,用于處理不同的網(wǎng)絡(luò)通信需求:
acceptTcpHandler
:用于accept
client的connect
.acceptUnixHandler
:用于accept
client的本地connect
.sendReplyToClient
:用于向client發(fā)送命令回復(fù).readQueryFromClient
:用于讀入client發(fā)送的請求.
然后我們來看一下獲取最快達(dá)到時間事件的函數(shù)aeSearchNearestTimer
實現(xiàn):
// 尋找第一個快到時的時間事件 // 這個操作是有用的知道有多少時間可以選擇該事件設(shè)置為不用推遲任何事件的睡眠中。 // 如果事件鏈表沒有時間將返回NULL。 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { // 時間事件頭節(jié)點地址 aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; // 遍歷所有的時間事件 while(te) { // 尋找第一個快到時的時間事件,保存到nearest中 if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; }
該函數(shù)就是遍歷時間事件鏈表,然后找到最小值.
我們重點看執(zhí)行時間事件的函數(shù)processTimeEvents
函數(shù)的實現(xiàn):
// 執(zhí)行時間事件 static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te, *prev; long long maxId; time_t now = time(NULL); // 這里嘗試發(fā)現(xiàn)時間混亂的情況,上一次處理事件的時間比當(dāng)前時間還要大 // 重置最近一次處理事件的時間 if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } // 設(shè)置上一次時間事件處理的時間為當(dāng)前時間 eventLoop->lastTime = now; prev = NULL; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; //當(dāng)前時間事件表中的最大ID // 遍歷時間事件鏈表 while(te) { long now_sec, now_ms; long long id; /* Remove events scheduled for deletion. */ // 如果時間事件已被刪除了 if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; // 從事件鏈表中刪除事件的節(jié)點 if (prev == NULL) eventLoop->timeEventHead = te->next; else prev->next = te->next; // 調(diào)用時間事件終結(jié)方法清除該事件 if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); te = next; continue; } // 確保我們不處理在此迭代中由時間事件創(chuàng)建的時間事件. 請注意,此檢查目前無效:我們總是在頭節(jié)點添加新的計時器,但是如果我們更改實施細(xì)節(jié),則該檢查可能會再次有用:我們將其保留在未來的防御 if (te->id > maxId) { te = te->next; continue; } // 獲取當(dāng)前時間 aeGetTime(&now_sec, &now_ms); // 找到已經(jīng)到時的時間事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; // 調(diào)用時間事件處理方法 retval = te->timeProc(eventLoop, id, te->clientData); // 時間事件次數(shù)加1 processed++; // 如果不是定時事件,則繼續(xù)設(shè)置它的到時時間 if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); // 如果是定時時間,則retval為-1,則將其時間事件刪除,惰性刪除 } else { te->id = AE_DELETED_EVENT_ID; } } // 更新前驅(qū)節(jié)點指針和后繼節(jié)點指針 prev = te; te = te->next; } return processed; //返回執(zhí)行事件的次數(shù) }
如果時間事件不存在,則就調(diào)用finalizerProc
指向的回調(diào)函數(shù),刪除當(dāng)前的時間事件. 如果存在,就調(diào)用timeProc
指向的回調(diào)函數(shù)處理時間事件. Redis的時間事件分為兩類:
- 定時事件:讓一段程序在指定的時間后執(zhí)行一次.
- 周期性事件:讓一段程序每隔指定的時間后執(zhí)行一次.
如果當(dāng)前的時間事件是周期性,那么就會在將時間周期添加到周期事件的到時時間中. 如果是定時事件,則將該時間事件刪除.
推薦學(xué)習(xí):Redis視頻教程