如下代碼在vs 2010編譯經過,使用的libevent版本是:libevent-2.0.22,win7環境測試經過。linux
服務器實現:服務器
1 流程圖:併發
2 代碼:app
// my_telnet.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include <string.h> #include <errno.h> #include <stdio.h> #include <signal.h> #ifndef WIN32 #include <netinet/in.h> # ifdef _XOPEN_SOURCE_EXTENDED # include <arpa/inet.h> # endif #include <sys/socket.h> #endif #include "event2/bufferevent.h" #include "event2/buffer.h" #include "event2/listener.h" #include "event2/util.h" #include "event2/event.h" #include "event2/util.h" #include <WinSock2.h> //#include "MySimpleLog.h" static const int PORT = 9995; //讀取一行輸入並返回 static void conn_readcb(struct bufferevent *bev, void *user_data) { //MY_SIMPLE_LOG_DEBUG("conn_readcb!\n"); //只有讀到一行完整行才進行處理 因此這裏須要檢查緩衝區是否存在換行符 //若是存在才往下走 struct evbuffer *input = bufferevent_get_input(bev); struct evbuffer_ptr ptrInput; if ((ptrInput = evbuffer_search(input , "\n", 1, NULL)).pos == -1) { return; } char line[1024] = {0}; int nRead = 0; //已讀取字節數 int nExpect = ptrInput.pos + 1; //指望讀取的字節數 //把這一行讀取出來(若是大於1024則須要分次讀取) while (nRead < nExpect) { int nLeft = nExpect - nRead; int nReadOnce = min(nLeft, sizeof(line) - 1); int n = bufferevent_read(bev, line, nReadOnce); if (n <= 0) { //MY_SIMPLE_LOG_ERROR("expect to read %d bytes,but get %d.", nReadOnce, n); break; } line[n] = '\0'; //把待發送數據添加到發送緩衝中,這裏不會當即發送數據,要等conn_readcb返回後纔會發送的 bufferevent_write(bev, line, n); nRead += nReadOnce; //MY_SIMPLE_LOG_DEBUG("n = %d nRead = %d nExpect = %d! line = [%s]", n, nRead, nExpect, line); } //啓動寫事件,發送緩衝內容 //寫事件是默認開啓的,這裏不須要額外設置 //bufferevent_enable(bev, EV_WRITE | EV_PERSIST); //爲避免粘包,這裏要判斷緩衝裏面是否還有剩餘數據,若是有要特別處理 //這是由於libevent是邊緣觸發的,而不是水平觸發 if (evbuffer_get_length(input) != 0) { //原本是想直接激活讀事件 //可是發現要訪問bufferevent未公開的成員,就不這樣搞了 //event_active(&(bev->ev_read), EV_READ, 1); conn_readcb(bev, user_data); } } static void conn_writecb(struct bufferevent *bev, void *user_data) { //MY_SIMPLE_LOG_DEBUG("conn_writecb!"); struct evbuffer *output = bufferevent_get_output(bev); if (evbuffer_get_length(output) == 0) { //MY_SIMPLE_LOG_DEBUG("buffer writed\n"); //bufferevent_free(bev); } } static void conn_writecb_once(struct bufferevent *bev, void *user_data) { //MY_SIMPLE_LOG_DEBUG("conn_writecb_once!"); struct evbuffer *output = bufferevent_get_output(bev); if (evbuffer_get_length(output) == 0) { //MY_SIMPLE_LOG_DEBUG("flushed free\n"); bufferevent_free(bev); } } static void conn_eventcb(struct bufferevent *bev, short events, void *user_data) { //MY_SIMPLE_LOG_DEBUG("conn_eventcb!\n"); //判斷是由於什麼緣由調用了此函數 if (events & BEV_EVENT_EOF) { //MY_SIMPLE_LOG_DEBUG("Connection closed.\n"); } else if (events & BEV_EVENT_ERROR) { //MY_SIMPLE_LOG_DEBUG("Got an error on the connection: %s\n", // strerror(errno));/*XXX win32*/ } else if ((events & BEV_EVENT_TIMEOUT) && (events & BEV_EVENT_READING)) { //發生讀超時 //超時事件發生時,會禁止相應讀/寫事件,須要從新註冊 //這裏因爲業務邏輯上認爲超時就要關閉socket,就不必這樣作了 //bufferevent_enable(bev, EV_READ | EV_PERSIST); //發送一個超時消息給客戶端,而後就關閉bufferevent const char time_out_msg[] = "time out!\n"; //MY_SIMPLE_LOG_INFOR(time_out_msg); int n = bufferevent_write(bev, time_out_msg, sizeof(time_out_msg) - 1); //MY_SIMPLE_LOG_DEBUG("bufferevent_write ret %d send %d", n, int(sizeof(time_out_msg) - 1)); // 聽說這個版本bufferevent_flush沒實現 實測老是返回0 // n = bufferevent_flush(bev, EV_WRITE, BEV_FINISHED); // //MY_SIMPLE_LOG_DEBUG("bufferevent_flush ret %d", n); //等寫完超時信息就退出 bufferevent_setcb(bev, NULL, conn_writecb_once, NULL, user_data); return; } /* None of the other events can happen here, since we haven't enabled * timeouts */ bufferevent_free(bev); } //ctrl + c處理函數 static void signal_cb(evutil_socket_t sig, short events, void *user_data) { //MY_SIMPLE_LOG_DEBUG("signal_cb!\n"); printf("to exit in 2 seconds.\n"); struct event_base *base = (struct event_base *)user_data; struct timeval delay = { 2, 0 }; //MY_SIMPLE_LOG_DEBUG("Caught an interrupt signal; exiting cleanly in two seconds.\n"); event_base_loopexit(base, &delay); } //新鏈接到來處理函數 static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { //MY_SIMPLE_LOG_DEBUG("listener_cb!\n"); struct event_base *base = (struct event_base *)user_data; struct bufferevent *bev; //將該鏈接設置成非阻塞 evutil_make_socket_nonblocking(fd); //BEV_OPT_CLOSE_ON_FREE參數讓bufferevent被刪除時會自動清理對應的socket bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); if (!bev) { //MY_SIMPLE_LOG_ERROR("Error constructing bufferevent!"); event_base_loopbreak(base); return; } //MY_SIMPLE_LOG_DEBUG("new connect!\n"); //開始設置各個事件的回調函數 //最後一個參數是各個回調函數的user_data 這裏不須要用到,填NULL //除了讀寫事件外,其它不少事件都是會回調conn_eventcb bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL); //EV_PERSIST不設置的話讀事件只會觸發一次 bufferevent_enable(bev, EV_READ | EV_PERSIST); //默認狀況下bufferevent自動監聽可讀事件,若有須要能夠關閉 //關閉的狀況下bufferevent_write調用後,不會真正往socket寫,只保留在緩衝區 //bufferevent_disable(bev, EV_WRITE); struct timeval delay = { 10, 0 }; //讀超時10秒 寫超時無限 bufferevent_set_timeouts(bev, &delay, NULL); } /* 基本流程: 1 新建event_base 2 生成監聽socket 3 將 */ int main(int argc, char **argv) { struct event_base *base; struct evconnlistener *listener; struct event *signal_event; struct sockaddr_in sin; #ifdef WIN32 WSADATA wsa_data; WSAStartup(0x0201, &wsa_data); #endif //MY_SIMPLE_LOG_INIT(//MY_SIMPLE_LOG_LEVEL_DEBUG); //初始化event_base 這個是全局惟一的 base = event_base_new(); if (!base) { //MY_SIMPLE_LOG_ERROR("Could not initialize libevent!\n"); return 1; } //MY_SIMPLE_LOG_DEBUG("all inited"); //往event_base新增一個event,監聽鏈接事件 memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(PORT); listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!listener) { //MY_SIMPLE_LOG_ERROR("Could not create a listener!\n"); event_base_free(base); return 1; } //MY_SIMPLE_LOG_INFOR("listener created"); //evutil_make_socket_nonblocking(listener); //監聽socket不該該設置成非阻塞 //監聽ctrl+c信號 signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); if (!signal_event || event_add(signal_event, NULL)<0) { //MY_SIMPLE_LOG_ERROR("Could not create/add a signal event!\n"); evconnlistener_free(listener); event_base_free(base); return 1; } //MY_SIMPLE_LOG_INFOR("signal_event created"); //MY_SIMPLE_LOG_INFOR("beging to dispatch"); printf("beging to dispatch...\n"); //開始監聽鏈接 這是一個內置循環的函數 event_base_loopexit函數調用纔會返回 event_base_dispatch(base); //MY_SIMPLE_LOG_INFOR("end dispatch\n"); //清理資源 evconnlistener_free(listener); event_free(signal_event); event_base_free(base); //MY_SIMPLE_LOG_DEBUG("done\n"); return 0; }
客戶端實現:socket
客戶端實現較簡單,就不附流程圖了,直接貼代碼tcp
// my_telnet.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include <string.h> #include <errno.h> #include <stdio.h> #include <io.h> #include <signal.h> #ifndef WIN32 #include <netinet/in.h> # ifdef _XOPEN_SOURCE_EXTENDED # include <arpa/inet.h> # endif #include <sys/socket.h> #else #include <Ws2tcpip.h> #endif #include "event2/bufferevent.h" #include "event2/buffer.h" #include "event2/listener.h" #include "event2/util.h" #include "event2/event.h" #include "event2/util.h" #include <WinSock2.h> #include <conio.h> // _kbhit(), _getch() //#include "MySimpleLog.h" #include <assert.h> static const int PORT = 9995; int tcp_connect_server(const char* server_ip, int port); void cmd_msg_cb(int fd, short events, void* arg); void server_msg_cb(struct bufferevent* bev, void* arg); void event_cb(struct bufferevent *bev, short event, void *arg); void signal_cb(evutil_socket_t sig, short events, void *user_data); typedef struct { struct bufferevent* bev; struct evbuffer *buf; }MyTelnetParam; int main(int argc, char** argv) { if( argc < 3 ) { //兩個參數依次是服務器端的IP地址、端口號 fprintf(stderr, "please input 2 parameter\n"); return -1; } #ifdef WIN32 WSADATA wsa_data; WSAStartup(0x0201, &wsa_data); #endif //MY_SIMPLE_LOG_INIT(//MY_SIMPLE_LOG_LEVEL_DEBUG); struct event_base *base = event_base_new(); //建立bufferevent 暫時不設置對應的socket struct bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); assert(bev != NULL); /* //監聽終端輸入事件 這是linux的作法 win上面不能這樣玩 struct event* ev_cmd = event_new(base, _fileno(stdin), EV_READ | EV_PERSIST, cmd_msg_cb, (void*)bev); event_add(ev_cmd, NULL); */ struct evbuffer *buf = evbuffer_new(); assert(buf != NULL); MyTelnetParam param = {bev, buf}; //監聽終端輸入事件 用超時事件 這裏不用evtimer_new 由於那個事件只能觸發一次 struct event *ev_cmd = event_new(base, -1, EV_PERSIST, cmd_msg_cb, (void *)¶m); assert(event_new != NULL); //每100微秒檢測一次鍵盤輸入狀況 若是有輸入則讀取輸入併發送給服務器 //時間過短CPU佔用過高,太長用戶感受有延遲 struct timeval wait_time = {0, 100}; event_add(ev_cmd, &wait_time); //監聽ctrl+c信號 struct event *signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); assert(signal_event != NULL); event_add(signal_event, NULL); //服務器ip信息初始化 struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr) ); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(atoi(argv[2])); //inet_aton(argv[1], &server_addr.sin_addr); server_addr.sin_addr.s_addr = inet_addr(argv[1]); //鏈接服務器,並將socket設置到bufferevent bufferevent_socket_connect(bev, (struct sockaddr *)&server_addr, sizeof(server_addr)); bufferevent_setcb(bev, server_msg_cb, NULL, event_cb, NULL); bufferevent_enable(bev, EV_READ | EV_PERSIST); event_base_dispatch(base); //這將自動close套接字和free讀寫緩衝區 bufferevent_free(bev); event_free(ev_cmd); event_free(signal_event); evbuffer_free(buf); event_base_free(base); #ifdef WIN32 WSACleanup(); #endif //MY_SIMPLE_LOG_DEBUG("finished \n"); return 0; } void cmd_msg_cb(int fd, short events, void* arg) { MyTelnetParam *param = (MyTelnetParam *)arg; struct bufferevent *bev = param->bev; struct evbuffer *buf = param->buf; //若是有鍵盤輸入,則讀取一個字符 若是不加這判斷直接getch有可能會阻塞回調 if (_kbhit()) { char cInput = EOF; do { //讀取鍵盤輸入並回顯 不用getchar 由於getchar要等用戶按回車才能獲取 int nInput = (char) _getch(); cInput = (char) nInput; evbuffer_add(buf, &cInput, 1); //這裏最好用putch 不用putchar 由於有時是讀取到非可視化字符,如方向鍵 putch(nInput); //讀滿一行就先返回 同時換行 if (cInput == '\r') { //實際測試發現,按回車時用getch只能讀取到一次按鍵\r cInput = '\n'; evbuffer_add(buf, &cInput, 1); putch(cInput); break; } } while (_kbhit()); //將buf整理成連續的內存 //size_t nLen = evbuffer_get_length(buf); //evbuffer_pullup(buf, nLen); //往socket輸出讀取到的字節並移除buf現有字節 bufferevent_write_buffer(bev, buf); evbuffer_drain(buf, evbuffer_get_length(buf)); } } //ctrl + c處理函數 static void signal_cb(evutil_socket_t sig, short events, void *user_data) { //MY_SIMPLE_LOG_DEBUG("signal_cb!\n"); printf("to exit in 2 seconds.\n"); struct event_base *base = (struct event_base *)user_data; struct timeval delay = { 2, 0 }; //MY_SIMPLE_LOG_DEBUG("Caught an interrupt signal; exiting cleanly in two seconds.\n"); event_base_loopexit(base, &delay); } void server_msg_cb(struct bufferevent* bev, void* arg) { char msg[1024 * 4]; size_t len = bufferevent_read(bev, msg, sizeof(msg) - 1); msg[len] = '\0'; printf("recv %d:[%s]\n", (int)len, msg); } void event_cb(struct bufferevent *bev, short event, void *arg) { if (event & BEV_EVENT_EOF) printf("connection closed\n"); else if (event & BEV_EVENT_ERROR) printf("some other error\n"); else if( event & BEV_EVENT_CONNECTED) { printf("the client has connected to server\n"); return ; } // 不等目前隊列中全部回調事件完成,當即退出循環 // 若是等待回調事件完成再退出要用event_base_loopexit // 這裏不能用event_base_loopexit 否則若是用戶操做太快,cmd_msg_cb會報錯 event_base_loopbreak(bufferevent_get_base(bev)); }