死磕Handler

ActivityThread 的啓動

ActivityThread 是 android 應用程序的「主線程」,它是由 ActivityManagerService 調用 Process 進程類交由 Zygote 孵化出一個新進程的時候作爲 entryPoint 參數指定的。java

/* framework/base/services/core/com/android/server/am/ActivityManagerService.java */
private final boolean startProcessLocked(ProcessRecord app, String hostingType,
        String hostingNameStr, boolean disableHiddenApiChecks, String abiOverride) {
    ...
    final String entryPoint = "android.app.ActivityThread";
    return startProcessLocked(hostingType, hostingNameStr, entryPoint, app, uid, gids,
                runtimeFlags, mountExternal, seInfo, requiredAbi, instructionSet, invokeWith,
                startTime);
}
// 普通 app 進程的 startProcessLocked 會執行到下面的函數上去
private ProcessStartResult startProcess(String hostingType, String entryPoint,
        ProcessRecord app, int uid, int[] gids, int runtimeFlags, int mountExternal,
        String seInfo, String requiredAbi, String instructionSet, String invokeWith,
        long startTime) {
        ...
        // Process 進程類會間接調用 Zygote 孵化出一個新的進程
        startResult = Process.start(entryPoint,
                        app.processName, uid, uid, gids, runtimeFlags, mountExternal,
                        app.info.targetSdkVersion, seInfo, requiredAbi, instructionSet,
                        app.info.dataDir, invokeWith,
                        new String[] {PROC_START_SEQ_IDENT + app.startSeq});
        eturn startResult;                
}                      
複製代碼

Looper、MessageQueue、Handler

目標進程孵化成功後,會觸發 ActivityThread 的 main() 方法的執行,首先她會進行一些環境配置的準備,接着會調用 prepareMainLooper() 初始化 Looper 對象,並將這個 Looper 對象與當前線程一一綁定;在建立 Looper 對象的時候,也伴隨着 MessageQueue 對象的建立,Looper 與 MessageQueue 也是一一對應的。linux

/* frameworks/base/core/java/android/app/ActivityThread.java*/

static volatile Handler sMainThreadHandler;  // set once in main()
final H mH = new H();

public static void main(String[] args) {
    ...
    Looper.prepareMainLooper();

    ActivityThread thread = new ActivityThread();
    thread.attach(false, startSeq);

    if (sMainThreadHandler == null) {
        sMainThreadHandler = thread.getHandler();
    }
    
    Looper.loop();

    throw new RuntimeException("Main thread loop unexpectedly exited");
}


final Handler getHandler() {
    return mH;
}
  
// H 用於分發或處理 android 封裝後的 events 事件 
class H extends Handler {
    ...
}
複製代碼

sThreadLocal 是一個靜態類型變量,意味着一旦 import 了 Looper 後,sThreadLocal 就已經存在並構建完畢。ThreadLocal 是一個本地線程副本變量工具類,這個類能使線程中的某個值與保存值的對象關聯起來。ThreadLocal 提供了 get 與 set 等訪問接口或方法,這些方法爲每一個使用該變量的線程都存有一份獨立的副本,所以 get 老是返回由當前執行線程在調用 set 時設置的值。android

Looper 正式利用 ThreadLocal 的這種特性來將 thread、Looper、MessageQueue 一一關聯起來的。segmentfault

/* frameworks/base/core/java/android/os/Looper.java*/

@UnsupportedAppUsage
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
public static void prepareMainLooper() {
    prepare(false);
    synchronized (Looper.class) {
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper();
    }
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}
    
private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

複製代碼

這裏咱們能夠看出數組

  • 每一個 Thread 只對應一個 Looper
  • 每一個 Looper 只對應一個 MessageQueue

MessageQueue 看名字就能猜到,是用來保存 Message 的隊列。準確來講,Runnable (做爲 message 的 callback 成員) 和 Message 均可以被壓入 MessageQueue 中,造成一個集合。MessageQueue 的構造函數中會調用 nativeInit() 經過 jni 調用進入到 C++ framework 層bash

/*frameworks/base/core/java/android/os/MessageQueue.java */
private final boolean mQuitAllowed;
private long mPtr; // 能夠理解爲對 Native 層 MessageQueue 的引用

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}

private native static long nativeInit();
複製代碼

nativeInit 在 native 層一樣建立了 MessageQueue 、Looper 對象,這與 java 層很是類似。由此咱們能夠猜測,在 native 層也應該實現了一套消息傳遞機制,在 java 層,MessageQueue 中的一條條 message 最終會交由 handler 去處理,在 native 層是否也是如此?併發

