線程間通訊機制是什麼?怎麼完成線程間通訊的?java
由什麼組成?android
調度策略是什麼樣的?消息循環機制,消息分發機制?app
爲何這麼設計?less
Handler 是典型的生產者-消費者模型。是線程之間進行通訊的媒介。異步
線程之間內存是不共享的,那麼怎麼完成的線程間通訊?async
設計通訊機制的思路:ide
進行消息封裝 ---- 拿到目標線程句柄 --- 發送消息 --- 目標線程進行消息調度(優先級、異步、阻塞?)函數
實際上Handler也是按照這個思路進行,並有更多優秀的思考。好比消息池機制、消息屏障、IdleHandler、epoll等oop
首先看Handler的建立過程:post
public Handler(@Nullable Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
// 是匿名類 或 成員類 或 局部類 ,但字段修飾符不是靜態的時候, 有可能引發內存泄露。
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
// 獲取Looper 若是當前線程沒有Looper 拋出異常。
// 1. 由於沒有Looper 就執行不了下面mQueue的初始化
// 2. 沒有Looper將沒法完成消息調度
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
//這句。。 mQueue 是跨線程的句柄、
// 若是不能初始化 也就不能傳遞消息
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
複製代碼
消息是怎麼封裝的?
public final class Message implements Parcelable {
// 執行等待時長, 在MessageQueue中,以此爲依據排序
public long when;
// 發送的數據
/*package*/ Bundle data;
// 目標線程的 Handler對象,用於發送 和 處理消息 跨線程
/*package*/ Handler target;
// Message Pool 消息池,回收和複用Message對象 MAX_POOL_SIZE = 50;
private static Message sPool;
}
複製代碼
消息怎麼調度的? Looper.loop()
/** * Run the message queue in this thread. Be sure to call * {@link #quit()} to end the loop. */
public static void loop() {
// sThreadLocal.get();
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 從本地線程的ThreadLocal 中找到Looper對象, 並從其中拿到 MessageQueue
// 那這個Looper對象是何時放進去的呢? 沒錯就是 Looper.prepare()
// sThreadLocal.set(new Looper(quitAllowed));
// 這也就是子線程不調用 prepare 就不能建立Handler、也不能運行Looper.loop()
final MessageQueue queue = me.mQueue;
for (;;) {
// 若是沒有消息這裏不會返回
Message msg = queue.next(); // might block
// 返回null 這裏會退出循環
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
// 非空的消息 執行分發操做
try {
msg.target.dispatchMessage(msg);
} finally {
}
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
// 放回消息池
msg.recycleUnchecked();
}
}
// Looper 並無實際的調度,那麼問題就在於這個next何時返回了。
// 因此這個調度測量也就是next的返回策略。
// MessageQueue.java
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
// 消息不爲空 可是Target是 null , 這說明什麼問題?
// 能夠看下這個函數實現 private int postSyncBarrier(long when) {}
// 在Post這個名字SyncBarrier的消息的時候沒有賦值msg.target
// 這就是咱們所說的消息屏障。
for (;;) {
// 這裏進入休眠,下次何時喚醒呢?
// 1.
// boolean enqueueMessage(Message msg, long when) {}
// 這個函數裏面nativeWake(mPtr) , 添加消息的時候會對messageQueue排序順便會檢查要不要喚醒
// 這些條件(p == null || when == 0 || when < p.when) 知足,而且當前是阻塞狀態,就會調用nativeWake(mPtr)
// 這確定不是惟一的喚醒方式,不然若是一直沒有消息進來不是要死在這裏了。。
// 2.
// Looper.cpp-->Looper::pollOnce()
// 這裏面使用了epoll機制, 超時返回。
nativePollOnce(ptr, nextPollTimeoutMillis);
// 消息屏障
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
// 消息屏障存在的狀況 就會搜索msgQueue中的isAsynchronous 異步消息,拿出來執行 。 直到隊列末尾
} while (msg != null && !msg.isAsynchronous());
}
// 到這基本流程就結束了
// 可是還有一個IdleHandler機制
}
複製代碼
Handler 發送的消息只有Message
一種,怎麼就出現了三種消息類型?
前面分析Handler建立的時候,能夠看到最後一個參數是async,這裏就註定這個Handler是發送出去同步消息仍是異步消息。
public Handler(@Nullable Callback callback, boolean async) {
/// async
mAsynchronous = async;
}
// post 方法最終調用到的MessageQueue的入隊方法。在這裏設置了Message中一個屬性 msg.setAsynchronous(true);
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg, long uptimeMillis) {
// 這裏一定會設置Target
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
// 若是是異步的Handler,發出去的消息Message 就會帶有 異步標記
// 這個標記的 就區分出了兩種消息, 同步消息和異步消息。
// 什麼區別呢?前面調度分析的時候,同步消息會被消息屏障阻塞, 而異步不會。
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
複製代碼
好像只有這兩種消息類型,消息屏障又是什麼? 準確的說叫同步消息屏障,也就是說只會阻塞同步消息,一般是和異步消息一塊兒使用。
消息屏障有什麼特殊嗎?
@TestApi
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
/// ******注意看這個消息的建立過程
// 從消息池中拿到了這個消息對象以後,設置了when、 arg1 就加入到消息隊列的單鏈表中了。
// !!! 沒設置 target 。 能夠對比上面的 代碼 msg.target = this;
// 什麼是消息屏障? Looper調度的時候,若是發現沒有target的消息,那麼就是消息屏障了
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
// *********
Message prev = null;
Message p = mMessages;
if (when != 0) {
// 找到時間合適的那個節點
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
/// 加入進去
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
// 返回了一個 token 。 實際上就是一個計數
// 刪除的時候就據此token
return token;
}
}
複製代碼
刪除消息屏障的過程。
@TestApi
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
// 找到 消息target爲空 arg1 是 token的那個Message對象
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
// 到末尾都沒找到那麼 就拋出異常了
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
// 從節點中刪除
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
// 將刪除的節點回收,放入消息池中
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
// 有消息須要喚醒 nativePollOnce的阻塞 epoll_wait() 實現
nativeWake(mPtr);
}
}
}
複製代碼
在向Choreography訂閱了VSync信號接收消息以後,VSync回調會觸發重繪操做。
顯然,此次界面刷新的優先級最高,所以使用了消息屏障以阻塞消息隊列中不重要的同步消息。並建立一個異步刷新,加入到主線程Loop中的MessageQueue隊列中。
ViewRootImpl.java
// Post消息屏障到消息隊列
@UnsupportedAppUsage
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
// 設置消息屏障
// 這裏會返回Token , 刪除的時候會以此爲依據進行刪除
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
if (!mUnbufferedInputDispatch) {
scheduleConsumeBatchedInput();
}
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}
// 刪除 消息屏障
void unscheduleTraversals() {
if (mTraversalScheduled) {
mTraversalScheduled = false;
// mTraversalBarrier 拿Token 去MessageQueue中刪除屏障
// 刪除消息屏障
mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
mChoreographer.removeCallbacks(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
}
}
複製代碼
// 從開始加入消息開始看起
// Handler 中兩種消息延遲的方法
// 1. 定時執行
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
...
return enqueueMessage(queue, msg, uptimeMillis);
}
// 2. 延遲執行
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
// uptimeMillis 消息延遲執行的時間點
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
// 在上節三種消息類型 中提到的queueMessage
// queue.enqueueMessage 方法(msg, uptimeMillis);
// 這裏最終調用的是MessageQueue的 enqueueMessage 方法
return enqueueMessage(queue, msg, uptimeMillis);
}
// 至此並無找到爲何Handler 會延遲處理消息,只是在Message的when中增長了一個時間戳。
// 那麼就只剩消息隊列,和消息調度環節了。
複製代碼
boolean enqueueMessage(Message msg, long when) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//插入隊列 兩種狀況
// 1. 消息隊列爲空\待插入時間爲0\第一條執行的when時間比待插入的時間更長
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
//將msg 插入到頭部
msg.next = p;
mMessages = msg;
// 插入到頭部要喚醒
needWake = mBlocked;
// 2. 其餘狀況
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 找到比當msg 執行時間更晚的消息
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 插入到查找到的消息位置
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// 非頭部不須要喚醒
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
複製代碼
Handler 是Android的核心基礎之一, 可以適應複雜的代碼環境,被普遍的應用在Android系統代碼中。
Handler設計策略是典型的生產消費者模型,調度策略巧妙的解決了生產消費模型中的執行順序和優先級的問題。
被普遍使用另外一緣由是高效,使用epoll機制,完成跨線程和超時喚醒,使Handler在消耗極少CPU資源的狀況下準確的完成調度工做。
同時Handler設計中包含了不少的優化策略,好比消息池機制是比較經常使用的內存優化策略。再好比內存泄漏的檢測,而且給出了友好的提示。同時Handler 提供了比較完善的監測機制,能夠參考Looper 的代碼進行詳細的分析。
消息喚醒機制 eventFd
Handler中的監控機制設計