微信開源mars源碼分析4—底層核心mars分析(續1)

接着上篇的雪崩檢測,回顧下LongLinkTaskManager::__RunOnStartTask:
/mars-master/mars/stn/src/longlink_task_manager.ccjava

void LongLinkTaskManager::__RunOnStartTask() {
    std::list<TaskProfile>::iterator first = lst_cmd_.begin();
    std::list<TaskProfile>::iterator last = lst_cmd_.end();

    bool ismakesureauthruned = false;
    bool ismakesureauthsuccess = false;
    uint64_t curtime = ::gettickcount();

    bool canretry = curtime - lastbatcherrortime_ >= retry_interval_;
    bool canprint = true;
    int sent_count = 0;

    while (first != last) {
        std::list<TaskProfile>::iterator next = first;
        ++next;

        ......

        if (!first->antiavalanche_checked) {
            if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) {
                __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
            // 雪崩檢測
            xassert2(fun_anti_avalanche_check_);
            if (!fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) {
                __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
            
            first->antiavalanche_checked = true;
        }

        if (!longlinkconnectmon_->MakeSureConnected()) {
            break;
        }

        if (0 == bufreq.Length()) {
            if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) {
                __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
            // 雪崩檢測
            xassert2(fun_anti_avalanche_check_);
            if (!first->antiavalanche_checked && !fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) {
                __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
        }

        first->transfer_profile.loop_start_task_time = ::gettickcount();
        first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus());
        first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating;
        first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout);
        first->transfer_profile.send_data_size = bufreq.Length();
        first->running_id = longlink_->Send((const unsigned char*) bufreq.Ptr(), (unsigned int)bufreq.Length(), first->task.cmdid, first->task.taskid,
                                      first->task.send_only ? "":first->task.cgi);

        if (!first->running_id) {
            xwarn2(TSF"task add into longlink readwrite fail cgi:%_, cmdid:%_, taskid:%_", first->task.cgi, first->task.cmdid, first->task.taskid);
            first = next;
            continue;
        }

        xinfo2(TSF"task add into longlink readwrite suc cgi:%_, cmdid:%_, taskid:%_, size:%_, timeout(firstpkg:%_, rw:%_, task:%_), retry:%_",
               first->task.cgi, first->task.cmdid, first->task.taskid, first->transfer_profile.send_data_size, first->transfer_profile.first_pkg_timeout / 1000,
               first->transfer_profile.read_write_timeout / 1000, first->task_timeout / 1000, first->remain_retry_count);

        if (first->task.send_only) {
            __SingleRespHandle(first, kEctOK, 0, kTaskFailHandleNoError, longlink_->Profile());
        }

        ++sent_count;
        first = next;
    }
}

其實後面就剩下一個longlink_->Send,這個纔是真正的發送函數,前面的是一堆參數的設定。好吧,咱們來看看:
/mars-master/mars/stn/src/longlink.ccwindows

bool LongLink::Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) {
    ScopedLock lock(mutex_);

    if (kConnected != connectstatus_) return false;

    return __Send(_pbuf, _len, _cmdid, _taskid, _task_info);
}

    bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) {
    lstsenddata_.push_back(LongLinkSendData());

    lstsenddata_.back().cmdid = _cmdid;
    lstsenddata_.back().taskid = _taskid;
    longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data);
    lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart);
    lstsenddata_.back().task_info = _task_info;

    readwritebreak_.Break();
    return true;
}

能夠直接看__Send方法了,就是將須要傳輸的數據以LongLinkSendData爲載體壓入隊列中,而後執行了SocketSelectBreaker::Break:
/mars-master/mars/comm/windows/SocketSelect/SocketSelect.cpp數組

bool SocketSelectBreaker::Break() {
    ScopedLock lock(m_mutex);

    if (m_broken) return true;

    char dummy[] = "1";
    int ret = sendto(m_socket_w, &dummy, strlen(dummy), 0, (sockaddr*)&m_sendin, m_sendinlen);
    m_broken = true;

    if (ret < 0 || ret != strlen(dummy)) {
        xerror2(TSF"sendto Ret:%_, errno:(%_, %_)", ret, errno, WSAGetLastError());
        m_broken =  false;
        ReCreate();
    }

    // Ret = WSAGetLastError();
    return m_broken;
}