/* frameworks/base/core/jni/android_os_MessageQueue.cpp */
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env); // 對 env 的強弱引用執行加1
    return reinterpret_cast<jlong>(nativeMessageQueue); // 將 nativeMessageQueue 轉換成 long 類型來表示
}


class NativeMessageQueue : public MessageQueue, public LooperCallback {
    ...
    
    NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
        mLooper = Looper::getForThread();
        if (mLooper == NULL) {
            mLooper = new Looper(false); // 這裏的 Looper 是 native 的Looper
            Looper::setForThread(mLooper);
        }
    
    ....
    }
}
複製代碼

接着回到 ActivityThread,當執行完上面的步驟後,分別在 java 層和 native 層都各自初始化了她們的 Looper 與 MessageQueue 對象,至此,prepareMainLooper() 執行完畢。接着便會 new 一個 ActivityThread 對象,隨即經過 thread.attach(false, startSeq) 方法將 ActivityThread 與當前線程綁定,並執行 application、activity、services 等的啓動流程。這部分流程與本篇內容關係不大,故不作展開。attach()執行以後緊接着會經過這個 ActivityThread 對象的 getHandler() 方法獲取到一個 Handler 對象,並將其標賦值爲sMainThreadHandler(主線程 Handler),這個 sMainThreadHandler 僅會在 ActivityThread 初始化的時候執行一次。app

/* frameworks/base/core/java/android/app/ActivityThread.java */
public static void main(String[] args) {
  ...
  ActivityThread thread = new ActivityThread();
  thread.attach(false, startSeq);
  if (sMainThreadHandler == null) {
      sMainThreadHandler = thread.getHandler();
  }
  ...
}
複製代碼

至此,Handler 對象出如今了咱們視野。那麼 Handler 與線程 Thread 又有什麼關係呢?異步

表面上看確實沒什麼關聯,可是因爲每一個 Message 最多指定一個 Handler 來處理事件,而每一個 MessageQueue 中能夠有 N 個 Message,結合前面 MessageQueue 、Looper、與 Thread 的一一對應關係,不難推導出,Thread 與 Handler 是一對多的關係socket

sMainThreadHandler 初始化後,主線程便經過 Looper.loop()方法開始了永不停歇的循環。若是遇到某些異常致使 loop 退出,主線程也將拋出異常,程序崩潰。

/* frameworks/base/core/java/android/os/Looper.java */

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;

    for (;;) {
        ...
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }

        ...
        try {
            msg.target.dispatchMessage(msg);
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
        ...
        msg.recycleUnchecked();
    }
}

 public static @Nullable Looper myLooper() {
     return sThreadLocal.get();
 }
複製代碼

首先會經過 myLooper() 拿到保存在 ThreadLocal 中的 Looper 對象,若是該線程沒有對 Looper 初始化,則會拋出異常。拿到 Looper 對象後再經過這個 Looper 就能夠拿到 MessageQueue ,而後開始執行死循環,經過 queue.next()不斷從 MessageQueue 中取出 Message 來處理。

任一線程,只要初始化過 Looper ,就能夠調用loop()來處理事件,這是由於 Looper 是保存在 sThreadLocal 中的,主線程只是經過Looper.prepareMainLooper()已經幫咱們處理了這一過程。對於子線程來講,也能夠經過Looper.prepare()來達到這一目的,而後就能夠經過Looper.loop()開啓循環,處理事件了。

既然是一個死循環在不斷經過queue.next()讀取 message 來處理,爲何主線程不至於卡死?或者是 ANR 呢?從源碼註釋來看queue.next()是可能形成線程阻塞的。因此玄機也應該在這個方法裏面。

