微信開源mars源碼分析5—底層核心mars分析(續2)

最近回顧以前的文章,發現最後一篇有些着急了,不少地方沒有敘述清楚。這裏先作個銜接吧。
咱們仍是以長鏈接爲例,從longlink.cc看起。首先是那個線程函數__Run:
/mars-master/mars/stn/src/longlink.ccjava

void LongLink::__Run() {
    ......
    // 執行鏈接
    SOCKET sock = __RunConnect(conn_profile);
    
    // 無效的socket,更新描述文件,記錄失敗的時間節點,返回
    if (INVALID_SOCKET == sock) {
        conn_profile.disconn_time = ::gettickcount();
        conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi);
        __UpdateProfile(conn_profile);
        return;
    }
    ......
    // 執行讀寫
    __RunReadWrite(sock, errtype, errcode, conn_profile);
}

實際上核心的就2個,鏈接和讀寫,咱們分別看下。
/mars-master/mars/stn/src/longlink.ccc++

SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile) {
    std::vector<IPPortItem> ip_items;
    std::vector<socket_address> vecaddr;
    ......
    // 賦值填充ip_items地址端口數組
    netsource_.GetLongLinkItems(ip_items, dns_util_);
    ......
    // 根據ip_items建立socket_address並加入vecaddr中
    for (unsigned int i = 0; i < ip_items.size(); ++i) {
        vecaddr.push_back(socket_address(ip_items[i].str_ip.c_str(), ip_items[i].port).v4tov6_address(isnat64));
    }
    ......
    // 建立觀察者和ComplexConnect鏈接核心,而後開始執行鏈接
    LongLinkConnectObserver connect_observer(*this, ip_items);
    ComplexConnect com_connect(kLonglinkConnTimeout, kLonglinkConnInteral, kLonglinkConnInteral, kLonglinkConnMax);
    SOCKET sock = com_connect.ConnectImpatient(vecaddr, connectbreak_, &connect_observer);
    
    // 返回socket
    return sock;
}

1.建立2個數組,地址端口item和socket_address;
2.調用netsource_.GetLongLinkItems(ip_items, dns_util_);填充IPPortItem數組;
3.根據填充好的前者數組生成socket_address填充後者數組;
4.建立鏈接觀察者;
5.開始執行鏈接;
首先看看netsource_.GetLongLinkItems是如何填充的:
/mars-master/mars/stn/src/net_source.cc數組

bool NetSource::GetLongLinkItems(std::vector<IPPortItem>& _ipport_items, DnsUtil& _dns_util) {
    
    ScopedLock lock(sg_ip_mutex);

    if (__GetLonglinkDebugIPPort(_ipport_items)) {
        return true;
    }
    
    lock.unlock();

     std::vector<std::string> longlink_hosts = NetSource::GetLongLinkHosts();
     if (longlink_hosts.empty()) {
         xerror2("longlink host empty.");
         return false;
     }

     __GetIPPortItems(_ipport_items, longlink_hosts, _dns_util, true);

    return !_ipport_items.empty();
}

能夠看到debug的優先,這裏增長了調試的ip。再往下就先不貼代碼了,基本上就是以前經過SetLongLink設置進去的sg_longlink_hosts(長鏈接主機列表),再往上倒騰就是在MarsServiceNative.java的onCreate中經過描述文件profile設置進去的主機列表。也就是說以前早就設置好的主機列表已經存在了。
下面咱們仍然要進入到上一篇提到的ComplexConnect::ConnectImpatient這個核心函數中看看。
/mars-master/mars/comm/socket/complexconnect.ccapp

SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) {
    ......
    // 根據地址列表,生成ConnectCheckFSM鏈接列表
    std::vector<ConnectCheckFSM*> vecsocketfsm;

    for (unsigned int i = 0; i < _vecaddr.size(); ++i) {
        xinfo2(TSF"complex.conn %_", _vecaddr[i].url());

        ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer);
        vecsocketfsm.push_back(ic);
    }
    // 下面就是對這個鏈接列表的各類操做了
    do {
        ......
        // 生成socketselect的封裝對象,並執行PreSelect前期準備工做
        SocketSelect sel(_breaker);
        sel.PreSelect();
        
        ......
        // 執行鏈接
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->PreSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;
            timeout = std::min(timeout, vecsocketfsm[i]->Timeout());
        }
        
        ......
        
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->AfterSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;

            if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(),
                       vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this);
                retsocket = vecsocketfsm[i]->Socket();
                index_ = i;
                index_conn_rtt_ = vecsocketfsm[i]->Rtt();
                index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt();
                vecsocketfsm[i]->Socket(INVALID_SOCKET);
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                break;
            }
        }
        
    } while (true);
}

