撥雲見日---android異步消息機制源碼分析(二)

在撥雲見日---android異步消息機制源碼分析(一)(http://my.oschina.net/u/1155515/blog/378460)中,咱們瞭解了Java層異步消息機制的基本流程,可能細心的同窗會發現java層中有不少native調用,其實java層僅僅是一個殼子,具體的實現全在native層,經過本篇文章,讓咱們繼續抽絲剝繭,一步步揭開Android異步消息的面紗,分析很差之處,還請熟悉的童鞋指教java

本文從實際使用角度出發,共分爲2個部分android

一、從子線程添加消息到消息線程時,從Java層到native的處理流程異步

二、消息線程消費消息的處理流程函數

1、從子線程添加消息到消息線程時,從Java層到native的處理流程oop

在子線程中,咱們能夠經過handler.sendMessage或Handler.postXXX(如post(Runable r)或postDelayed(Runnable r, long delayMillis))向消息線程發送消息源碼分析

而最終會調用到Java層MessageQueue.enqueueMessage把消息放入消息線程的MessageQueue(源碼路徑:android.os.MessageQueue)post

boolean enqueueMessage(Message msg, long when) {
        if (msg.isInUse()) {
            throw new AndroidRuntimeException(msg + " This message is already in use.");
        }
        if (msg.target == null) {
            throw new AndroidRuntimeException("Message must have a target.");
        }

        synchronized (this) {
            if (mQuitting) {
                RuntimeException e = new RuntimeException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w("MessageQueue", e.getMessage(), e);
                return false;
            }

            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            //判斷入隊消息是否須要當即處理
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;//若是消息循環線程處於阻塞中,則須要喚醒消息線程
            } else {
                //入隊消息不須要當即處理,則根據消息處理時間插入到鏈表中合適位置
                .........
                
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    .............
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
            //喚醒
                nativeWake(mPtr);
            }
        }
        return true;
    }

經過上面代碼,當入隊消息須要當即處理而且消息線程處於阻塞時,會調用native函數nativeWake喚醒消息線程來處理消息ui

經過JNI定義,讓咱們看看nativeWake對應的native函數是什麼this

代碼路徑: frameworks\base\core\jni\android_os_MessageQueue.cppspa

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    .......
    mLooper->pollOnce(timeoutMillis);
    .......
}

void NativeMessageQueue::wake() {
    mLooper->wake();
}

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}


static const JNINativeMethod gMessageQueueMethods[] = {
    /* name, signature, funcPtr */
    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
    { "nativeSetFileDescriptorEvents", "(JII)V",
            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};

int register_android_os_MessageQueue(JNIEnv* env) {
    int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,
                                   NELEM(gMessageQueueMethods));

    jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");
    gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");
    gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,
            "dispatchEvents", "(II)I");

    return res;
}

調用關係鏈以下:

java層nativeWake->nativeMessageQueue.wake()->Looper.wake()

讓咱們經過源碼一探Looper.wake()

代碼路徑: frameworks\native\jb-dev\libs\utils\Looper.cpp

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1);
    } while (nWrite == -1 && errno == EINTR);

    if (nWrite != 1) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

mWakeWritePipeFd是個什麼鬼?

爲何會向mWakeWritePipeFd寫入一個字符?

別急,帶着這些疑問咱們繼續日後面看,看了後面,前面的問題也就迎刃而解

如今咱們簡單的總結一下消息發送的流程:

一、在Java層經過Handler對象發送消息後,消息被放入消息線程的MessageQueue

二、消息入隊後,會判斷是否須要當即處理消息

三、若是須要當即處理消息且消息線程處於阻塞中,則喚醒消息線程

2、消息線程消費消息的處理流程

上面咱們瞭解從子線程發送消息的流程,那麼發送了消息後,消息線程是如何消費消息?

繼續咱們的腳步,讓咱們來一探究竟

