Handler機制實現原理(二)MessageQueue的源碼分析

看源碼有一段時間了,愈來愈能從代碼中感受到工程師們滿滿的激情,不管是基礎Java語法仍是高級的語言特性都被髮揮的淋漓盡致,寫的恰到好處。分析源碼的過程,未嘗不是與大神們進行靈魂溝的過程。數組

MessageQueue屬於低層類且依附於Looper,Looper外其餘類不該該單首創建它,若是想使用MessageQueue能夠從Looper類中獲得它。緩存

消息隊列存儲原理

再上一章Message源碼分析中咱們知道了Message內部維持了一個鏈表緩存池來避免重複建立Message對象形成的額外消耗,以靜態屬性private static Message sPool做爲緩存池鏈表頭,Message next;做爲鏈表的next指針。bash

有意思的是Message對象中next指針的不止用於鏈表緩存池,在MessageQueue中也採用一樣的方法存儲消息對象:異步

public final class MessageQueue {
    ......
    Message mMessages;
    ......
}
複製代碼

上面代碼中的mMessages就是MessageQueue用來維持消息隊列的鏈表頭,至於它是如何存儲的,後面再說。oop

使用JNI實現的native方法

MessageQueue的源碼調用了多個的C/C++方法,這類方法使用前都會用關鍵字native聲明一下。源碼分析

這些方法所屬的底層C++代碼建立了屬於native層本身的NativeMessageQueueNativeLooper消息模型。它們對Java層做用其實就是控制線程是否阻塞post

當咱們想要從MessageQueue中取出消息時,碰巧隊列是空的或即將取出的消息還沒到被處理時間,那麼咱們就須要將線程阻塞掉等待隊列中有消息時再取出。ui

下面就是MessageQueue中的native方法:this

// 初始化
    private native static long nativeInit();
    // 註銷
    private native static void nativeDestroy(long ptr);
    // 讓線程阻塞timeoutMillis毫秒
    private native void nativePollOnce(long ptr, int timeoutMillis); 
    // 馬上喚醒線程
    private native static void nativeWake(long ptr);
    // 線程是否處於阻塞狀態
    private native static boolean nativeIsPolling(long ptr);
    // 設置文件描述符
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
複製代碼

文件描述符與ePoll指令spa

消息隊列裏控制線程阻塞狀態的native代碼本質是用Linux指令ePoll完成的,在這以前須要先了解一點,Linux內核依靠「文件描述符(file descriptor)」來完成全部的文件讀寫訪問操做,它更像是一個文件的索引值。而咱們用到的ePoll指令就是用來監聽文件描述符是否有可I/O操做的。(這都是一些Linux相關知識)

建立與銷燬

上面說了MessageQueue是依附於Looper的,因此本節分析的建立與銷燬方法其實都是給Looper調用的,MessageQueue只提供了一個帶參的構造方法來建立對象:

// 當前MessageQueue是否能夠退出
    private final boolean mQuitAllowed;

    // native層中NativeMessageQueue隊列指針的地址
    // mPtr等於0時表示退出隊列
    private long mPtr;

    // native層代碼,建立native層中NativeMessageQueue
    private native static long nativeInit();

    // 構造方法
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        // 執行native層方法
        mPtr = nativeInit();
    }
複製代碼

構造方法中boolean quitAllowed參數的意思是當前這個MessageQueue是否能夠手動退出,爲何要控制可否手動退出呢?這裏先說一個結論:Android 系統中要求UI線程不可手動退出,而其餘Worker線程則所有都是能夠的。(具體的操做在Looper和UI線程中)

那麼退出是什麼意思呢?

退出就是當前這個MessageQueue中止服務,將隊列中已存在的全部消息所有清空,看看源碼中退出方法都作了什麼:

// 是否已經退出了
    private boolean mQuitting;

    // native方法,退出隊列
    private native static void nativeDestroy(long ptr);


    // 退出隊列
    void quit(boolean safe) {
        
        // 若是不是能夠手動退出的,拋出異常
        if (!mQuitAllowed) {
            throw new IllegalStateException("Main thread not allowed to quit.");
        }

        synchronized (this) {
            // 若是已經退出了直接結束方法
            if (mQuitting) {
                return;
            }
            // 標記爲已退出狀態
            mQuitting = true;

            // 兩種清除隊列中消息的方法
            if (safe) {
                removeAllFutureMessagesLocked();
            } else {
                removeAllMessagesLocked();
            }

            // 註銷
            nativeWake(mPtr);
        }
    }

複製代碼

方法quit(boolean safe)中的參數safe決定了到底執行哪一種清除消息的方法:

  • removeAllMessagesLocked(),簡單暴力直接清除掉隊列中全部的消息。
  • removeAllFutureMessagesLocked(),清除掉可能尚未被處理的消息。