/* frameworks/base/core/java/android/os/MessageQueue.java */
Message next() {
    // 根據前面介紹,mPtr 能夠理解爲對 native MessageQueue 的引用
    // ptr == 0 ,說明 native MessageQueue 不可用,程序處於異常狀態,loop 也將結束
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }
    //正常狀況都會執行到這裏來
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            // 該方法用於將當前線程中任何待處理的 Binder 命令刷新到驅動中
            // 在執行可能阻塞很長時間的操做調用前調用頗有用
            Binder.flushPendingCommands();
        }

        nativePollOnce(ptr, nextPollTimeoutMillis); // 這個方法很重要

        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // 經過 barrier,找到要處理的異步消息
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    /* 到這裏說明 message 還未準備執行,須要等待一段時間,nextPollTimeoutMillis 將被傳入 nativePollOnce 讓線程休眠,待 nextPollTimeoutMillis 時間知足後再喚醒線程 */
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // 獲取到 message 的處理,正常流程會執行到這個分支
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    msg.markInUse(); // 標記已處理
                    return msg;
                }
            } else {
                nextPollTimeoutMillis = -1;
            }

            if (mQuitting) {
                /* 若是 ptr!=0,調用 nativeDestroy() disposes  message queue*/
                dispose(); 
                return null;
            }
          /* 處理註冊的IdleHandler,當MessageQueue中沒有Message時,
          Looper會調用IdleHandler作一些工做  */
         ...
        nextPollTimeoutMillis = 0;
    }
}
複製代碼

NativePollOnce 幹了啥

next()方法會一直等待到下一個消息的執行時間到來而後取出並返回。等待的期間線程會休眠,不佔用 cpu 資源,這部分邏輯正是在 nativePollOnce來處理的,nativePollOnce 最終是經過 native 層的 Looper 來執行 pollOnce方法的

/* frameworks/base/core/jni/android_os_MessageQueue.cpp */
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}


class NativeMessageQueue : public MessageQueue, public LooperCallback {
   ...
   void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
    ...
}

}
複製代碼

注意,這裏的 Looper.cpp L 是大寫的,源碼中也有 looper.cpp,路徑 frameworks/base/native/android/looper.cpp,裏面有 ALooper 對象等,其實也是對 Looper.cpp 的包裝,最終實現仍是經過 system/core/libutils/Looper.cpp 來實現的

/*system/core/libutils/include/utils/Looper.h */
public:
    enum {
        /**
         * Result from Looper_pollOnce() and Looper_pollAll():
         * The poll was awoken using wake() before the timeout expired
         * and no callbacks were executed and no other file descriptors were ready.
         */
        POLL_WAKE = -1,

        /**
         * Result from Looper_pollOnce() and Looper_pollAll():
         * One or more callbacks were executed.
         */
        POLL_CALLBACK = -2,

        /**
         * Result from Looper_pollOnce() and Looper_pollAll():
         * The timeout expired.
         */
        POLL_TIMEOUT = -3,

        /**
         * Result from Looper_pollOnce() and Looper_pollAll():
         * An error occurred.
         */
        POLL_ERROR = -4,
    };

    int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
    inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
    }


private:
   struct Request {
        int fd;
        int ident;
        int events;
        int seq;
        sp<LooperCallback> callback;
        void* data;

        void initEventItem(struct epoll_event* eventItem) const;
    };
    struct Response {
        int events;
        Request request;
    };
複製代碼

pollOnce() 方法中,先處理 Response 數組中不帶 Callback的事件,再調用了 pollInner() 方法

/* system/core/libutils/Looper.cpp */
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            /* ident >= 0,表示沒有 callback,由於 POLL_CALLBACK = -2 */
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                if (outFd != nullptr) *outFd = fd;
                if (outEvents != nullptr) *outEvents = events;
                if (outData != nullptr) *outData = data;
                return ident;
            }
        }
        // result != 0 說明
        if (result != 0) {
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            return result;
        }
        // 內部輪循
        result = pollInner(timeoutMillis);
    }
}
複製代碼

pollInner 先調用 epoll_wait() 等待事件的發生或者超時,對於 epoll_wait 的返回值 eventCount,若是小於等於0,先將 result 標記爲對應的值,直接跳轉到Done。若是 epoll 重建,也會直接跳轉 Done。進入Done標記位的代碼段,會先調用 Native 的 Handler 來處理該 Message,而後再處理 Response 數組,若是是 CALLBACK 回調,就交由具體請求方去處理(不帶 Callback 的事件 pollOnce() 已經處理了 )。

/* system/core/libutils/Looper.cpp */

static const int EPOLL_MAX_EVENTS = 16;

