OceanBase server處理網絡包的回調邏輯

OceanBase處理網絡包的邏輯仍是蠻繞的,這裏以UPS爲例,做爲給本身的備忘。網絡

 

UPS代碼的main.cpp中調用ObUpdateServerMain的start啓動server。start函數會調用ObUpdateServerMain的do_work函數,此函數調用ObUpdateServer類的start啓動UPS。數據結構

 

ObUpdateServer繼承自以下幾個類:函數

1)common::ObBaseServer,基礎的server類,要求派生類實現handlePacket和handleBatchPacket方法(後一個UPS不會使用),這兩個是純虛函數,同時有個虛函數start_service,派生類能夠實現此方法用於初始化時作些server相關的邏輯;this

2)ObPacketQueueHandler,  要求派生類實現handlePacketQueue接口;線程

3)common::IBatchPacketQueueHandler,要求派生類實現handleBatchPacketQueue接口;code

 

ObUpdateServer類的start函數實際上調用的ObBaseServer的start函數,此函數作以下幾件事:server

1)初始化libeasy相關的一系列數據結構, 在特定的網絡端口上監聽等;對象

2)調用initialize函數(須要派生類實現);繼承

3)調用start_service函數(派生類可本身實現);接口

 

ObUpdateServer的initialize函數,作一系列啓動UPS內部所需的參數的初始化,除了必要的參數外,有幾個須要特別注意下:

1)server_handler_,此變量的類型是easy_io_handler_pt,此處註冊一系列回調,包括收包時的編解碼(server_handler_.encode/server_handler_.decode),收包時的處理邏輯(server_handler_.process),斷開鏈接時的處理等(server_handler_.on_disconnect)等;編解碼相關等都是使用的全部OB server共用的ObTbnetCallback類的方法,版本升級時包的格式變化只須要統一改此common類的方法便可;包的處理邏輯調用的UPS本身的ObUpdateCallback::process方法;

2)client_manager_的初始化,後續server發包都須要用到client_manager_;

3)rpc相關的初始化;

4)一系列worker線程的初始化(如read_thread_queue_/write_thread_queue_/lease_thread_queue_等,具體看代碼,指定不一樣類型線程的線程數,將this傳給各個線程用做handler回調等);

 

ObUpdateServer的start_service函數,調用start_threads(此函數調用上述一系列worker線程的start方法,等候收到任務處理),start_timer_schedule設定一系列ObTimer設定一些定時器相關的處理邏輯。

 

如上和網絡包處理相關的一系列函數說清楚了,而後具體說說網絡包收到後是怎麼處理的。

libeasy收到包會調用註冊的回調方法ObUpdateCallback::process,此函數會調用ObUpdateServer的handlePacket方法,此函數由各server本身實現,如UPS,會根據packet_code不一樣,或者進行本地處理(由於libeasy有多個IO poll thread,因此本地能夠處理一些耗時少的操做包),或是扔給不一樣的worker類處理。這裏分別舉個例子:

1)packet_code爲OB_SET_OBI_ROLE類型的包,調用read_thread_queue_的push方法,交由read_thread_queue_線程處理;

2)packet_code爲OB_UPS_CLEAR_FATAL_STATUS類型的包,調用lease_thread_queue_的push方法,交由lease_thread_queue_線程處理;

3)OB_WRITE/OB_MS_MUTATE/OB_FAKE_WRITE_FOR_KEEP_ALIVE/OB_UPS_PHY_PLAN_EXECUTE/OB_START_TRANSACTION/OB_END_TRANSACTION/OB_WRITE_DUMMY_LOG類型的包,直接在IO線程中調用trans_executor_的handle_packet函數處理;這裏注意下,trans_executor_的handle_packet也會進一步判斷包的類型,肯定是直接在此函數中處理(transe_executor_類對不一樣類型的包實現了一系列的handle函數),仍是經過調用push_task_交由trans_executor本身的一系列worker線程處理。

 

read_thread_queue_等worker線程是common::ObPacketQueueThread類型的(或者其餘相似類型)實例,此類類型都是tbsys::CDefaultRunnable的派生類,其內部含有一個阻塞隊列等消息到來,同時有一個用於回調的ObPacketQueueHandler類型的handler(注意ObUpdateServer就繼承自ObPacketQueueHandler,實際上此處註冊的handler就是ObUpdateServer類的對象)。此類worker線程start後真正調用的是ObPacketQueueThread::run函數,此函數即從內部隊列中取消息,而後調用ObUpdateServer的handlePacketQueue函數,handlePacketQueue函數再根據包的類型調用ObUpdateServer類實現的各種方法對包進行處理。此處因爲是worker線程,全部包都是線程本地處理的。

 

至此,整個處理包的邏輯應該被串起來了。其實主要是初看代碼時handlePacket/handlePacketQueue/handleBatchPacket/handleBatchPacketQueue各由誰調用,以及在哪一個線程空間中被調用不太好理解。

相關文章
相關標籤/搜索