接着上篇的雪崩檢測,回顧下LongLinkTaskManager::__RunOnStartTask:
/mars-master/mars/stn/src/longlink_task_manager.ccjava
void LongLinkTaskManager::__RunOnStartTask() { std::list<TaskProfile>::iterator first = lst_cmd_.begin(); std::list<TaskProfile>::iterator last = lst_cmd_.end(); bool ismakesureauthruned = false; bool ismakesureauthsuccess = false; uint64_t curtime = ::gettickcount(); bool canretry = curtime - lastbatcherrortime_ >= retry_interval_; bool canprint = true; int sent_count = 0; while (first != last) { std::list<TaskProfile>::iterator next = first; ++next; ...... if (!first->antiavalanche_checked) { if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) { __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } // 雪崩檢測 xassert2(fun_anti_avalanche_check_); if (!fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } first->antiavalanche_checked = true; } if (!longlinkconnectmon_->MakeSureConnected()) { break; } if (0 == bufreq.Length()) { if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) { __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } // 雪崩檢測 xassert2(fun_anti_avalanche_check_); if (!first->antiavalanche_checked && !fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) { __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile()); first = next; continue; } } first->transfer_profile.loop_start_task_time = ::gettickcount(); first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus()); first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating; first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout); first->transfer_profile.send_data_size = bufreq.Length(); first->running_id = longlink_->Send((const unsigned char*) bufreq.Ptr(), (unsigned int)bufreq.Length(), first->task.cmdid, first->task.taskid, first->task.send_only ? "":first->task.cgi); if (!first->running_id) { xwarn2(TSF"task add into longlink readwrite fail cgi:%_, cmdid:%_, taskid:%_", first->task.cgi, first->task.cmdid, first->task.taskid); first = next; continue; } xinfo2(TSF"task add into longlink readwrite suc cgi:%_, cmdid:%_, taskid:%_, size:%_, timeout(firstpkg:%_, rw:%_, task:%_), retry:%_", first->task.cgi, first->task.cmdid, first->task.taskid, first->transfer_profile.send_data_size, first->transfer_profile.first_pkg_timeout / 1000, first->transfer_profile.read_write_timeout / 1000, first->task_timeout / 1000, first->remain_retry_count); if (first->task.send_only) { __SingleRespHandle(first, kEctOK, 0, kTaskFailHandleNoError, longlink_->Profile()); } ++sent_count; first = next; } }
其實後面就剩下一個longlink_->Send,這個纔是真正的發送函數,前面的是一堆參數的設定。好吧,咱們來看看:
/mars-master/mars/stn/src/longlink.ccwindows
bool LongLink::Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) { ScopedLock lock(mutex_); if (kConnected != connectstatus_) return false; return __Send(_pbuf, _len, _cmdid, _taskid, _task_info); } bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) { lstsenddata_.push_back(LongLinkSendData()); lstsenddata_.back().cmdid = _cmdid; lstsenddata_.back().taskid = _taskid; longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data); lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart); lstsenddata_.back().task_info = _task_info; readwritebreak_.Break(); return true; }
能夠直接看__Send方法了,就是將須要傳輸的數據以LongLinkSendData爲載體壓入隊列中,而後執行了SocketSelectBreaker::Break:
/mars-master/mars/comm/windows/SocketSelect/SocketSelect.cpp數組
bool SocketSelectBreaker::Break() { ScopedLock lock(m_mutex); if (m_broken) return true; char dummy[] = "1"; int ret = sendto(m_socket_w, &dummy, strlen(dummy), 0, (sockaddr*)&m_sendin, m_sendinlen); m_broken = true; if (ret < 0 || ret != strlen(dummy)) { xerror2(TSF"sendto Ret:%_, errno:(%_, %_)", ret, errno, WSAGetLastError()); m_broken = false; ReCreate(); } // Ret = WSAGetLastError(); return m_broken; }
這裏能夠看到,實際上只發送了一個字符1.實際上這個發送只是爲了檢測當前通道是否正常可用的,能夠理解爲一種心跳吧,不過不是定時的那種。
也就是說,每次入隊一個待發送數據時,都要進行一下通道檢測。那麼後面確定有隊列的自我運起色制來進行真實的數據發送。那麼咱們來找找線索吧。
在LongLink的構造時候,已經將LongLink::__Run經過boost::bind賦值給了thread_。那麼LongLink::MakeSureConnected裏面又執行了thread_.start(&newone);能夠看到是個線程在運轉着__Run函數。那麼在哪裏調用的LongLink::MakeSureConnected,找到的一個線索鏈:StnLogic.java::makesureLongLinkConnected->stn_logic.cc::MakesureLonglinkConnected->NetCore::MakeSureLongLinkConnect->LongLink::MakeSureConnected。咱們把這個調用線索代碼貼到下面:異步
public class StnLogic { /** * 檢測長連接狀態.若是沒有鏈接上,則會嘗試重連. */ public static native void makesureLongLinkConnected(); } // stn_logic.cc void MakesureLonglinkConnected() { xinfo2(TSF "make sure longlink connect"); STN_WEAK_CALL(MakeSureLongLinkConnect()); } void NetCore::MakeSureLongLinkConnect() { #ifdef USE_LONG_LINK longlink_task_manager_->LongLinkChannel().MakeSureConnected(); #endif } bool LongLink::MakeSureConnected(bool* _newone) { if (_newone) *_newone = false; ScopedLock lock(mutex_); if (kConnected == ConnectStatus()) return true; bool newone = false; thread_.start(&newone); if (newone) { connectstatus_ = kConnectIdle; conn_profile_.Reset(); identifychecker_.Reset(); disconnectinternalcode_ = kNone; readwritebreak_.Clear(); connectbreak_.Clear(); } if (_newone) *_newone = newone; return false; }
最後會被上層samples的MarsServiceNative調用:socket
@Override public void onCreate() { super.onCreate(); final MarsServiceProfile profile = gFactory.createMarsServiceProfile(); stub = new MarsServiceStub(this, profile); // set callback AppLogic.setCallBack(stub); StnLogic.setCallBack(stub); SdtLogic.setCallBack(stub); // Initialize the Mars PlatformComm Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper())); // Initialize the Mars StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts()); StnLogic.setShortlinkSvrAddr(profile.shortLinkPort()); StnLogic.setClientVersion(profile.productID()); Mars.onCreate(true); // !!!這裏調用!!! StnLogic.makesureLongLinkConnected(); // Log.d(TAG, "mars service native created"); }
總之就是最後啓動一個線程來執行,線程函數是LongLink::__Run:
/mars-master/mars/stn/src/longlink.cctcp
void LongLink::__Run() { // sync to MakeSureConnected data reset { ScopedLock lock(mutex_); } uint64_t cur_time = gettickcount(); xinfo_function(TSF"LongLink Rebuild span:%_, net:%_", conn_profile_.disconn_time != 0 ? cur_time - conn_profile_.disconn_time : 0, getNetInfo()); ConnectProfile conn_profile; conn_profile.start_time = cur_time; conn_profile.conn_reason = conn_profile_.disconn_errcode; getCurrNetLabel(conn_profile.net_type); conn_profile.tid = xlogger_tid(); __UpdateProfile(conn_profile); #ifdef ANDROID wakelock_.Lock(30 * 1000); #endif SOCKET sock = __RunConnect(conn_profile); #ifdef ANDROID wakelock_.Lock(1000); #endif if (INVALID_SOCKET == sock) { conn_profile.disconn_time = ::gettickcount(); conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi); __UpdateProfile(conn_profile); return; } ErrCmdType errtype = kEctOK; int errcode = 0; __RunReadWrite(sock, errtype, errcode, conn_profile); socket_close(sock); conn_profile.disconn_time = ::gettickcount(); conn_profile.disconn_errtype = errtype; conn_profile.disconn_errcode = errcode; conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi); __ConnectStatus(kDisConnected); __UpdateProfile(conn_profile); if (kEctOK != errtype) __RunResponseError(errtype, errcode, conn_profile); #ifdef ANDROID wakelock_.Lock(1000); #endif }
咱們只看重點吧:
1.__RunConnect,鏈接;
2.__RunReadWrite,執行讀寫(阻塞不斷執行);ide
__RunConnect的代碼就不貼了,核心的就是com_connect.ConnectImpatient。
/mars-master/mars/comm/socket/complexconnect.cc函數
SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) { ...... // 生成ConnectCheckFSM數組 for (unsigned int i = 0; i < _vecaddr.size(); ++i) { xinfo2(TSF"complex.conn %_", _vecaddr[i].url()); ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer); vecsocketfsm.push_back(ic); } ...... do { ...... // 前置準備工做 SocketSelect sel(_breaker); sel.PreSelect(); ...... // 內部執行鏈接 for (unsigned int i = 0; i < index; ++i) { if (NULL == vecsocketfsm[i]) continue; xgroup2_define(group); vecsocketfsm[i]->PreSelect(sel, group); xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group; timeout = std::min(timeout, vecsocketfsm[i]->Timeout()); } ...... // 執行select if (INT_MAX == timeout) { ret = sel.Select(); } else { timeout = std::max(0, timeout); ret = sel.Select(timeout); } ...... for (unsigned int i = 0; i < index; ++i) { if (NULL == vecsocketfsm[i]) continue; xgroup2_define(group); vecsocketfsm[i]->AfterSelect(sel, group); xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group; if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); vecsocketfsm[i]->Close(); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; lasterror = -1; continue; } if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); vecsocketfsm[i]->Close(); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; lasterror = -1; continue; } if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) { if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime)); xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(), vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this); retsocket = vecsocketfsm[i]->Socket(); index_ = i; index_conn_rtt_ = vecsocketfsm[i]->Rtt(); index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt(); vecsocketfsm[i]->Socket(INVALID_SOCKET); delete vecsocketfsm[i]; vecsocketfsm[i] = NULL; break; } } ...... } while (true); }
1.根據傳遞進來的一個地址數組,來生成ConnectCheckFSM的一個數組;
2.進入一個do while的死循環;
3.根據入口的SocketSelectBreaker建立SocketSelect,並執行PreSelect方法,執行一個前期準備工做;
4.對地址池中的每一個ConnectCheckFSM進行鏈接,若是狀態不是要進行鏈接,則執行別的前置操做。,在這個過程當中,會將鏈接的socket保存在SocketSelect中(這裏有必要在後面看下深刻的代碼);
5.執行鏈接的select操做,異步檢測是否有數據可從通道上讀取;
6.以後的for循環,作select後的數據讀取等事情,將地址集對應的ConnectCheckFSM全部對象都執行一下AfterSelect,並根據返回的狀態,調用回調通知觀察者;oop
下面咱們來看一下TcpClientFSM::AfterSelect:
/mars-master/mars/comm/socket/tcpclient_fsm.ccui
void TcpClientFSM::AfterSelect(SocketSelect& _sel, XLogger& _log) { if (EConnecting == status_) AfterConnectSelect(_sel, _log); else if (EReadWrite == status_) AfterReadWriteSelect(_sel, _log); if (EEnd == status_ && INVALID_SOCKET != sock_) { _OnClose(last_status_, error_, false); } }
根據狀態的不一樣調用不一樣的函數執行,若是是鏈接,調用AfterConnectSelect,若是是讀寫,調用AfterReadWriteSelect。
下面看下AfterConnectSelect:
void TcpClientFSM::AfterConnectSelect(const SocketSelect& _sel, XLogger& _log) { xassert2(EConnecting == status_, "%d", status_); int timeout = ConnectTimeout(); xinfo2(TSF"sock:%_, (%_:%_), ", sock_, addr_.ip(), addr_.port()) >> _log; if (_sel.Exception_FD_ISSET(sock_)) { socklen_t len = sizeof(error_); if (0 != getsockopt(sock_, SOL_SOCKET, SO_ERROR, &error_, &len)) { error_ = socket_errno; } xwarn2(TSF"close connect exception: (%_, %_)", sock_, error_, socket_strerror(error_)) >> _log; end_connecttime_ = gettickcount(); last_status_ = status_; status_ = EEnd; return; } error_ = socket_error(sock_); if (0 != error_) { xwarn2(TSF"close connect error:(%_, %_), ", error_, socket_strerror(error_)) >> _log; end_connecttime_ = gettickcount(); last_status_ = status_; status_ = EEnd; return; } if (0 == error_ && _sel.Write_FD_ISSET(sock_)){ end_connecttime_ = gettickcount(); last_status_ = status_; status_ = EReadWrite; xinfo2(TSF"connected Rtt:%_, ", Rtt()) >> _log; _OnConnected(Rtt()); return; } if (0 >= timeout) { end_connecttime_ = gettickcount(); xwarn2(TSF"close connect timeout:(%_, %_), (%_, %_)", ConnectAbsTimeout(), -timeout, SOCKET_ERRNO(ETIMEDOUT), socket_strerror(SOCKET_ERRNO(ETIMEDOUT))) >> _log; error_ = SOCKET_ERRNO(ETIMEDOUT); last_status_ = status_; status_ = EEnd; return; } }
若是成功,調用_OnConnected。而後經過他會調回到繼承者的同名虛函數中,在這裏就是ConnectCheckFSM:
virtual void _OnConnected(int _rtt) { m_checkfintime = ::gettickcount(); if (!m_observer) return; m_observer->OnConnected(m_index, addr_, sock_, 0, _rtt); if (ECheckOK == CheckStatus()) { return; } if (!m_observer->OnVerifySend(m_index, addr_, sock_, send_buf_)) { m_check_status = ECheckFail; } }
這裏首先調用了觀察者的OnConnected,這個觀察者就是LongLinkConnectObserver。
咱們回來看ConnectImpatient,在循環裏執行了AfterSelect,以後根據每一個ConnectCheckFSM的狀態更新vecsocketfsm數組。在for循環的下面會有3段代碼來作這個根據狀態更新數組的操做,前兩段是若是鏈接已經關閉的處理和錯誤的狀況處理,都須要從數組中將該項目置爲null。第三段是成功完成的狀況處理。注意,前兩段是continue,而第三段是break。怎麼理解這裏呢?個人解釋是,自己是有個地址池的鏈接方式,若是其中一個可以成功鏈接上而且可以正常收發,那麼其他的就不須要再嘗試了,所以這裏作了break處理。能夠看到這裏的3種狀況處理了TcpClientFSM::EEnd、TcpClientFSM::EReadWrite,那麼若是是ESstart和EConnecting的狀況下,是不會清除這個數組元素的。再接着看,是這個for循環以後的處理,循環判斷全部的鏈接是否都是無效的,若是都是無效的,繼續執行這個while死循環,不然若是有一個是有效的,那麼跳出來。也就是說,再次執行的時候index也會進行上面的自增++運算,那麼繼續日後嘗試下一個鏈接。再日後看,是跳出了while死循環的狀況,又把這些鏈接依次close,而後清除了數組。再而後是返回了retsocket。這玩意兒的惟一賦值是在上面的for循環中的第三段斷定中,這段斷定纔是一個關鍵,就是說一個可用的鏈接出現了。那麼直接帶來的就是返回一個可用的socket,不然返回的將是個INVALID_SOCKET。稍微總結下這裏,仍是挺巧妙的,能夠理解爲從地址池中找到可用的鏈接,不是漫無目的的嘗試,而是遞進式,而且將無效的隨時置爲null。不過說實話,應當能夠寫的更簡潔,這裏實在是有些晦澀。至此,鏈接部分分析完畢。