1.數組中的每一個長鏈接地址依次執行鏈接;
2.數組中的每一個鏈接分別作後續處理(一個for循環中的三段處理);socket

咱們首先看看vecsocketfsm[i]->PreSelect(sel, group);這句話,是由ConnectCheckFSM的父類TcpClientFSM實現的:
/mars-master/mars/comm/socket/tcpclient_fsm.ccasync

void TcpClientFSM::PreSelect(SocketSelect& _sel, XLogger& _log) {
    
    switch(status_) {
        case EStart: {
            PreConnectSelect(_sel, _log);
            break;
        }
        case EConnecting: {
            _sel.Write_FD_SET(sock_);
            _sel.Exception_FD_SET(sock_);
            break;
        }
        case EReadWrite: {
            PreReadWriteSelect(_sel, _log);
            break;
        }
        default:
            xassert2(false, "preselect status error");
    }
}

這裏是根據這個鏈接的當前狀態決定前置操做的行爲(開始鏈接、讀寫、鏈接中)。再往下看就是進行socket的connect。以PreConnectSelect爲例,這裏生產了socket,並執行了connect,最後將成功鏈接的socket執行_sel.Write_FD_SET(sock_);保存在了SocketSelect中。
咱們來看下代碼:
/mars-master/mars/comm/socket/tcpclient_fsm.cctcp

void TcpClientFSM::PreConnectSelect(SocketSelect& _sel, XLogger& _log) {
    xassert2(EStart == status_, "%d", status_);
    // 執行虛函數,由子類繼承實現
    _OnCreate();

    xinfo2(TSF"addr:(%_:%_), ", addr_.ip(), addr_.port()) >> _log;

    // 生成socket
    sock_ = socket(addr_.address().sa_family, SOCK_STREAM, IPPROTO_TCP);

    if (sock_ == INVALID_SOCKET) {
        error_ = socket_errno;
        last_status_ = status_;
        status_ = EEnd;
        _OnClose(last_status_, error_, false);
        xerror2(TSF"close socket err:(%_, %_)", error_, socket_strerror(error_)) >> _log;
        return;
    }

    if (::getNetInfo() == kWifi && socket_fix_tcp_mss(sock_) < 0) {
#ifdef ANDROID
        xinfo2(TSF"wifi set tcp mss error:%0", strerror(socket_errno));
#endif
    }
    if (0 != socket_ipv6only(sock_, 0)){
        xwarn2(TSF"set ipv6only failed. error %_",strerror(socket_errno));
    }
    
    if (0 != socket_set_nobio(sock_)) {
        error_ = socket_errno;
        xerror2(TSF"close socket_set_nobio:(%_, %_)", error_, socket_strerror(error_)) >> _log;
    } else {
        xinfo2(TSF"socket:%_, ", sock_) >> _log;
    }

    if (0 != error_) {
        last_status_ = status_;
        status_ = EEnd;
        return;
    }

    start_connecttime_ = gettickcount();

    // 執行鏈接
    int ret = connect(sock_, &(addr_.address()), addr_.address_length());

    if (0 != ret && !IS_NOBLOCK_CONNECT_ERRNO(socket_errno)) {
        end_connecttime_ = ::gettickcount();

        error_ = socket_errno;
        xwarn2(TSF"close connect err:(%_, %_), localip:%_", error_, socket_strerror(error_), socket_address::getsockname(sock_).ip()) >> _log;
    } else {
        xinfo2("connect") >> _log;
        // 記錄socket到SocketSelect中
        _sel.Write_FD_SET(sock_);
        _sel.Exception_FD_SET(sock_);
    }

    last_status_ = status_;

    if (0 != error_)
        status_ = EEnd;
    else
        status_ = EConnecting;

    if (0 == error_) _OnConnect();
}

須要注意的是_OnCreate的調用,其實是子類實現的,這裏也就是ConnectCheckFSM實現的:ide

virtual void _OnCreate() { if (m_observer) m_observer->OnCreated(m_index, addr_, sock_);}

