teamtalk的conn框架簡介及netlib線程安全問題

最近把teamtalk的conn_map改爲了智能指針,但改了總要多方面試試有沒有問題,總不能編譯經過,能正常啓動就萬事大吉了。因此就寫了一個shell client客戶端來進行功能的測試。 android

tt的官方上一次發佈版本里有一個test目錄,裏面寫了一個簡易的測試客戶端。不過這個test根本不可用,由於不是代碼寫的有錯誤,就是功能缺失,因此我只好親自動手重作了。 ios

在作的過程當中老是發現client發起的connect偶爾會有鏈接不上,這個偶爾的機率很是低,但既然發生了,那確定是有問題的。因而就查啊查啊。。。 面試

test的測試客戶端至關因而一個命令行shell的客戶端,也就是沒有圖形界面,你的功能經過在終端上輸入命令來完成。目前我僅實現了註冊和登錄。將來打算把聊天等各類功能也作了,這樣就差很少至關於實現了一個命令行式的客戶端。有人也許會問,TT有windows,mac,ios,android全平臺客戶端,作個命令行的客戶端有什麼用?固然有用了,測試功能方便啊,你不用考慮折騰界面就能把各類功能給測了。將來添加功能也方便寫測試,好比我如今新增一個註冊功能,在這個命令行上面輸入reg xx oo,那麼一個用戶名叫xx的用戶便以密碼爲oo註冊進了數據庫。對命令的解析可比作界面的事件響應函數方便多了。 shell

好了,如今問題來了,shell命令的輸入是須要一個死循環來反覆等待用戶輸入的,而tt的異步網絡框架又須要另外一個死循環,若是兩個死循環放在同一線程裏顯然不行,因此就把接受用戶輸入的死循環放到了另外一個線程裏面。那麼當用戶輸入reg xx oo時,將這條命令解析出用戶名和密碼,並開始啓動註冊流程,這一切都是在另外一個線程裏作的。 數據庫

註冊流程是怎麼樣的呢?這裏先講一講TT的conn框架。 windows

TT的底層異步網絡庫是將socket和epoll封裝成一個netlib庫,你要作的任何有關異步網絡的操做都是經過調用netlib來實現的。但netlib只是一個原始的對tcp報文發送接收的異步庫,你要作即時通信,還須要在此基礎上實現一套通信協議,而且封裝一組接口來完成對這些協議的操做。 服務器

因而TT就定義了一個叫 網絡

CImConn的類,這個類定義在imconn.h裏面。 多線程

爲了方便你們閱讀,這裏摘入部分代碼 框架

class CImConn : public CRefObject
{
public:
	CImConn();
	virtual ~CImConn();
	int Send(void* data, int len);
	virtual void OnRead();
	virtual void OnWrite();
	
	bool IsBusy() { return m_busy; }
	int SendPdu(CImPdu* pPdu) { return Send(pPdu->GetBuffer(), pPdu->GetLength()); }

