由Handler、MessageQueue、Looper構成的線程消息通訊機制在Android開發中很是經常使用,不過大部分人都只粗淺地看了Java層的實現,對其中的細節不甚了了,這篇博文將研究Android消息機制從Java層到Native層的實現。java
消息機制因爲更貼近抽象設計,因此整個結構更簡單,只包含了消息的產生、分發,不像Input子系統那樣還有歸類、過濾等環節。總體的結構以下圖:linux
在Java層中消息的產生都來源於用戶建立的Message對象,通過封裝的Runnable對象,或調用obtainMessage從Message Pool中得到,Message Pool指的是Message類內的Message循環隊列,隊頭是靜態的Message對象sPool,該隊列最大容納MAX_POOL_SIZE(50)個Message:android
MessagePool對Message的複用節省了不斷建立Message帶來的開銷,若是當前50個Message都已經被用過,因爲MessagePool是循環隊列,則會回到隊頭並請空該Message,向下複用。數組
看Java層Handler的源碼的時候發現了一個奇怪的東西:BlockingRunnable,基本上沒有用過的東西,也沒看別人講過,因而我就來鑽研一下吧:app
private static final class BlockingRunnable implements Runnable {
private final Runnable mTask;
private boolean mDone;
public BlockingRunnable(Runnable task) {
mTask = task;
}
@Override
public void run() {
try {
mTask.run();
} finally {
synchronized (this) {
mDone = true;
notifyAll();
}
}
}
public boolean postAndWait(Handler handler, long timeout) {
if (!handler.post(this)) {
return false;
}
synchronized (this) {
if (timeout > 0) {
final long expirationTime = SystemClock.uptimeMillis() + timeout;
while (!mDone) {
long delay = expirationTime - SystemClock.uptimeMillis();
if (delay <= 0) {
return false; // timeout
}
try {
wait(delay);
} catch (InterruptedException ex) {
}
}
} else {
while (!mDone) {
try {
wait();
} catch (InterruptedException ex) {
}
}
}
}
return true;
}
}複製代碼
咱們能夠看到,BlockingRunnable是一個「包裹」構造方法中傳入的Runnable的Runnable,調用BlockingRunnable的postAndWait會作如下事情:異步
這個東西的說明書和使用風險能夠在runWithScissors方法的註釋裏看到,我在這裏就不當翻譯工了。async
獲得Message後,就會經過Handler的sendMessageAtTime調用MessageQueue的enqueueMessage將Message投遞到MessageQueue中,在往下學習以前必須先了解Handler的建立,由於後面的知識和它有關聯。ide
其實Handler的初始化沒什麼好看的,就是保存Callback、mLooper的MessageQueue的引用,以及聲明Handler是否異步投遞全部Message。可是裏面有一個內存泄露的檢查,能夠學習一下,就是若是打開了FIND_POTENTIAL_LEAKS,就會進行內存泄露的檢查,它會作如下事情:函數
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}複製代碼
既然Handler的建立這麼簡單,爲何說後面要學習的內容和它相關呢?緣由就出在Looper中,咱們能夠看到Looper是經過sThreadLocal返回的,這個ThreadLocal是什麼呢?oop
ThreadLocal是一個關於建立線程局部變量的類。
一般狀況下,咱們建立的變量是能夠被任何一個線程訪問並修改的。而使用ThreadLocal建立的變量只能被當前線程訪問,其餘線程則沒法訪問和修改。它的實現原理以下:
如圖所示,ThreadLocalRef實際上是同一個ThreadLocal對象的引用,爲了避免讓線看起來很亂我分別用了兩個方塊表示ThreadLocal對象,但實際上是同一個對象。ThreadLocal同時是ThreadA、ThreadB甚至ThreadN內ThreadLocalMap的Key,但取出來的對象時不同的,由於Map不同對應的鍵值對也不同嘛。
ThreadLocalMap是僅用於維護ThreadLocal值的自定義HashMap,只在Thread類內使用。爲了不ThreadLocalMap的Key->ThreadLocal在GC時沒法被回收,裏邊的元素都是用WeakReference封裝的。ThreadLocalMap除了這點之外,沒有什麼特別的,就不細講了。
須要注意的一點是:ThreadLocalMap是可能帶來內存泄露的,但root cause不是ThreadLocalMap自己,而是代碼質量不夠高。首先,因爲做爲Map的Key的ThreadLocal是弱引用,那麼GC時ThreadLocal會被回收,此時Map內存在一對Key爲null的鍵值對,而Value仍然被線程強引用着,那麼若是用完ThreadLocal後不主動移除,就會內存泄露了。但事實上,ThreadLocal用完後主動調remove就能規避這個問題,原本也該這樣作。
Entry做爲ThreadLocalMap的元素,表示的是一對鍵值對:ThreadLocal的弱引用爲鍵,將要用ThreadLocal存儲的對象爲值。
static class Entry extends WeakReference<ThreadLocal> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal k, Object v) {
super(k);
value = v;
}
}複製代碼
換句話說,所謂的不可被其餘線程修改的局部變量,表示的是:每一個線程中都會維護一個ThreadLocalMap,裏邊以ThreadLocal爲鍵,對應的局部變量爲值,經過鍵值對來控制訪問和數據的一致性,而不是經過鎖來控制。
既然一個線程只有一個Looper,那麼Looper裏面有什麼呢?從源碼能夠看到,Looper的構造方法是私有的,也就意味着得到Looper對象基本都是單例,這一點和線程<->Looper的一對一映射關係切合。
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}複製代碼
從Looper的成員變量咱們能夠知道Looper包含了如下東西:
從這能夠知道,一個線程對應一個Looper,一個Looper對應一個MessageQueue
----------------分割線,接下來回到消息的投遞結束的地方----------------
獲得Message後,就會經過Handler的sendMessageAtTime調用MessageQueue的enqueueMessage將Message投遞到MessageQueue中,在往下學習以前必須先了解Handler的建立,由於後面的知識和它有關聯。
如今咱們知道Message將要投遞到哪裏的MessageQueue裏了,那麼投遞過去以後,消息是怎麼被處理的呢?這代碼很長,並且就是個進入隊列的過程,我就不貼了,作了如下事情:
在這裏有個有意思的概念必須提一下,就是Barrier Message,它表示的是一種柵欄的概念,將它加入MessageQueue能夠攔住全部執行時間在它以後的同步Message,異步Message則不受影響,遍歷到就會處理,這種情況會持續到把Barrier Message移除。
提示:圖裏綠色表明Message能夠被取出執行,紅色表示沒法被取出執行
它和Message的根本差異是,他沒有target,即:沒有處理該Message的Handler,但咱們本身將Message的Handler設爲null是無法加入MessageQueue的,必須調用postSyncBarrier方法:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
……
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}複製代碼
前面已經知道Message投遞後就會到達MessageQueue,接下來就看消息是怎麼被遍歷處理的。首先要知道的一點是,Looper在調用prepare建立後,是必須調loop()方法的,不少人會問,我日常用的時候沒用loop()方法也沒問題啊。那是由於你是在主線程用的,主線程在建立Looper的時候已經調用過loop()方法了。
咱們建立了其餘線程的Looper後,調loop()方法會作如下事情:
public static void loop() {
……
for (;;) {
Message msg = queue.next(); // might block
……
try {
msg.target.dispatchMessage(msg);
} finally {
……
}
……
msg.recycleUnchecked();
}
}複製代碼
在Handler的dispatchMessage中,對Message的處理實際上是有優先順序這個說法的:
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}複製代碼
對於MessageQueue,它實際表示了Java層和Native層的MessageQueue,Java層的MessageQueue就是mMessages表示的循環隊列,Native層的MessageQueue就是mPtr。它的next()方法裏作的事情以下:
Java層Android消息機制的整個過程能夠用下圖歸納:
有鑽研過Java層代碼的朋友確定知道,Handler裏面還有個用於跨進程Message通訊的MessengerImpl,這個東西我就不在這裏說了,由於它就是個簡單的跨進程通訊,和整個Handler、Looper、MessageQueue其實關係不大。
Android消息機制在Native層其實和Java層很類似,保留了Handler、Looper、MessageQueue的結構。可是Native層Message、Handler、MessageQueue的概念被弱化得很厲害,基本上只是個「空殼」,核心邏輯都在Looper裏邊了。
其餘區別都不大了,只是在實現上有一點不同,具體的差異就在源碼中找答案吧。總體結構圖以下:
在Native層中,消息由MessageEnvelope和封裝fd(Java層Handler能夠添加fd的監聽、Native固然也能夠)相關信息後獲得的epoll_event組成。
對於要被監聽的fd的消息,Looper作了如下事情:
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
……
{ // acquire lock
AutoMutex _l(mLock);
……
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
……
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
……
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}複製代碼
MessageEnvelope相對於fd就簡單多了,在調用Native層Looper的sendMessage相關函數時會將uptime、MessageHandler、Native層Message封裝到MessageEnvelope中,而後插入mMessageEnvelopes中。
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) {
……
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
……
}
……
}複製代碼
前面已經提到了,Java層的MessageQueue處理消息時,會先調用Native層MessageQueue的nativePollOnce(),它實際調用的是native層MessageQueue的pollOnce(),而native的pollOnce調用的是Native層的Looper的pollOnce:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
……
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
……
mLooper->pollOnce(timeoutMillis);
……
}複製代碼
在看Native層Looper的pollOnce方法以前,先看看Native層的Looper和Java層的Looper會不會有一些不同吧。
和Java層Looper的使用同樣,Native層Looper也須要prepare,也是一個經過線程局部變量存儲的對象,一個線程只有一個。那麼在Native層是怎麼實現線程局部變量的呢?
Native層線程局部變量的思想和Java層很相似,Native層會維護一個全局的pthread_keys數組,用於存放線程局部變量的鍵。其中seq用於標記是否"in_use",destr則是一個函數指針,可用做析構函數,在線程退出時釋放該鍵對應於線程中的線程局部變量。
static struct pthread_key_struct pthread_keys[PTHREAD_KEYS_MAX] ={{0,NULL}};
int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*));
struct pthread_key_struct
{
/* Sequence numbers. Even numbers indicated vacant entries. Note that zero is even. We use uintptr_t to not require padding on 32- and 64-bit machines. On 64-bit machines it helps to avoid wrapping, too. */
uintptr_t seq;
/* Destructor for the data. */
void (*destr) (void *);
};複製代碼
pthread在建立線程時會維護一個指針數組,數組元素指向線程局部變量的數據塊。總體解構以下圖:
建立Looper時,會作如下事情:
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
……
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
……
// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
……
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
……
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
……
}
}複製代碼
對於Native層Looper的pollOnce,找它函數定義稍微有點隱祕,它在Looper.h中聲明,inline到pollOnce(int timeoutMillis, int outFd, int outEvents, void** outData)函數裏了,它作了如下事情:
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
……
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
……
return result;
}
result = pollInner(timeoutMillis);
}
}複製代碼
pollInner這個函數比較長,它作了如下事情:
至此對Android消息機制的學習就結束啦。
若是你以爲個人分享有幫助到你的話,請我吃個零食/喝杯咖啡唄~