微信開源mars源碼分析3—底層核心mars分析

接力上篇的上層分析,咱們此次深刻到底層的mars內部來看看。他在另一個工程mars中。咱們打開它,直接先看初始化,在上層的MarsServiceNative的OnCreate中調用了Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper()));java

/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/Mars.javaandroid

/**
     * APP建立時初始化平臺回調 必須在onCreate方法前調用
     * @param _context
     * @param _handler
     */
    public static void init(Context _context, Handler _handler) {
        PlatformComm.init(_context, _handler);
        hasInitialized = true;
    }

調用了PlatformComm.init,並將自身初始化狀態修改成true。繼續:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/comm/PlatformComm.javac++

public static void init(Context ncontext, Handler nhandler) {
        context = ncontext;
        handler = nhandler;

        NetworkSignalUtil.InitNetworkSignalUtil(ncontext);
    }

保留了context和handler,而後初始化網絡信號強度單元。注意:這個handler是上層的服務傳遞過來的主Looper生成的。服務器

下面看看網絡單元的初始化:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/comm/NetworkSignalUtil.java微信

public static void InitNetworkSignalUtil(Context ncontext) {
    context = ncontext;
    TelephonyManager mgr = (TelephonyManager)context.getSystemService(Context.TELEPHONY_SERVICE);
    mgr.listen(new PhoneStateListener() {
        @Override
        public void onSignalStrengthsChanged(SignalStrength signalStrength){
            super.onSignalStrengthsChanged(signalStrength);
            calSignalStrength(signalStrength);
        }
    } ,PhoneStateListener.LISTEN_SIGNAL_STRENGTHS);
}

獲得TelephonyManager,並進行監聽狀態改變。當信號改變時,調用calSignalStrength記錄信號強度:網絡

private static void calSignalStrength(SignalStrength sig) {
    int nSig = 0;
    if(sig.isGsm())
        nSig = sig.getGsmSignalStrength();
    else
        nSig = (sig.getCdmaDbm() + 113) / 2; 
    if(sig.isGsm() && nSig == 99)
        strength = 0;
    else {
        strength = (long)(nSig * ((float)100 / (float)31));
        strength = (strength > 100 ? 100 : strength);
        strength = (strength < 0 ? 0 : strength);
    }
}

其實就是監控信號強度。
而後咱們來看看Mars.onCreate,他是緊接着Mars.init調用的:app

public static void onCreate(boolean isFirstStartup) {
    if (isFirstStartup && hasInitialized) {
        BaseEvent.onCreate();
    }
    else if (!isFirstStartup) {
        BaseEvent.onCreate();
    }
    else {
        /**
         * 首次啓動但未調用init 沒法進行BaseEvent create
         */
        throw new IllegalStateException("function MarsCore.init must be executed before Mars.onCreate when application firststartup.");
    }
}

這裏初始化了BaseEvent.onCreate:異步

public static native void onCreate();

是個native層函數。
回到Mars中看到還有loadDefaultMarsLibrary加載幾個so文件。實在StnLogic的一開始就調用的:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/stn/StnLogic.javasocket

public class StnLogic {

public static final String TAG = "mars.StnLogic";

static {
    Mars.loadDefaultMarsLibrary();
}

剩下的工做都要在StnLogic中進行,包括髮送最終也是跟他打交道,先不着急看這個StnLogic,咱們繼續深刻下去看看BaseEvent在幹什麼:
/Users/WangJF/Desktop/res/im/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/BaseEvent.javaasync

public class BaseEvent {

    public static native void onCreate();
    public static native void onDestroy();
    public static native void onNetworkChange();
    public static native void onForeground(final boolean forground);
    public static native void onSingalCrash(int sig);
    public static native void onExceptionCrash();

    /**
     * 網絡切換監聽,客戶端經過註冊該廣播通知mars stn網絡切換
     */
    public static class ConnectionReceiver extends BroadcastReceiver {

        public static NetworkInfo lastActiveNetworkInfo = null;
        public static WifiInfo lastWifiInfo = null;
        public static boolean lastConnected = true;

        public static String TAG = "mars.ConnectionReceiver";

        @Override
        public void onReceive(Context context, Intent intent) {

            if (context == null || intent == null) {
                return;
            }

            ConnectivityManager mgr = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            NetworkInfo netInfo = null;
            try {
                netInfo = mgr.getActiveNetworkInfo();
            } catch (Exception e) {
                Log.i(TAG, "getActiveNetworkInfo failed.");
            }

            checkConnInfo(context, netInfo);
        }

        public void checkConnInfo(final Context context, final NetworkInfo activeNetInfo) {

            if (activeNetInfo == null) {
                lastActiveNetworkInfo = null;
                lastWifiInfo = null;
                BaseEvent.onNetworkChange();
            }
            else if (activeNetInfo.getDetailedState() != NetworkInfo.DetailedState.CONNECTED) {

                if (lastConnected) {
                    lastActiveNetworkInfo = null;
                    lastWifiInfo = null;
                    BaseEvent.onNetworkChange();
                }

                lastConnected = false;
            }
            else {
                if (isNetworkChange(context, activeNetInfo)) {
                    BaseEvent.onNetworkChange();
                }
                lastConnected = true;
            }

        }