	virtual void OnConnect(net_handle_t handle) { m_handle = handle; }
	virtual void OnConfirm(){}
	virtual void OnClose(){}
	virtual void OnTimer(uint64_t){}
    virtual void OnWriteCompelete(){}
	virtual void HandlePdu(CImPdu*){}



接口的含義是顯而易見的,OnConnect就是有鏈接接入事件的響應函數, OnConfirm這個含義有點含糊,其實就是你發起netlib_connect,當這個connect鏈接創建完成時調用的函數。

這裏爲了方便你理解,作個類比,若是你作過android開發,想一下每次你寫一個應用的最經常使用的流程是什麼樣的?定義一個類繼承Activity,而後override裏面的onCreate等xx方法,是否是很類似?固然若是你沒有安卓開發經驗,類比一下ios吧,ios也是這樣的,若是ios也沒作過,那也不要緊,繼續往下看。

這裏CImConn其實就是留給你繼承的,當你繼承後,請實現裏面對應的成員函數。

若是你作的是服務端,那麼須要實現OnConnect來響應用戶的接入,若是是客戶端,就須要OnConfirm來定義鏈接上服務器後的操做。其餘幾個接口服務端和客戶端是通用的。

因此,看完這裏你就會理解msg_server目錄下爲何有DBServConn,FileServConn, LoginServConn, RouteServConn, PushServConn以及MsgConn。

前面幾個都是消息服務器主動向其餘幾個服務器發起的客戶端鏈接,最後一個是消息服務器本身的服務端Conn,用來等待用戶接入,因此須要實現OnConnect函數。

而login_server裏面的HttpConn和LoginConn含義也顯而易見了,一個是用來響應http請求的,另外一個是響應消息服務器login信息登記請求的。其餘幾個服務器裏的conn也以此類推。

以前對TT感到很凌亂的朋友是否是忽然感受本身頓悟了?感謝我吧。

另一個疑問,TT的imconn框架是如何把這個CImConn和netlib鏈接起來的?

這裏以DBServConn爲例作一個解釋,看代碼

void CDBServConn::Connect(const char* server_ip, uint16_t server_port, uint32_t serv_idx)
{
	log("Connecting to DB Storage Server %s:%d ", server_ip, server_port);

	m_serv_idx = serv_idx;
	m_handle = netlib_connect(server_ip, server_port, imconn_callback, (void*)&g_db_server_conn_map);

	if (m_handle != NETLIB_INVALID_HANDLE) {
		g_db_server_conn_map.insert(make_pair(m_handle, this));
	}
}



CDBServConn是消息服務器像數據庫代理髮起鏈接時須要繼承的一個CImConn類,裏面的Connect函數是發起鏈接時調用的。看裏面有調到netlib_connect,並傳入imconn_callback和g_db_server_conn_map。

這兩個參數就是鏈接imconn和netlib的關鍵。g_db_server_conn_map是定義在CDBServConn裏的一個static全局map映射表,用來保存什麼呢?下面一句

g_db_server_conn_map.insert(make_pair(m_handle, this))
很明顯,這個映射表保存了每次鏈接的socket句柄(m_handle)和imconn對象(this)的映射關係。

當TT底層的事件分發器產生事件後,便會調用imconn_callback,裏面有一個FindImConn會反查到對應的Conn,而後再調用Conn對象的OnConfirm等函數,這些函數就是你以前繼承CImConn本身實現的。運行時多態有木有?是否是以爲TT的框架作的還挺不錯的。conn對象的OnRead實際上是最重要的一個函數,由於你的業務代碼都將在這裏面自行實現。

void imconn_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
	NOTUSED_ARG(handle);
	NOTUSED_ARG(pParam);

	if (!callback_data)
		return;

	ConnMap_t* conn_map = (ConnMap_t*)callback_data;
	CImConn* pConn = FindImConn(conn_map, handle); //這裏將會經過socket句柄反查到對於的imconn
	if (!pConn)
		return;

	//log("msg=%d, handle=%d ", msg, handle);

	switch (msg)
	{
	case NETLIB_MSG_CONFIRM:
		pConn->OnConfirm();  //connect鏈接成功後會調此pConn的OnConfirm()函數
		break;
	case NETLIB_MSG_READ:
		pConn->OnRead(); //業務代碼會在這裏面執行
		break;
	case NETLIB_MSG_WRITE:
		pConn->OnWrite();
		break;
	case NETLIB_MSG_CLOSE:
		pConn->OnClose();
		break;
	default:
		log("!!!imconn_callback error msg: %d ", msg);
		break;
	}

	pConn->ReleaseRef();
}

看看OnRead代碼,裏面有一個HandlePdu

