libevent(2)

client.cppios

// App02.cpp : 定義控制檯應用程序的入口點。
//
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <iostream>
#include <thread>
#include <atomic>

#ifndef WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
#  include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif

#include <event2/thread.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/bufferevent_struct.h>

std::atomic_bool write_over;

void eventcb(struct bufferevent *bev, short events, void *ptr)
{
    if (events & BEV_EVENT_CONNECTED) 
    {
         /* We're connected to 127.0.0.1:8080.   Ordinarily we'd do
            something here, like start reading or writing. */
    } 
    else if (events & BEV_EVENT_ERROR) 
    {
         /* An error occured while connecting. */
    }
}

//接收Server來的消息
static void conn_readcb(struct bufferevent *bev, void *user_data)//arg1:發生了事件的bufferevent,最後一個是用戶提供的參數,能夠經過這個向回調傳遞參數
{
    char buf[50] = {0};
    int len = bufferevent_read(bev, buf, sizeof(buf));
    std::cout<<"來自Server:"<< buf << std::endl;
}

static void conn_writecb(struct bufferevent *bev, void *user_data)
{
    write_over = true;
    //int ret = bufferevent_write(bev, "我是一個客戶端!", 20);
}
   
struct event_base *g_base;//管理事件


int main()
{
    write_over = false;
    struct bufferevent *bev;
    struct sockaddr_in sin;
#ifdef WIN32
    WSADATA wsa_data;
    WSAStartup(0x0201, &wsa_data);
#endif
    //event支持多線程的初始化函數
    if(-1 == evthread_use_windows_threads())
        return false;

    g_base = event_base_new();

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
    sin.sin_port = htons(9527); /* Port 9527 */

    bev = bufferevent_socket_new(g_base, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);//建立基於套接字的bufferevent

 //   bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);

    if (bufferevent_socket_connect(bev,//connect套接字
        (struct sockaddr *)&sin, sizeof(sin)) < 0) 
    {
        /* Error starting connection */
        std::cout<<"鏈接失敗!\n";
        bufferevent_free(bev);
        return -1;
    }
    bufferevent_setcb(bev, conn_readcb, /*conn_writecb*/nullptr, eventcb, "回調參數");//修改回調
    bufferevent_enable(bev, EV_READ | EV_WRITE );

    std::thread th([]
    {
        event_base_dispatch(g_base);//循環處理事件
    });
    
    int ret = bufferevent_write(bev, "我是客戶端-1", 20);

    std::cout<<"輸入你想發送的內容:\n";
    int count = 0;
    while(1)
    {
    //    if(write_over)
        {
            write_over = false;
            char msg[50] = {0};
            sprintf(msg, "第%d次發送信息!", ++count);
            bufferevent_write(bev, msg, strlen(msg) + 1);
            Sleep(500);
        }
    //    Sleep(50);
    }

    th.join();
    std::cout<<" over!\n";
    getchar();
    return 0;
}

server.cppwindows

// App01.cpp : 定義控制檯應用程序的入口點。
//
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
#  include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif

#include <event2/thread.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/bufferevent_struct.h>
#include <thread>
#include <iostream>
#include <vector>

/* 設置計數,只容許echo_write_cb調用一次 */
static int count = 1;
static std::vector<struct bufferevent *> g_bev;//bufferevent緩衝區

/*當有數據可讀的時候,會調用這個函數 */
//讀取回調函數
static void echo_read_cb(struct bufferevent *bev, void *ctx)
{
    printf("讀:echo_read_cb is called\n");
    char bufs[1000] = {0};
    bufferevent_read(bev, bufs, sizeof(bufs));
    std::cout<< bufs <<"\n";
    return;

    printf("讀:echo_read_cb is called\n");
    struct evbuffer *input = bufferevent_get_input(bev);
    struct evbuffer *output = bufferevent_get_output(bev);
    
    size_t len = evbuffer_get_length(input);
    printf("evbuffer input length is: %lu\n", (unsigned long)len);

    //evbuffer_add_buffer(output, input);

    char buf[1024];
    int n;
    n = evbuffer_remove(input, buf, sizeof(buf));
    printf("copy bytes == %d\n", n);
    printf("copy buf: %s\n", buf);
}

