三思系列:Android的消息機制,一文吃透

三思系列是我最新的學習、總結形式,着重於:問題分析技術積累視野拓展關於三思系列java

此次,真的能夠一文吃透:android

  • Java層消息隊列的設計
  • Java層Looper分發
  • Native層消息隊列和Java層消息隊列的關係
  • Native層Looper分發
  • 消息
  • epoll

前言

做爲Android中 相當重要 的機制之一,十多年來,分析它的文章不斷,大量的內容已經被挖掘過了。因此:git

  • 已經對這一機制比較 熟稔 的讀者,在這篇文章中,看不到 新東西 了。
  • 還不太熟悉消息機制的讀者,能夠在文章的基礎上,繼續挖一挖。

可是,通過簡單的檢索和分析,大部分 的文章是圍繞:github

  • Handler,Looper,MQ的關係
  • 上層的Handler,Looper、MQ 源碼分析

展開的。單純的從這些角度學習的話,並不能 徹底理解 消息機制。web

這篇文章本質仍是 一次腦暴 ,一來 避免腦暴跑偏 ,二來幫助讀者 捋清內容脈絡 。先放出腦圖:shell

guide.png

腦暴:OS解決進程間通訊問題

程序世界中,存在着大量的 通訊 場景。搜索咱們的知識,解決 進程間通訊 問題有如下幾種方式:編程

這段內容能夠泛讀,瞭解就行,不影響往下閱讀markdown

  • 管道
    • 普通管道pipe:一種 半雙工 的通訊方式,數據只能 單向流動 ,並且只能在具備 親緣關係 的進程間使用。網絡

    • 命令流管道s_pipe: 全雙工,能夠同時雙向傳輸數據結構

    • 命名管道FIFO:半雙工 的通訊方式,容許無親緣關係 的進程間通訊。

  • 消息隊列 MessageQueue:

消息的鏈表存放在內核 中 並由 消息隊列標識符 標識。 消息隊列克服了 信號傳遞信息少管道 只能承載 無格式字節流 以及 緩衝區大小受限 等缺點。

  • 共享存儲 SharedMemory:

映射一段 能被其餘進程所訪問 的內存,這段共享內存由 一個進程建立,但 多個進程均可以訪問。 共享內存是 最快的 IPC 方式,它是針對 其餘 進程間通訊方式 運行效率低 而專門設計的。 每每與其餘通訊機制一同使用,如 信號量 配合使用,來實現進程間的同步和通訊。

  • 信號量 Semaphore:

是一個 計數器 ,能夠用來控制多個進程對共享資源的訪問。它常做爲一種 鎖機制,防止某進程正在訪問共享資源時, 其餘進程也訪問該資源,實現 資源的進程獨佔。所以,主要做爲 進程間 以及 同一進程內線程間 的同步手段。

  • 套接字Socket:

與其餘通訊機制不一樣的是,它能夠 經過網絡 ,在 不一樣機器之間 進行進程通訊。

  • 信號 signal:

用於通知接收進程 某事件已發生。機制比較複雜。

咱們能夠想象,Android之間也有大量的 進程間通訊場景,OS必須採用 至少一種 機制,以實現進程間通訊。

仔細研究下去,咱們發現,Android OS用了不止一種方式。並且,Android 還基於 OpenBinder 開發了 Binder 用於 用戶空間 內的進程間通訊。

關於 爲何不直接使用Linux中現有的進程間通訊方式 ,能夠看看這篇知乎問答

這篇文章 也簡單探討了 "內核空間內的消息隊列"

這裏咱們留一個問題之後探究:

Android 有沒有使用 Linux內核中的MessageQueue機制 幹事情

基於消息隊列的消息機制設計有不少優點,Android 在不少通訊場景內,採用了這一設計思路。

消息機制的三要素

無論在哪,咱們談到消息機制,都會有這三個要素:

  • 消息隊列
  • 消息循環(分發)
  • 消息處理

消息隊列 ,是 消息對象 的隊列,基本規則是 FIFO

消息循環(分發), 基本是通用的機制,利用 死循環 不斷的取出消息隊列頭部的消息,派發執行

消息處理,這裏不得不提到 消息 有兩種形式:

  • Enrichment 自身信息完備
  • Query-Back 自身信息不完備,須要回查

這二者的取捨,主要看系統中 生成消息的開銷回查信息的開銷 二者的博弈。

在信息完備後,接收者便可處理消息。

Android Framework中的消息隊列