removeAllMessagesLocked()方法的邏輯很簡單,從隊列頭中取消息,有一個算一個,所有拿出來回收掉。:

private void removeAllMessagesLocked() {
        Message p = mMessages;
        while (p != null) {
            Message n = p.next;
            p.recycleUnchecked();
            p = n;
        }
        mMessages = null;
    }
複製代碼

然而,removeAllFutureMessagesLocked()方法的邏輯稍微多一點:

private void removeAllFutureMessagesLocked() {
    
        // 獲取當前系統時間
        final long now = SystemClock.uptimeMillis();
        Message p = mMessages;
        if (p != null) {
            // 判斷當前消息對象的預處理時間是否晚於當前時間
            if (p.when > now) {
                // 若是當前消息對象的預處理時間晚於當前時間直接所有暴力清除
                removeAllMessagesLocked();
            } else {
                Message n;
                // 若是當前消息對象的預處理時間並不晚於當前時間
                // 說明有可能這個消息正在被分發處理
                // 那麼就跳過這個消息日後找晚於當前時間的消息
                for (;;) {
                    n = p.next;
                    if (n == null) {
                        return;
                    }
                    if (n.when > now) {
                        // 若是找到了晚於當前時間的消息結束循環
                        break;
                    }
                    p = n;
                }
                p.next = null;
                do {
                    // n就是那個晚於當前時間的消息
                    // 從n開始以後的消息所有回收
                    p = n;
                    n = p.next;
                    p.recycleUnchecked();
                } while (n != null);
            }
        }
    }

複製代碼

這麼看來這個方法名字起的還挺靠譜的,很好的解釋了是要刪除尚未被處理的消息。

消息入隊管理enqueueMessage()方法