        public boolean isNetworkChange(final Context context, final NetworkInfo activeNetInfo) {

            boolean isWifi = (activeNetInfo.getType() == ConnectivityManager.TYPE_WIFI);
            if (isWifi) {
                WifiManager wifiManager = (WifiManager) context.getSystemService(Context.WIFI_SERVICE);
                WifiInfo wi = wifiManager.getConnectionInfo();
                if (wi != null && lastWifiInfo != null && lastWifiInfo.getBSSID().equals(wi.getBSSID())
                        && lastWifiInfo.getSSID().equals(wi.getSSID())
                        && lastWifiInfo.getNetworkId() == wi.getNetworkId()) {
                    Log.w(TAG, "Same Wifi, do not NetworkChanged");
                    return false;
                }
                lastWifiInfo = wi;
            } else if (lastActiveNetworkInfo != null
                    && lastActiveNetworkInfo.getExtraInfo() != null && activeNetInfo.getExtraInfo() != null
                    && lastActiveNetworkInfo.getExtraInfo().equals(activeNetInfo.getExtraInfo())
                    && lastActiveNetworkInfo.getSubtype() == activeNetInfo.getSubtype()
                    && lastActiveNetworkInfo.getType() == activeNetInfo.getType()) {
                return false;

            } else if (lastActiveNetworkInfo != null
                    && lastActiveNetworkInfo.getExtraInfo() == null && activeNetInfo.getExtraInfo() == null
                    && lastActiveNetworkInfo.getSubtype() == activeNetInfo.getSubtype()
                    && lastActiveNetworkInfo.getType() == activeNetInfo.getType()) {
                Log.w(TAG, "Same Network, do not NetworkChanged");
                return false;
            }

            if (!isWifi) {
            }

            lastActiveNetworkInfo = activeNetInfo;


            return true;
        }

    }
}

能夠看到,定義了一個靜態的廣播接收器,這個ConnectionReceiver直接在上層的wrapper中的manifest裏被定義了。註釋也寫的很清楚,就是爲了監聽網絡切換。直接在上層的MessageHandler裏面會發起廣播來通道到這裏。
能夠看到,這裏並不關心intent的內容,而是經過ConnectivityManager獲取網絡信息,並檢查網絡狀況。若是網絡狀態發生改變,經過BaseEvent.onNetworkChange通知native層。
以此爲切入點,咱們進入mars的native層:
/mars-master/mars/baseevent/jni/com_tencent_mars_BaseEvent.cc

JNIEXPORT void JNICALL Java_com_tencent_mars_BaseEvent_onNetworkChange (JNIEnv *, jclass)
{
    mars::baseevent::OnNetworkChange();
}

最後會到達這裏:
/mars-master/mars/baseevent/src/baseprjevent.cc

boost::signals2::signal<void ()>& GetSignalOnNetworkChange()
{
    static boost::signals2::signal<void ()> SignalOnNetworkChange;
    return SignalOnNetworkChange;
}

利用了boost這個c++庫的signals2信號槽的機制,有點像qt下的信號槽。發起了一個信號,那麼這個信號在哪裏響應的呢?
在stn_locic.cc這個文件中:
/mars-master/mars/stn/stn_logic.cc

static void __initbind_baseprjevent() {

#ifdef ANDROID
    mars::baseevent::addLoadModule(kLibName);
#endif

    GetSignalOnCreate().connect(&onCreate);
    GetSignalOnDestroy().connect(&onDestroy);
    GetSignalOnSingalCrash().connect(&onSingalCrash);
    GetSignalOnExceptionCrash().connect(&onExceptionCrash);
    GetSignalOnNetworkChange().connect(&onNetworkChange);
    
#ifndef XLOGGER_TAG
#error "not define XLOGGER_TAG"
#endif
    
    GetSignalOnNetworkDataChange().connect(&OnNetworkDataChange);
}

BOOT_RUN_STARTUP(__initbind_baseprjevent);

看到吧,這裏調用connect鏈接了onNetworkChange這個函數。咱們先看下哪裏調用這個BOOT_RUN_STARTUP初始化了:
/mars-master/mars/comm/bootrun.h

#define BOOT_RUN_STARTUP(func) VARIABLE_IS_NOT_USED static int __anonymous_run_variable_startup_##func = boot_run_atstartup(func)

看到了吧,是個靜態變量,這裏利用了c++的技巧,全局靜態變量會在一開始就初始化,所以這裏實際上在這裏調用了__initbind_baseprjevent這個靜態方法,而後實現了一開始信號與槽的鏈接。
那麼好,既然初始化和觸發的過程都有了,剩下的就是看看響應的部分幹了什麼吧:

static void onNetworkChange() {
#ifdef __APPLE__
    FlushReachability();
#endif
#ifdef ANDROID
    g_NetInfo = 0;

    ScopedLock lock(g_net_mutex);
    g_wifi_info.ssid.clear();
    g_wifi_info.bssid.clear();
    g_sim_info.isp_code.clear();
    g_sim_info.isp_name.clear();
    g_apn_info.nettype = kNoNet -1;
    g_apn_info.sub_nettype = 0;
    g_apn_info.extra_info.clear();
    lock.unlock();
#endif
    STN_WEAK_CALL(OnNetworkChange());
}

加鎖,清除全局的有關網絡部分的一些保留的信息,由於網絡狀態發生了改變,這些都等於無效了。而後執行了一個喚醒的調用,經過宏STN_WEAK_CALL,具體的先不看吧,調用的是函數OnNetworkChange:
/mars-master/mars/stn/src/timing_sync.cc

void TimingSync::OnNetworkChange()
{
    if (alarm_.IsWaiting())
    {
         alarm_.Cancel();
         alarm_.Start(GetAlarmTime(active_logic_.IsActive()));
    }
}

這裏又引入了alarm這個東西,看起來是個警報器。從新啓動了他,給定了一個時間。再往下看的話,有些東西看不太明白了,這裏存疑吧。不過剛纔這部分的大致流程仍是能夠概括出來的。
1.上層的靜態廣播接收器接收廣播事件;
2.觸發後,經過BaseEvent來通知底層native層,這裏的BaseEvent實際上是承擔了java層與c層之間的通信;
3.進入到c層後,也有baseevent,不過這裏的是經過信號槽的方式觸發響應函數處理;

咱們再繼續看下這些BaseEvent所觸發的各個函數:

static void onCreate() {
#if !UWP && !defined(WIN32)
    signal(SIGPIPE, SIG_IGN);
#endif

    xinfo2(TSF"stn oncreate");
    SINGLETON_STRONG(ActiveLogic);
    NetCore::Singleton::Instance();

}

其實是new出了這個NetCore單例。這其中使用了大量的宏,配合智能指針處理,詳細過程就不在這裏敘述了。

下面咱們結合上層的處理來分析下一個任務的執行流程。還記得上層最後的send調用會最終調用StnLogic.startTask(_task)。那麼這個就直接進入了StnLogic:
/mars-master/mars/libraries/mars_android_sdk/src/main/java/com/tencent/mars/stn/StnLogic.java

//async call
public static native void startTask(final Task task);

這裏已經註明是個異步調用,並且是個native的函數,好的,進入c層,咱們節選一部分代碼:
/mars-master/mars/stn/jni/com_tencent_mars_stn_StnLogic_Java2C.cc

DEFINE_FIND_STATIC_METHOD(KJava2C_startTask, KNetJava2C, "startTask", "(Lcom/tencent/mars/stn/StnLogic$Task;)V")
JNIEXPORT void JNICALL Java_com_tencent_mars_stn_StnLogic_startTask
  (JNIEnv *_env, jclass, jobject _task) {
    xverbose_function();

    //get the field value of the netcmd
    jint taskid = JNU_GetField(_env, _task, "taskID", "I").i;
    jint cmdid = JNU_GetField(_env, _task, "cmdID", "I").i;
    jint channel_select = JNU_GetField(_env, _task, "channelSelect", "I").i;
    jobject hostlist = JNU_GetField(_env, _task, "shortLinkHostList", "Ljava/util/ArrayList;").l;
    jstring cgi = (jstring)JNU_GetField(_env, _task, "cgi", "Ljava/lang/String;").l;

    jboolean send_only = JNU_GetField(_env, _task, "sendOnly", "Z").z;
    jboolean need_authed = JNU_GetField(_env, _task, "needAuthed", "Z").z;
    jboolean limit_flow = JNU_GetField(_env, _task, "limitFlow", "Z").z;
    jboolean limit_frequency = JNU_GetField(_env, _task, "limitFrequency", "Z").z;

    jint channel_strategy = JNU_GetField(_env, _task, "channelStrategy", "I").i;
    jboolean network_status_sensitive = JNU_GetField(_env, _task, "networkStatusSensitive", "Z").z;
    jint priority = JNU_GetField(_env, _task, "priority", "I").i;

    jint retrycount = JNU_GetField(_env, _task, "retryCount", "I").i;
    jint server_process_cost = JNU_GetField(_env, _task, "serverProcessCost", "I").i;
    jint total_timetout = JNU_GetField(_env, _task, "totalTimeout", "I").i;
    jstring report_arg = (jstring)JNU_GetField(_env, _task, "reportArg", "Ljava/lang/String;").l;

    //init struct Task
    struct Task task;
    task.taskid = taskid;
    task.cmdid = cmdid;
    task.channel_select = channel_select;

    task.send_only = send_only;
    task.need_authed = need_authed;
    task.limit_flow = limit_flow;
    task.limit_frequency = limit_frequency;

    task.channel_strategy = channel_strategy;
    task.network_status_sensitive = network_status_sensitive;
    task.priority = priority;

    task.retry_count = retrycount;
    task.server_process_cost = server_process_cost;
    task.total_timetout = total_timetout;

    if (NULL != report_arg) {
        task.report_arg = ScopedJstring(_env, report_arg).GetChar();
    }

    if (NULL != hostlist) {
        jclass cls_arraylist = _env->GetObjectClass(hostlist);
        //method in class ArrayList
        jmethodID arraylist_get = _env->GetMethodID(cls_arraylist,"get","(I)Ljava/lang/Object;");
        jmethodID arraylist_size = _env->GetMethodID(cls_arraylist,"size","()I");
        jint len = _env->CallIntMethod(hostlist, arraylist_size);
        for(int i = 0; i < len; i++){
            jstring host = (jstring)_env->CallObjectMethod(hostlist, arraylist_get, i);
            if (NULL != host) {
                task.shortlink_host_list.push_back(ScopedJstring(_env, host).GetChar());
                _env->DeleteLocalRef(host);
            }
        }
        _env->DeleteLocalRef(hostlist);
    }

    if (NULL != cgi) {
        task.cgi = ScopedJstring(_env, cgi).GetChar();
        _env->DeleteLocalRef(cgi);
    }

    StartTask(task);
}

所有都是在作Task的轉換,從StnLogic的Task轉換成爲底層native的Task。依次抓取成員變量字段,並複製到新Task中。而後調用的是StartTask。會直接走到NetCore中去:
/mars-master/mars/stn/src/net_core.cc

void NetCore::StartTask(const Task& _task) {

    ASYNC_BLOCK_START

    xgroup2_define(group);
    xinfo2(TSF"task start long short taskid:%0, cmdid:%1, need_authed:%2, cgi:%3, channel_select:%4, limit_flow:%5, ",
           _task.taskid, _task.cmdid, _task.need_authed, _task.cgi.c_str(), _task.channel_select, _task.limit_flow) >> group;
    xinfo2(TSF"host:%_, send_only:%_, cmdid:%_, server_process_cost:%_, retrycount:%_,  channel_strategy:%_, ",
            _task.shortlink_host_list.empty()?"":_task.shortlink_host_list.front(), _task.send_only, _task.cmdid, _task.server_process_cost, _task.retry_count, _task.channel_strategy) >> group;
    xinfo2(TSF" total_timetout:%_, network_status_sensitive:%_, priority:%_, report_arg:%_",  _task.total_timetout,  _task.network_status_sensitive, _task.priority, _task.report_arg) >> group;

    Task task = _task;
    if (!__ValidAndInitDefault(task, group)) {
        OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalTaskParam);
        return;
    }
    