Android 的Framework中的消息隊列有兩個:

  • Java層 frameworks/base/core/java/android/os/MessageQueue.java
  • Native層 frameworks/base/core/jni/android_os_MessageQueue.cpp

Java層的MQ並非 List 或者 Queue 之類的 Jdk內的數據結構實現。

Native層的源碼我下載了一份 Android 10 的 源碼 ,並不長,你們能夠完整的讀一讀。

並不難理解:用戶空間 會接收到來自 內核空間消息 , 從 下圖 咱們可知,這部分消息先被 Native層 獲知,因此:

  • 經過 Native層 創建消息隊列,它擁有消息隊列的各類基本能力
  • 利用JNI 打通 Java層Native層Runtime屏障,在Java層 映射 出消息隊列
  • 應用創建在Java層之上,在Java層中實現消息的 分發處理

PS:在Android 2.3那個時代,消息隊列的實現是在Java層的,至於10年前爲什麼改爲了 native實現, 推測和CPU空轉有關,筆者沒有繼續探究下去,若是有讀者瞭解,但願能夠留言幫我解惑。

PS:還有一張經典的 系統啓動架構圖 沒有找到,這張圖更加直觀

代碼解析

咱們簡單的 閱讀、分析 下Native中的MQ源碼

Native層消息隊列的建立:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
複製代碼

很簡單,建立一個Native層的消息隊列,若是建立失敗,拋異常信息,返回0,不然將指針轉換爲Java的long型值返回。固然,會被Java層的MQ所持有

NativeMessageQueue 類的構造函數

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}
複製代碼

這裏的Looper是native層Looper,經過靜態方法 Looper::getForThread() 獲取對象實例,若是未獲取到,則建立實例,並經過靜態方法設置。

看一下Java層MQ中會使用到的native方法

class MessageQueue {
    private long mPtr; // used by native code

    private native static long nativeInit();

    private native static void nativeDestroy(long ptr);

    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/

    private native static void nativeWake(long ptr);

    private native static boolean nativeIsPolling(long ptr);

    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
}
複製代碼

對應簽名:

static const JNINativeMethod gMessageQueueMethods[] = {
    /* name, signature, funcPtr */
    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
    { "nativeSetFileDescriptorEvents", "(JII)V",
            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};
複製代碼

mPtr 是Native層MQ的內存地址在Java層的映射。

Java層判斷MQ是否還在工做:

private boolean isPollingLocked() {
    // If the loop is quitting then it must not be idling.
    // We can assume mPtr != 0 when mQuitting is false.
    return !mQuitting && nativeIsPolling(mPtr);
}
複製代碼
static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    return nativeMessageQueue->getLooper()->isPolling();
}
複製代碼
/** * Returns whether this looper's thread is currently polling for more work to do. * This is a good signal that the loop is still alive rather than being stuck * handling a callback. Note that this method is intrinsically racy, since the * state of the loop can change before you get the result back. */
bool isPolling() const;
複製代碼

喚醒 Native層MQ:

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    mLooper->wake();
}
複製代碼

Native層Poll:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
複製代碼

這裏比較重要,咱們先大概看下 Native層的Looper是 如何分發消息

//Looper.h

int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

//實現

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

複製代碼

先處理Native層滯留的Response,而後調用pollInner。這裏的細節比較複雜,稍後咱們在 Native Looper解析 中進行腦暴。

先於此處細節分析,咱們知道,調用一個方法,這是阻塞的 ,用大白話描述即在方法返回前,調用者在 等待

Java層調動 native void nativePollOnce(long ptr, int timeoutMillis); 過程當中是阻塞的。

此時咱們再閱讀下Java層MQ的消息獲取:代碼比較長,直接在代碼中進行要點註釋。

在看以前,咱們先單純從 TDD的角度 思考下,有哪些 主要場景固然,這些場景不必定都合乎Android現有的設計

  • 消息隊列是否在工做中
    • 工做中,指望返回消息
    • 不工做,指望返回null
  • 工做中的消息隊列 當前 是否有消息
    • 不存在消息,阻塞 or 返回null?-- 若是返回null,則在外部須要須要 保持空轉 或者 喚醒機制,以支持正常運做。從封裝角度出發,應當 保持空轉,本身解決問題
    • 存在消息
      • 特殊的 內部功能性消息,指望MQ內部自行處理
      • 已經處處理時間的消息, 返回消息
      • 未處處理時間,若是都是排過序的,指望 空轉保持阻塞 or 返回靜默並設置喚醒? 按照前面的討論,是指望 保持空轉
