參考資料;html
https://zh.wikipedia.org/wiki/%E7%AE%A1%E9%81%93_(Unix)java
https://blog.csdn.net/qq_33951180/article/details/68959819linux
https://blog.csdn.net/cywosp/article/details/27316803android
很久沒有寫文章了,最近想要學習的東西不少,opengl es也很久沒更新了,主要是事情太多了,恰好在公司研究了一下Native層的Handler源碼實現,這裏記錄一下學習的內容。c++
因爲Native的Handler設計到c++以及對飲Linux系統接口的調用,文章講述的內容有以下三個方面:segmentfault
首先弄懂對應的API可以幫助咱們更好的去理解對應的Handler源碼。數組
tips緩存
進程間的通訊方式:bash
- 管道(pipe)和命名管道(FIFO)
- 信號(signal)
- 消息隊列
- 共享內存
- 信號量
- 套接字(socket)
想要了解多點的能夠查看個人這篇文章異步
管道是Linux中進行進程通訊或者線程通訊的一種手段之一,管道分爲匿名管道(pipe)以及命名管道(named pipe),管道是內核維護的一個緩存, 它提供兩個 fd, 從一個fd寫入數據, 從另外一個fd讀出數據. 因此它是半雙工的。
關於爲何是半雙工而不是雙工的請看這篇文章:
這裏因爲Android的Native源碼中運用的是匿名管道,只針對匿名管道進行說明,關於命名管道(我也不太瞭解)有興趣的請自行查閱資料。
匿名管道經過調用pipe(int[2])函數來進行獲取兩個描述符,分別表明着管道讀端以及管道的寫端,方式以下:
int fds[2]; int result=pipe(fds); if(result>=0){ ...作本身的事情 }
以以上例子爲例,即fds[0]爲管道的讀端,fds[1]爲管道的寫端。管道的兩端是兩個普通的,匿名的文件描述符,這就讓其餘進程沒法鏈接該管道,因此稱之爲匿名管道。對於進程而言,經過管道通訊須要在進程A關閉讀/寫端,在進程B關閉寫/讀端,數據流向爲單向。對於線程而言,不須要關閉管道任何端,子線程是和建立它的進程共享fd的,任何一方關閉管道的讀或寫都會影響到另外一方。
使用匿名管道須要注意以下幾個點:
首先試下線程間經過匿名管道進行數據交換的過程:
void* run(void* fd){ std::cout<<"run start"<<std::endl; char str[] = "hello everyone!"; write( *(int*)fd, str,strlen(str) ); } int main (void) { int fd[2]; if(pipe(fd)){ throw out_of_range("error"); } pthread_t tid=0; pthread_create(&tid,NULL,run,&fd[1]); pthread_join(tid, NULL); char readbuf[1024]; sleep(3); // read buf from child thread read( fd[0], readbuf, sizeof(readbuf) );//阻塞操做 printf("%s\n",readbuf); return (EXIT_SUCCESS); } //執行命令g++ main.cpp -o test -lpthread // ./test //輸出結果 run start //等待三秒後 hello everyone!
經過匿名管道,咱們在子線程中調用write(...)函數將數據寫入,在主線程中調用read(...)函數獲取對應的數據,從而實現了對應的子線程到主線程的數據的單向流通的操做,那若是要子線程讀取主線程經過匿名管道寫入的數據,改下實現便可:
printMsg (char ch) { std::cout << ch << std::endl; } void* run(void* fd){ std::cout<<"run start"<<std::endl; char readbuf[1024]; read(*(int*)fd, readbuf, sizeof(readbuf) ); printf("%s\n",readbuf); } int main (void) { int fd[2]; if(pipe(fd)){ throw out_of_range("error"); } pthread_t slef=pthread_self(); std::cout<<"pthread_id="<<slef<<std::endl; pthread_t tid=0; pthread_create(&tid,NULL,run,&fd[0]); // read buf from child thread char str[] = "hello everyone!"; write(fd[1], str,strlen(str) ); sleep(3); return (EXIT_SUCCESS); } //輸出結果與上面的相同
接下來看下進程間經過匿名管道進行數據交流的過程,主要運行fork()函數進行子進程的初始化過程,首先測試從子進程寫數據,父進程讀數據的狀況:
int main (void) { int fd[2]; int pid=0; char str[]="hello everyone"; char readBuffer[1024]; if(pipe(fd)>=0){ if((pid=fork())<0){ printf("%s","fork error"); }else if(pid==0){ //子進程 printf("%s\n","子進程建立成功"); //關閉子進程的讀端 close(fd[0]); //寫數據 write(fd[1],str,strlen(str)); printf("%s\n","子進程寫入數據完畢"); }else{ //父進程,即當前進程 printf("%s\n","父進程開始做業"); //關閉父進程寫端 close(fd[1]); sleep(3); read(fd[0],readBuffer,sizeof(readBuffer)); printf("父進程讀到數據=%s\n",readBuffer); } } return (EXIT_SUCCESS); } //運行結果 父進程開始做業 子進程建立成功 子進程寫入數據完畢 父進程讀到數據=hello everyone
測試從父進程寫數據,子進程讀數據的狀況:
int main (void) { int fd[2]; int pid=0; char str[]="hello everyone"; char readBuffer[1024]; if(pipe(fd)>=0){ if((pid=fork())<0){ printf("%s","fork error"); }else if(pid==0){ printf("%s\n","子進程開始做業"); //關閉子進程寫端 close(fd[1]); sleep(3); read(fd[0],readBuffer,sizeof(readBuffer)); //子進程,即當前進程 printf("子進程讀到數據=%s\n",readBuffer); }else{ //父進程 printf("%s\n","父進程建立成功"); //關閉父進程的讀端 close(fd[0]); //寫數據 write(fd[1],str,strlen(str)); printf("%s\n","父進程寫入數據完畢"); } } return (EXIT_SUCCESS); } //輸出結果 父進程建立成功 父進程寫入數據完畢 子進程開始做業 子進程讀到數據=hello everyone
epoll是Linux對於select以及poll的加強版,在Linux的2.6內核提出。對於epoll能夠直接在bash中用man進行文檔查看,或者查閱官網對應的內容。
對於epoll而言,網上有不少文章講了其實現的功能以及對應與select以及poll的比較,這裏對於我認爲比較好的文章進行總結以及梳理,資料大多來自於網上。
附:學習來源
https://zh.wikipedia.org/wiki/Epoll
http://blog.51cto.com/yaocoder/888374
https://www.zhihu.com/question/28594409
對於select,poll以及epoll的而言,三個都是IO多路複用的機制,能夠監視多個描述符的讀/寫等事件,一旦某個描述符就緒(通常是讀或者寫事件發生了),就可以將發生的事件通知給關心的應用程序去處理該事件。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。
關於IO複用機制的說明,能夠看下知乎的講解做爲最直觀的理解思路,slect,poll以及epoll的優缺點整理以下:
select優缺點以下:
缺點:
每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大;
同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大;
select支持的文件描述符數量過小了,默認是1024。
優勢:
select的可移植性更好,在某些Unix系統上不支持poll()。
select對於超時值提供了更好的精度:微秒,而poll是毫秒。
poll優缺點以下:
缺點:
大量的fd的數組被總體複製於用戶態和內核地址空間之間,而無論這樣的複製是否是有意義;
與select同樣,poll返回後,須要輪詢pollfd來獲取就緒的描述符。
優勢:
poll() 不要求開發者計算最大文件描述符加一的大小。
poll() 在應付大數目的文件描述符的時候速度更快,相比於select。
它沒有最大鏈接數的限制,緣由是它是基於鏈表來存儲的。
epoll的優勢就是改進了前面所說缺點:
支持一個進程打開大數目的socket描述符:相比select,epoll則沒有對FD的限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。
IO效率不隨FD數目增長而線性降低:epoll不存在這個問題,它只會對"活躍"的socket進行操做--- 這是由於在內核實現中epoll是根據每一個fd上面的callback函數實現的。那麼,只有"活躍"的socket纔會主動的去調用 callback函數,其餘idle狀態socket則不會,在這點上,epoll實現了一個"僞"AIO,由於這時候推進力在os內核。在一些 benchmark中,若是全部的socket基本上都是活躍的---好比一個高速LAN環境,epoll並不比select/poll有什麼效率,相 反,若是過多使用epoll_ctl,效率相比還有稍微的降低。可是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。
使用mmap加速內核與用戶空間的消息傳遞:這點實際上涉及到epoll的具體實現了。不管是select,poll仍是epoll都須要內核把FD消息通知給用戶空間,如何避免沒必要要的內存拷貝就 很重要,在這點上,epoll是經過內核於用戶空間mmap同一塊內存實現的。
epoll主要提供三個API給開發者進行調用實現自主功能:
邊緣觸發(edge-triggered 簡稱ET)和水平觸發(level-triggered 簡稱LT):
epoll的事件派發接口能夠運行在兩種模式下:邊緣觸發(edge-triggered)和水平觸發(level-triggered),兩種模式的區別請看下面,咱們先假設下面的狀況:
若是rfd被設置了ET,在調用完第五步的epool_wait 後會被掛起,儘管在緩衝區還有能夠讀取的數據,同時另一段的管道還在等待發送完畢的反饋。這是由於ET模式下只有文件描述符發生改變的時候,纔會派發事件。因此第五步操做,可能會去等待已經存在緩衝區的數據。在上面的例子中,一個事件在第二步被建立,再第三步中被消耗,因爲第四步中沒有讀取完緩衝區,第五步中的epoll_wait可能會一直被阻塞下去。
下面狀況下推薦使用ET模式:
相比之下,當咱們使用LT的時候(默認),epoll會比poll更簡單更快速,並且咱們可使用在任何一個地方。
上述講述水平觸發和邊緣觸發翻譯來自epoll的doc中,想要徹底理解能夠查看這篇文章,講的十分清楚。
int epoll_create(int size);
epoll_create() 能夠建立一個epoll實例。在linux 內核版本大於2.6.8 後,這個size 參數就被棄用了,可是傳入的值必須大於0。若是執行成功,返回一個非負數(實際爲文件描述符), 若是執行失敗,會返回-1。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
這個系統調用可以控制給定的文件描述符epfd指向的epoll實例,op是添加事件的類型,fd是目標文件描述符。
有效的op值有如下幾種:
第三個參數是須要監聽的fd。第四個參數是告訴內核須要監聽什麼事,代碼結構以下:
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; //感興趣的事件和被觸發的事件 struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
events這個參數是一個字節的掩碼構成的。下面是能夠用的事件:
返回值:若是成功,返回0。若是失敗,會返回-1, errno將會被設置。有如下幾種錯誤:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll_wait 這個系統調用是用來等待epfd中的事件。events指向調用者可使用的事件的內存區域。maxevents告知內核有多少個events,必需要大於0.
timeout這個參數是用來制定epoll_wait 會阻塞多少毫秒,會一直阻塞到下面幾種狀況:
當timeout等於-1的時候這個函數會無限期的阻塞下去,當timeout等於0的時候,就算沒有任何事件,也會馬上返回。
下面寫個例子演示一下epoll和pipe一塊兒使用的過程:
static int MAX=256; struct Data{ int* fd; int epfd; struct epoll_event events[]; }; void *runEp(void* data){ printf("線程運行開始\n"); Data r_data=*(Data*)data; struct epoll_event allEvs[MAX]; int pipeFd=*(r_data.fd); //struct epoll_event events[MAX]=r_data.events; int count=epoll_wait(r_data.epfd,allEvs,MAX,5000); for(int i=0;i<count;i++){ if(allEvs[i].data.fd==pipeFd&&(allEvs[i].events&EPOLLIN)){ printf("接收到管道能夠進行讀的信號,開始讀取\n"); char buffer[MAX]; read(pipeFd,buffer,100); printf("讀取的內容是:%s\n",buffer); } } } void testEpoll(){ int epollId=epoll_create(MAX); if(epollId<=0){ throw out_of_range("epoll error"); } int pipFd[2]; int pirRes; if((pirRes=pipe(pipFd))<0){ throw out_of_range("pipe error"); } struct epoll_event event; event.data.fd=pipFd[0];//監聽管道讀端 event.events=EPOLLIN|EPOLLET;//設置參數,接收能夠read()的通知,設置邊緣觸發模式 int epfd=epoll_create(MAX); struct Data data; data.epfd=epfd; data.fd=&pipFd[0]; int res=epoll_ctl(epfd,EPOLL_CTL_ADD,pipFd[0],&event); if(res!=0){ throw out_of_range("pipe error"); } pthread_t tid=12; pthread_create(&tid,NULL,runEp,&data); sleep(2); char str[] = "hello everyone!"; write(pipFd[1], str,strlen(str) ); printf("寫入管道數據完畢\n"); sleep(3); } //運行testEpoll()輸出結果: 線程運行開始 寫入管道數據完畢 接收到管道能夠進行讀的信號,開始讀取 讀取的內容是:hello everyone!
上面瞭解了一下關於管道以及epoll,接下來跟蹤一下Handler的具體源碼來理一下邏輯。首先Looper在初始化的時候會同時初始化一個MessageQueue,在MessageQueue的構造函數以下:
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
對應的native層實如今android_os_MessageQueue.cpp
文件中:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { //初始化一個本地的MessageQueue NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env);//增長引用 return reinterpret_cast<jlong>(nativeMessageQueue);//返回指針地址 }
上述代碼主要相關的爲兩件事情:
在NativeMessageQueue初始化過程以下:
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
這裏在Native層也創建了一個Looper,實際上能夠理解爲Looper.java在Native層的映射,看下構造函數:
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { int wakeFds[2]; int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); mWakeReadPipeFd = wakeFds[0]; mWakeWritePipeFd = wakeFds[1]; ... mIdling = false; // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN;//監聽管道的read()操做 eventItem.data.fd = mWakeReadPipeFd;//記錄管道讀端的fd result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); ... }
這裏的一套在學習epoll的時候已經見識過了,在native層的Looper的構造函數中會去監聽管道讀端的read()操做。
總結一下messagequeue.nativeInit()作的事情:
調用Natvie層代碼在Native初始化一個NativeMessageQueue和Looper,在Looper中會開啓一個匿名管道,由epoll來監聽I/O事件的變化,當管道中有數據的時候,經過epoll通知系統讀取數據。最後返回一個NativeMessageQueue的指針交由Java層的MessageQueue方便下次尋址訪問。
ok,這裏初始化完Java層的Looper,以後會調用Looper.loop()方法,在該方法中會一直取MessageQueue裏面的數據:
public static void loop() { ... for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } ... }
MessageQueue.next()方法以下:
Message next() { //獲取指針地址 final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
這裏能夠看到調用了nativePollOnce(...)
方法進入了native層,對應實現爲:
//`android_os_MessageQueue.cpp` static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
該方法最終調用native層的Looper.pollOnce(...)
:
//Looper.cpp int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { ... result = pollInner(timeoutMillis); } } int Looper::pollInner(int timeoutMillis) { ... // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mIdling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //阻塞等待能夠讀取管道的通知 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mIdling = false; // Acquire lock. mLock.lock(); ... for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken();// } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ... } } Done: ; ... return result; }
關鍵代碼在於awaken()
方法:
void Looper::awoken() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ awoken", this); #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));//能夠看到讀取了管道中的內容 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); }
那麼read(..)方法執行了,哪裏進行write(..)方法的操做呢?答案在於咱們將消息push到MessageQueue中時候,即MessageQueue.enqueueMessages(...)方法中,裏面會執行:
nativeWake(mPtr);
這個最終會調用到native層的Looper中的wake()方法:
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif ssize_t nWrite; do { nWrite = write(mWakeWritePipeFd, "W", 1);//進行了寫操做 } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
Handler在native層主要的邏輯代碼已經瞭解了,那麼總結一下:
引用Gityuan大神的解釋:
在主線程的MessageQueue沒有消息時,便阻塞在loop的queue.next()中的nativePollOnce()方法裏,詳情見Android消息機制1-Handler(Java層),此時主線程會釋放CPU資源進入休眠狀態,直到下個消息到達或者有事務發生,經過往pipe管道寫端寫入數據來喚醒主線程工做。這裏採用的epoll機制,是一種IO多路複用機制,能夠同時監控多個描述符,當某個描述符就緒(讀或寫就緒),則馬上通知相應程序進行讀或寫操做,本質同步I/O,即讀寫是阻塞的。 因此說,主線程大多數時候都是處於休眠狀態,並不會消耗大量CPU資源。