    if (task_process_hook_) {
        task_process_hook_(task);
    }

    if (0 == task.channel_select) {
        xerror2(TSF"error channelType (%_, %_), ", kEctLocal, kEctLocalChannelSelect) >> group;
        
        OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalChannelSelect);
        return;
    }
    
    if (task.network_status_sensitive && kNoNet ==::getNetInfo()
#ifdef USE_LONG_LINK
        && LongLink::kConnected != longlink_task_manager_->LongLinkChannel().ConnectStatus()
#endif
        ) {
        xerror2(TSF"error no net (%_, %_), ", kEctLocal, kEctLocalNoNet) >> group;
        OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalNoNet);
        return;
    }

    bool start_ok = false;

#ifdef USE_LONG_LINK

    if (LongLink::kConnected != longlink_task_manager_->LongLinkChannel().ConnectStatus()
            && (Task::kChannelLong & task.channel_select) && SINGLETON_STRONG(ActiveLogic)->IsForeground()

            && (15 * 60 * 1000 >= gettickcount() - SINGLETON_STRONG(ActiveLogic)->LastForegroundChangeTime()))
        longlink_task_manager_->getLongLinkConnectMonitor().MakeSureConnected();

#endif

    xgroup2() << group;

    switch (task.channel_select) {
    case Task::kChannelBoth: {

#ifdef USE_LONG_LINK
        bool bUseLongLink = LongLink::kConnected == longlink_task_manager_->LongLinkChannel().ConnectStatus();

        if (bUseLongLink && task.channel_strategy == Task::kChannelFastStrategy) {
            xinfo2(TSF"long link task count:%0, ", longlink_task_manager_->GetTaskCount());
            bUseLongLink = bUseLongLink && (longlink_task_manager_->GetTaskCount() <= kFastSendUseLonglinkTaskCntLimit);
        }

        if (bUseLongLink)
            start_ok = longlink_task_manager_->StartTask(task);
        else
#endif
            start_ok = shortlink_task_manager_->StartTask(task);
    }
    break;
#ifdef USE_LONG_LINK

    case Task::kChannelLong:
        start_ok = longlink_task_manager_->StartTask(task);
        break;
#endif

    case Task::kChannelShort:
        start_ok = shortlink_task_manager_->StartTask(task);
        break;

    default:
        xassert2(false);
        break;
    }

    if (!start_ok) {
        xerror2(TSF"taskid:%_, error starttask (%_, %_)", task.taskid, kEctLocal, kEctLocalStartTaskFail);
        OnTaskEnd(task.taskid, task.user_context, kEctLocal, kEctLocalStartTaskFail);
    } else {
#ifdef USE_LONG_LINK
        zombie_task_manager_->OnNetCoreStartTask();
#endif
    }
        
    ASYNC_BLOCK_END
}