經過前一篇文章(http://my.oschina.net/u/1155515/blog/378460)咱們瞭解到消息線程經過調用Java層Looper.loop()進入消息循環,在消息循環中,又經過調用MessageQueue.next()不斷的獲取消息或者沒有消息時阻塞

Message next() {
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            ...............

           //先調用native方法獲阻塞到超時(超時時間由nextPollTimeoutMillis指定)或者被主動喚醒
            nativePollOnce(mPtr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                
                ...................
                
                //判斷是有新消息到達仍是超時
                if (msg != null) {
                    //判斷是否須要當即處理消息
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        //
                        //消息須要當即處理,則返回消息
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (false) Log.v("MessageQueue", "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } 
                
                ....................

                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                ..................
                
            nextPollTimeoutMillis = 0;
        }
    }

根據上面第一部分native層源碼中JNI函數的定義,能夠看到調用關係鏈以下:

java層nativePollOnce->nativeMessageQueue.pollOnce->Looper.pollOnce

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
    
        ...........

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

Looper.pollOnce最終調用了Looper.pollInner

int Looper::pollInner(int timeoutMillis) {
    ...............

    // Poll.
    int result = ALOOPER_POLL_WAKE;
    
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    //調用epoll_wait系統調用監聽fd上事件,或者直到超時返回
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    .............

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = ALOOPER_POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        ////判斷是不是主動喚醒,
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } 
        .............
    }
Done: ;
    ...................
    return result;
}

這裏又出現了mWakeReadPipeFd,讓咱們經過Looper構造函數看看mWakeReadPipeFd是個什麼鬼

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
        
    int wakeFds[2];
    //建立管道
    int result = pipe(wakeFds);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);

    mWakeReadPipeFd = wakeFds[0];
    mWakeWritePipeFd = wakeFds[1];

    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
            errno);

    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
            errno);

    // Allocate the epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
    
    //把讀端管道添加到epoll監控列表並監聽讀端事件
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
            errno);
}

原來mWakeReadPipeFd只是管道的讀端fd,可能童鞋們這時又有疑問

一、爲何要建立一個管道並監聽讀端事件?

二、爲何消息入隊喚醒消息線程時,僅僅是向讀端寫一個字符?

經過上面的源碼,咱們知道當消息線程沒有消息,則會一直阻塞到超時結束;可是若阻塞過程當中,子線程發送一條消息,而這時消息線程還在阻塞中呢,那隻能等消息線程阻塞結束才能處理消息,這樣會形成消息處理延遲

可能聰明的童鞋會說,那我超時時間設置短一點行不行,這樣看起來沒問題,可是太短的超時時間基本上等於輪詢,效率低不說還浪費CPU浪費電

因此經常使用的作法是:

一、建立一個pipe

二、pipe的讀取端放入epoll監聽隊列

三、當須要當即喚醒消息線程時,子線程僅僅往讀取端管道寫一個字符就行

經過上面的分析,如今讓咱們總結一下

消息線程消費消息:

一、消息線程建立MessageQueue時,會在native層建立一個NativeMessageQueue

二、消息線程經過native調用進入阻塞狀態,直到當有新消息到達被主動喚醒或者阻塞超時

三、消息線程被喚醒後,會判斷是否有消息須要處理,若是有則返回消息處理,不然會繼續步驟2直到退出

子線程傳遞消息流程:

一、MessageQueue中,消息按照執行時間從早到晚排列,當子線程發送消息後,根據消息執行時間判斷是否須要當即喚醒消息線程

二、若是須要當即喚醒消息線程,則經過native端喚醒消息線程

三、消息線程被喚醒後,判斷是否有消息須要處理

四、若是有新消息須要處理則返回,不然繼續進入阻塞狀態

一點思考:

Java層消息傳遞與消費爲何不用wait和Notify來實現,Native層僅僅是阻塞消息線程或者喚醒消息線程(Native層消息隊列要強大得多,能夠監聽FD)有種殺雞焉用牛刀的感受

相關文章
相關標籤/搜索