libev使用方法

1. libev簡介

libev是個高性能跨平臺的事件驅動框架,支持io事件,超時事件,子進程狀態改變通知,信號通知,文件狀態改變通知,還能用來實現wait/notify機制。libev對每種監聽事件都用一個ev_type類型的數據結構表示,如ev_io, ev_timer, ev_child, ev_async分別用來表示文件監聽器, timeout監聽器, 子進程狀態監聽器, 同步事件監聽器.linux

libev支持優先級, libev一次loop收集的事件按優先級先排序, 優先級高的事件回調先執行, 優先級低的後執行, 相同優先級則按事件到達順序執行. libev優先級從[-2, 2], 默認優先級爲0,libev註冊watcher的流程以下:shell

static void type_cb(EV_P_ ev_type *watcher, int revents)
{
	// callback
}

static void ev_test()
{
#ifdef EV_MULTIPLICITY
	struct ev_loop *loop;
#else
	int loop;
#endif
	ev_type *watcher;

	loop = ev_default_loop(0);
	watcher = (ev_type *)calloc(1, sizeof(*watcher));
	assert(loop && watcher);

	ev_type_init(watcher, type_cb, ...);
	ev_start(EV_A_ watcher);

	ev_run(EV_A_ 0);

	/* 資源回收 */
	ev_loop_destroy(EV_A);
	free(watcher);
}

libev註冊watcher能夠分爲四個步驟:後端

  1. 建立一個loop和watcher
  2. 初始化watcher,主要設置callback函數和定義watcher的參數
  3. 激活watcher
  4. 啓動libev,開始loop收集事件

2. ev_io

libev內部使用後端select, poll, epoll(linux專有), kqueue(drawin), port(solaris10)實現io事件監聽, 用戶能夠指定操做系統支持的後端或者由libev自動選擇使用哪一個後端,如linux平臺上用戶能夠強制指定libev使用select做爲後端。libev支持單例模式和多例模式, 假設咱們連接的是多例模式的libev庫, 且watcher使用默認優先級0.服務器

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>

#include <ev.h>

static void io_cb(struct ev_loop *loop, ev_io *watcher, int revents)
{
	char buf[1024] = {0};

	/* 參數watcher即註冊時的watcher */
	read(watcher->fd, buf, sizeof(buf) - 1);
	fprintf(stdout, "%s\n", buf);

	ev_break(loop, EVBREAK_ALL);
}

static void io_test()
{
	struct ev_loop *loop;
	ev_io *io_watcher;

	/* 指定libev使用epoll機制,關閉環境變量對libev影響 */
	loop = ev_default_loop(EVFLAG_NOENV | EVBACKEND_EPOLL);
	io_watcher = (ev_io *)calloc(1, sizeof(*io_watcher));
	assert(("can not alloc memory", loop && io_watcher));

	/* 設置監聽標準輸入的可讀事件和回調函數 */
	ev_io_init(io_watcher, io_cb, STDIN_FILENO, EV_READ);
	ev_io_start(loop, io_watcher);

	/* libev開啓loop */
	ev_run(loop, 0);

	/* 資源回收 */
	ev_loop_destroy(loop);
	free(io_watcher);
}

int main(void)
{
	io_test();

	return 0;
}

ev_io_init(ev, cb, fd, event)

  • ev:ev_io
  • cb: 回調函數
  • fd:socket,pipe等句柄
  • event:事件類型(EV_READ/EV_WRITE)

Makefile數據結構

target := main
CC := clang
CFLAGS += -g -Wall -fPIC
LDFLAGS += -lev

src += \
	main.c
obj := $(patsubst %.c, %.o, $(src))

$(target):$(obj)
	$(CC) -o $@ $^ $(LDFLAGS)

%.o:%.c
	$(CC) -o $@ -c $< $(CFLAGS)

.PHONY:clean

clean:
	@rm -rf $(target) *.o