1.一堆錯誤檢查,有錯誤隨時準備執行OnTaskEnd並返回;
2.一個switch case,用來檢查任務的執行通道類型,分爲長鏈接+短鏈接、長鏈接、短鏈接。
3.最後若是一切正常,而且是長鏈接,還要走一個zombie_task_manager_->OnNetCoreStartTask(),其實內部就是記錄下開始時間。字面上理解是殭屍任務管理,我理解的是對實際執行時間超長的任務的跟蹤,若是發現已經脫離並超時,準備結束和回收。
這裏我特地看了下,是在net_core.cc中,長鏈接錯誤,斷開重連或者網絡情況發生改變,都會觸發ZombieTaskManager.__StartTask,這個東西會在內部檢查每一個任務的時長,這些任務是經過SaveTask保存下來的,若是時長超過了會走fun_callback_不然再次嘗試執行任務fun_start_task_。這裏採用的是boost::bind方式,對應的函數實體是NetCore::__CallBack和NetCore::StartTask。以前在_StartTask中若是超時了,會調用到這個callback,其中會保留task,不然才走NetCore::StartTask。這裏我感受超時的任務並非徹底釋放回收,而是等待網絡情況發生改變的時候再次嘗試執行。此處屬於粗略查看,並不能確定,不過暫時沒看到釋聽任務的步驟。這裏已經偏離了主線,暫時放下。
4.每一個case其實是根據傳輸通道類型的不一樣,執行的不一樣的任務管理器的StartTask。