// 消息入隊
    // 參數when就是此消息應該被處理的時間
    boolean enqueueMessage(Message msg, long when) {
        // 若是此消息的target也就是宿主handler是空的拋異常
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }

        // 若是此消息是in-use狀態拋異常,in-use的消息不可拿來使用
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }

        //上鎖
        synchronized (this) {

            // 若是當前MessageQueue已經退出了拋異常並釋放掉此消息
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
            
            // 將消息標記爲in-use狀態
            msg.markInUse();
            // 設置應該被處理的時間
            msg.when = when;
            // 拿到隊列頭
            Message p = mMessages;


            // 是否須要喚醒線程
            boolean needWake;

            // p等於空說明隊列是空的
            // when等於0表示強制把此消息插入隊列頭部,最早處理
            // when小於隊列頭的when說明此消息應該被處理的時間比隊列中第一個要處理的時間還早
            // 以上狀況知足任意一種直接將消息插入隊列頭部
            if (p == null || when == 0 || when < p.when) {
                // 將消息插入隊列頭部
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                
                // 線程已經被阻塞&&消息存在宿主Handler&&消息是異步的
                needWake = mBlocked && p.target == null && msg.isAsynchronous();

            
                //若是上述條件都不知足就要按照消息應該被處理的時間插入隊列中    
                Message prev;
                for (;;) {
                    // 兩根相鄰的引用一前一後從隊列頭開始依次向後移動
                    prev = p;
                    p = p.next;
                    // 若是隊列到尾部了或者找到了處理時間早於自身的消息就結束循環
                    if (p == null || when < p.when) {
                        break;
                    }

                    // 若是入隊的消息是異步的而排在它前面的消息有異步的就不須要喚醒
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                // 將新消息插在這一前一後兩個引用中間,完成入隊
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // 判斷是否須要喚醒線程
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

複製代碼

總結一下消息入隊的邏輯大體分爲以下幾步:

  1. 檢查消息合法性,包括宿主target是否爲空,是否爲in-use狀態,隊列是否還存活。

  2. 若是知足條件【隊列爲空、when等於0、此消息應被處理的時間比隊列中第一個要處理的時間還早】中的任意一個直接將此消息插在隊列頭部最早被處理。

  3. 若是以上三個條件均不知足,那麼就從頭遍歷隊列根據被處理時間找到它的位置。

同步消息攔截器

除了enqueueMessage()方法能夠向隊列中添加消息外,還有一個postSyncBarrier()方法也能夠向隊列添加消息,但它不是添加普通的消息,咱們將它添加的特殊Message稱爲同步消息攔截器

顧名思義,該攔截器只會影響同步消息。複習一下上節中分析到的東西,咱們默認發送的消息都是同步的,只有某個Message被調用了setAsynchronous(true)後纔是異步消息。同步消息受隊列限制依次有序的等待處理,異步消息也不受限制。

消息攔截器與普通消息的差別在於攔截器的target是空的,正常咱們經過enqueueMessage()方法入隊的消息因爲限制target是不能爲空的。

// 標識攔截器的token
    private int mNextBarrierToken;

    private int postSyncBarrier(long when) {

        synchronized (this) {
            // 獲得攔截器token
            final int token = mNextBarrierToken++;
            // 實例化一個消息對象
            final Message msg = Message.obtain();
            // 將對象設置爲in-use狀態
            msg.markInUse();
            // 設置時間
            msg.when = when;
            // 將token存於消息的經常使用屬性arg1中
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;

            // 若是when不等於0就在隊列中按時間找到它的位置
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            // 若是prev不等於空就把攔截器插入
            // 若是prev等於空直接插入隊列頭部
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
          
            // 攔截器入隊成功,返回對應token
            return token;
        }
    }
複製代碼

整體來講添加攔截器的方法跟正常消息入隊差很少,值得一提的就是Message的target是空的,而後arg1保存着攔截器的惟一標識token

token的做用是找到對應的攔截器刪除,看看刪除攔截器的方法。

public void removeSyncBarrier(int token) {
        synchronized (this) {
            Message prev = null;
            Message p = mMessages;
            // 遍歷隊列找到指定攔截器
            // 查找條件:target爲空,arg1等於指定token值
            while (p != null && (p.target != null || p.arg1 != token)) {
                prev = p;
                p = p.next;
            }
            // 若是p等於空說明沒找到
            if (p == null) {
                throw new IllegalStateException("The specified message queue synchronization "
                        + " barrier token has not been posted or has already been removed.");
            }

            // 是否須要喚醒線程
            final boolean needWake;

            // 在隊列中移除掉攔截器
            if (prev != null) {
                prev.next = p.next;
                // 若是prev不等於空說明攔截器前面還有別的消息,就不須要喚醒
                needWake = false;
            } else {
                mMessages = p.next;
                // 攔截器在隊列頭部,移除它以後若是隊列空了或者它的下一個消息是個正常消息就須要喚醒
                needWake = mMessages == null || mMessages.target != null;
            }

            // 回收
            p.recycleUnchecked();

            // 判斷是否須要喚醒
            if (needWake && !mQuitting) {
                nativeWake(mPtr);
            }
        }
    }

複製代碼

隊列空閒處理器IdleHandler

因爲在從隊列中取出消息時隊裏多是空的,這時候就會阻塞線程等待消息到來。每次隊列中沒有消息而進入的阻塞狀態,咱們叫它爲「空閒狀態」。

講道理實際使用中隊列空閒狀態的狀況仍是很常見的,爲了更好的利用資源,也爲了更好的掌握線程的狀態,開發人員就設計了這麼一個「隊列空閒處理器IdleHandler」。

IdleHandler是MessageQueue類下的一個子接口,只包含了一個方法:

public static interface IdleHandler {
        /**
         * 當線程的MessageQueue等待更多消息時會調用該方法。
         *
         * 返回值:true表明只執行一次,false表明會一直執行它
         */
        boolean queueIdle();
    }
複製代碼

MessageQueue爲咱們提供了添加和刪除IdleHandler的方法:

//使用一個ArrayList存儲IdleHandler
   private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();

   // 添加一個IdleHandler
   public void addIdleHandler(@NonNull IdleHandler handler) {
        if (handler == null) {
            throw new NullPointerException("Can't add a null IdleHandler");
        }
        synchronized (this) {
            mIdleHandlers.add(handler);
        }
    }

    // 刪除一個IdleHandler
    public void removeIdleHandler(@NonNull IdleHandler handler) {
        synchronized (this) {
            mIdleHandlers.remove(handler);
        }
    }

複製代碼

消息出隊管理next()方法

next()方法很長,先大體看一下源碼:

private IdleHandler[] mPendingIdleHandlers;

    Message next() {

        // mPtr是從native方法中獲得的NativeMessageQueue地址
       // 若是mPtr等於0說明隊列不存在或被清除掉了
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        // 待處理的IdleHandler數量,由於表明數量,因此只有第一次初始化時爲-1
        int pendingIdleHandlerCount = -1;


        // 線程將被阻塞的時間
        // -1:一直阻塞
        // 0:不阻塞
        // >0:阻塞nextPollTimeoutMillis 毫秒
        int nextPollTimeoutMillis = 0;


        // 開始死循環,下面的代碼都是在循環中,賊長!
        for (;;) {

            // 若是nextPollTimeoutMillis 不等於0說明要阻塞線程了
            if (nextPollTimeoutMillis != 0) {
                // 爲即將長時間阻塞作準備把該釋放的對象都釋放了
                Binder.flushPendingCommands();
            }

            // 阻塞線程操做
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // 獲得當前時間
                final long now = SystemClock.uptimeMillis();

                Message prevMsg = null;
                Message msg = mMessages;

                // 判斷隊列頭是否是同步攔截器
                if (msg != null && msg.target == null) {
                    // 若是是攔截器就向後找一個異步消息
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
    
                // 判斷隊列是否有能夠取出的消息
                if (msg != null) {

                    if (now < msg.when) {
                        // 若是待取出的消息尚未到應該被處理的時間就讓線程阻塞到應該被處理的時間
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // 直接就能取出消息,因此不用阻塞線程
                        mBlocked = false;

                        將消息從隊列中剝離出來
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        // 讓消息脫離隊列
                        msg.next = null;

                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        
                        // 設置爲in-use狀態
                        msg.markInUse();
                        // 返回取出的消息,結束循環,結束next()方法
                        return msg;
                    }
                } else {
                    // 隊列中沒有可取出的消息,nextPollTimeoutMillis 等於-1讓線程一直阻塞
                    nextPollTimeoutMillis = -1;
                }

                // 若是隊列已經退出了直接註銷和結束方法
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // IdleHandler初始化爲-1,因此在本循環中該條件成立的次數 <= 1
                if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) {
                    // 獲得IdleHandler的數量
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }

                // pendingIdleHandlerCount 小於或等於0說明既沒有合適的消息也沒有合適的閒時處理
                if (pendingIdleHandlerCount <= 0) {
                    // 直接進入下次循環阻塞線程
                    mBlocked = true;
                    continue;
                }

                // 代碼執行到此處就說明線程中有待處理的IdleHandler
                // 那麼就從IdleHandler集合列表中取出待處理的IdleHandler
                if (mPendingIdleHandlers == null) {
                    // 初始化待處理IdleHandler數組,最小長度爲4
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
    
                // 從IdleHandler集合中獲取待處理的IdleHandler
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }
          
            // ==========到此處同步代碼塊已經結束==========


            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                // 取出一個IdleHandler
                final IdleHandler idler = mPendingIdleHandlers[i];
                // 釋放掉引用
                mPendingIdleHandlers[i] = null; 

                // IdleHandler的執行模式,true=執行一次,false=老是執行
                boolean keep = false;

                try {
                    // 執行IdleHandler的queueIdle()代碼,獲得執行模式
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                // 經過執行模式判斷是否須要移除掉對應的IdleHandler
                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // 處理完了全部IdleHandler把數量清0
            pendingIdleHandlerCount = 0;

            // 由於執行了IdleHandler的代碼塊,有可能已經有新的消息入隊了
            // 因此到這裏就不阻塞線程,直接去查看有沒有新消息
            nextPollTimeoutMillis = 0;
        }
    }

複製代碼

消息出隊的核心代碼的邏輯都在一個龐大的死循環for(;;)中,其流程以下:

0,循環開始。

1,根據nextPollTimeoutMillis值阻塞線程,初始值爲0:不阻塞線程。

2,將【待取出消息指針】指向隊列頭。

3,若是隊列頭是同步攔截器的話就將【待取出消息指針】指向隊列頭後面最近的一個異步消息。

4,若是【待取出消息指針】不可用(msg == null)說明隊列中沒有可取出的消息,讓nextPollTimeoutMillis 等於-1讓線程一直阻塞,等待新消息到來時喚醒它。

5,若是【待取出消息指針】可用(msg != null)再判斷一下消息的待處理時間。

  • 若是消息的待處理時間大於當前時間(now < msg.when)說明當前消息還沒到要處理的時間,讓線程阻塞到消息待處理的指定時間。

  • 若是消息的待處理時間小於當前時間(now > msg.when)就直接從隊列中取出消息返回給調用處。(此處會直接結束整個循環,結束next()方法。)

6,若是隊列已經退出了直接結束next()方法。

7,若是是第一次循環就初始化IdleHandler數量的局部變量pendingIdleHandlerCount 。

8,若是IdleHandler數量小於等於0說明沒有合適的IdleHandler,直接進入下次循環阻塞線程。(此處會直接結束本次循環。)

9,初始化IdleHandler數組,裏面保存着本地待處理的IdleHandler。

10,遍歷IdleHandler數組,執行對應的queueIdle()方法。

11,執行完全部IdleHandler以後,將IdleHandler數量清0。

12,由於執行了IdleHandler的代碼塊,有可能已經有新的消息入隊了, 因此讓nextPollTimeoutMillis 等於0不阻塞線程,直接去查看有沒有新消息。

13,本次循環結束,開始新一輪循環。

總結

  1. MessageQueue隊列消息是有序的,按消息待處理時間依次排序。

  2. 同步攔截器能夠攔截它以後的全部同步消息,直到這個攔截器被移除。

  3. 取出消息時若是沒有合適的消息線程會阻塞。

相關文章
相關標籤/搜索