這裏將觀察者與鏈接對象的生命週期綁在了一塊兒,執行了觀察者的OnCreated。那麼觀察者是誰呢?往上看,在LongLink::__RunConnect中生成的LongLinkConnectObserver。固然生命週期的回調並不止OnCreated一個。函數

回到__RunConnect中,看後續處理(for循環的三段處理)。執行AfterSelect並根據每一個鏈接的狀態決定後續處理,上篇已經講過,再也不累述。oop

那麼什麼時候終止這個do while循環呢?當for循環的三段處理完畢後,全部的鏈接過程都已經處理完畢了:

// end of loop
        bool all_invalid = true;

        for (unsigned int i = 0; i < vecsocketfsm.size(); ++i) {
            if (NULL != vecsocketfsm[i]) {
                all_invalid = false;
                break;
            }
        }

        if (all_invalid || INVALID_SOCKET != retsocket) break;

最後枚舉一遍鏈接數組,每一個元素檢查是否非空,若是還有非空的,就將all_invalid置爲false,那麼會繼續走一次do while。上面的三段處理完畢後,應該是數組中再也不有鏈接纔對,這裏的保險處理是對數組再進行檢查。至此跳出do while,算是整個鏈接過程完畢了。

能夠看到,通過了三段處理後,鏈接數組中只會命中一個檢測成功的鏈接,其餘的失敗和完成的都會置爲null。這裏從全局看就是一個地址池的淘汰篩選機制。在三段處理的for循環中清除不合格的鏈接,挑出第一個找到的合格的鏈接,而後跳出三段後,馬上檢查整個數組是否已經就剩這一個可用了,若是不是繼續執行do while,又會去執行數組中的每一個item的鏈接過程,再回到三段處理。也就是說全部的數組中的item都會鏈接一次,而後根據返回的狀態決定是否命中最終的一個socket。這是幹嗎呢這麼繞?我以前的理解恐怕還不透徹,如今感受是在找一個穩定的能夠讀寫狀態的鏈接。
第一次進入do while已經鏈接全部池中的item了,那麼在通過了三段處理後淘汰掉不合適的和失敗的,而後再進入do while再次執行vecsocketfsm[i]->PreSelect(sel, group);的時候,已經更新了狀態並執行了不一樣的調用了,再通過三段處理在新的狀態下再淘汰一批,最後通過整個運轉,留下來的只能是最持久的(穩定的)惟一的一個鏈接,返回這個。
不得不說,這裏確實巧妙,若是我寫並不會比這要好。

咱們回來到longlink.cc的線程函數__Run中,當鏈接處理完畢後,下面繼續執行的是__RunReadWrite。咱們來看看:

void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) {
    // Alarm消息觸發處理綁定在__OnAlarm上
    Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false);
    Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false);
}

首先是2個Alarm,這裏要理解就須要看看這個Alarm是個什麼東西:
/mars-master/mars/comm/alarm.h

template<class T>
    explicit Alarm(const T& _op, bool _inthread = true)
        : target_(detail::transform(_op))
        , reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))
        , runthread_(boost::bind(&Alarm::__Run, this), "alarm")
        , inthread_(_inthread)
        , seq_(0), status_(kInit)
        , after_(0) , starttime_(0) , endtime_(0)
        , reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))
#ifdef ANDROID
        , wakelock_(NULL)
#endif
    {}

構造函數。這裏須要逐句分析,首先是target_(detail::transform(_op))。簡單看是個賦值語句,後面的參數須要看這個:
/mars-master/mars/comm/thread/runnable.h

// base template for no argument functor
template <class T>
struct TransformImplement {
    static Runnable* transform(const T& t) {
        return new RunnableFunctor<T>(t);
    }
};

template <class T>
inline Runnable* transform(const T& t) {
    return TransformImplement<T>::transform(t);
}

1.這裏使用的是c++魔板,直接new了一個RunnableFunctor對象,這個對象是個runnable,其實就是將這個傳遞進來的參數t包裝成了一個runnable,在適當的時候調用他的run方法的時候就會調用這個t了。那麼帶入到具體的內容中,這個t是_op,就是boost::bind(&LongLink::__OnAlarm, this)。這裏又使用了c++的boost庫,作了bind操做,綁定了參數this也就是LongLink與函數體LongLink::__OnAlarm。好了,如今target_是個包裝好的runnable了,在適當的時候能夠執行LongLink::__OnAlarm。