咱們以長鏈接爲例,看下具體的執行過程:
/mars-master/mars/stn/src/longlink_task_manager.cc

bool LongLinkTaskManager::StartTask(const Task& _task) {
    xverbose_function();
    xdebug2(TSF"taskid=%0", _task.taskid);

    TaskProfile task(_task);
    task.link_type = Task::kChannelLong;

    lst_cmd_.push_back(task);
    lst_cmd_.sort(__CompareTask);

    __RunLoop();
    return true;
}

1.設置傳輸類型;
2.添加任務到隊列中,並從新排序隊列;
3.執行__RunLoop;
看看__RunLoop:

void LongLinkTaskManager::__RunLoop() {

    if (lst_cmd_.empty()) {
#ifdef ANDROID
        /*cancel the last wakeuplock*/
        wakeup_lock_->Lock(500);
#endif
        return;
    }

    __RunOnTimeout();
    __RunOnStartTask();

    if (!lst_cmd_.empty()) {
#ifdef ANDROID
        wakeup_lock_->Lock(30 * 1000);
#endif
      MessageQueue::FasterMessage(asyncreg_.Get(),
                                  MessageQueue::Message((MessageQueue::MessageTitle_t)this, boost::bind(&LongLinkTaskManager::__RunLoop, this)),
                                  MessageQueue::MessageTiming(1000));
    } else {
#ifdef ANDROID
        /*cancel the last wakeuplock*/
        wakeup_lock_->Lock(500);
#endif
    }
}

1.__RunOnTimeout超時;
2.__RunOnStartTask執行任務;
3.執行MessageQueue::FasterMessage;
依次看下:

void LongLinkTaskManager::__RunOnTimeout() {
    std::list<TaskProfile>::iterator first = lst_cmd_.begin();
    std::list<TaskProfile>::iterator last = lst_cmd_.end();

    uint64_t cur_time = ::gettickcount();
    int socket_timeout_code = 0;
    bool istasktimeout = false;

    while (first != last) {
        std::list<TaskProfile>::iterator next = first;
        ++next;

        if (first->running_id && 0 < first->transfer_profile.start_send_time) {
            if (0 == first->transfer_profile.last_receive_pkg_time && cur_time - first->transfer_profile.start_send_time >= first->transfer_profile.first_pkg_timeout) {
                xerror2(TSF"task first-pkg timeout taskid:%_,  nStartSendTime=%_, nfirstpkgtimeout=%_",
                        first->task.taskid, first->transfer_profile.start_send_time / 1000, first->transfer_profile.first_pkg_timeout / 1000);
                socket_timeout_code = kEctLongFirstPkgTimeout;
                __SetLastFailedStatus(first);
            }

            if (0 < first->transfer_profile.last_receive_pkg_time && cur_time - first->transfer_profile.last_receive_pkg_time >= ((kMobile != getNetInfo()) ? kWifiPackageInterval : kGPRSPackageInterval)) {
                xerror2(TSF"task pkg-pkg timeout, taskid:%_, nLastRecvTime=%_, pkg-pkg timeout=%_",
                        first->task.taskid, first->transfer_profile.last_receive_pkg_time / 1000, ((kMobile != getNetInfo()) ? kWifiPackageInterval : kGPRSPackageInterval) / 1000);
                socket_timeout_code = kEctLongPkgPkgTimeout;
            }
        }


        if (first->running_id && 0 < first->transfer_profile.start_send_time && cur_time - first->transfer_profile.start_send_time >= first->transfer_profile.read_write_timeout) {
            xerror2(TSF"task read-write timeout, taskid:%_, , nStartSendTime=%_, nReadWriteTimeOut=%_",
                    first->task.taskid, first->transfer_profile.start_send_time / 1000, first->transfer_profile.read_write_timeout / 1000);
            socket_timeout_code = kEctLongReadWriteTimeout;
        }

        if (cur_time - first->start_task_time >= first->task_timeout) {
            __SingleRespHandle(first, kEctLocal, kEctLocalTaskTimeout, kTaskFailHandleTaskTimeout, longlink_->Profile());
            istasktimeout = true;
        }

        first = next;
    }

    if (0 != socket_timeout_code) {
        dynamic_timeout_.CgiTaskStatistic("", kDynTimeTaskFailedPkgLen, 0);
        __BatchErrorRespHandle(kEctNetMsgXP, socket_timeout_code, kTaskFailHandleDefault, 0, longlink_->Profile());
        xassert2(fun_notify_network_err_);
        fun_notify_network_err_(__LINE__, kEctNetMsgXP, socket_timeout_code, longlink_->Profile().ip,  longlink_->Profile().port);
    } else if (istasktimeout) {
        __BatchErrorRespHandle(kEctNetMsgXP, kEctLongTaskTimeout, kTaskFailHandleDefault, 0, longlink_->Profile());
        //        xassert2(funNotifyNetworkError);
        //        funNotifyNetworkError(__LINE__, ectNetMsgXP, ectNetMsgXP_TaskTimeout, longlink_->IP(),  longlink_->Port());
    }
}