libev內部使用一個大的循環來收集各類watcher註冊的事件,若是沒有註冊ev_timer和ev_periodic,則libev內部使用的後端採用59.743s做爲超時事件,若是select做爲後端,則select的超時設置爲59.743s,這樣能夠下降cpu佔用率,對一個fd能夠註冊的watcher數量不受限(或者說只受內存限制),好比能夠對標準輸入的可讀事件註冊100個watcher,當有用戶輸入時全部100個watcher的回調都能執行(固然回調中仍是隻有一個read操做成功)。框架

3. ev_timer

ev_timer能夠用來實現定時器, ev_timer不受牆上時間影響,如設置一個1小時定時器,把當前系統時間調快1小時不能讓定時器馬上超時,超時依舊發生在1小時後,以下是一個簡單的例子:socket

static void timer_cb(struct ev_loop *loop, ev_timer *w, int revents)
{
	fprintf(stdout, "%fs timeout\n", w->repeat);
	ev_break(loop, EVBREAK_ALL);
}

static void timer_test()
{
	struct ev_loop *loop;
	ev_timer *timer_watcher;

	loop = ev_default_loop(EVFLAG_NOENV);
	timer_watcher = calloc(1, sizeof(*timer_watcher));
	assert(("can not alloc memory", loop && timer_watcher));

	ev_timer_init(timer_watcher, timer_cb, 0., 3600.);
	ev_timer_start(loop, timer_watcher);

	ev_run(loop, 0);

	ev_loop_destroy(loop);
	free(timer_watcher);
}

ev_timer_init(ev, cb, ofs, iva)

  • ev: ev_timer
  • cb: 回調函數
  • ofs,iva: 超時事件爲(now + ofs + ival * N), now爲當前時間,N爲正整數

若是ival參數爲0,則timer是一次性的定時器,超時後libev自動stop timer
能夠在回調中從新設置timer超時,並從新啓動timer。async

static void timer_cb(struct ev_loop *loop, ev_timer *watcher, int revents)
{
	fprintf(stdout, "%fs timeout\n", watcher->repeat);
	watcher->repeat += 5.;
	ev_timer_again(loop, watcher);
}

上面介紹了libev是在一個大的循環中監聽全部watcher的事件,只有ev_io類型的watcher時,libev後端以59.743s做爲超時(如select超時),這時用戶註冊一個3s的timer,那麼libev會不會由於後端超時太長致使定時器檢測很是不許呢?答案是不會,libev保證後端超時時間不大於定時器超時時間,註冊一個3s timer,則libev自動調整到3s之內loop一次,這樣保證timer超時能及時被檢測到,同時也帶來更高的cpu佔用率。函數

4. ev_periodic

ev_timer作爲定時器很方便,可是對應指定到某時刻發生超時就比較困難,好比天天00:00:00觸發超時,天天08:00開燈,18:00關燈等等,ev_periodic能夠很好的應付這種場景,ev_periodic基於牆上時間,因此受牆上時間影響,如註冊1小時後超時的ev_peroidic,同時系統時間調快1小時,ev_periodic立馬能超時。以下例子指定天天凌晨發生超時:oop

static void periodic_cb(struct ev_loop *loop, ev_periodic *watcher, int revents)
{
	fprintf(stdout, "00:00:00 now, time to sleep");
}

ev_tstamp my_schedule(ev_periodic *watcher, ev_tstamp now)
{
	time_t cur;
	struct tm tm;

	time(&cur);
	localtime_r(&cur, &tm);

	tm.tm_wday += 1;
	tm.tm_hour = 0;
	tm.tm_sec = 0;
	tm.tm_min = 0;
	tm.tm_mon -= 1;
	tm.tm_year -= 1900;

	return mktime(&tm);
}

static void periodic_test()
{
	struct ev_loop *loop;
	ev_periodic *periodic_watcher;

	loop = ev_default_loop(0);
	periodic_watcher = (ev_periodic *)calloc(1, sizeof(*periodic_watcher));
	assert(("can not alloc memory", loop && periodic_watcher));

	ev_periodic_init(periodic_watcher, periodic_cb, 0, 0, my_schedule);
	ev_periodic_start(loop, periodic_watcher);

	ev_run(loop, 0);

	ev_loop_destroy(loop);
	free(periodic_watcher);
}

