libev是個高性能跨平臺的事件驅動框架,支持io事件,超時事件,子進程狀態改變通知,信號通知,文件狀態改變通知,還能用來實現wait/notify機制。libev對每種監聽事件都用一個ev_type類型的數據結構表示,如ev_io, ev_timer, ev_child, ev_async分別用來表示文件監聽器, timeout監聽器, 子進程狀態監聽器, 同步事件監聽器.linux
libev支持優先級, libev一次loop收集的事件按優先級先排序, 優先級高的事件回調先執行, 優先級低的後執行, 相同優先級則按事件到達順序執行. libev優先級從[-2, 2], 默認優先級爲0,libev註冊watcher的流程以下:shell
static void type_cb(EV_P_ ev_type *watcher, int revents) { // callback } static void ev_test() { #ifdef EV_MULTIPLICITY struct ev_loop *loop; #else int loop; #endif ev_type *watcher; loop = ev_default_loop(0); watcher = (ev_type *)calloc(1, sizeof(*watcher)); assert(loop && watcher); ev_type_init(watcher, type_cb, ...); ev_start(EV_A_ watcher); ev_run(EV_A_ 0); /* 資源回收 */ ev_loop_destroy(EV_A); free(watcher); }
libev註冊watcher能夠分爲四個步驟:後端
libev內部使用後端select, poll, epoll(linux專有), kqueue(drawin), port(solaris10)實現io事件監聽, 用戶能夠指定操做系統支持的後端或者由libev自動選擇使用哪一個後端,如linux平臺上用戶能夠強制指定libev使用select做爲後端。libev支持單例模式和多例模式, 假設咱們連接的是多例模式的libev庫, 且watcher使用默認優先級0.服務器
#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <unistd.h> #include <ev.h> static void io_cb(struct ev_loop *loop, ev_io *watcher, int revents) { char buf[1024] = {0}; /* 參數watcher即註冊時的watcher */ read(watcher->fd, buf, sizeof(buf) - 1); fprintf(stdout, "%s\n", buf); ev_break(loop, EVBREAK_ALL); } static void io_test() { struct ev_loop *loop; ev_io *io_watcher; /* 指定libev使用epoll機制,關閉環境變量對libev影響 */ loop = ev_default_loop(EVFLAG_NOENV | EVBACKEND_EPOLL); io_watcher = (ev_io *)calloc(1, sizeof(*io_watcher)); assert(("can not alloc memory", loop && io_watcher)); /* 設置監聽標準輸入的可讀事件和回調函數 */ ev_io_init(io_watcher, io_cb, STDIN_FILENO, EV_READ); ev_io_start(loop, io_watcher); /* libev開啓loop */ ev_run(loop, 0); /* 資源回收 */ ev_loop_destroy(loop); free(io_watcher); } int main(void) { io_test(); return 0; }
Makefile數據結構
target := main CC := clang CFLAGS += -g -Wall -fPIC LDFLAGS += -lev src += \ main.c obj := $(patsubst %.c, %.o, $(src)) $(target):$(obj) $(CC) -o $@ $^ $(LDFLAGS) %.o:%.c $(CC) -o $@ -c $< $(CFLAGS) .PHONY:clean clean: @rm -rf $(target) *.o
libev內部使用一個大的循環來收集各類watcher註冊的事件,若是沒有註冊ev_timer和ev_periodic,則libev內部使用的後端採用59.743s做爲超時事件,若是select做爲後端,則select的超時設置爲59.743s,這樣能夠下降cpu佔用率,對一個fd能夠註冊的watcher數量不受限(或者說只受內存限制),好比能夠對標準輸入的可讀事件註冊100個watcher,當有用戶輸入時全部100個watcher的回調都能執行(固然回調中仍是隻有一個read操做成功)。框架
ev_timer能夠用來實現定時器, ev_timer不受牆上時間影響,如設置一個1小時定時器,把當前系統時間調快1小時不能讓定時器馬上超時,超時依舊發生在1小時後,以下是一個簡單的例子:socket
static void timer_cb(struct ev_loop *loop, ev_timer *w, int revents) { fprintf(stdout, "%fs timeout\n", w->repeat); ev_break(loop, EVBREAK_ALL); } static void timer_test() { struct ev_loop *loop; ev_timer *timer_watcher; loop = ev_default_loop(EVFLAG_NOENV); timer_watcher = calloc(1, sizeof(*timer_watcher)); assert(("can not alloc memory", loop && timer_watcher)); ev_timer_init(timer_watcher, timer_cb, 0., 3600.); ev_timer_start(loop, timer_watcher); ev_run(loop, 0); ev_loop_destroy(loop); free(timer_watcher); }
若是ival參數爲0,則timer是一次性的定時器,超時後libev自動stop timer
能夠在回調中從新設置timer超時,並從新啓動timer。async
static void timer_cb(struct ev_loop *loop, ev_timer *watcher, int revents) { fprintf(stdout, "%fs timeout\n", watcher->repeat); watcher->repeat += 5.; ev_timer_again(loop, watcher); }
上面介紹了libev是在一個大的循環中監聽全部watcher的事件,只有ev_io類型的watcher時,libev後端以59.743s做爲超時(如select超時),這時用戶註冊一個3s的timer,那麼libev會不會由於後端超時太長致使定時器檢測很是不許呢?答案是不會,libev保證後端超時時間不大於定時器超時時間,註冊一個3s timer,則libev自動調整到3s之內loop一次,這樣保證timer超時能及時被檢測到,同時也帶來更高的cpu佔用率。函數
ev_timer作爲定時器很方便,可是對應指定到某時刻發生超時就比較困難,好比天天00:00:00觸發超時,天天08:00開燈,18:00關燈等等,ev_periodic能夠很好的應付這種場景,ev_periodic基於牆上時間,因此受牆上時間影響,如註冊1小時後超時的ev_peroidic,同時系統時間調快1小時,ev_periodic立馬能超時。以下例子指定天天凌晨發生超時:oop
static void periodic_cb(struct ev_loop *loop, ev_periodic *watcher, int revents) { fprintf(stdout, "00:00:00 now, time to sleep"); } ev_tstamp my_schedule(ev_periodic *watcher, ev_tstamp now) { time_t cur; struct tm tm; time(&cur); localtime_r(&cur, &tm); tm.tm_wday += 1; tm.tm_hour = 0; tm.tm_sec = 0; tm.tm_min = 0; tm.tm_mon -= 1; tm.tm_year -= 1900; return mktime(&tm); } static void periodic_test() { struct ev_loop *loop; ev_periodic *periodic_watcher; loop = ev_default_loop(0); periodic_watcher = (ev_periodic *)calloc(1, sizeof(*periodic_watcher)); assert(("can not alloc memory", loop && periodic_watcher)); ev_periodic_init(periodic_watcher, periodic_cb, 0, 0, my_schedule); ev_periodic_start(loop, periodic_watcher); ev_run(loop, 0); ev_loop_destroy(loop); free(periodic_watcher); }
libev支持監聽子進程狀態變化, 如子進程退出, 內部用waitpid去實現, libev限制只能用default loop去監聽子進程狀態變化, 若是以ev_loop_new()建立的loop則不行, 經過ev_default_loop()建立default loop時libev內部自動註冊了SIGCHILD信號處理函數, 須要在本身代碼處理SIGCHILD的話, 能夠在ev_default_loop()以後註冊SIGCHIL處理以覆蓋libev中的默認處理, 以下是一個簡單的例子:
static void child_cb(struct ev_loop *loop, ev_child *watcher, int revents) { fprintf(stdout, "pid:%d exit, status:%d\n", watcher->rpid, watcher->rstatus); } static void child_test() { pid_t pid; struct ev_loop *loop; ev_child *child_watcher; switch (pid = fork()) { case 0: sleep(5); fprintf(stdout, "child_pid:%d\n", getpid()); exit(EXIT_SUCCESS); default: { loop = ev_default_loop(0); child_watcher = (ev_child*)calloc(1, sizeof(*child_watcher)); assert(("can not alloc memory", loop && child_watcher)); ev_child_init(child_watcher, child_cb, 0, 1); ev_child_start(loop, child_watcher); ev_run(loop, 0); /* 資源回收 */ ev_loop_destroy(loop); free(child_watcher); } } }
能夠經過libev的async來實現wait/notify機制, 用戶註冊多個ev_async監聽器, 在其餘地方調用ev_async_send()便可觸發ev_async註冊的回調, libev內部用eventfd(linux平臺)和pipe(win32)實現, 我的以爲linux平臺上直接用eventfd更完美, 以下是簡單例子.
static void *routine(void *args) { static size_t count = 0; ev_async *watcher = (ev_async *)args; struct ev_loop *loop = (struct ev_loop *)watcher->data; while (count++ < 10) { ev_async_send(loop, watcher); sleep(1); } return NULL; } static void async_cb(struct ev_loop *loop, ev_async *watcher, int revents) { fprintf(stdout, "get the order, start move...\n"); } static void async_test() { pthread_t pid; struct ev_loop *loop; ev_async *async_watcher; loop = ev_default_loop(0); async_watcher = (ev_async *)calloc(1, sizeof(*async_watcher)); assert(("can not alloc memory", loop && async_watcher)); ev_async_init(async_watcher, async_cb); ev_async_start(loop, async_watcher); async_watcher->data = loop; pthread_create(&pid, NULL, routine, async_watcher); ev_run(loop, 0); /* 資源回收 */ ev_loop_destroy(loop); free(async_watcher); }
libev每次loop收集各類事件以前都會先調用ev_prepare的回調函數(若是有的話), 若是存在比ev_idle優先級更高的監聽有事件待處理, 則ev_idle的事件不會處理, 如存在優先級1,2的事件待處理, 則優先級爲1的ev_idle的事件不會被處理, 只有在優先級1,2的全部事件都處理完後纔會把ev_idle的事件添加到帶處理的事件隊列中去.
static void idle_cb(struct ev_loop *loop, ev_idle *watcher, int revents) { fprintf(stdout, "no one has higher priority than me now\n"); ev_idle_stop(loop, watcher); } static void prepare_cb(struct ev_loop *loop, ev_prepare *watcher, int revents) { fprintf(stdout, "prepare_cb\n"); } static void ev_test() { struct ev_loop *loop; ev_io *io_watcher; ev_idle *idle_watcher; ev_prepare *prepare_watcher; loop = ev_default_loop(0); prepare_watcher = (ev_prepare *)calloc(1, sizeof(*prepare_watcher)); idle_watcher = (ev_idle *)calloc(1, sizeof(*idle_watcher)); io_watcher = (ev_io *)calloc(1, sizeof(*io_watcher)); assert(("can not alloc memory", loop && prepare_watcher && io_watcher && idle_watcher)); ev_io_init(io_watcher, io_cb, STDIN_FILENO, EV_READ); ev_prepare_init(prepare_watcher, prepare_cb); ev_idle_init(idle_watcher, idle_cb); ev_prepare_start(loop, prepare_watcher); ev_io_start(loop, io_watcher); ev_idle_start(loop, idle_watcher); ev_run(loop, 0); ev_loop_destroy(loop); free(io_watcher); free(idle_watcher); free(prepare_watcher); }
能夠看到每次輸入前都先有ev_prepare的回調,只有不存在優先級別idle高的時間待處理時纔會處理idle的回調。
不建議使用,還不如用個定時器本身去檢測文件是否改動
註冊ev_fork,在libev自動檢測到fork調用(開啓了EVFLAG_FORKCHECK),或者用戶調用ev_loop_fork()通知libev有fork調用時ev_fork回調被觸發
註冊ev_cleanup的watcher,在libev銷燬時調用ev_cleanup的回調,用來作一些清理工做
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <assert.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <sys/socket.h> #include <arpa/inet.h> #include <ev.h> /* client number limitation */ #define MAX_CLIENTS 1000 /* message length limitation */ #define MAX_MESSAGE_LEN (256) #define err_message(msg) \ do {perror(msg); exit(EXIT_FAILURE);} while(0) /* record the number of clients */ static int client_number; static int create_serverfd(char const *addr, uint16_t u16port) { int fd; struct sockaddr_in server; fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) err_message("socket err\n"); server.sin_family = AF_INET; server.sin_port = htons(u16port); inet_pton(AF_INET, addr, &server.sin_addr); if (bind(fd, (struct sockaddr *)&server, sizeof(server)) < 0) err_message("bind err\n"); if (listen(fd, 10) < 0) err_message("listen err\n"); return fd; } static void read_cb(EV_P_ ev_io *watcher, int revents) { ssize_t ret; char buf[MAX_MESSAGE_LEN] = {0}; ret = recv(watcher->fd, buf, sizeof(buf) - 1, MSG_DONTWAIT); if (ret > 0) { write(watcher->fd, buf, ret); } else if ((ret < 0) && (errno == EAGAIN || errno == EWOULDBLOCK)) { return; } else { fprintf(stdout, "client closed (fd=%d)\n", watcher->fd); --client_number; ev_io_stop(EV_A_ watcher); close(watcher->fd); free(watcher); } } static void accept_cb(EV_P_ ev_io *watcher, int revents) { int connfd; ev_io *client; connfd = accept(watcher->fd, NULL, NULL); if (connfd > 0) { if (++client_number > MAX_CLIENTS) { close(watcher->fd); } else { client = calloc(1, sizeof(*client)); ev_io_init(client, read_cb, connfd, EV_READ); ev_io_start(EV_A_ client); } } else if ((connfd < 0) && (errno == EAGAIN || errno == EWOULDBLOCK)) { return; } else { close(watcher->fd); ev_break(EV_A_ EVBREAK_ALL); /* this will lead main to exit, no need to free watchers of clients */ } } static void start_server(char const *addr, uint16_t u16port) { int fd; #ifdef EV_MULTIPLICITY struct ev_loop *loop; #else int loop; #endif ev_io *watcher; fd = create_serverfd(addr, u16port); loop = ev_default_loop(EVFLAG_NOENV); watcher = calloc(1, sizeof(*watcher)); assert(("can not alloc memory\n", loop && watcher)); /* set nonblock flag */ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); ev_io_init(watcher, accept_cb, fd, EV_READ); ev_io_start(EV_A_ watcher); ev_run(EV_A_ 0); ev_loop_destroy(EV_A); free(watcher); } static void signal_handler(int signo) { switch (signo) { case SIGPIPE: break; default: // unreachable break; } } int main(void) { signal(SIGPIPE, signal_handler); start_server("127.0.0.1", 10009); return 0; }
客戶端:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <assert.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <sys/socket.h> #include <arpa/inet.h> #include <pthread.h> #define err_message(msg) \ do {perror(msg); exit(EXIT_FAILURE);} while(0) static int create_clientfd(char const *addr, uint16_t u16port) { int fd; struct sockaddr_in server; fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) err_message("socket err\n"); server.sin_family = AF_INET; server.sin_port = htons(u16port); inet_pton(AF_INET, addr, &server.sin_addr); if (connect(fd, (struct sockaddr *)&server, sizeof(server)) < 0) perror("connect err\n"); return fd; } static void *routine(void *args) { int fd; char buf[128]; fd = create_clientfd("127.0.0.1", 10009); for (; ;) { write(fd, "Hello", strlen("hello")); memset(buf, '\0', sizeof(buf)); read(fd, buf, sizeof(buf) - 1); fprintf(stdout, "pthreadid:%ld %s\n", pthread_self(), buf); usleep(100 * 1000); } } int main(void) { pthread_t pids[4]; for (int i = 0; i < sizeof(pids)/sizeof(pthread_t); ++i) { pthread_create(pids + i, NULL, routine, 0); } for (int i = 0; i < sizeof(pids)/sizeof(pthread_t); ++i) { pthread_join(pids[i], 0); } return 0; }
Makefile
all:server client server_src += \ server.c server_obj := $(patsubst %.c, %.o, $(server_src)) client_src += \ client.c client_obj:= $(patsubst %.c, %.o, $(client_src)) CC := clang CFLAGS += -Wall -fPIC server:$(server_obj) $(CC) -o $@ $^ -lev %.o:%.c $(CC) -o $@ -c $< $(CFLAGS) client:$(client_obj) $(CC) -o $@ $^ -lpthread %.o:%.c $(CC) -o $@ -c $< $(CFLAGS) .PHONY:clean all clean: @rm -rf server client *.o