2.reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))。首先看MessageQueue::InstallAsyncHandler:
/mars-master/mars/comm/messagequeue/message_queue.cc

MessageHandler_t InstallAsyncHandler(const MessageQueue_t& id) {
    ASSERT(0 != id);
    return InstallMessageHandler(__AsyncInvokeHandler, false, id);
}

MessageHandler_t InstallMessageHandler(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid) {
    ASSERT(bool(_handler));

    ScopedLock lock(sg_messagequeue_map_mutex);
    const MessageQueue_t& id = _messagequeueid;

    if (sg_messagequeue_map.end() == sg_messagequeue_map.find(id)) {
        ASSERT2(false, "%" PRIu64, id);
        return KNullHandler;
    }

    HandlerWrapper* handler = new HandlerWrapper(_handler, _recvbroadcast, _messagequeueid, __MakeSeq());
    sg_messagequeue_map[id].lst_handler.push_back(handler);
    return handler->reg;
}

struct HandlerWrapper {
    HandlerWrapper(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid, unsigned int _seq)
        : handler(_handler), recvbroadcast(_recvbroadcast) {
        reg.seq = _seq;
        reg.queue = _messagequeueid;
    }

    MessageHandler_t reg;
    MessageHandler handler;
    bool recvbroadcast;
};

生成了一個HandlerWrapper,並將其保留在了一個map中,隨後返回了MessageHandler_t,其中保存了_seq與_messagequeueid。這裏個人感受是這個handler就是個相似句柄的東西,保存MessageHandler的一個關聯關係,即消息隊列與seq碼(這裏是個自增的靜態變量)。實際上調用者只要有這個MessageHandler_t就能夠了。最後將這個MessageHandler_t賦值給了reg_async_。這裏又有一個對象ScopeRegister是個MessageHandler_t的包裝對象,裏面統一封裝了方法來操做MessageHandler_t。

3.runthread_(boost::bind(&Alarm::__Run, this), "alarm")。一個線程對象,線程函數是Alarm::__Run。沒事什麼好解釋的。

4.inthread_(_inthread), seq_(0), status_(kInit), after_(0) , starttime_(0) , endtime_(0)。都是簡單賦值,暫時不去管它。

5.reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))。相似2。

好了,這個Alarm能夠看作是個消息處理,在有消息觸發的狀況下會調用到具體的函數中,通常是__OnAlarm。

回到__RunReadWrite,往下看。首先是個while的死循環,咱們單獨摘錄以下:

while (true) {
        ......
        if (!alarmnoopinterval.IsWaiting()) {
            ......
            if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) {
                is_noop = true;
                __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval);
            }
            ......
        }
        
        ......
        // socket處理
        SocketSelect sel(readwritebreak_, true);
        sel.PreSelect();
        sel.Read_FD_SET(_sock);
        sel.Exception_FD_SET(_sock);
        
        ScopedLock lock(mutex_);
        
        if (!lstsenddata_.empty()) sel.Write_FD_SET(_sock);
        
        lock.unlock();
        
        int retsel = sel.Select(10 * 60 * 1000);
        ......
        // 處理髮送(寫入)
        if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) {
            ......
            ssize_t writelen = ::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0);
            ......
            while (it != lstsenddata_.end() && 0 < writelen) {
                if (0 == it->data.Pos()) OnSend(it->taskid);
                
                if ((size_t)writelen >= it->data.PosLength()) {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, it->data.PosLength(), it->data.PosLength(), it->data.Length()) >> xlog_group;
                    writelen -= it->data.PosLength();
                    if (!it->task_info.empty()) sent_taskids[it->taskid] = it->task_info;
                    LongLinkNWriteData nwrite(it->taskid, it->data.PosLength(), it->cmdid, it->task_info);
                    nsent_datas.push_back(nwrite);
                    
                    it = lstsenddata_.erase(it);
                } else {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, writelen, it->data.PosLength(), it->data.Length()) >> xlog_group;
                    it->data.Seek(writelen, AutoBuffer::ESeekCur);
                    writelen = 0;
                }
            }
            
        }
        
        ......
        // 處理接收(讀取)
        if (sel.Read_FD_ISSET(_sock)) {
            bufrecv.AllocWrite(64 * 1024, false);
            ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0);
            ......
            while (0 < bufrecv.Length()) {
                uint32_t cmdid = 0;
                uint32_t taskid = Task::kInvalidTaskID;
                size_t packlen = 0;
                AutoBuffer body;
                
                int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body);
                
                if (LONGLINK_UNPACK_FALSE == unpackret) {
                    xerror2(TSF"task socket recv sock:%0, unpack error dump:%1", _sock, xdump(bufrecv.Ptr(), bufrecv.Length()));
                    _errtype = kEctNetMsgXP;
                    _errcode = kEctNetMsgXPHandleBufferErr;
                    goto End;
                }
                
                xinfo2(TSF"task socket recv sock:%_, pack recv %_ taskid:%_, cmdid:%_, %_, packlen:(%_/%_)", _sock, LONGLINK_UNPACK_CONTINUE == unpackret ? "continue" : "finish", taskid, cmdid, sent_taskids[taskid], LONGLINK_UNPACK_CONTINUE == unpackret ? bufrecv.Length() : packlen, packlen);
                lastrecvtime_.gettickcount();
                
                if (LONGLINK_UNPACK_CONTINUE == unpackret) {
                    OnRecv(taskid, bufrecv.Length(), packlen);
                    break;
                } else {
                    
                    sent_taskids.erase(taskid);
                    
                    bufrecv.Move(-(int)(packlen));
                    
                    if (__NoopResp(cmdid, taskid, body, alarmnooptimeout, _profile)) {
                        xdebug2(TSF"noopresp span:%0", alarmnooptimeout.ElapseTime());
                        is_noop = false;
                    } else {
                        OnResponse(kEctOK, 0, cmdid, taskid, body, _profile);
                    }
                }
            }
        }
    }

// 收尾,整個looper退出
End:

從while中的代碼可以看出,基本上就是上面摘錄的幾塊,以下所示:
1.__NoopReq調用,無數據狀態處理;
2.socket的select處理;
3.處理髮送寫入部分;
4.處理接收讀取部分;

這裏須要逐個分析了:
1.__NoopReq:
先看代碼,並不長:

bool LongLink::__NoopReq(XLogger& _log, Alarm& _alarm, bool need_active_timeout) {
    AutoBuffer buffer;
    uint32_t req_cmdid = 0;
    bool suc = false;
    
    if (identifychecker_.GetIdentifyBuffer(buffer, req_cmdid)) {
        suc = Send((const unsigned char*)buffer.Ptr(), (int)buffer.Length(), req_cmdid, Task::kLongLinkIdentifyCheckerTaskID);
        identifychecker_.SetSeq(Task::kLongLinkIdentifyCheckerTaskID);
        xinfo2(TSF"start noop synccheck taskid:%0, cmdid:%1, ", Task::kLongLinkIdentifyCheckerTaskID, req_cmdid) >> _log;
    } else {
        AutoBuffer body;
        longlink_noop_req_body(body);
        suc = SendWhenNoData((const unsigned char*) body.Ptr(), body.Length(), longlink_noop_cmdid(), Task::kNoopTaskID);
        xinfo2(TSF"start noop taskid:%0, cmdid:%1, ", Task::kNoopTaskID, longlink_noop_cmdid()) >> _log;
    }
    
    if (suc) {
        _alarm.Cancel();
        _alarm.Start(need_active_timeout ? (2* 1000) : (10 * 1000));
    } else {
        xerror2("send noop fail");
    }
    
    return suc;
}

說實話,這裏看的不是很清晰 ,由於以前確定有忽略的部分,個人猜想是在走了一個發送信令的校驗後,根據返回的值的不一樣決定是執行send發送數據(使用校驗填充好的buffer),仍是走SendWhenNoData發送(自行填充請求體)沒有數據的狀況。暫時先往下看一步,看看Send:

bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) {
    lstsenddata_.push_back(LongLinkSendData());

    lstsenddata_.back().cmdid = _cmdid;
    lstsenddata_.back().taskid = _taskid;
    longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data);
    lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart);
    lstsenddata_.back().task_info = _task_info;

    readwritebreak_.Break();
    return true;
}

這裏可以清晰的看到,在使用lstsenddata_這個隊列,來進行發送的請求,實際上就是向隊列中增長一項。那麼如今的問題就在於這個發送的數據時怎麼來的了。這就須要咱們弄懂LongLinkIdentifyChecker這個玩意兒。
/mars-master/mars/stn/src/longlink_identify_checker.cc