ev_periodic_init(ev, cb, ofs, ival,schedule)

  • ev:ev_periodic
  • cb:回調函數
  • ofs,ival:ofs + ceil((now - ofs) / ival) * ival // now表示當前時間戳
  • schedule:用戶自定義函數,該函數返回下一次ev_periodic超時時間
  1. 若是schedule不爲空,則ev_periodic超時時間爲:ofs + ceil((now - ofs) / ival) * ival,表示從當前時間開始,通過ofs時間後全部能被ival整數的點,註冊ev_periodic,ifs = 1, ival = 10, 當前時間爲1604649458(2020-10-615:57:38),則ev_periodic第一次通過2s就發生超時了。
  2. 若是schedule存在,則ofs,ival參數被忽略,ev_periodic超時時間由schedule()返回值指定

5. ev_child

libev支持監聽子進程狀態變化, 如子進程退出, 內部用waitpid去實現, libev限制只能用default loop去監聽子進程狀態變化, 若是以ev_loop_new()建立的loop則不行, 經過ev_default_loop()建立default loop時libev內部自動註冊了SIGCHILD信號處理函數, 須要在本身代碼處理SIGCHILD的話, 能夠在ev_default_loop()以後註冊SIGCHIL處理以覆蓋libev中的默認處理, 以下是一個簡單的例子:

static void child_cb(struct ev_loop *loop, ev_child *watcher, int revents)
{
	fprintf(stdout, "pid:%d exit, status:%d\n", watcher->rpid, watcher->rstatus);	
}

static void child_test()
{
	pid_t pid;
	struct ev_loop *loop;
	ev_child *child_watcher;

	switch (pid = fork()) {
		case 0:
			sleep(5);
			fprintf(stdout, "child_pid:%d\n", getpid());
			exit(EXIT_SUCCESS);
		default:
			{
				loop = ev_default_loop(0);
				child_watcher = (ev_child*)calloc(1, sizeof(*child_watcher));
				assert(("can not alloc memory", loop && child_watcher));

				ev_child_init(child_watcher, child_cb, 0, 1);
				ev_child_start(loop, child_watcher);

				ev_run(loop, 0);

				/* 資源回收 */
				ev_loop_destroy(loop);
				free(child_watcher);
			}
	}
}

ev_child_init(ev, cb, pid, trace)

  • ev:ev_child
  • cb:回調函數
  • pid:子進程pid
  • trace:設置爲1
    我的以爲libev的default loop默認註冊SIGCHILD處理並很差,最好仍是在本身代碼中作處理。

6. ev_async

能夠經過libev的async來實現wait/notify機制, 用戶註冊多個ev_async監聽器, 在其餘地方調用ev_async_send()便可觸發ev_async註冊的回調, libev內部用eventfd(linux平臺)和pipe(win32)實現, 我的以爲linux平臺上直接用eventfd更完美, 以下是簡單例子.

static void *routine(void *args)
{
	static size_t count = 0;
	ev_async *watcher = (ev_async *)args;
	struct ev_loop *loop = (struct ev_loop *)watcher->data;

	while (count++ < 10) {
		ev_async_send(loop, watcher);
		sleep(1);
	}

	return NULL;
}

static void async_cb(struct ev_loop *loop, ev_async *watcher, int revents)
{
	fprintf(stdout, "get the order, start move...\n");
}

static void async_test()
{
	pthread_t pid;
	struct ev_loop *loop;
	ev_async *async_watcher;

	loop = ev_default_loop(0);
	async_watcher = (ev_async *)calloc(1, sizeof(*async_watcher));
	assert(("can not alloc memory", loop && async_watcher));

	ev_async_init(async_watcher, async_cb);
	ev_async_start(loop, async_watcher);

	async_watcher->data = loop;
	pthread_create(&pid, NULL, routine, async_watcher);

	ev_run(loop, 0);

	/* 資源回收 */
	ev_loop_destroy(loop);
	free(async_watcher);
}

