負載均衡通信轉發分發器(G5)源代碼分析

負載均衡通信轉發分發器(G5)源代碼分析 (以版本v1.1.0爲準)     G5源代碼文件只有.c(2400行)和.h(260行)兩個源文件,行數雖然很少,可是技術密集度較高,分析源碼主要從基於epoll(ET)事件處理應用層框架和轉發會話結構管理兩方面入手。     先來看數據結構。主要的數據結構有服務器環境大結構,裏面包含了全部參數配置和運行時狀態數據,是全部內部函數傳遞的第一個參數。 [code=c] /* 服務器環境大結構 */ struct ServerEnv {     struct CommandParam        cmd_para ;          struct ForwardRule        *forward_rule ;     unsigned long            forward_rule_count ;          int                event_env ;     struct ForwardSession        *forward_session ;     unsigned long            forward_session_maxcount ;     unsigned long            forward_session_count ;     unsigned long            forward_session_use_offsetpos ;          struct ServerCache        server_cache ; } ; [/code]     包含的衆多成員中最應關注的是轉發規則結構和轉發會話結構 [code=c] /* 轉發規則結構 */ struct ForwardRule {     char                rule_id[ RULE_ID_MAXLEN + 1 ] ;     char                rule_mode[ RULE_MODE_MAXLEN + 1 ] ;          unsigned long            timeout ;          struct ClientNetAddress        client_addr[ RULE_CLIENT_MAXCOUNT ] ;     unsigned long            client_count ;          struct ForwardNetAddress    forward_addr[ RULE_FORWARD_MAXCOUNT ] ;     unsigned long            forward_count ;          struct ServerNetAddress        server_addr[ RULE_SERVER_MAXCOUNT ] ;     unsigned long            server_count ;     unsigned long            select_index ;          union     {         struct         {             unsigned long    server_unable ;         } RR[ RULE_SERVER_MAXCOUNT ] ;         struct         {             unsigned long    server_unable ;         } LC[ RULE_SERVER_MAXCOUNT ] ;         struct         {             unsigned long    server_unable ;             struct timeval    tv1 ;             struct timeval    tv2 ;             struct timeval    dtv ;         } RT[ RULE_SERVER_MAXCOUNT ] ;     } status ; } ; [/code]     轉發規則結構裏的數據主要來源於配置文件,還有一些是由配置處理獲得。     最後一塊數據status則是用於負載均衡的內部狀態跟蹤。 [code=c] /* 轉發會話結構 */ struct ForwardSession {     char                forward_session_type ;          struct ClientNetAddress        client_addr ;     struct ListenNetAddress        listen_addr ;     struct ServerNetAddress        server_addr ;     unsigned long            client_session_index ;     unsigned long            server_session_index ;          struct ForwardRule        *p_forward_rule ;     unsigned long            client_index ;          char                connect_status ;     unsigned long            try_connect_count ;          unsigned long            active_timestamp ;          char                io_buffer[ IO_BUFSIZE + 1 ] ;     long                io_buflen ; } ; [/code]     轉發會話結構在轉發鏈接創建時建立,鏈接斷開時銷燬。     一個數據傳輸方向對應一個會話,通常一個鏈接請求對應兩段TCP鏈接也對應兩個會話結構。成員forward_session_type表示是客戶端到服務端數據傳輸方向仍是服務端到客戶端數據傳輸方向。獲得一個會話結構能夠經過成員client_session_index和server_session_index獲得另外一個會話結構。     成員p_forward_rule指向該會話使用的轉發規則結構。     成員client_addr、listen_addr和分別對應客戶端、轉發端(本地偵聽端)和服務端信息。     成員connect_status用於跟蹤異步鏈接模式下的創建狀態。     成員io_buffer用於會話數據傳輸時的緩衝區,便於異步處理。          瞭解了幾個主要的數據結構後咱們開始分析源代碼。          main函數主要解析命令行參數和顯示語法,調用G5函數          G5函數主要是建立epoll池、裝載配置文件、調用服務器主工做循環(ServerLoop函數)、銷燬epoll池     pse->event_env = epoll_create( pse->forward_session_maxcount ) ;     nret = LoadConfig( pse ) ;     nret = ServerLoop( pse ) ;     close( pse->event_env );          ServerLoop函數負責服務器主循環,即批量等待epoll事件,有epoll事件發生時,處理之     sock_count = epoll_wait( pse->event_env , events , WAIT_EVENTS_COUNT , 1000 ) ;         若是是偵聽端口事件         if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_LISTEN )             管理端口事件,調用函數AcceptManageSocket             if( strcmp( p_forward_session->listen_addr.rule_mode , FORWARD_RULE_MODE_G ) == 0 )                 nret = AcceptManageSocket( pse , p_event , p_forward_session ) ;             轉發端口事件,調用函數AcceptForwardSocket             else                 nret = AcceptForwardSocket( pse , p_event , p_forward_session ) ;         若是是輸入事件         else if( p_event->events & EPOLLIN )             管理端口輸入事件,調用函數ReceiveOrProcessManageData             if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_MANAGE )                 nret = ReceiveOrProcessManageData( pse , p_event , p_forward_session ) ;             轉發端口輸入事件,調用函數TransferSocketData             else if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_CLIENT || p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER )                 nret = TransferSocketData( pse , p_event , p_forward_session ) ;         若是是輸出事件         else if( p_event->events & EPOLLOUT )             異步鏈接創建響應事件,調用函數SetSocketConnected             if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER && p_forward_session->connect_status == CONNECT_STATUS_CONNECTING )                 nret = SetSocketConnected( pse , p_event , p_forward_session ) ;             異步發送sock可寫事件,調用函數ContinueToWriteSocketData             else if( p_forward_session->connect_status == CONNECT_STATUS_CONNECTED )                 nret = ContinueToWriteSocketData( pse , p_event , p_forward_session ) ;         若是是錯誤事件         else if( p_event->events & EPOLLERR )             調用函數ResolveSocketError             nret = ResolveSocketError( pse , p_event , p_forward_session ) ;          函數AcceptForwardSocket用於在有客戶端接入時,查詢轉發規則,轉連到對應服務端上         循環接受轉發端口鏈接,epoll(ET)邊緣觸發時必須把一口氣把全部事件都處理掉,包含客戶端接入事件             接受客戶端接入             client_addr.sock = accept( p_forward_session->listen_addr.sock , (struct sockaddr *) & (client_addr.netaddr.sockaddr) , & addr_len ) ;             查詢轉發規則             nret = MatchForwardRule( pse , & client_addr , & (p_forward_session->listen_addr) , & p_forward_rule , & client_index ) ;             鏈接服務端(函數ConnectToRemote)             nret = ConnectToRemote( pse , p_event , p_forward_session , p_forward_rule , client_index , & client_addr , TRY_CONNECT_MAXCOUNT ) ;          函數ConnectToRemote用於有客戶端接入時轉連服務端         建立轉連的本地客戶端sock         server_addr.sock = socket( AF_INET , SOCK_STREAM , IPPROTO_TCP );         設置非堵塞模式         SetNonBlocking( server_addr.sock );         根據轉發規則,選擇目標網絡地址(函數SelectServerAddress)。若是有多個服務端地址的話按負載均衡算法選擇         nret = SelectServerAddress( pse , p_client_addr , p_forward_rule , server_addr.netaddr.ip , server_addr.netaddr.port ) ;         鏈接目標網絡地址         nret = connect( server_addr.sock , ( struct sockaddr *) & (server_addr.netaddr.sockaddr) , addr_len );         若是鏈接創建中         if( nret < 0 ) if( errno != EINPROGRESS )             登記服務端轉發會話到會話池中(會話鏈接狀態爲正在鏈接中),登記sock到epoll池             epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_server->server_addr.sock , & server_event );         若是鏈接創建完成(本地鏈接本身時大機率發生)             登記客戶端轉發會話到會話池中(會話鏈接狀態爲鏈接成功),登記sock到epoll池             epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_client->client_addr.sock , & client_event );             登記服務端轉發會話到會話池中(會話鏈接狀態爲鏈接成功),登記sock到epoll池             epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_server->server_addr.sock , & server_event );          函數SetSocketConnected用於以前正在鏈接服務端的會話當鏈接成功事件發生時處理         登記客戶端轉發會話到會話池中(會話鏈接狀態爲鏈接成功),登記sock到epoll池         p_forward_session_client->connect_status = CONNECT_STATUS_CONNECTED ;         epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_client->client_addr.sock , & client_event );         更新服務端轉發會話鏈接狀態鏈接成功         p_forward_session_server->connect_status = CONNECT_STATUS_CONNECTED ;         epoll_ctl( pse->event_env , EPOLL_CTL_MOD , p_forward_session_server->server_addr.sock , & server_event );         (等鏈接創建後,一對會話進入等待數據輸入事件EPOLLIN)          函數TransferSocketData用於從一個準備好數據輸入的會話中接收數據,並轉發給服務端         循環從客戶端sock中接收數據,epoll(ET)邊緣觸發時必須把一口氣把全部事件都處理掉             接收通信數據             p_out_forward_session->io_buflen = recv( in_sock , p_out_forward_session->io_buffer , IO_BUFSIZE , 0 ) ;             若是沒有接收數據了,跳出循環             if( p_out_forward_session->io_buflen < 0 ) if( errno == EAGAIN )             若是接收失敗,關閉客戶端和服務端sock、刪除epoll池中一對sock,刪除轉發會話             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );             若是接收到輸入端斷開鏈接事件,關閉客戶端和服務端sock、刪除epoll池中一對sock,刪除轉發會話             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );             循環發送數據到服務端sock                 發送通信數據                 len = send( out_sock , p_out_forward_session->io_buffer , p_out_forward_session->io_buflen , 0 ) ;                 若是底層發送緩衝區滿了,則暫停客戶端sock輸入事件EPOLLIN監控,啓用服務端sock輸出事件EPOLLOUT監控,跳出循環發送                 if( len < 0 ) if( errno == EAGAIN )                     epoll_ctl( pse->event_env , EPOLL_CTL_MOD , in_sock , & in_event );                     epoll_ctl( pse->event_env , EPOLL_CTL_MOD , out_sock , & out_event );                 若是發送出錯,關閉客戶端和服務端sock、刪除epoll池中一對sock,刪除轉發會話                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );                 SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );                 若是發送完了,跳出循環                 else if( len == p_out_forward_session->io_buflen )                     break;                 不然繼續發送未發送數據                 else                     p_out_forward_session->io_buflen -= len ;                     memmove( p_out_forward_session->io_buffer , p_out_forward_session->io_buffer + len , p_out_forward_session->io_buflen );          函數ContinueToWriteSocketData用於處理以前底層發送緩衝區滿了引起的異步發送機制         循環繼續發送未發送數據到服務端sock             發送通信數據             len = send( out_sock , p_out_forward_session->io_buffer , p_out_forward_session->io_buflen , 0 ) ;             若是底層發送緩衝區滿了,跳出循環發送             if( len < 0 ) if( errno == EAGAIN )                 break;             若是發送出錯,關閉客戶端和服務端sock、刪除epoll池中一對sock,刪除轉發會話                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , in_sock , NULL );                 epoll_ctl( pse->event_env , EPOLL_CTL_DEL , out_sock , NULL );                 SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );             若是發送完了,恢復客戶端sock輸入事件EPOLLIN監控,暫停服務端sock輸出事件EPOLLOUT監控,跳出循環             else if( len == p_out_forward_session->io_buflen )                 epoll_ctl( pse->event_env , EPOLL_CTL_MOD , in_sock , & in_event );                 epoll_ctl( pse->event_env , EPOLL_CTL_MOD , out_sock , & out_event );                 break;             不然繼續發送             else                 p_out_forward_session->io_buflen -= len ;                 memmove( p_out_forward_session->io_buffer , p_out_forward_session->io_buffer + len , p_out_forward_session->io_buflen );          函數ResolveSocketError用於全部sock出錯時,關閉客戶端和服務端sock、刪除epoll池中一對sock,刪除轉發會話         若是是異步鏈接事件         if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER && p_forward_session->connect_status == CONNECT_STATUS_CONNECTING )             設置以前選擇目標服務器不可用狀態             nret = OnServerUnable( pse , p_forward_session->p_forward_rule ) ;             選擇新目標服務器,鏈接之             nret = ConnectToRemote( pse , p_event , p_forward_session , p_forward_session->p_forward_rule , p_forward_session->client_index , & (p_forward_session->client_addr) , --p_forward_session->try_connect_count ) ;             清理以前臨時建立的服務端會話結構             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused( p_forward_session );         若是是數據交換報錯事件             關閉客戶端和服務端sock、刪除epoll池中一對sock,刪除轉發會話             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );             epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );             SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );          epoll(ET)事件處理應用層框架源代碼主線分析完成,但願經過分析其源代碼,能夠幫助讀者理解其設計思路和代碼結構,審覈做者編碼缺陷,提升使用準確性,也有助於讀者在許可證容許條件下fork本身的版本。     若有意見或建議歡迎及時聯繫我     開源項目首頁 : http://git.oschina.net/calvinwilliams/G5     做者郵箱 : calvinwilliams.c@gmail.com
相關文章
相關標籤/搜索