//寫入回調函數
static void echo_write_cb(struct bufferevent *bev, void *ctx)
{

    printf("寫:echo_write_cb is called\n");
    char sendbuffer[] = "yes, i recv your message!\n";    
    return;

    struct evbuffer *output = bufferevent_get_output(bev);    
    
    if(count == 1)
    {
        int result = evbuffer_add(output, sendbuffer, strlen(sendbuffer));
        printf("evbuffer_add result = %d\n", result);    
    }
    
    count++;
    int len = evbuffer_get_length(output);
    evbuffer_drain(output, len);    
}


/*當客戶端結束的時候,確定會調用這個函數  */
//事件回調函數
static void echo_event_cb(struct bufferevent *bev, short events, void *ctx)
{
    printf("狀態:echo_event_cb is called\n");
    if(events & BEV_EVENT_ERROR)
        perror("Error from bufferevent");
    if(events & BEV_EVENT_EOF | BEV_EVENT_ERROR)
    {
        bufferevent_free(bev);
        printf("bufferevent_free is called\n");
    }
    printf("-------------------------------\n\n");
    
}    


//client鏈接回調
static void accept_conn_cb(struct evconnlistener *listener, evutil_socket_t fd, 
    struct sockaddr*address, int socklen, void *ctx)
{
    printf("鏈接:Accept_conn_cb is called\n");
    struct event_base *base = evconnlistener_get_base(listener);//返回監聽器關聯的event_base
    struct bufferevent *bev = bufferevent_socket_new(base, fd,//建立基於套接字的bufferevent
        BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
    if (!bev) 
    {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return ;
    }
    g_bev.push_back(bev);

    bufferevent_setcb(bev, echo_read_cb, NULL, echo_event_cb, "Server參數");//設置讀回調,錯誤事件回調,null表示禁止回調,cbarg向回調傳遞參數
    
    bufferevent_enable(bev, EV_READ|EV_WRITE);    //開啓緩衝區上的讀,寫事件
}

//鏈接監聽器錯誤回調函數
static void accept_error_cb(struct evconnlistener *listener, void *ctx)
{    
    printf("監聽錯誤:Accept_error_cb is called\n");
    struct event_base *base = evconnlistener_get_base(listener);
    int err = EVUTIL_SOCKET_ERROR();
    fprintf(stderr, "Got an error");
    
    event_base_loopexit(base, NULL);
    
}

struct event_base *g_base;

int main(int argc, char **argv)
{
    struct evconnlistener *listener;
    struct sockaddr_in sin;
    
    int port = 9527;
    
    if(argc > 1)
    {
        port = atoi(argv[1]);
    }
    if(port <= 0 || port > 65535)
    {
        puts("Invalid port");
        return 1;
    }
#ifdef WIN32
    WSADATA wsa_data;
    WSAStartup(0x0201, &wsa_data);//啓動異步socket
#endif
    if(-1 == evthread_use_windows_threads())//event多線程支持
        return false;

    g_base = event_base_new();//建立一個event_base
    if(!g_base)
    {
        puts("could't open event_base");
        return 1;
    }

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(0);
    sin.sin_port = htons(port);
    //分配返回一個監聽器對象,有鏈接會回調accept_conn_cb函數,綁定iP,port
    listener = evconnlistener_new_bind(g_base, accept_conn_cb, NULL,
        LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
        (struct sockaddr*)&sin, sizeof(sin));
    if(!listener)
    {
        perror("could't not create listener");
        return 1;
    }
    
    evconnlistener_set_error_cb(listener, accept_error_cb);//偵聽錯誤
    std::thread th([]
    {
        event_base_dispatch(g_base);//程序進入無線循環,等待就緒事件並執行事件處理
    });
    int count = 0;
    for(;;)
    {
        if(!g_bev.empty())
        {
            for(auto it : g_bev)
            {
                char msg[50] = {0};
                sprintf(msg, "歡迎!Server send!%d", count);
                bufferevent_write(it, msg, 50);//想緩衝區添加數據
            }
            count++;
        }
        Sleep(50);
    }
    th.join();
    return 0;
}

 

//每一個bufferevent有兩個數據相關的回調:一個讀取回調和一個寫入回調。
//默認狀況下,從底層傳輸端口讀取了任意量的數據以後會調用讀取回調;
//輸出緩衝區中足夠量的數據被清空到底層傳輸端口後寫入回調會被調用。
//經過調整bufferevent的讀取和寫入「水位(watermarks)」能夠覆蓋這些函數的默認行爲。多線程

本站公眾號
   歡迎關注本站公眾號,獲取更多信息