ev_async_init(ev,cb)

  • ev:ev_async
  • cb:回調函數

7. ev_prepare/ev_idle

libev每次loop收集各類事件以前都會先調用ev_prepare的回調函數(若是有的話), 若是存在比ev_idle優先級更高的監聽有事件待處理, 則ev_idle的事件不會處理, 如存在優先級1,2的事件待處理, 則優先級爲1的ev_idle的事件不會被處理, 只有在優先級1,2的全部事件都處理完後纔會把ev_idle的事件添加到帶處理的事件隊列中去.

static void idle_cb(struct ev_loop *loop, ev_idle *watcher, int revents)
{
	fprintf(stdout, "no one has higher priority than me now\n");
	ev_idle_stop(loop, watcher);
}

static void prepare_cb(struct ev_loop *loop, ev_prepare *watcher, int revents)
{
	fprintf(stdout, "prepare_cb\n");
}

static void ev_test()
{
	struct ev_loop *loop;
	ev_io *io_watcher;
	ev_idle *idle_watcher;
	ev_prepare *prepare_watcher;

	loop = ev_default_loop(0);
	prepare_watcher = (ev_prepare *)calloc(1, sizeof(*prepare_watcher));
	idle_watcher = (ev_idle *)calloc(1, sizeof(*idle_watcher));
	io_watcher = (ev_io *)calloc(1, sizeof(*io_watcher));
	assert(("can not alloc memory", loop && prepare_watcher && io_watcher && idle_watcher));

	ev_io_init(io_watcher, io_cb, STDIN_FILENO, EV_READ);
	ev_prepare_init(prepare_watcher, prepare_cb);
	ev_idle_init(idle_watcher, idle_cb);

	ev_prepare_start(loop, prepare_watcher);
	ev_io_start(loop, io_watcher);
	ev_idle_start(loop, idle_watcher);

	ev_run(loop, 0);

	ev_loop_destroy(loop);
	free(io_watcher);
	free(idle_watcher);
	free(prepare_watcher);
}

ev_prepare(ev, cb)/ev_idle(ev, cb)

  • ev:ev_prepare/ev_idle
  • cb:回調函數

能夠看到每次輸入前都先有ev_prepare的回調,只有不存在優先級別idle高的時間待處理時纔會處理idle的回調。

8. ev_stat

不建議使用,還不如用個定時器本身去檢測文件是否改動

9. ev_fork

註冊ev_fork,在libev自動檢測到fork調用(開啓了EVFLAG_FORKCHECK),或者用戶調用ev_loop_fork()通知libev有fork調用時ev_fork回調被觸發

10. ev_cleanup

註冊ev_cleanup的watcher,在libev銷燬時調用ev_cleanup的回調,用來作一些清理工做

11. 簡單回顯服務器

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>

#include <ev.h>

/* client number limitation */
#define MAX_CLIENTS 1000

/* message length limitation */
#define MAX_MESSAGE_LEN (256)

#define err_message(msg) \
	do {perror(msg); exit(EXIT_FAILURE);} while(0)

/* record the number of clients */
static int client_number;

static int create_serverfd(char const *addr, uint16_t u16port)
{
	int fd;
	struct sockaddr_in server;

	fd = socket(AF_INET, SOCK_STREAM, 0);
	if (fd < 0) err_message("socket err\n");

	server.sin_family = AF_INET;
	server.sin_port = htons(u16port);
	inet_pton(AF_INET, addr, &server.sin_addr);

	if (bind(fd, (struct sockaddr *)&server, sizeof(server)) < 0) err_message("bind err\n");

	if (listen(fd, 10) < 0) err_message("listen err\n");

	return fd;
}

static void read_cb(EV_P_ ev_io *watcher, int revents)
{
	ssize_t ret;
	char buf[MAX_MESSAGE_LEN] = {0};

	ret = recv(watcher->fd, buf, sizeof(buf) - 1, MSG_DONTWAIT);	
	if (ret > 0) {
		write(watcher->fd, buf, ret);

	} else if ((ret < 0) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
		return;

	} else {
		fprintf(stdout, "client closed (fd=%d)\n", watcher->fd);
		--client_number;
		ev_io_stop(EV_A_ watcher);
		close(watcher->fd);
		free(watcher);
	}
}