void CImConn::OnRead()
{
	for (;;)
	{
		uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();
		if (free_buf_len < READ_BUF_SIZE)
			m_in_buf.Extend(READ_BUF_SIZE);

		int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);
		if (ret <= 0)
			break;

		m_recv_bytes += ret;
		m_in_buf.IncWriteOffset(ret);

		m_last_recv_tick = get_tick_count();
	}

    CImPdu* pPdu = NULL;
	try
    {
		while ( ( pPdu = CImPdu::ReadPdu(m_in_buf.GetBuffer(), m_in_buf.GetWriteOffset()) ) )
		{
            uint32_t pdu_len = pPdu->GetLength();
            
			HandlePdu(pPdu);  //這裏面將會完成各類業務代碼

			m_in_buf.Read(NULL, pdu_len);
			delete pPdu;
            pPdu = NULL;
//			++g_recv_pkt_cnt;
		}
	} catch (CPduException& ex) {
		log("!!!catch exception, sid=%u, cid=%u, err_code=%u, err_msg=%s, close the connection ",
				ex.GetServiceId(), ex.GetCommandId(), ex.GetErrorCode(), ex.GetErrorMsg());
        if (pPdu) {
            delete pPdu;
            pPdu = NULL;
        }
        OnClose();
	}
}



摘一段CDBServConn的HandlePdu
void CDBServConn::HandlePdu(CImPdu* pPdu)
{
	switch (pPdu->GetCommandId()) {
        case CID_OTHER_HEARTBEAT:
            break;
        case CID_OTHER_VALIDATE_RSP:
            _HandleValidateResponse(pPdu );
            break;
        case CID_LOGIN_RES_DEVICETOKEN:
            _HandleSetDeviceTokenResponse(pPdu);
            break;
        case CID_MSG_UNREAD_CNT_RESPONSE:
            _HandleUnreadMsgCountResponse( pPdu );
            break;
        case CID_MSG_LIST_RESPONSE:
            _HandleGetMsgListResponse(pPdu);
            break;
        case CID_MSG_GET_BY_MSG_ID_RES:
            _HandleGetMsgByIdResponse(pPdu);
            break;
        case CID_MSG_DATA:
            _HandleMsgData(pPdu);
            break;
        case CID_MSG_GET_LATEST_MSG_ID_RSP:
            _HandleGetLatestMsgIDRsp(pPdu);
            break;



裏面的handler就是對應不一樣協議的處理器。因此到此,你就會差很少明白,大部分時候,你要作的就是繼承CImConn而後寫handler。

TT的conn框架簡介就到此爲止了,其實還有不少細節須要你本身去摳代碼,慢慢來。

如今回到一開始說的在另外一個線程裏發起註冊流程,你應該會很清楚整個過程是怎麼作的了,其實就是繼承CImConn,而後在裏面發起鏈接和接受鏈接處理。這裏摘一段我代碼

net_handle_t CClientConn::Connect(const char* ip, uint16_t port, uint32_t idx)
{
	m_handle = netlib_connect(ip, port, imconn_callback_sp, (void*)&s_client_conn_map);
	log("connect handle %d", m_handle);
	if (m_handle != NETLIB_INVALID_HANDLE) {
	    log("in invalid %d", m_handle);
        s_client_conn_map.insert(make_pair(m_handle, shared_from_this()));//這裏!!!
	}
    return  m_handle;
}



注意這裏我本身的代碼跟以前給出的TT源碼略有不一樣,imconn_callback_sp是我改爲智能指針的版本,插入conn_map表的不是原始this指針,而是shared_ptr。

這個操做是在子線程裏進行的,因此netlib_connect會把imconn_callback_sp加入到底層事件分發器裏進行監聽,而事件分發器是在主線程裏運行的一個循環,這個循環會在socket文件句柄發生讀寫事件後對你加入的函數進行回調。因此netlib_connect會裏面把imconn_callback加入主線程的監聽器,主線程一旦監聽到事件發生就會馬上調用此函數,而此函數裏的

CImConn* pConn = FindImConn(conn_map, handle);

conn_map是在netlib_connect後insert的,因此就有可能出現FindImConn時,conn_map裏面尚未來得及insert這對關係,也就形成了偶爾會發生connect後沒有繼續調用後續的OnConfirm函數,而你跑到服務端看,connect確實成功的奇怪現象。多線程真要命啊。。。

那麼如何解決這個問題呢?這個不是本文的要講的,各位有興趣請自行考慮解決的方法,這裏友情提示,加鎖是沒有用的。

相關文章
相關標籤/搜索