class MessageQueue {
    Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        // 1. 若是 native消息隊列指針映射已經爲0,即虛引用,說明消息隊列已經退出,沒有消息了。
        // 則返回 null
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        
        // 2. 死循環,當爲獲取到須要 `分發處理` 的消息時,保持空轉
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            // 3. 調用native層方法,poll message,注意,消息還存在於native層
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message. Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                
                //4. 若是發現 barrier ,即同步屏障,則尋找隊列中的下一個可能存在的異步消息
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier. Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                
                if (msg != null) {
                    // 5. 發現了消息,
                    // 若是是尚未到約定時間的消息,則設置一個 `下次喚醒` 的最大時間差
                    // 不然 `維護單鏈表信息` 並返回消息
                    
                    if (now < msg.when) {
                        // Next message is not ready. Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // 尋找到了 `處處理時間` 的消息。 `維護單鏈表信息` 並返回消息
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // 處理 是否須要 中止消息隊列 
                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // 維護 接下來須要處理的 IDLEHandler 信息,
                // 若是沒有 IDLEHandler,則直接進入下一輪消息獲取環節
                // 不然處理 IDLEHandler
                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run. Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // 處理 IDLEHandler
            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }
}
複製代碼

Java層壓入消息

這就比較簡單了,當消息自己合法,且消息隊列還在工做中時。 依舊從 TDD角度 出發:

  • 若是消息隊列沒有頭,指望直接做爲頭
  • 若是有頭
    • 消息處理時間 先於 頭消息 或者是須要當即處理的消息,則做爲新的頭
    • 不然按照 處理時間 插入到合適位置
boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }

        synchronized (this) {
            if (msg.isInUse()) {
                throw new IllegalStateException(msg + " This message is already in use.");
            }

            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue. Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
複製代碼

同步屏障 barrier後面單獨腦暴, 其餘部分就先不看了

Java層消息分發

這一節開始,咱們腦暴消息分發,前面咱們已經看過了 MessageQueue ,消息分發就是 不停地MessageQueue 中取出消息,並指派給處理者。 完成這一工做的,是Looper。

在前面,咱們已經知道了,Native層也有Looper,可是不難理解

  • 消息隊列須要 橋樑 連通 Java層和Native層
  • Looper只須要 在本身這一端,處理本身的消息隊列分發便可

因此,咱們看Java層的消息分發時,看Java層的Looper便可。

關注三個主要方法:

  • 出門上班
  • 工做
  • 下班回家

出門上班 prepare

class Looper {

    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }
}
複製代碼

這裏有兩個注意點:

  • 已經出了門,除非再進門,不然無法再出門了。一樣,一個線程有一個Looper就夠了,只要它還活着,就不必再建一個。
  • 責任到人,一個Looper服務於一個Thread,這須要 註冊 ,表明着 某個Thread 已經由本身服務了。利用了ThreadLocal,由於多線程訪問集合,`總須要考慮

競爭,這很不人道主義,乾脆分家,每一個Thread操做本身的內容互不干擾,也就沒有了競爭,因而封裝了 ThreadLocal`

上班 loop

注意工做性質是 分發,並不須要本身處理

  • 沒有 註冊 天然就找不到負責這份工做的人。
  • 已經在工做了就不要催,催了會致使工做出錯,順序出現問題。
  • 工做就是不斷的取出 老闆-- MQ指令 -- Message,並交給 相關負責人 -- Handler 去處理,並記錄信息
  • 007,不眠不休,當MQ不再發出消息了,沒活幹了,你們都散了吧,下班回家
class Looper {
    public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        if (me.mInLoop) {
            Slog.w(TAG, "Loop again would have the queued messages be executed"
                    + " before this one completed.");
        }

        me.mInLoop = true;
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        // Allow overriding a threshold with a system prop. e.g.
        // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
        final int thresholdOverride =
                SystemProperties.getInt("log.looper."
                        + Process.myUid() + "."
                        + Thread.currentThread().getName()
                        + ".slow", 0);