這裏能夠看到,實際上只發送了一個字符1.實際上這個發送只是爲了檢測當前通道是否正常可用的,能夠理解爲一種心跳吧,不過不是定時的那種。
也就是說,每次入隊一個待發送數據時,都要進行一下通道檢測。那麼後面確定有隊列的自我運起色制來進行真實的數據發送。那麼咱們來找找線索吧。
在LongLink的構造時候,已經將LongLink::__Run經過boost::bind賦值給了thread_。那麼LongLink::MakeSureConnected裏面又執行了thread_.start(&newone);能夠看到是個線程在運轉着__Run函數。那麼在哪裏調用的LongLink::MakeSureConnected,找到的一個線索鏈:StnLogic.java::makesureLongLinkConnected->stn_logic.cc::MakesureLonglinkConnected->NetCore::MakeSureLongLinkConnect->LongLink::MakeSureConnected。咱們把這個調用線索代碼貼到下面:異步

public class StnLogic {
    /**
     * 檢測長連接狀態.若是沒有鏈接上,則會嘗試重連.
     */
    public static native void makesureLongLinkConnected();
}

// stn_logic.cc
void MakesureLonglinkConnected() {
    xinfo2(TSF "make sure longlink connect");
   STN_WEAK_CALL(MakeSureLongLinkConnect());
}

void NetCore::MakeSureLongLinkConnect() {
#ifdef USE_LONG_LINK
    longlink_task_manager_->LongLinkChannel().MakeSureConnected();
#endif
}

bool LongLink::MakeSureConnected(bool* _newone) {
    if (_newone) *_newone = false;

    ScopedLock lock(mutex_);

    if (kConnected == ConnectStatus()) return true;

    bool newone = false;
    thread_.start(&newone);

    if (newone) {
        connectstatus_ = kConnectIdle;
        conn_profile_.Reset();
        identifychecker_.Reset();
        disconnectinternalcode_ = kNone;
        readwritebreak_.Clear();
        connectbreak_.Clear();
    }

    if (_newone) *_newone = newone;

    return false;
}

最後會被上層samples的MarsServiceNative調用:socket

@Override
public void onCreate() {
    super.onCreate();

    final MarsServiceProfile profile = gFactory.createMarsServiceProfile();
    stub = new MarsServiceStub(this, profile);

    // set callback
    AppLogic.setCallBack(stub);
    StnLogic.setCallBack(stub);
    SdtLogic.setCallBack(stub);

    // Initialize the Mars PlatformComm
    Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper()));

    // Initialize the Mars
    StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts());
    StnLogic.setShortlinkSvrAddr(profile.shortLinkPort());
    StnLogic.setClientVersion(profile.productID());
    Mars.onCreate(true);

    // !!!這裏調用!!!
    StnLogic.makesureLongLinkConnected();

    //
    Log.d(TAG, "mars service native created");
}

總之就是最後啓動一個線程來執行,線程函數是LongLink::__Run:
/mars-master/mars/stn/src/longlink.cctcp

void LongLink::__Run() {
    // sync to MakeSureConnected data reset
    {
        ScopedLock lock(mutex_);
    }
    
    uint64_t cur_time = gettickcount();
    xinfo_function(TSF"LongLink Rebuild span:%_, net:%_", conn_profile_.disconn_time != 0 ? cur_time - conn_profile_.disconn_time : 0, getNetInfo());
    
    ConnectProfile conn_profile;
    conn_profile.start_time = cur_time;
    conn_profile.conn_reason = conn_profile_.disconn_errcode;
    getCurrNetLabel(conn_profile.net_type);
    conn_profile.tid = xlogger_tid();
    __UpdateProfile(conn_profile);
    
#ifdef ANDROID
    wakelock_.Lock(30 * 1000);
#endif
    SOCKET sock = __RunConnect(conn_profile);
#ifdef ANDROID
    wakelock_.Lock(1000);
#endif
    
    if (INVALID_SOCKET == sock) {
        conn_profile.disconn_time = ::gettickcount();
        conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi);
        __UpdateProfile(conn_profile);
        return;
    }
    
    ErrCmdType errtype = kEctOK;
    int errcode = 0;
    __RunReadWrite(sock, errtype, errcode, conn_profile);
    
    socket_close(sock);
    
    conn_profile.disconn_time = ::gettickcount();
    conn_profile.disconn_errtype = errtype;
    conn_profile.disconn_errcode = errcode;
    conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi);
    
    __ConnectStatus(kDisConnected);
    __UpdateProfile(conn_profile);

    if (kEctOK != errtype) __RunResponseError(errtype, errcode, conn_profile);
    