int Looper::pollInner(int timeoutMillis) {
    ...
    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    //epoll_event 最大爲16個
    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 
    // epoll_wait 是 linux 中 epoll 相關的系統調用
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 

    // Acquire lock.
    mLock.lock();

   // epoll重建,直接跳轉Done
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();  //  epoll重建方法,內部會使用epoll_create1系統調用
        goto Done;
    }

    // epoll事件個數小於0,發生錯誤,直接跳轉Done
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        result = POLL_ERROR; // result=-4
        goto Done;
    }

    // epoll事件個數等於0,發生超時,直接跳轉Done
    if (eventCount == 0) {
        result = POLL_TIMEOUT; // result=-3
        goto Done;
    }

    // 遍歷處理事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                awoken(); //已經喚醒了,則讀取並清空管道數據
            } 
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                //將request封裝成response對象,push到mResponses隊列
                pushResponse(events, mRequests.valueAt(requestIndex)); 
            }
        }
    }
Done: ;

    mNextMessageUptime = LLONG_MAX;
    // 處理native的message 
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { 
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                handler->handleMessage(message); //經過handler處理消息
            } 

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // 理帶有Callback()方法的Response事件
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            //處理請求的回調方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            // 清除callback引用
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
複製代碼
pollOnce返回值 說明
POLL_WAKE 表示由wake()觸發,即pipe寫端的write事件觸發
POLL_CALLBACK 表示某個被監聽fd被觸發
POLL_TIMEOUT 表示等待超時
POLL_ERROR 表示等待期間發生錯誤

關於 Linux Epoll 機制

epoll 是 Linux 內核中的一套高效 IO多路複用模型。Linux 經過 socket 睡眠隊列來管理全部等待 socket 的某個事件的 process,同時經過 wakeup 機制來異步喚醒整個睡眠隊列上等待事件的 process,通知 process 相關事件發生。

I/O 多路複用(又被稱爲「事件驅動」),首先要理解的是,操做系統爲你提供了一個功能,當你的某個 socket 可讀或者可寫的時候,它能夠給你一個通知。這樣當配合非阻塞的 socket 使用時,只有當系統通知我哪一個描述符可讀了,我纔去執行 read 操做,能夠保證每次 read 都能讀到有效數據而不作純返回 -1 和 EAGAIN 的無用功。寫操做相似。操做系統的這個功能經過 select/poll/epoll/kqueue 之類的系統調用函數來使用,這些函數均可以同時監視多個描述符的讀寫就緒情況,這樣,多個描述符的 I/O 操做都能在一個線程內併發交替地順序完成,這就叫 I/O 多路複用,這裏的「複用」指的是複用同一個線程。

Epoll事件 含義
EPOLLIN 關聯的文件描述符能夠讀(包括對端SOCKET正常關閉)
EPOLLOUT 關聯的文件描述符能夠寫
EPOLLPRI 關聯的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來)
EPOLLERR 關聯的文件描述符發生錯誤
EPOLLHUP 流套接字對等關閉鏈接,或半關閉寫。(當使用邊緣觸發監視時,此標記對於編寫簡單代碼檢測對等端是否關閉特別有用。2.6.17引入)
EPOLLET 將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的
EPOLLONESHOT 只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏
系統調用 函數 功能
epoll_create int epoll_create(int size) 用於建立一個epoll的句柄,size是指監聽的描述符個數, 如今內核支持動態擴展,該值的意義僅僅是初次分配的fd個數,後面空間不夠時會動態擴容。 當建立完epoll句柄後,佔用一個fd值
epoll_ctl int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 用於對須要監聽的文件描述符(fd)執行op操做。op操做包括EPOLL_CTL_ADD、EPOLL_CTL_DEL、EPOLL_CTL_MOD
epoll定義了一個結構體 struct epoll_event 來表達監聽句柄的訴求。epoll_event 支持的事件對應上表
epoll_wait int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) 等待事件的上報,該函數返回須要處理的事件數目,如返回0表示已超時

Epoll 的工做流程和優點,感興趣的同窗能夠參考下面的文章

小結

Java 層與 native 層各自實現了一套 handler 機制,都有各自的 Handler、Message、MessageQueue和 Looper 對象,二者的模型與工做原理類似。java 層的Looper 經過 MessageQueue 不斷調用 next() 獲取 Message 交由 Handler 去處理。若是 MessageQueue 中不存在新的事件,next() 將經過 Linux Epoll 將同時讓 java 層和 native 層的線程進入休眠態,線程被掛起,不在消耗 CPU 資源,直到新的事件觸發,fd 發生變更,Epoll 再次喚醒休眠的目標進程,繼續執行消息循環。

若是用一句簡單的話語歸納 Handler 的運行機制,那就是:

Looper 不斷獲取 MessageQueue 中的一個 Message ,而後交由 Handler 去處理。

相關文章
相關標籤/搜索