        boolean slowDeliveryDetected = false;

        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }
            // Make sure the observer won't change while processing a transaction.
            final Observer observer = sObserver;

            final long traceTag = me.mTraceTag;
            long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
            long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
            if (thresholdOverride > 0) {
                slowDispatchThresholdMs = thresholdOverride;
                slowDeliveryThresholdMs = thresholdOverride;
            }
            final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
            final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);

            final boolean needStartTime = logSlowDelivery || logSlowDispatch;
            final boolean needEndTime = logSlowDispatch;

            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }

            final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
            final long dispatchEnd;
            Object token = null;
            if (observer != null) {
                token = observer.messageDispatchStarting();
            }
            long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
            try {
                //注意這裏
                msg.target.dispatchMessage(msg);
                if (observer != null) {
                    observer.messageDispatched(token, msg);
                }
                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
            } catch (Exception exception) {
                if (observer != null) {
                    observer.dispatchingThrewException(token, msg, exception);
                }
                throw exception;
            } finally {
                ThreadLocalWorkSource.restore(origWorkSource);
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            if (logSlowDelivery) {
                if (slowDeliveryDetected) {
                    if ((dispatchStart - msg.when) <= 10) {
                        Slog.w(TAG, "Drained");
                        slowDeliveryDetected = false;
                    }
                } else {
                    if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
                            msg)) {
                        // Once we write a slow delivery log, suppress until the queue drains.
                        slowDeliveryDetected = true;
                    }
                }
            }
            if (logSlowDispatch) {
                showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
            }

            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
                Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }

            msg.recycleUnchecked();
        }
    }
}

複製代碼

下班 quit/quitSafely

這是比較粗暴的行爲,MQ離開了Looper就無法正常工做了,即下班即意味着辭職

class Looper {
    public void quit() {
        mQueue.quit(false);
    }
    
    public void quitSafely() {
        mQueue.quit(true);
    }
}
複製代碼

消息處理 Handler

這裏就比較清晰了。API基本分爲如下幾類:

面向使用者:

  • 建立Message,經過Message的 享元模式
  • 發送消息,注意postRunnable也是一個消息
  • 移除消息,
  • 退出等

面向消息處理:

class Handler {
    /** * Subclasses must implement this to receive messages. */
    public void handleMessage(@NonNull Message msg) {
    }

    /** * Handle system messages here. * Looper分發時調用的API */
    public void dispatchMessage(@NonNull Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }
}
複製代碼

若是有 Handler callback,則交給callback處理,不然本身處理,若是沒覆寫 handleMessage ,消息至關於被 drop 了。

消息發送部分能夠結合下圖梳理:

send_msg.webp


階段性小結,至此,咱們已經對 Framework層的消息機制 有一個完整的瞭解了。 前面咱們梳理了:

  • Native層 和 Java層均有消息隊列,而且經過JNI和指針映射,存在對應關係
  • Native層 和 Java層MQ 消息獲取時的大體過程
  • Java層 Looper 如何工做
  • Java層 Handler 大體概覽

根據前面梳理的內容,能夠總結:從 Java Runtime 看:

  • 消息隊列機制服務於 線程級別,即一個線程有一個工做中的消息隊列便可,固然,也能夠沒有。

即,一個Thread 至多有 一個工做中的Looper。

  • Looper 和 Java層MQ 一一對應
  • Handler 是MQ的入口,也是 消息 的處理者
  • 消息-- Message 應用了 享元模式,自身信息足夠,知足 自洽,建立消息的開銷性對較大,因此利用享元模式對消息對象進行復用。

下面咱們再繼續探究細節,解決前面語焉不詳處留下的疑惑:

  • 消息的類型和本質
  • Native層Looper 的pollInner

消息的類型和本質

message中的幾個重要成員變量:

class Message {
   
    public int what;
    
    public int arg1;
    
    public int arg2;
    
    public Object obj;

    public Messenger replyTo;

    /*package*/ int flags;
    
    public long when;

    /*package*/ Bundle data;

    /*package*/ Handler target;

    /*package*/ Runnable callback;

}
複製代碼

其中 target是 目標,若是沒有目標,那就是一個特殊的消息: 同步屏障barrier

what 是消息標識 arg1 和 arg2 是開銷較小的 數據,若是 不足以表達信息 則能夠放入 Bundle data 中。

replyTo 和 obj 是跨進程傳遞消息時使用的,暫且不看。

flags 是 message 的狀態標識,例如 是否在使用中是不是同步消息

上面提到的同步屏障,即 barrier,其做用是攔截後面的 同步消息 不被獲取,在前面閱讀Java層MQ的next方法時讀到過。

咱們還記得,next方法中,使用死循環,嘗試讀出一個知足處理條件的消息,若是取不到,由於死循環的存在,調用者(Looper)會被一直阻塞。

此時能夠印證一個結論,消息按照 功能分類 能夠分爲 三種

  • 普通消息
  • 同步屏障消息
  • 異步消息

