android裏的消息機制是很是重要的部分,此次我但願可以系統的剖析這個部分,做爲一個總結。
首先這裏涉及到幾個部分,從層次上看,分爲java層和native層2部分;從類上看,分爲Handler/Looper/Message/MessageQueue。java
Handler:輔助功能類,提供接口向消息池中發送各種消息事件,而且提供響應消息的機制。android
Looper:消息泵,不斷的循環處理消息隊列中的每一個消息,確保最終分發給處理者。數組
Message:消息體,承載消息內容。緩存
MessageQueue:消息隊列,提供消息池和緩存。安全
他們之間的關係就是Looper使用MessageQueue提供機制,Handler提供調用接口和回調處理,Message做爲載體數據傳遞。
異步
咱們下面逐次剖析。async
prepare --- 準備和初始化ide
這裏的準備過程分爲2個接口,分別是prepare和prepareMainLooper。區別是前者給線程提供,後者是給主UI線程調用。咱們看下代碼來確認區別:函數
public static void prepare() { prepare(true); } public static void prepareMainLooper() { prepare(false); synchronized (Looper.class) { if (sMainLooper != null) { throw new IllegalStateException("The main Looper has already been prepared."); } sMainLooper = myLooper(); } }
首先調用內部帶參數的靜態方法prepare給的參數不一樣,true表示能夠退出此looper,false表示不容許退出。prepareMainLooper中將這個looper對象賦值給了一個靜態私有變量sMainLooper保存下來。往下看帶參數的prepare:oop
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
首先是個tls的獲取和設置,這裏作了斷定,一個線程只能有一個Looper對象,而後建立的Looper設置在tls對象中。再來就是構造了:
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
建立了一個MessageQueue對象保存下來,而後將當前的線程對象保留下來。
至此準備過程完畢,總結一下,2個入口,不一樣的場景。tls對象的使用並確保每一個線程都有惟一的一個Looper對象。
loop --- 消息循環
這個纔是核心部分,循環進行消息的獲取及派發工做。咱們直接上代碼:
public static void loop() { // 獲取tls的惟一Looper final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } // 獲取Looper中的消息隊列 final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); // 進入消息泵循環體 for (;;) { // 獲取一個待處理的消息,有可能會阻塞,後面分析MessageQueue的時候回闡述 Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger final Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } // 跟蹤消息 final long traceTag = me.mTraceTag; if (traceTag != 0 && Trace.isTagEnabled(traceTag)) { Trace.traceBegin(traceTag, msg.target.getTraceName(msg)); } try { // 處理消息的派發 msg.target.dispatchMessage(msg); } finally { // 結束跟蹤 if (traceTag != 0) { Trace.traceEnd(traceTag); } } if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } // 回收處理後的消息,將其放入消息池中,準備複用 msg.recycleUnchecked(); } }
過程以下:
1.獲取線程tls對象,那個惟一的looper,而後獲取MessageQueue消息隊列。
2.進入消息泵循環體,以阻塞的方式獲取待處理消息。
3.執行消息的派發。
4.回收處理後的消息,放入消息池中,等待複用。
5.回頭2,開始下一個。
這裏能夠看出,大多數都是調用MessageQueue或者Message或者Handler來處理具體事務,loop這裏只是個邏輯流程處理,很簡單。其實不管什麼平臺下的消息機制大致都是這種流程。其中注意的是,處理消息派發的部分:msg.target.dispatchMessage(msg)。這個調用其實走的是message裏面保留的handler的dispatchMessage,後面具體講到handler的時候會闡述。
quit --- 退出消息泵
退出過程比較簡單:
public void quit() { mQueue.quit(false); } public void quitSafely() { mQueue.quit(true); }
有2個調用,分別是正常退出和安全退出。區別是前者移除全部消息,後者是隻移除還沒有處理的消息。具體的在MessageQueue中闡述。
handler咱們最普通的用法就是new出來以後,重載handleMessage方法,來等待消息觸發並在這裏寫下處理。以後無非就是在合適的時候調用sendMessage發送消息了。
Handler --- 構造
光是構造函數就有好多個,最後無非就2個入口:
public Handler(Callback callback, boolean async) { if (FIND_POTENTIAL_LEAKS) { final Class<? extends Handler> klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0) { Log.w(TAG, "The following Handler class should be static or leaks might occur: " + klass.getCanonicalName()); } } mLooper = Looper.myLooper(); if (mLooper == null) { throw new RuntimeException( "Can't create handler inside thread that has not called Looper.prepare()"); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; } public Handler(Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }
仔細看,其實都是在根據參數設置環境,共3個參數,looper、callback、async。looper是要綁定的looper就是說這個handler是要執行在哪一個線程的looper上的,通常咱們使用的時候都是不指定,那麼默認就是當前線程,這裏是容許在其餘任意線程的;callback是一個相應消息的回調,後面說;async表示是否異步執行,關係到callback或者其餘的處理消息體的執行方式。在2個參數的構造中,Looper.myLooper();指定了是本線程的looper。
dispatchMessage --- 消息派發
還記得上面的looper的loop調用中,處理具體message的派發使用的是msg.target.dispatchMessage(msg)。這個target就是handler。那麼咱們直接看dispatchMessage,相關代碼以下:
public interface Callback { public boolean handleMessage(Message msg); } public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } private static void handleCallback(Message message) { message.callback.run(); }
1.首先判斷message的callback是否存在,若是存在,調用這個callback,注意,查看message源碼可知,callback是個runnable。
2.不然,構造中保存的callback是否存在,若是存在,調用他的handleMessage方法。
3.若是上面2個條件都不知足,調用自身的handleMessage。這個能夠複寫。
咱們可以知道什麼?
3個回調,分別是message的runnable,handler構造的callback,handler的自身handleMessage。這3個的調用是排他性的,一旦一個知足,就直接返回,再也不走別的。
sendMessage --- 發送消息
發送消息的過程自己有2個調用,一個是sendMessageXXX,一個是post。前者最終都會調用到sendMessageAtTime:
public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
其實最後走的都是MessageQueue的enqueueMessage。具體的咱們在後面介紹MessageQueue的時候闡述。須要注意的是mAsynchronous,這個是否異步的標記,設置在了message裏面。還有就是delayed的發送,實際上是本地的系統當前時間加上延遲的時間差。
再來看看post:
public final boolean post(Runnable r) { return sendMessageDelayed(getPostMessage(r), 0); } private static Message getPostMessage(Runnable r) { Message m = Message.obtain(); m.callback = r; return m; }
最後也是走的sendmessage,可是區別是若是是Post調用,會將傳遞進來的runnable設置到message的callback中。
既然message是載體,那麼先來看看數據內容:
// 消息的惟一key public int what; // 消息支持的2個參數,都是int類型 public int arg1; public int arg2; // 消息內容 public Object obj; // 這個是一個應答的信使,實際上是和信使服務有關係的一個東西,這裏暫時不作解釋 public Messenger replyTo; // 消息觸發的時間 /*package*/ long when; // 消息相應的handler /*package*/ Handler target; // 消息回調 /*package*/ Runnable callback; // 本消息的下一個 /*package*/ Message next; // 消息池,其實就是第一個消息 private static Message sPool; // 消息池當前的大小 private static int sPoolSize = 0;
obtain --- 獲取消息
public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; return m; } } return new Message(); }
這個sPool是一個靜態私有的變量,存儲的就是一個鏈表性質的表頭元素message。取出鏈表頭的元素,將鏈表表頭日後移動一個元素。能夠看出這個sPool表頭對應的鏈表就是一個回收後可複用的全部的message的集合,因爲是靜態私有的,所以這裏至關於一個全局的存在。
再看下回收就會比較清楚是如何將廢棄的message存儲的。
recycle --- 回收消息
public void recycle() { if (isInUse()) { if (gCheckRecycle) { throw new IllegalStateException("This message cannot be recycled because it " + "is still in use."); } return; } recycleUnchecked(); } void recycleUnchecked() { // Mark the message as in use while it remains in the recycled object pool. // Clear out all other details. flags = FLAG_IN_USE; what = 0; arg1 = 0; arg2 = 0; obj = null; replyTo = null; sendingUid = -1; when = 0; target = null; callback = null; data = null; synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } } }
在recycleUnchecked中就是修改自身message的成員,將其清空,而後判斷若是沒有超過這個鏈表的最大上限,則將這個message自身存儲爲sPool,就是做爲表頭了,而後再將pool的size增1。
從以上能夠看到,message的複用機制是獨立的,與消息隊列並不直接關係,耦合性較低。
MessageQueue裏會涉及到c層,也就是native層的內容,其實他大部分核心內容都是在c層完成的。java層是個銜接部分。
構造
MessageQueue的構造是在Looper的構造中完成的,也就是說一個線程有一個looper一個MessageQueue。
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
構造裏面直接走了nativeInit。而且將返回值保存在了mPtr。咱們深刻下去看看c層的部分,在frameworks/base/core/jni/android_os_MessageQueue.cpp:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); }
這裏明顯生成了一個新的NativeMessageQueue,而且將地址做爲返回值返回了。這個NativeMessageQueue就是個c層的queue對象。
獲取隊列消息 --- next
回到java層,咱們看下next這個相當重要的函數在作什麼,在looper的loop中,循環中第一句就是調用他獲取一個message:
Message next() { // 拿到初始化時候保存的地址,便是c層NativeMessageQueue對象的地址 final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; // 進入循環,爲了獲取到消息 for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } // 阻塞,有超時 nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; // 當消息的handler爲Null,找下一個異步的消息 if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. // 若是消息的觸發時間大於當前時鐘,則設置下一次阻塞等待超時爲這個差值 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // 獲得並返回一個message,這裏是個鏈表操做 mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // 退出狀況的判斷 if (mQuitting) { dispose(); return null; } // 空閒時候的idlerHandler處理 if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
1.經過以前初始化時保留的NativeMessageQueue阻塞獲取消息;
2.若是不是當即執行的消息,而且沒有到達執行點,根據該消息與當前時鐘的差值動態調節下一次阻塞獲取的超時時間;
3.若是到達執行點的消息,操做鏈表,並返回該消息;
4.若是沒有消息可供處理,執行全部以前註冊的IdleHandler;
往下看的話就是這個阻塞獲取消息的nativePollOnce了,繼續。
上面這個函數須要進入c層:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
這裏進入了NativeMessageQueue,爲了瞭解c層的具體狀況,咱們須要分析下初始化過程。
初始化
首先仍是須要看看NativeMessageQueue類的初始化:
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
這裏又有一個Looper,注意,這裏的已經不是java層的那個了,而是c層自身的Looper,在/system/core/libutils/Looper.cpp這裏,仍是老規矩,看看他初始化的時候:
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", strerror(errno)); AutoMutex _l(mLock); rebuildEpollLocked(); } void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } }
看到了吧,使用了epoll來監控多個fd。首先是一個喚醒的事件fd,而後是根據request隊列的每一個request來添加不一樣的監控fd。request是什麼呢?咱們暫時先放一下,後面會闡述。
總結一下初始化過程:
讀取消息 --- nativePollOnce
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } }
從這裏能夠看到,實際上是經過c層的Looper調用pollOnce來完成的。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 處理每一個response裏的request,若是沒有回調,直接返回 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
重點就一個:pollInner:
...... // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mPolling = true; // 最大處理16個fd struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 等待事件發生或超時 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // 若是須要進行重建epoll if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // <0錯誤處理,直接跳轉到Done if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error: %s", strerror(errno)); result = POLL_ERROR; goto Done; } // 超時,跳轉到Done if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = POLL_TIMEOUT; goto Done; } ...... // 循環處理獲取到的event for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { // 若是是喚醒的fd,執行喚醒處理 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 不然,處理每一個request ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { // 建立新的events,並經過pushResponse生成新的response,push int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; // 處理堆積未處理的事件 while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // 處理每一個response for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result;
總結一下:
1.經過epoll_wait執行等待事件的操做;
2.根據等待到的event與request數組,生成response並push;
3.循環處理堆積的未處理的mMessageEnvelopes事件;
4.處理全部response;
這裏引出3個東西,request/response/mMessageEnvelopes。咱們分別解釋下。
首先,request的add動做是在addFd時候調用的,所以這裏應該是將fd與關心的event綁定的東西。一個fd可綁定多個事件,經過|操做符。後面每次收到event後,使用&來判斷是否存在關心的事件,若是是執行pushResponse。
response就更簡單了,就是一個events與request的對應關係的維護:
struct Request { int fd; int ident; int events; int seq; sp<LooperCallback> callback; void* data; void initEventItem(struct epoll_event* eventItem) const; }; struct Response { int events; Request request; };
mRequests是個以fd做爲索引的vector,mResponses就乾脆就是個vector。
mMessageEnvelopes是個vector,存儲的是MessageEnvelope對象:
struct MessageEnvelope { MessageEnvelope() : uptime(0) { } MessageEnvelope(nsecs_t u, const sp<MessageHandler> h, const Message& m) : uptime(u), handler(h), message(m) { } nsecs_t uptime; sp<MessageHandler> handler; Message message; };
在sendMessageAtTime裏生成並insertAt了MessageEnvelope。所以能夠看出,MessageEnvelope其實就是緩存了須要處理的message,並記錄了須要執行的時間uptime和handler,及消息體message。
處理消息 --- handler的調用
處理消息的過程其實在上面已經代表了,就是調用response.request.callback->handleEvent(fd, events, data);這句話,那麼來看看這個callback是怎麼回事:
class LooperCallback : public virtual RefBase { protected: virtual ~LooperCallback(); public: /** * Handles a poll event for the given file descriptor. * It is given the file descriptor it is associated with, * a bitmask of the poll events that were triggered (typically EVENT_INPUT), * and the data pointer that was originally supplied. * * Implementations should return 1 to continue receiving callbacks, or 0 * to have this file descriptor and callback unregistered from the looper. */ virtual int handleEvent(int fd, int events, void* data) = 0; };
就是個回調的對象,在addFd的時候須要傳遞進去。在setFileDescriptorEvents的時候調用了addFd,給定的是this,也就是說,回調響應由NativeMessageQueue自行截獲。順便說下,這個setFileDescriptorEvents最後仍是提供給Java層調用的,對應的是nativeSetFileDescriptorEvents函數。
好吧,咱們回來,既然調用的是handleEvent,那麼咱們就看看這個東西:
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) { int events = 0; if (looperEvents & Looper::EVENT_INPUT) { events |= CALLBACK_EVENT_INPUT; } if (looperEvents & Looper::EVENT_OUTPUT) { events |= CALLBACK_EVENT_OUTPUT; } if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { events |= CALLBACK_EVENT_ERROR; } int oldWatchedEvents = reinterpret_cast<intptr_t>(data); int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events); if (!newWatchedEvents) { return 0; // unregister the fd } if (newWatchedEvents != oldWatchedEvents) { setFileDescriptorEvents(fd, newWatchedEvents); } return 1; }
組合event後,調用的是mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);能看出來吧,mPollEnv是JNIEnv,那麼這個明顯是調用java層的方法,是誰呢?就是MessageQueue.dispatchEvents:
private int dispatchEvents(int fd, int events) { // Get the file descriptor record and any state that might change. final FileDescriptorRecord record; final int oldWatchedEvents; final OnFileDescriptorEventListener listener; final int seq; synchronized (this) { record = mFileDescriptorRecords.get(fd); if (record == null) { return 0; // spurious, no listener registered } oldWatchedEvents = record.mEvents; events &= oldWatchedEvents; // filter events based on current watched set if (events == 0) { return oldWatchedEvents; // spurious, watched events changed } listener = record.mListener; seq = record.mSeq; } // Invoke the listener outside of the lock. int newWatchedEvents = listener.onFileDescriptorEvents( record.mDescriptor, events); if (newWatchedEvents != 0) { newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR; } // Update the file descriptor record if the listener changed the set of // events to watch and the listener itself hasn't been updated since. if (newWatchedEvents != oldWatchedEvents) { synchronized (this) { int index = mFileDescriptorRecords.indexOfKey(fd); if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record && record.mSeq == seq) { record.mEvents = newWatchedEvents; if (newWatchedEvents == 0) { mFileDescriptorRecords.removeAt(index); } } } } // Return the new set of events to watch for native code to take care of. return newWatchedEvents; }
其實最主要的就是調用了listener.onFileDescriptorEvents( record.mDescriptor, events);,其實就是調用以前設置好的監聽者響應。是根據fd來選擇listener的。我查了一下,調用addOnFileDescriptorEventListener的只有在java層的ParcelFileDescriptor.fromFd有這個動做,再深刻查下去就是MountService.mountAppFuse來作這個事情。感受是在mountApp的時候作的這個監聽。總之這個過程是要在有事件響應的時候根據事件的狀況(EVENT_INPUT/EVENT_OUTPUT/EVENT_ERROR/EVENT_HANGUP/EVENT_INVALID)若是有這些狀況,則須要通知對應的監聽者進行響應,可是看狀況跟message自己的處理關係就不大了。