異步網絡消息處理框架

最近一段時間將原來寫的kendynet網絡框架重寫了大部分的代碼,讓提供的接口更清晰,對用戶更友好。linux

整個框架的架構分層3層:git

1)單線程,基於原始數據流的網絡接口,在這一層上,沒有提供封包的處理,定時器事件等等。使用者能夠在此之上按本身的需求作進一步的封裝。github

2)單線程,提供connection,封包處理,接收發送超時處理。安全

3)網絡邏輯分離的異步網絡框架,抽象出三個主要的類型:asynnet_t,sock_ident和msgdisp_t.服務器

asynnet_t:網絡處理引擎,使用者建立實例的時候能夠傳入pollercount參數,其中每個poller都會在單獨的線程中運行.網絡

sock_ident:邏輯層操做的套接口封裝,能夠安全的在多線程環境下使用.多線程

msgdisp_t:消息分離器,每一個分離器有一個對應的消息隊列用於接收從網絡線程傳遞過來的消息.架構

msgdisp_t提供了兩種使用模式:框架

第一種:典型的線程池模式。在這種模式下,能夠建立一個消息分離器,多個邏輯線程對這個消息分離器調用異步

msg_loop.

第二種:共用網絡層模式。在一個進程中啓動N個線程,每一個線程運行一個不一樣服務,全部這些服務共用網絡通訊層.

在這種狀況下,網絡消息須要路由到正確的服務那裏.能夠每一個線程都建立一個消息分離器,各線程在本身的消息分離器上

調用msg_loop處理只屬於本身的消息.

下面是一個異步網絡服務器的示例:

#include <stdio.h>
#include <stdlib.h>
#include "core/msgdisp.h"
#include "testcommon.h"

uint32_t recvsize = 0;
uint32_t recvcount = 0;

///int32_t asynnet_bind(msgdisp_t disp,sock_ident sock,void *ud,int8_t raw,uint32_t send_timeout,uint32_t recv_timeout)
void asynconnect(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port)
{
    printf("asynconnect\n");
    disp->bind(disp,sock,1,0,30*1000);
}

void asynconnected(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port)
{
    printf("asynconnected\n");
   ++client_count;
}

void asyndisconnected(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port,uint32_t err)
{
    --client_count;
}


int32_t asynprocesspacket(msgdisp_t disp,sock_ident sock,rpacket_t rpk)
{
    recvsize += rpk_len(rpk);
    recvcount++;
    asyn_send(sock,wpk_create_by_other((struct packet*)rpk));
    return 1;
}


void asynconnectfailed(msgdisp_t disp,const char *ip,int32_t port,uint32_t reason)
{

}


int main(int argc,char **argv)
{
    setup_signal_handler();
    InitNetSystem();
    asynnet_t asynet = asynnet_new(1);
    msgdisp_t  disp = new_msgdisp(asynet,
                                  asynconnect,
                                  asynconnected,
                                  asyndisconnected,
                                  asynprocesspacket,
                                  asynconnectfailed);
    int32_t err = 0;
    disp->listen(disp,argv[1],atoi(argv[2]),&err);
    uint32_t tick,now;
    tick = now = GetSystemMs();
    while(!stop){
        msg_loop(disp,50);
        now = GetSystemMs();
        if(now - tick > 1000)
        {
            uint32_t elapse = now-tick;
            recvsize = (recvsize/elapse)/1000;
            printf("client_count:%d,recvsize:%d,recvcount:%d\n",client_count,recvsize,recvcount);
            tick = now;
            packet_send_count = 0;
            recvcount = 0;
            recvsize = 0;
        }
    }
    CleanNetSystem();
    return 0;
}

項目代碼在:https://github.com/sniperHW/luanet

目前只實現了對linux,tcp的網絡支持,後續將會先完善這部分代碼,並在此之上提供基於user level thread的RPC支持.

相關文章
相關標籤/搜索