其中同步消息是一種內部機制。設置屏障以後須要在合適時間取消屏障,不然會致使 普通消息永遠沒法被處理,而取消時,須要用到設置屏障時返回的token。

Native層Looper

相信你們都對 Native層 的Looper產生興趣了,想看看它在Native層都幹些什麼。

對完整源碼感興趣的能夠看 這裏 ,下面咱們節選部分進行閱讀。

前面提到了Looper的pollOnce,處理完擱置的Response以後,會調用pollInner獲取消息

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }

    // Poll.
    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    
    //注意 1
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // Acquire lock.
    mLock.lock();

// 注意 2
    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = ALOOPER_POLL_ERROR;
        goto Done;
    }

// 注意 3
    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;
    }

//注意 4
    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

// 注意 5
    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes. Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = ALOOPER_POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

//注意 6
    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == ALOOPER_POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd);
            }
            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = ALOOPER_POLL_CALLBACK;
        }
    }
    return result;
}
複製代碼

上面標記了注意點

  • 1 epoll機制,等待 mEpollFd 產生事件, 這個等待具備超時時間。
  • 2,3,4 是等待的三種結果,goto 語句能夠直接跳轉到 標記
  • 2 檢測poll 是否出錯,若是有,跳轉到 Done
  • 3 檢測pool 是否超時,若是有,跳轉到 Done
  • 4 處理epoll後全部的事件
  • 5 處理 pending 消息的回調
  • 6 處理 全部 Response的回調

而且咱們能夠發現返回的結果有如下幾種:

  • ALOOPER_POLL_CALLBACK

pending message 或者 request.ident 值爲 ALOOPER_POLL_CALLBACK 的 Response被處理了。 若是沒有:

  • ALOOPER_POLL_WAKE 正常喚醒
  • ALOOPER_POLL_ERROR epoll錯誤
  • ALOOPER_POLL_TIMEOUT epoll超時

查找了一下枚舉值:

ALOOPER_POLL_WAKE = -1,
ALOOPER_POLL_CALLBACK = -2,
ALOOPER_POLL_TIMEOUT = -3,
ALOOPER_POLL_ERROR = -4
複製代碼

階段性小結, 咱們對 消息Native層的pollInner 進行了一次腦暴,引出了epoll機制。

其實Native層的 Looper分發還有很多值得腦暴的點,但咱們先緩緩,已經火燒眉毛的要對 epoll機制進行腦暴了。


##腦暴:Linux中的I/O模型

這部份內容,推薦一篇文章:使用 libevent 和 libev 提升網絡應用性能——I/O模型演進變化史 做者 hguisu

PS:本段中,存在部分圖片直接引用自該文,我偷了個懶,沒有去找原版內容並標記出處

阻塞I/O模型圖:在調用recv()函數時,發生在內核中等待數據和複製數據的過程

阻塞I/O模型

實現很是的 簡單,可是存在一個問題,阻塞致使線程沒法執行其餘任何計算,若是是在網絡編程背景下,須要使用多線程提升處理併發的能力。

注意,不要用 Android中的 點擊屏幕等硬件被觸發事件 去對應這裏的 網絡併發,這是兩碼事。

若是採用了 多進程 或者 多線程 實現 併發應答,模型以下:

應答模型

到這裏,咱們看的都是 I/O 阻塞 模型。

腦暴,阻塞爲調用方法後一直在等待返回值,線程內執行的內容就像 卡頓 在這裏。

若是要消除這種卡頓,那就不能調用方法等待I/O結果,而是要 當即返回

舉個例子:

  • 去西裝店定製西裝,肯定好款式和尺寸後,你坐在店裏一直等着,等到作好了拿給你,這就是阻塞型的,這能等死你;
  • 去西裝店定製西裝,肯定好款式和尺寸後,店員告訴你別乾等着,好多天呢,等你有空了來看看,這就是非阻塞型的。

改變爲非阻塞模型後,應答模型以下:

應答模型——非阻塞

不難理解,這種方式須要顧客去 輪詢 。對客戶不友好,可是對店家但是一點損失都沒有,還讓等候區沒那麼擠了。

有些西裝店進行了改革,對客戶更加友好了:

去西裝店定製西裝,肯定好款式和尺寸後,留下聯繫方式,等西服作好了聯繫客戶,讓他來取。

這就變成了 select or poll 模型:

select模型

注意:進行改革的西裝店須要增長一個員工,圖中標識的用戶線程,他的工做是:

  • 在前臺記錄客戶訂單和聯繫方式
  • 拿記錄着 訂單 的小本子去找製做間,不斷檢查 訂單是否完工,完工的就能夠提走並聯系客戶了。