bool LongLinkIdentifyChecker::GetIdentifyBuffer(AutoBuffer &_buffer, uint32_t &_cmdid)
{
    if (has_checked_) return false;
    
    hash_code_buffer_.Reset();
    _buffer.Reset();

    IdentifyMode mode = (IdentifyMode)GetLonglinkIdentifyCheckBuffer(_buffer, hash_code_buffer_, (int&)_cmdid);

    switch (mode)
    {
    case kCheckNever:
        {
            has_checked_ = true;
        }
        break;
    case kCheckNext:
        {
            has_checked_ = false;
        }
        break;
    case kCheckNow:
        {
            cmd_id_ = _cmdid;
            return true;
        }
        break;
    default:
        xassert2(false);
    }
    
    return false;
}

調用了GetLonglinkIdentifyCheckBuffer,咱們追溯到stn_logic.cc中:

int  GetLonglinkIdentifyCheckBuffer(AutoBuffer& identify_buffer, AutoBuffer& buffer_hash, int32_t& cmdid) {
        xassert2(sg_callback != NULL);
        return sg_callback->GetLonglinkIdentifyCheckBuffer(identify_buffer, buffer_hash, cmdid);
    }

其實是對sg_callback這個回調的調用。最終我找到的線索是在MarsServiceNative.java上層的onCreate中設置了回調:

// set callback
        AppLogic.setCallBack(stub);
        StnLogic.setCallBack(stub);
        SdtLogic.setCallBack(stub);

再接着找到了MarsServiceStub.java中的getLongLinkIdentifyCheckBuffer:

@Override
    public int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID) {
        // Send identify request buf to server
        // identifyReqBuf.write();

        return ECHECK_NEVER;
    }

返回的是ECHECK_NEVER,沒有填充buffer。也便是說has_checked_ = true,而後返回false。其實看到這一刻我是崩潰的,真心不知道是想幹嗎。咱們只能這麼理解,只要進入__NoopReq其實都是在走SendWhenNoData發送無數據狀態。好吧,咱們從新回到__RunReadWrite中看一下。每次在while循環中一上來只要不是alarmnoopinterval正在等待的狀態,那麼就走一個發送無數據狀態。看看發送無數據的代碼:

bool LongLink::SendWhenNoData(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid) {
    ScopedLock lock(mutex_);

    if (kConnected != connectstatus_) return false;
    if (!lstsenddata_.empty()) return false;

    return __Send(_pbuf, _len, _cmdid, _taskid, "");
}

實際上是檢查lstsenddata_是否有內容,若是沒有才發送。那麼好吧,這裏總體理解就是每次whie循環開始都會檢查若是發送隊列中沒有數據的時候,發送一個特定的無數據狀態來確認鏈接。可是這裏寫的比較複雜,還須要回調回上層java的代碼中,讓其來控制狀態,從而根據狀態控制流程,只能說考慮的很周全,任何狀況在任何節點均可以有處理。吐槽下若是咱們本身寫來規劃這部分的時候大多數時候都是最對無數據檢測放在下層,而後直接就發送了,不會讓上層這裏進行什麼干涉吧。其實這裏還有些點沒有詳細的分析很清楚,原諒文章有限,畢竟不能偏離主線太多。

2.socket的select操做。
這裏倒沒什麼可說的,前面的設置,爲後面的sel.Select(10 60 1000)作準備,內部採用poll來運做。

3.發送過程。
先是判斷若是發送隊列裏面有內容,執行下面的::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0)。這裏注意,參數給定的是隊列的第一個的data,也就是說這裏是取出第一個執行發送。
下面就是一個while循環,將發送隊列過了一遍。若是剛纔發送的數據大小與待發送的實際數據長度相等,那麼認爲是發送完了這一個,從隊列中移除這一個,而後下一次while會自動取下一個了。若是沒有;認爲是沒發完,位移數據,下次while仍然獲取到這個item,可是數據位移已經變了,所以繼續發送下面的數據。通過這個while以後,全部的發送隊列中的數據都應當發送完畢了。