循環檢查cmd容器每一個項目,檢查是網絡socket超時錯誤仍是任務超時。這個暫時不做爲重點查看。
看下__RunOnStartTask:

void LongLinkTaskManager::__RunOnStartTask() {
    std::list<TaskProfile>::iterator first = lst_cmd_.begin();
    std::list<TaskProfile>::iterator last = lst_cmd_.end();

    bool ismakesureauthruned = false;
    bool ismakesureauthsuccess = false;
    uint64_t curtime = ::gettickcount();

    bool canretry = curtime - lastbatcherrortime_ >= retry_interval_;
    bool canprint = true;
    int sent_count = 0;

    while (first != last) {
        std::list<TaskProfile>::iterator next = first;
        ++next;

        // 跳過正在運行的任務
        if (first->running_id) {
            ++sent_count;
            first = next;
            continue;
        }

        //重試間隔, 不影響第一次發送的任務
        if (first->task.retry_count > first->remain_retry_count && !canretry) {
            xdebug2_if(canprint, TSF"retry interval:%0, curtime:%1, lastbatcherrortime_:%2, curtime-m_lastbatcherrortime:%3",
                       retry_interval_, curtime, lastbatcherrortime_, curtime - lastbatcherrortime_);
            
            canprint = false;
            first = next;
            continue;
        }

        // make sure login
        if (first->task.need_authed) {
            if (!ismakesureauthruned) {
                ismakesureauthruned = true;
                ismakesureauthsuccess = MakesureAuthed();
            }

            if (!ismakesureauthsuccess) {
                xinfo2_if(curtime % 3 == 0, TSF"makeSureAuth retsult=%0", ismakesureauthsuccess);
                first = next;
                continue;
            }
        }

        AutoBuffer bufreq;
        int error_code = 0;

        if (!first->antiavalanche_checked) {
            if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) {
                __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
            // 雪崩檢測
            xassert2(fun_anti_avalanche_check_);
            if (!fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) {
                __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
            
            first->antiavalanche_checked = true;
        }

        if (!longlinkconnectmon_->MakeSureConnected()) {
            break;
        }

        if (0 == bufreq.Length()) {
            if (!Req2Buf(first->task.taskid, first->task.user_context, bufreq, error_code, Task::kChannelLong)) {
                __SingleRespHandle(first, kEctEnDecode, error_code, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
            // 雪崩檢測
            xassert2(fun_anti_avalanche_check_);
            if (!first->antiavalanche_checked && !fun_anti_avalanche_check_(first->task, bufreq.Ptr(), (int)bufreq.Length())) {
                __SingleRespHandle(first, kEctLocal, kEctLocalAntiAvalanche, kTaskFailHandleTaskEnd, longlink_->Profile());
                first = next;
                continue;
            }
        }

        first->transfer_profile.loop_start_task_time = ::gettickcount();
        first->transfer_profile.first_pkg_timeout = __FirstPkgTimeout(first->task.server_process_cost, bufreq.Length(), sent_count, dynamic_timeout_.GetStatus());
        first->current_dyntime_status = (first->task.server_process_cost <= 0) ? dynamic_timeout_.GetStatus() : kEValuating;
        first->transfer_profile.read_write_timeout = __ReadWriteTimeout(first->transfer_profile.first_pkg_timeout);
        first->transfer_profile.send_data_size = bufreq.Length();
        first->running_id = longlink_->Send((const unsigned char*) bufreq.Ptr(), (unsigned int)bufreq.Length(), first->task.cmdid, first->task.taskid,
                                      first->task.send_only ? "":first->task.cgi);

        if (!first->running_id) {
            xwarn2(TSF"task add into longlink readwrite fail cgi:%_, cmdid:%_, taskid:%_", first->task.cgi, first->task.cmdid, first->task.taskid);
            first = next;
            continue;
        }

        xinfo2(TSF"task add into longlink readwrite suc cgi:%_, cmdid:%_, taskid:%_, size:%_, timeout(firstpkg:%_, rw:%_, task:%_), retry:%_",
               first->task.cgi, first->task.cmdid, first->task.taskid, first->transfer_profile.send_data_size, first->transfer_profile.first_pkg_timeout / 1000,
               first->transfer_profile.read_write_timeout / 1000, first->task_timeout / 1000, first->remain_retry_count);

        if (first->task.send_only) {
            __SingleRespHandle(first, kEctOK, 0, kTaskFailHandleNoError, longlink_->Profile());
        }

        ++sent_count;
        first = next;
    }
}

仍然是循環cmd列表,裏面作的事情:
1.跳過正在執行的任務;
2.若是任務的retry次數超過邊界,則跳過這個任務,繼續下一個;
3.確認登陸驗證,若是是須要驗證的而且還未驗證,則執行MakesureAuthed。這個內部也是最終調用到了上層java層的StnLogic的ICallBack.makesureAuthed方法上,由上層在此處能夠插入一個驗證機制,注意這裏是個同步的過程;
4.雪崩檢查;
5.執行longlink_->Send真正的發送;

來看看雪崩檢測吧。這裏的雪崩指的是客戶端反覆retry太多引起的很短的時間間隔內大量的數據重發致使的負載過大的問題。通常狀況下出如今網絡不穩定或者服務器出現處理問題的時候,嚴重的狀況下會拖慢客戶端的效能,而且致使服務器負載癱瘓,有點像dos攻擊這類的效果。

是經過函數fun_anti_avalanche_check_檢查的,其實是AntiAvalanche::Check。
/mars-master/mars/stn/src/anti_avalanche.cc

bool AntiAvalanche::Check(const Task& _task, const void* _buffer, int _len) {
    xverbose_function();

    unsigned int span = 0;
    if (!frequency_limit_->Check(_task, _buffer, _len, span)){
        ReportTaskLimited(kFrequencyLimit, _task, span);
        return false;
    }

    if (kMobile == getNetInfo() && !flow_limit_->Check(_task, _buffer, _len)) {
        ReportTaskLimited(kFlowLimit, _task, (unsigned int&)_len);
        return false;
    }

    return true;
}

這裏有2種狀況返回了false,而false正是調用者判斷是否執行continue的依據(執行了continue便是暫時不進行這個任務的處理轉而進行下一次循環),這兩種狀況分別是調用了FrequencyLimit::Check和FlowLimit::Check。先看前者:
/mars-master/mars/stn/src/frequency_limit.cc

bool FrequencyLimit::Check(const mars::stn::Task& _task, const void* _buffer, int _len, unsigned int& _span) {
    xverbose_function();

    if (!_task.limit_frequency) return true;

    // 計算時間間隔,當前時間與上次記錄清除時間的時間差
    unsigned long time_cur = ::gettickcount();
    xassert2(time_cur >= itime_record_clear_);
    unsigned long interval = time_cur - itime_record_clear_;

    // 若是這個時間間隔大於等於60*60*1000(1分鐘),執行清除全部記錄,並記錄當前時間爲最後清除記錄的時間
    if (RUN_CLEAR_RECORDS_INTERVAL_MINUTE <= interval) {
        xdebug2(TSF"__ClearRecord interval=%0, timeCur=%1, itimeRecordClear=%2", interval, time_cur, itime_record_clear_);
        itime_record_clear_ = time_cur;
        __ClearRecord();
    }

    // 計算當前buffer的hash值,在當前記錄集中查找是否有相同的記錄存在
    unsigned long hash = ::adler32(0, (const unsigned char*)_buffer, _len);
    int find_index = __LocateIndex(hash);

    if (0 <= find_index) {
        // 有相同記錄存在
        _span = __GetLastUpdateTillNow(find_index);
        __UpdateRecord(find_index);
        // 檢測記錄的次數是否超出邊界105,若是是返回false
        if (!__CheckRecord(find_index)) {
            xerror2(TSF"Anti-Avalanche had Catch Task, Task Info: ptr=%0, cmdid=%1, need_authed=%2, cgi:%3, channel_select=%4, limit_flow=%5",
                    &_task, _task.cmdid, _task.need_authed, _task.cgi, _task.channel_select, _task.limit_flow);
            xerror2(TSF"apBuffer Len=%0, Hash=%1, Count=%2, timeLastUpdate=%3",
                    _len, iarr_record_[find_index].hash_, iarr_record_[find_index].count_, iarr_record_[find_index].time_last_update_);
            xassert2(false);

            return false;
        }
    } else {
        // 沒有相同記錄存在,插入當前這條記錄
        xdebug2(TSF"InsertRecord Task Info: ptr=%0, cmdid=%1, need_authed=%2, cgi:%3, channel_select=%4, limit_flow=%5",
                &_task, _task.cmdid, _task.need_authed, _task.cgi, _task.channel_select, _task.limit_flow);

        __InsertRecord(hash);
    }

    return true;
}

1.計算一個時間差,是從上次清楚全部記錄開始到當前時間點的,而後判斷時間差是否超出1分鐘的邊界,若是是,清楚全部記錄集,並將當前時間做爲最後清除時間;
2.根據傳遞進來的buffer計算hash值,並調用__LocateIndex判斷當前記錄集中是否有相同hash值的記錄存在;
3.根據__LocateIndex的返回值判斷(-1沒有相同的存在,>=0有相同的存在)執行不一樣的分支;
4.有相同的存在,執行__CheckRecord,檢查這條記錄的count_是否<=105,若是超出,表示到達上限,返回false;
5.沒有相同的記錄存在,插入該條記錄到記錄集中;

這裏稍微總結一下,能夠看出,徹底是經過時間和次數來肯定是否雪崩的,時間用來確保必定的時間間隔內的檢查,不會太頻繁也不會太超長,1分鐘比較合適,過了1分鐘清除一次再從新來;次數在__UpdateRecord時候增長,用來確保一段時間內,只要執行的雪崩檢查越多,次數的值就越大,直到超出後認爲觸發了雪崩效應,返回false。看起來怎麼樣,比較清晰和明確,並且這部分獨立的,耦合性也不高,寫的挺好。
順便說下,__InsertRecord插入並非直接插入就完了,裏面會執行一個檢查,若是當前的記錄集大小已經超過了最大額定大小(30),會在每次都找到一個時間最小的,也就是最先的,刪除他,給新的一個記錄騰個地。

void FrequencyLimit::__InsertRecord(unsigned long _hash) {
    if (MAX_RECORD_COUNT < iarr_record_.size()) {
        xassert2(false);
        return;
    }

    STAvalancheRecord temp;
    temp.count_ = 1;
    temp.hash_ = _hash;
    temp.time_last_update_ = ::gettickcount();

    // 若是超出邊界,刪除時間最先的那一條記錄
    if (MAX_RECORD_COUNT == iarr_record_.size()) {
        unsigned int del_index = 0;

        for (unsigned int i = 1; i < iarr_record_.size(); i++) {
            if (iarr_record_[del_index].time_last_update_ > iarr_record_[i].time_last_update_) {
                del_index = i;
            }
        }

        std::vector<STAvalancheRecord>::iterator it = iarr_record_.begin();
        it += del_index;
        iarr_record_.erase(it);
    }

    iarr_record_.push_back(temp);
}

ok,上面的只是雪崩檢查的一部分,還有另一部分在FlowLimit::Check:
/mars-master/mars/stn/src/flow_limit.cc

bool FlowLimit::Check(const mars::stn::Task& _task, const void* _buffer, int _len) {
    xverbose_function();

    if (!_task.limit_flow) {
        return true;
    }

    __FlashCurVol();

    if (cur_funnel_vol_ + _len > kMaxVol) {
        xerror2(TSF"Task Info: ptr=%_, cmdid=%_, need_authed=%_, cgi:%_, channel_select=%_, limit_flow=%_, cur_funnel_vol_(%_)+_len(%_)=%_,MAX_VOL:%_ ",
                &_task, _task.cmdid, _task.need_authed, _task.cgi, _task.channel_select, _task.limit_flow, cur_funnel_vol_ + _len, cur_funnel_vol_, _len, cur_funnel_vol_ + _len, kMaxVol);

        return false;
    }

    cur_funnel_vol_ += _len;
    return true;
}

先回顧下上面的AntiAvalanche::Check調用FlowLimit::Check的時候,還有一個條件,就是kMobile == getNetInfo(),也就是非wifi的移動網絡模式下開啓這個檢測,再回來看這裏,單獨看cur_funnel_vol_ + _len > kMaxVol這句話,感受是一個數據量上的檢測,若是大小超出2 * 1024這個邊界會認爲觸發雪崩,好吧,關鍵看下__FlashCurVol:

void FlowLimit::__FlashCurVol() {
    uint64_t timeCur = ::gettickcount();
    xassert2(timeCur >= time_lastflow_computer_, TSF"%_, %_", timeCur, time_lastflow_computer_);
    uint64_t interval = (timeCur - time_lastflow_computer_) / 1000;

    xdebug2(TSF"iCurFunnelVol=%0, iFunnelSpeed=%1, interval=%2", cur_funnel_vol_, funnel_speed_, interval);
    cur_funnel_vol_ -= interval * funnel_speed_;
    cur_funnel_vol_ = std::max(0, cur_funnel_vol_);
    xdebug2(TSF"iCurFunnelVol=%0", cur_funnel_vol_);

    time_lastflow_computer_ = timeCur;
}

開始仍然是時間差的計算,當前時間和最後的流量時間作時間差並計算出秒爲單位的差值,而後而後用一個增量單位因子funnel_speed_乘以時間差後,用當前的一個值減去他,這個值能夠認爲是一個估值6 1024 1024,而後若是這個值要是仍然大於0,則保留,不然置爲0。後面回到Check裏面,進行了這個值cur_funnel_vol_ + _len > kMaxVol的判斷。那麼怎麼理解呢?個人理解是在移動網絡狀況下,在作一個假設的發送數據量的追蹤,來看在這段時間差裏若是按照假設的應當走多少流量,而後這個流量值再與一個假設的總值相減,得到的是還可以使用多少流量,若是這個值加上當前要傳輸的Len超出了極限值,那麼認爲是過載了,觸發雪崩,暫時不作處理,continue。那麼這個continue的結果是此次任務的數據量大,先不發送,先走後面的一個任務,若是數據量少,能夠發送。
總結一下吧,智能說這裏的雪崩檢查仍是比較有技術含量的,煞費苦心的作了2個檢查,一個是單體任務的發送次數限制,一個是流量的限制,只要有一個超出限制,認爲是雪崩,執行跳過當前任務,執行下一個。這裏看起來仍是比較費勁一些,須要猜想用途的真正含義。這裏的檢查涵蓋的比較全面,能夠看到微信爲了解決移動網絡下的網絡傳輸雪崩,下了一些功夫,挺不錯的!

此文有點長,咱們把雪崩檢查的後續工做放大下一文去分析吧。

相關文章
相關標籤/搜索