#ifdef ANDROID
    wakelock_.Lock(1000);
#endif
}

咱們只看重點吧:
1.__RunConnect,鏈接;
2.__RunReadWrite,執行讀寫(阻塞不斷執行);ide

__RunConnect的代碼就不貼了,核心的就是com_connect.ConnectImpatient。
/mars-master/mars/comm/socket/complexconnect.cc函數

SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) {
    ......
    // 生成ConnectCheckFSM數組
    for (unsigned int i = 0; i < _vecaddr.size(); ++i) {
    xinfo2(TSF"complex.conn %_", _vecaddr[i].url());

        ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer);
        vecsocketfsm.push_back(ic);
    }
    ......
    do {
        ......
        // 前置準備工做
        SocketSelect sel(_breaker);
        sel.PreSelect();
        ......
        // 內部執行鏈接
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->PreSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;
            timeout = std::min(timeout, vecsocketfsm[i]->Timeout());
        }
        ......
        // 執行select
        if (INT_MAX == timeout) {
            ret = sel.Select();
        } else {
            timeout = std::max(0, timeout);
            ret = sel.Select(timeout);
        }
        ......
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->AfterSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;

            if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(),
                       vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this);
                retsocket = vecsocketfsm[i]->Socket();
                index_ = i;
                index_conn_rtt_ = vecsocketfsm[i]->Rtt();
                index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt();
                vecsocketfsm[i]->Socket(INVALID_SOCKET);
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                break;
            }
        }
        ......
    } while (true);
}

1.根據傳遞進來的一個地址數組,來生成ConnectCheckFSM的一個數組;
2.進入一個do while的死循環;
3.根據入口的SocketSelectBreaker建立SocketSelect,並執行PreSelect方法,執行一個前期準備工做;
4.對地址池中的每一個ConnectCheckFSM進行鏈接,若是狀態不是要進行鏈接,則執行別的前置操做。,在這個過程當中,會將鏈接的socket保存在SocketSelect中(這裏有必要在後面看下深刻的代碼);
5.執行鏈接的select操做,異步檢測是否有數據可從通道上讀取;
6.以後的for循環,作select後的數據讀取等事情,將地址集對應的ConnectCheckFSM全部對象都執行一下AfterSelect,並根據返回的狀態,調用回調通知觀察者;oop

下面咱們來看一下TcpClientFSM::AfterSelect:
/mars-master/mars/comm/socket/tcpclient_fsm.ccui

void TcpClientFSM::AfterSelect(SocketSelect& _sel, XLogger& _log) {
    if (EConnecting == status_) AfterConnectSelect(_sel, _log);
    else if (EReadWrite == status_)  AfterReadWriteSelect(_sel, _log);

    if (EEnd == status_ && INVALID_SOCKET != sock_) {
        _OnClose(last_status_, error_, false);
    }
}

根據狀態的不一樣調用不一樣的函數執行,若是是鏈接,調用AfterConnectSelect,若是是讀寫,調用AfterReadWriteSelect。

下面看下AfterConnectSelect:

void TcpClientFSM::AfterConnectSelect(const SocketSelect& _sel, XLogger& _log) {
    xassert2(EConnecting == status_, "%d", status_);

    int timeout = ConnectTimeout();
    xinfo2(TSF"sock:%_, (%_:%_), ", sock_, addr_.ip(), addr_.port()) >> _log;

    if (_sel.Exception_FD_ISSET(sock_)) {
        socklen_t len = sizeof(error_);

        if (0 != getsockopt(sock_, SOL_SOCKET, SO_ERROR, &error_, &len)) { error_ = socket_errno; }

        xwarn2(TSF"close connect exception: (%_, %_)", sock_, error_, socket_strerror(error_)) >> _log;

        end_connecttime_ = gettickcount();
        last_status_ = status_;
        status_ = EEnd;
        return;
    }

    error_ = socket_error(sock_);
    
    if (0 != error_) {
        xwarn2(TSF"close connect error:(%_, %_), ", error_, socket_strerror(error_)) >> _log;
        end_connecttime_ = gettickcount();
        last_status_ = status_;
        status_ = EEnd;
        return;
    }
    
    if (0 == error_ && _sel.Write_FD_ISSET(sock_)){
        end_connecttime_ = gettickcount();
        last_status_ = status_;
        status_ = EReadWrite;
        xinfo2(TSF"connected Rtt:%_, ", Rtt()) >> _log;
        _OnConnected(Rtt());
        return;
    }

    if (0 >= timeout) {
        end_connecttime_ = gettickcount();
        xwarn2(TSF"close connect timeout:(%_, %_), (%_, %_)", ConnectAbsTimeout(), -timeout, SOCKET_ERRNO(ETIMEDOUT), socket_strerror(SOCKET_ERRNO(ETIMEDOUT))) >> _log;

        error_ = SOCKET_ERRNO(ETIMEDOUT);
        last_status_ = status_;
        status_ = EEnd;
        return;
    }
}

若是成功,調用_OnConnected。而後經過他會調回到繼承者的同名虛函數中,在這裏就是ConnectCheckFSM:

virtual void _OnConnected(int _rtt) {
    m_checkfintime = ::gettickcount();

    if (!m_observer) return;

    m_observer->OnConnected(m_index, addr_, sock_, 0, _rtt);

    if (ECheckOK == CheckStatus()) {
        return;
    }

    if (!m_observer->OnVerifySend(m_index, addr_, sock_, send_buf_)) {
        m_check_status = ECheckFail;
    }
}

這裏首先調用了觀察者的OnConnected,這個觀察者就是LongLinkConnectObserver。

咱們回來看ConnectImpatient,在循環裏執行了AfterSelect,以後根據每一個ConnectCheckFSM的狀態更新vecsocketfsm數組。在for循環的下面會有3段代碼來作這個根據狀態更新數組的操做,前兩段是若是鏈接已經關閉的處理和錯誤的狀況處理,都須要從數組中將該項目置爲null。第三段是成功完成的狀況處理。注意,前兩段是continue,而第三段是break。怎麼理解這裏呢?個人解釋是,自己是有個地址池的鏈接方式,若是其中一個可以成功鏈接上而且可以正常收發,那麼其他的就不須要再嘗試了,所以這裏作了break處理。能夠看到這裏的3種狀況處理了TcpClientFSM::EEnd、TcpClientFSM::EReadWrite,那麼若是是ESstart和EConnecting的狀況下,是不會清除這個數組元素的。再接着看,是這個for循環以後的處理,循環判斷全部的鏈接是否都是無效的,若是都是無效的,繼續執行這個while死循環,不然若是有一個是有效的,那麼跳出來。也就是說,再次執行的時候index也會進行上面的自增++運算,那麼繼續日後嘗試下一個鏈接。再日後看,是跳出了while死循環的狀況,又把這些鏈接依次close,而後清除了數組。再而後是返回了retsocket。這玩意兒的惟一賦值是在上面的for循環中的第三段斷定中,這段斷定纔是一個關鍵,就是說一個可用的鏈接出現了。那麼直接帶來的就是返回一個可用的socket,不然返回的將是個INVALID_SOCKET。稍微總結下這裏,仍是挺巧妙的,能夠理解爲從地址池中找到可用的鏈接,不是漫無目的的嘗試,而是遞進式,而且將無效的隨時置爲null。不過說實話,應當能夠寫的更簡潔,這裏實在是有些晦澀。至此,鏈接部分分析完畢。

相關文章
相關標籤/搜索