並且,他去看訂單完工時,沒法在前臺記錄客戶信息,這意味他 阻塞 了,其餘工做只能先擱置着。

這個作法,對於製做間而言,和 非阻塞模型 並無多大區別。還增長了一個店員,可是,用 一個店員 就解決了以前 不少店員 都會跑去 製做間 幫客戶問"訂單好了沒有?" 的問題。

值得一提的是,爲了提升服務質量,這個員工每次去製做間詢問一個訂單時,都須要記錄一些信息:

  • 訂單完成度詢問時,是否被應答;
  • 應答有沒有說謊;等

有些店對每種不一樣的考覈項均準備了記錄冊,這和 select模型相似

有些店只用一本記錄冊,可是冊子上能夠利用表格記錄各類考覈項,這和 poll 模型相似

select 模型 和 poll 模型的近似度比較高。

沒多久,老闆就發現了,這個店員的工做效率有點低下,他每次都要拿着一本訂單簿,去把訂單都問一遍,倒不是員工不勤快,是這個模式有點問題。

因而老闆又進行了改革:

  • 前臺製做間 之間加一個送信管道。
  • 製做間有進度須要彙報了,就送一份信到前臺,信上寫着訂單號。
  • 前臺員工直接去問對應的訂單。

這就變成了 epoll模型解決了 select/poll 模型的遍歷效率問題。

這樣改革後,前臺員工就再也不須要按着訂單簿從上到下挨個問了。提升了效率,前臺員工只要無事發生,就能夠優雅的划水了。

咱們看一下NativeLooper的構造函數:

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    int result = pipe(wakeFds);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);

    mWakeReadPipeFd = wakeFds[0];
    mWakeWritePipeFd = wakeFds[1];

    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
            errno);

    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
            errno);

    // Allocate the epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
            errno);
}
複製代碼

總結

相信看到這裏,你們已經本身悟透了各類問題。按照慣例,仍是要總結下,由於 這篇是腦暴,因此 思緒 是比較 跳躍 的,內容先後關係不太明顯。

咱們結合一個問題來點明內容先後關係。

Java層 Looper和MQ 會什麼使用了死循環可是 不會"阻塞"UI線程 / 沒形成ANR / 依舊能夠響應點擊事件

  • Android是基於 事件驅動 的,並創建了 完善的 消息機制
  • Java層的消息機制只是一個局部,其負責的就是面向消息隊列,處理 消息隊列管理消息分發消息處理
  • Looper的死循環保障了 消息隊列消息分發 一直處於有效運行中,不循環就中止了分發。
  • MessageQueue的 死循環 保障了 Looper能夠獲取有效的消息,保障了Looper 只要有消息,就一直運行,發現有效消息,就跳出了死循環。
  • 並且Java層MessageQueue在 next() 方法中的死循環中,經過JNI調用了 Native層MQ的 pollOnce,驅動了Native層去處理Native層消息
  • 值得一提的是,UI線程處理的事情也都是基於消息的,不管是更新UI仍是響應點擊事件等。

因此,正是Looper 進行loop()以後的死循環,保障了UI線程的各項工做正常執行。

再說的ANR,這是Android 確認主線程 消息機制 正常健康 運轉的一種檢測機制。

由於主線程Looper須要利用 消息機制 驅動UI渲染和交互事件處理, 若是某個消息的執行,或者其衍生出的業務,在主線程佔用了大量的時間,致使主線程長期阻塞,會影響用戶體驗。

因此ANR檢測採用了一種 埋定時炸彈 的機制,必須依靠Looper的高效運轉來消除以前裝的定時炸彈。而這種定時炸彈比較有意思,被發現了纔會炸

在說到 響應點擊事件,相似的事件老是從硬件出發的,在到內核,再進程間通訊到用戶空間,這些事件以消息的形式存在於Native層,通過處理後,表現出:

ViewRootImpl收到了InputManager的輸入,並進行了事件處理


這裏咱們借用一張圖總結整個消息機制流程:

圖片來自 《Android7.0 MessageQueue詳解》 做者 Gaugamela

PS:這篇文章寫得很長,內容長,耗時也長,大約花費了10天的時間,其中還有很多內容寫得未能盡興。例如: "Java層在哪些狀況下利用JNI調取Native層的喚醒,爲何這麼幹?"等等。

可是考慮到篇幅,決定再也不往下挖了。

相關文章
相關標籤/搜索