4.接收過程。
前面沒什麼好說的,無非是開闢buffer空間,而後執行recv調用。以後進入一個while循環,條件是讀取的buffer有數據。
首先走一個解包調用,內部走的是__unpack_test,具體內容就不貼了,我簡單看了下,基本上就是解開頭部,頭部的信息標識了本次傳遞的基本信息,包括了版本號等內容,一個結構體,仍是比較標準的。這裏是嘗試解包,若是本次接收到的大小連頭部都不夠,那確定返回錯誤,須要繼續接收了。那麼從這個可以看出,每次傳遞的數據都是帶有一個頭部的__STNetMsgXpHeader。這東西里面塞入的內容能夠和客戶端的版本,當前這個信令的id等關聯起來。
再下去看到的就是對解包返回值的判斷了,若是一切順利,就走到sent_taskids.erase(taskid);這裏須要注意,這個sent_taskids是個發送的taskid的map,這裏能夠推測發送和接受實際上是關聯的,這裏接收完畢移除這個保留項。而後走的__NoopResp這個調用。若是返回false表示不是空的信令返回,那麼就走OnResponse。這個函數其實是在LongLinkTaskManager中綁定了longlink_->OnResponse = boost::bind(&LongLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6);綁定在了LongLinkTaskManager::__OnResponse這裏。

void LongLinkTaskManager::__OnResponse(ErrCmdType _error_type, int _error_code, uint32_t _cmdid, uint32_t _taskid, AutoBuffer& _body, const ConnectProfile& _connect_profile) {
    copy_wrapper<AutoBuffer> body(_body);
    RETURN_LONKLINK_SYNC2ASYNC_FUNC(boost::bind(&LongLinkTaskManager::__OnResponse, this, _error_type, _error_code, _cmdid, _taskid, body, _connect_profile));

    ......
    
    int err_code = 0;
    int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, body, err_code, Task::kChannelLong);
    
    switch(handle_type){
        case kTaskFailHandleNoError:
        {
            dynamic_timeout_.CgiTaskStatistic(it->task.cgi, (unsigned int)it->transfer_profile.send_data_size + (unsigned int)body->Length(), ::gettickcount() - it->transfer_profile.start_send_time);
            __SingleRespHandle(it, kEctOK, err_code, handle_type, _connect_profile);
            xassert2(fun_notify_network_err_);
            fun_notify_network_err_(__LINE__, kEctOK, err_code, _connect_profile.ip, _connect_profile.port);
        }
            break;
        ......
    }

}

其實就2件事,經過Buf2Resp底層回包返回給上層解析。若是沒有錯誤kTaskFailHandleNoError,會執行__SingleRespHandle:

bool LongLinkTaskManager::__SingleRespHandle(std::list<TaskProfile>::iterator _it, ErrCmdType _err_type, int _err_code, int _fail_handle, const ConnectProfile& _connect_profile) {
    ......
    int cgi_retcode = fun_callback_(_err_type, _err_code, _fail_handle, _it->task, (unsigned int)(curtime - _it->start_task_time));
    ......
}

這裏的關鍵點就這一個,調用回調,回調的綁定在net_core.cc中的NetCore構造裏,longlink_task_manager_->fun_callback_ = boost::bind(&NetCore::__CallBack, this, (int)kCallFromLong, _1, _2, _3, _4, _5);,最終執行的是NetCore::__CallBack:

int NetCore::__CallBack(int _from, ErrCmdType _err_type, int _err_code, int _fail_handle, const Task& _task, unsigned int _taskcosttime) {

    if (task_callback_hook_ && 0 == task_callback_hook_(_from, _err_type, _err_code, _fail_handle, _task)) {
        xwarn2(TSF"task_callback_hook let task return. taskid:%_, cgi%_.", _task.taskid, _task.cgi);
        return 0;
    }

    if (kEctOK == _err_type || kTaskFailHandleTaskEnd == _fail_handle)
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    if (kCallFromZombie == _from) return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

#ifdef USE_LONG_LINK
    if (!zombie_task_manager_->SaveTask(_task, _taskcosttime))
#endif
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    return 0;
}

看到了吧,走了OnTaskEnd,任務結束。

此文從中間部分開始粗糙了,前面鋪墊的東西后面沒有講到,心不靜的時候分析東西效果確實不大好。總而言之既然堅持寫完了,這裏仍是留個記錄吧,往後有機會的時候會回顧把這部分完善好。

相關文章
相關標籤/搜索