本文分析下Android的消息處理機制,主要是針對Handler、Looper、MessageQueue組成的異步消息處理模型,先主觀想一下這個模型須要的材料:javascript
上面的三個部分能夠簡單的歸結爲以下圖:java
APP端UI線程都是Looper線程,每一個Looper線程中維護一個消息隊列,其餘線程好比Binder線程或者自定義線程,都能經過Handler對象向Handler所依附消息隊列線程發送消息,好比點擊事件,都是經過InputManagerService處理後,經過binder通訊,發送到App端Binder線程,再由Binder線程向UI線程發送送Message,其實就是經過Handler向UI的MessageQueue插入消息,與此同時,其餘線程也能經過Handler向UI線程發送消息,顯然這裏就須要同步,以上就是Android消息處理模型的簡單描述,以後跟蹤源碼,淺析一下具體的實現,以及裏面的一些小手段,首先,從Handler的常見用法入手,分析其實現原理,android
<關鍵點1>
Handler hanlder=new Handler();
<關鍵點2> hanlder.post(new Runnable() { @Override public void run() { //TODO } });複製代碼
這裏有兩個點須要注意,先看關鍵點1,Handler對象的建立,直觀來看可能感受不到有什麼注意的地方,可是若是你在普通線程建立Handler,就會遇到異常,由於普通線程是不能建立Handler對象的,必須是Looper線程才能建立,纔有意義,能夠看下其構造函數: 緩存
public Handler(Callback callback, boolean async) {
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}複製代碼
從上面的代碼能夠看出,Looper.myLooper()必須非空,不然就會拋出 RuntimeException異常,Looper.myLooper()何時纔會非空?多線程
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}複製代碼
上面的兩個函數牽扯到稍微擰巴的數據存儲模型,不分析,只要記住只有調用過Looper.prepare的線程,纔會生成一個線程單利的Looper對象,Looper.prepare只能調用一次,再次調用會拋出異常。其實prepare的做用就是新建一個Looper對象,而在new Looper對象的時候,會建立關鍵的消息隊列對象:異步
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}複製代碼
以後,一個線程就有了MessageQueue,雖然尚未調用Loop.loop()將線程變成loop線程,可是new Handler已經沒問題。接着看hanlder.post函數,它將會建立一個Message(若是須要),並將Message插入到MessageQueue,供loop線程摘取並執行。async
public final boolean post(Runnable r)
{
return sendMessageDelayed(getPostMessage(r), 0);
}
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
// 靜態方法,同步
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}複製代碼
上面的Message新建流程,其實主要是涉及了一個Message線程池,默認線程池大小50,固然,不採用線程池,所有新建Message也是能夠的,採用線程池主要是爲了提升效率,避免重複建立對象,由於Handler與Message的時候實在是太頻繁了,Message線程池消息池經常使用的方法有兩個:obtain()和recycle(),前者是用於從線程池取出一個乾淨的Message,然後者是用於將使用完的Message清理乾淨,並放回線程池,固然以上方法都是須要同步的。以後,經過Looper對象將Message插入到MessageQueue,Handler發消息最終都會調用sendMessageAtTime函數ide
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}複製代碼
mAsynchronous能夠先不關心,咱們使用的通常是mAsynchronous=false的,能夠看到,Handler最後經過MessageQueue的enqueueMessage函數來進行插入,函數
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
// 須要同步
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
<!--關鍵點1-->
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
<!--關鍵點2-->
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
<!--關鍵點3-->
if (needWake) {
nativeWake(mPtr);
} }
return true; }複製代碼
很明顯enqueueMessage須要同步,由於存在多個線程往一個Loop線程的MessageQueue中插入消息的場景。 這裏實際上是將Message根據延時插入到特定的地方,先看下關鍵點1,mMessages其實表明消息隊列的頭部,若是mMessages爲空,說明尚未消息,若是當前插入的消息不須要延時,或者說延時比mMessages頭消息的延時要小,那麼當前要插入的消息就須要放在頭部,至因而否須要喚醒隊列,則須要根據當前的Loop線程的狀態來判斷,後面講Loop線程的時候再回過頭說;再來看下關鍵點2,這個時候須要將消息插入到隊列中間,其實就是找到第一個Delay事件小於當前Message的非空Message,並插入到它的前面,往隊列中插入消息時,若是Loop線程在睡眠,是不該該喚醒的,異步消息的處理會更加特殊一些,先不討論。最後看關鍵點3,若是須要喚醒Loop線程,經過nativeWake喚醒,以上,普通消息的插入算結束了,接下來看一下消息的執行。oop
在消息的發送部分已經消息模型的兩個必要條件:消息隊裏+互斥機制,接下來看一下其餘兩個條件,Loop線程+消費者模型的同步機制。MessageQueue只有同Loop線程(死循環線程)配合起來纔有意義,普通線程必須能夠經過Looper的loop函數變成Loop線程,loop函數除了是個死循環,還包含了從MessageQueue摘取消息並執行的邏輯。看一下這個函數:
public static void loop() {
`<!--關鍵點1 確保MessageQueue準備好--> final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } ... <!--關鍵點2--> for (;;) { <!--關鍵點3 獲取一個消息,若是隊列爲空,阻塞等待--> Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } <!--關鍵點4 執行消息回調--> msg.target.dispatchMessage(msg); ... <!--關鍵點5 清理,回收到緩存池--> msg.recycleUnchecked(); } }複製代碼
先看下關鍵點1,它要確保當前線程已經調用過Looper.prepare函數,而且準備好了MessageQueue消息隊列;再看關鍵點2,其實就是將線程化身成Looper線程,變成死循環,不斷的讀取執行消息;關鍵點3,就是從MessageQueue摘取消息的函數,若是當前消息隊列上沒有消息,Loop線程就會進入阻塞,直到其餘線程插入消息,喚醒當前線程。若是消息讀取成功,就走到關鍵點4,執行target對象的回調函數,執行完畢,進入關鍵點5,回收清理Message對象,放入Message緩存池。直接看關鍵點3,消息的摘取與阻塞:
Message next() {
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
<!--關鍵點1 是否須要阻塞等待,第一次必定不阻塞-->
nativePollOnce(ptr, nextPollTimeoutMillis);
<!--關鍵點2 同步互斥--> synchronized (this) { final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; <!--關鍵點3 是否存在barier--> if (msg != null && msg.target == null) { do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } <!--關鍵點4 第一個消息是否須要阻塞等待,並計算出阻塞等待時間--> if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; msg.markInUse(); return msg; } } else { <!--關鍵點5 須要無限等待--> nextPollTimeoutMillis = -1; } <!--關鍵點6 沒有能夠即刻執行的Message,查看是否存在須要處理的IdleHandler,若是不存在,則返回,阻塞等待,若是存在則執行IdleHandler--> if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } <!--關鍵點7處理IdleHandler--> for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } <!--處理完IdleHandler ,須要從新判斷Message隊列 nextPollTimeoutMillis賦值爲0--> pendingIdleHandlerCount = 0; nextPollTimeoutMillis = 0; } }複製代碼
先看下關鍵點1 nativePollOnce,這是個native函數,其主要做用是設置一個定時的睡眠,其參數timeoutMillis,不一樣的值意義不一樣
next函數中,nextPollTimeoutMillis初始值=0 ,因此for循環第一次是必定不會阻塞的,若是能找到一個Delay倒計時結束的消息,就返回該消息,不然,執行第二次循環,睡眠等待,直到頭部第一個消息Delay時間結束,因此next函數必定會返回一個Message對象。再看MessageQueue的nativePollOnce函數以前,先走通整個流程,接着看關鍵點2,這裏實際上是牽扯到一個互斥的問題,防止多個線程同時從消息隊列取消息,關鍵點3主要是看看是否須要處理異步消息,關鍵點4,是經常使用的入口,看取到的消息是否是須要當即執行,須要當即執行的就返回當前消息,若是須要等待,計算出等待時間。最後,若是須要等待,還要查看,IdleHandler列表是否爲空,不爲空的話,須要處理IdleHandler列表,最後,從新計算一遍。
接着分析nativePollOnce函數,該函數能夠看作睡眠阻塞的入口,該函數是一個native函數,牽扯到native層的Looper與MessageQueue,由於java層的MessageQueue只是一個簡單的類,沒有處理睡眠與喚醒的機制,首先看一下Java層MessageQueue構造函數,這裏牽扯到後面的線程阻塞原理:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}複製代碼
MessageQueue的nativeInit函數在Native層建立了NativeMessageQueue與Looper,不過對於Java層來講,Native層的NativeMessageQueue只用來處理線程的睡眠與喚醒,Java層發送的消息仍是在Java層被處理:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
<!--關鍵點1-->
<!-- eventfd 這個函數會建立一個 事件對象 老版本用管道來實現--> mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); AutoMutex _l(mLock); rebuildEpollLocked(); } void Looper::rebuildEpollLocked() { if (mEpollFd >= 0) { close(mEpollFd); } mEpollFd = epoll_create(EPOLL_SIZE_HINT); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } }複製代碼
}
看一下關鍵點1,這裏實際上是採用了Linux的新API,這裏用的是7.0的源碼,eventfd函數會建立一個eventfd,這是一個計數器相關的fd,計數器不爲零是有可讀事件發生,read之後計數器清零,write遞增計數器;返回的fd能夠進行以下操做:read、write、select(poll、epoll)、close,如今咱們知道了,Native層有也有一套MessageQueue與Looper,簡單看一下Java層如何使用Native層對象的,接着走nativePollOnce
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
}複製代碼
因此最終調用Looper::pollOnce,Java層有本身的消息隊列,pollOnce也沒有更新Java層對象,那麼Native層的消息隊裏對於Java層有什麼用呢,其實只有睡眠與喚醒的做用,好比2.3以前的版本,Native層的MessageQueue都不具有發送消息的能力。不事後來Native添加了發送消息的功能,可是平常開發咱們用不到,不過若是native層若是有消息,必定會優先執行native層的消息
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
...
result = pollInner(timeoutMillis);
}
}複製代碼
pollInner 函數比較長,主要是經過利用epoll_wait監聽上面的管道或者eventfd,等待超時或者其餘線程的喚醒,不過多分析
int Looper::pollInner(int timeoutMillis) {
mPolling = true;
<!--關鍵點1--> struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); <!--關鍵點2--> mPolling = false; mLock.lock(); <!--關鍵點3 查看那個fd上又寫入操做--> for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; <!--關鍵點5 喚醒fd 上有寫入操做 返回Java層繼續執行--> if (fd == mWakeEventFd) { if (epollEvents & EPOLLIN) { awoken(); } else { } } else { <!--關鍵點6 本地MessageQueue有消息,執行本地消息--> } }複製代碼
以上牽扯到Linux中的epoll機制:epoll_create、epoll_ctl、epoll_wait、close等, 用一句話歸納:線程阻塞監聽多個fd句柄,其中一個fd有寫入操做,當前線程就被喚醒。這裏不用太過於糾結,只要理解,這是線程間通訊的一種方式,爲了處理多線程間生產者與消費者通訊模型用的,看下7.0源碼中native層實現的同步邏輯:
在更早的Android版本中,同步邏輯是利用管道通訊實現的,不過思想是一致的,看一下4.3的代碼
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}複製代碼