static void accept_cb(EV_P_ ev_io *watcher, int revents)
{
	int connfd;
	ev_io *client;

	connfd = accept(watcher->fd, NULL, NULL);
	if (connfd > 0) {
		if (++client_number > MAX_CLIENTS) {
			close(watcher->fd);

		} else {
			client = calloc(1, sizeof(*client));
			ev_io_init(client, read_cb, connfd, EV_READ);
			ev_io_start(EV_A_ client);
		}

	} else if ((connfd < 0) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
		return;

	} else {
		close(watcher->fd);
		ev_break(EV_A_ EVBREAK_ALL);
		/* this will lead main to exit, no need to free watchers of clients */
	}
}

static void start_server(char const *addr, uint16_t u16port)
{
	int fd;
#ifdef EV_MULTIPLICITY
	struct ev_loop *loop;
#else
	int loop;
#endif
	ev_io *watcher;

	fd = create_serverfd(addr, u16port);
	loop = ev_default_loop(EVFLAG_NOENV);
	watcher = calloc(1, sizeof(*watcher));
	assert(("can not alloc memory\n", loop && watcher));

	/* set nonblock flag */
	fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);

	ev_io_init(watcher, accept_cb, fd, EV_READ);
	ev_io_start(EV_A_ watcher);
	ev_run(EV_A_ 0);

	ev_loop_destroy(EV_A);
	free(watcher);
}

static void signal_handler(int signo)
{
	switch (signo) {
		case SIGPIPE:
			break;
		default:
			// unreachable
			break;
	}
}

int main(void)
{
	signal(SIGPIPE, signal_handler);
	start_server("127.0.0.1", 10009);

	return 0;
}

客戶端:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>

#include <pthread.h>

#define err_message(msg) \
	do {perror(msg); exit(EXIT_FAILURE);} while(0)

static int create_clientfd(char const *addr, uint16_t u16port)
{
	int fd;
	struct sockaddr_in server;

	fd = socket(AF_INET, SOCK_STREAM, 0);
	if (fd < 0) err_message("socket err\n");

	server.sin_family = AF_INET;
	server.sin_port = htons(u16port);
	inet_pton(AF_INET, addr, &server.sin_addr);

	if (connect(fd, (struct sockaddr *)&server, sizeof(server)) < 0) perror("connect err\n");

	return fd;
}

static void *routine(void *args)
{
	int fd;
	char buf[128];

	fd = create_clientfd("127.0.0.1", 10009);

	for (; ;) {
		write(fd, "Hello", strlen("hello"));

		memset(buf, '\0', sizeof(buf));
		read(fd, buf, sizeof(buf) - 1);
		fprintf(stdout, "pthreadid:%ld %s\n", pthread_self(), buf);
		usleep(100 * 1000);
	}
}

int main(void)
{
	pthread_t pids[4];

	for (int i = 0; i < sizeof(pids)/sizeof(pthread_t); ++i) {
		pthread_create(pids + i, NULL, routine, 0);
	}

	for (int i = 0; i < sizeof(pids)/sizeof(pthread_t); ++i) {
		pthread_join(pids[i], 0);
	}

	return 0;
}

Makefile

all:server client

server_src += \
	server.c

server_obj := $(patsubst %.c, %.o, $(server_src))

client_src += \
	client.c

client_obj:= $(patsubst %.c, %.o, $(client_src))

CC := clang

CFLAGS += -Wall -fPIC

server:$(server_obj)
	$(CC) -o $@ $^ -lev
%.o:%.c
	$(CC) -o $@ -c $< $(CFLAGS)

client:$(client_obj)
	$(CC) -o $@ $^ -lpthread
%.o:%.c
	$(CC) -o $@ -c $< $(CFLAGS)

.PHONY:clean all

clean:
	@rm -rf server client *.o
相關文章
相關標籤/搜索