《深入理解Android:卷III A》一一2.3心系两界的MessageQueue
本節書摘來華章計算機出版社《深入理解Android:卷III A》一書中的第2章,第2.3節,作者:張大偉 更多章節內容可以訪問云棲社區“華章計算機”公眾號查看。1
2.3心系兩界的MessageQueue
卷I第5章介紹過,MessageQueue類封裝了與消息隊列有關的操作。在一個以消息驅動的系統中,最重要的兩部分就是消息隊列和消息處理循環。在Andrid 2.3以前,只有Java世界的居民有資格向MessageQueue中添加消息以驅動Java世界的正常運轉,但從Android 2.3開始,MessageQueue的核心部分下移至Native層,讓Native世界的居民也能利用消息循環來處理他們所在世界的事情。因此現在的MessageQueue心系Native和Java兩界。
2.3.1MessageQueue的創建
現在來分析MessageQueue是如何跨界工作的,其代碼如下:
[MessageQueue.java-->MessageQueue.MessageQueue()]
MessageQueue() {
}
nativeInit()方法的真正實現為android_os_MessageQueue_nativeInit()函數,其代碼如下:
[android_os_MessageQueue.cpp-->android_os_MessageQueue_nativeInit()]
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
}
nativeInit函數在Native層創建了一個與MessageQueue對應的NativeMessageQueue對象,其構造函數如下:
[android_os_MessageQueue.cpp-->NativeMessageQueue::NativeMessageQueue()]
NativeMessageQueue::NativeMessageQueue() {
}
Native的Looper是Native世界中參與消息循環的一位重要角色。雖然它的類名和Java層的Looper類一樣,但此二者其實并無任何關系。這一點以后還將詳細分析。
2.3.2提取消息
當一切準備就緒后,Java層的消息循環處理,也就是Looper會在一個循環中提取并處理消息。消息的提取就是調用MessageQueue的next()方法。當消息隊列為空時,next就會阻塞。MessageQueue同時支持Java層和Native層的事件,那么其next()方法該怎么實現呢?具體代碼如下:
[MessagQueue.java-->MessageQueue.next()]
final Message next() {
}
看到這里,可能會有人覺得這個MessageQueue很簡單,不就是從以前在Java層的wait變成現在Native層的wait了嗎?但是事情本質比表象要復雜得多。請思考下面的情況:
在nativePollOnce()返回后,next()方法將從mMessages中提取一個消息。也就是說,要讓nativePollOnce()返回,至少要添加一個消息到消息隊列,否則nativePollOnce()不過是做了一次無用功罷了。
如果nativePollOnce()將在Native層等待,就表明Native層也可以投遞Message,但是從Message類的實現代碼上看,該類和Native層沒有建立任何關系。那么nativePollOnce()在等待什么呢?
對于上面的問題,相信有些讀者心中已有了答案:nativePollOnce()不僅在等待Java層來的Message,實際上在Native層還做了大量工作。
下面我們來分析Java層投遞Message并觸發nativePollOnce工作的正常流程。
MessageQueue的enqueueMessage函數完成將一個Message投遞到MessageQueue中的工作,其代碼如下:
[MesssageQueue.java-->MessageQueue.enqueueMessage()]
final boolean enqueueMessage(Message msg, long when) {
}
上面的代碼比較簡單,主要功能是:
將message按執行時間排序,并加入消息隊列。
根據情況調用nativeWake函數,以觸發nativePollOnce函數,結束等待。
雖然代碼簡單,但是對于那些不熟悉多線程的讀者,還是要細細品味一下mBlocked值的作用。我們常說細節體現美,代碼也一樣,這個小小的mBlocked正是如此。
nativeWake函數的代碼如下所示:
[android_os_MessageQueue.cpp-->android_os_MessageQueue_nativeWake()]
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj,
{
NativeMessageQueue* nativeMessageQueue = // 取出NativeMessageQueue對象reinterpret_cast<NativeMessageQueue*>(ptr); return nativeMessageQueue->wake(); // 調用它的wake函數}
[android_os_MessageQueue.cpp-->NativeMessageQueue::wake()]
void NativeMessageQueue::wake() {
}
Native Looper的wake函數代碼如下:
[Looper.cpp-->Looper::wake()]
void Looper::wake() {
}
Wake()函數則更為簡單,僅僅向管道的寫端寫入一個字符“W”,這樣管道的讀端就會因為有數據可讀而從等待狀態中醒來。
2.3.3nativePollOnce函數分析
nativePollOnce()的實現函數是android_os_MessageQueue_nativePollOnce,代碼如下:
[android_os_MessageQueue.cpp-->android_os_MessageQueue_nativePollOnce()]
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
}
分析pollOnce函數:
[android_os_MessageQueue.cpp-->NativeMessageQueue::pollOnece()]
void NativeMessageQueue::pollOnce(int timeoutMillis) {
}
Looper的pollOnce函數如下:
[Looper.cpp-->Looper::pollOnce()]
inline int pollOnce(int timeoutMillis) {
}
上面的函數將調用另外一個有4個參數的pollOnce函數,這個函數的原型如下:
int pollOnce(int timeoutMillis, int outFd, int outEvents, void** outData)
其中:
timeOutMillis參數為超時等待時間。如果為-1,則表示無限等待,直到有事件發生為止。如果值為0,則無須等待立即返回。
outFd用來存儲發生事件的那個文件描述符。
outEvents用來存儲在該文件描述符1上發生了哪些事件,目前支持可讀、可寫、錯誤和中斷4個事件。這4個事件其實是從epoll事件轉化而來。后面我們會介紹大名鼎鼎的epoll。
outData用于存儲上下文數據,這個上下文數據是由用戶在添加監聽句柄時傳遞的,它的作用和pthread_create函數最后一個參數param一樣,用來傳遞用戶自定義的數據。
另外,pollOnce函數的返回值也具有特殊的意義,具體如下:
當返回值為ALOOPER_POLL_WAKE時,表示這次返回是由wake函數觸發的,也就是管道寫端的那次寫事件觸發的。
返回值為ALOOPER_POLL_TIMEOUT表示等待超時。
返回值為ALOOPER_POLL_ERROR,表示等待過程中發生錯誤。
返回值為ALOOPER_POLL_CALLBACK,表示某個被監聽的句柄因某種原因被觸發。
這時,outFd參數用于存儲發生事件的文件句柄,outEvents用于存儲所發生的事件。
上面這些知識是和epoll息息相關的。
查看Looper的代碼會發現,Looper采用了編譯選項(即#if和#else)來控制是否使用epoll作為I/O復用的控制中樞。鑒于現在大多數系統都支持epoll,這里僅討論使用epoll的情況。
epoll機制提供了Linux平臺上最高效的I/O復用機制,因此有必要介紹一下它的基礎知識。
從調用方法上看,epoll的用法和select/poll非常類似,其主要作用就是I/O復用,即在一個地方等待多個文件句柄的I/O事件。
下面通過一個簡單例子來分析epoll的工作流程。
/* ① 使用epoll前,需要先通過epoll_create函數創建一個epoll句柄。
下面一行代碼中的10表示該epoll句柄初次創建時候分配能容納10個fd相關信息的緩存。
對于2.6.8版本以后的內核,該值沒有實際作用,這里可以忽略。其實這個值的主要目的是確定分配一塊多
大的緩存。現在的內核都支持動態拓展這塊緩存,所以該值就沒有意義了 */
int epollHandle = epoll_create(10);
/* ② 得到epoll句柄后,下一步就是通過epoll_ctl把需要監聽的文件句柄加入epoll句柄中。除了
指定文件句柄本身的fd值外,同時還需要指定在該fd上等待什么事件。epoll支持4類事件,分別是
EPOLLIN(句柄可讀)、EPOLLOUT(句柄可寫)、EPOLLERR(句柄錯誤)和EPOLLHUP(句柄斷)。
epoll定義了一個結構體struct epoll_event來表達監聽句柄的訴求。假設現在有一個監聽端的socket
句柄listener,要把它加入epoll句柄中 */
struct epoll_event listenEvent; //先定義一個event
/* EPOLLIN表示可讀事件,EPOLLOUT表示可寫事件,另外還有EPOLLERR,EPOLLHUP表示系統默認會將
EPOLLERR加入事件集合中 */
listenEvent.events = EPOLLIN;// 指定該句柄的可讀事件
// epoll_event中有一個聯合體叫data,用來存儲上下文數據,本例的上下文數據就是句柄自己listenEvent.
data.fd = listenEvent;
/* ③ EPOLL_CTL_ADD將監聽fd和監聽事件加入epoll句柄的等待隊列中;
EPOLL_CTL_DEL將監聽fd從epoll句柄中移除;EPOLL_CTL_MOD修改監聽fd的監聽事件,例如本來只等待可讀事件,現在需要同時等待可寫事件,那么修改listenEvent.events 為EPOLLIN|EPOLLOUT后,再傳給epoll句柄 */epoll_ctl(epollHandle,EPOLL_CTL_ADD, listener,&listenEvent);
/* 當把所有感興趣的fd都加入epoll句柄后,就可以開始坐等感興趣的事情發生了。為了接收所發生的事
情,先定義一個epoll_event數組 */
struct epoll_event resultEvents[10];
int timeout = -1;
while(1) {
}
epoll整體使用流程如上面代碼所示,基本和select/poll類似,不過作為Linux平臺最高效的I/O復用機制,這里有些內容供讀者參考。
epoll的效率為什么會比select高?其中一個原因是調用方法。每次調用select時,都需要把感興趣的事件復制到內核中,而epoll只在epll_ctl進行加入的時候復制一次。另外,epoll內部用于保存事件的數據結構使用的是紅黑樹,查找速度很快。而select采用數組保存信息,不但一次能等待的句柄個數有限,并且查找起來速度很慢。當然,在只等待少量文件句柄時,select和epoll效率相差不是很多,但還是推薦使用epoll。
epoll等待的事件有兩種觸發條件,一個是水平觸發(EPOLLLEVEL),另外一個是邊緣觸發(EPOLLET,ET為Edge Trigger之意),這兩種觸發條件的區別非常重要。讀者可通過man epoll查閱系統提供的更為詳細的epoll機制。
最后,關于pipe,還想提出一個小問題供讀者思考討論:
為什么Android中使用pipe作為線程間通信的方式?對于pipe的寫端寫入的數據,讀端都不感興趣,只是為了簡單喚醒。POSIX不是也有線程間同步函數嗎?為什么要用pipe呢?
關于這個問題的答案,可參見鄧凡平的一篇博文《隨筆之如何實現一個線程池》。網址為http://www.cnblogs.com/innost/archive/2011/11/24/2261454.html。
下面分析帶4個參數的pollOnce()函數,代碼如下:
[Looper.cpp-->Looper::pollOnce()]
int Looper::pollOnce(int timeoutMillis, int outFd, int outEvents,
void** outData) {
}
初看上面的代碼,可能會讓人有些丈二和尚摸不著頭腦。但是把pollInner()函數分析完畢,大家就會明白很多。pollInner()函數非常長,把用于調試和統計的代碼去掉,結果如下:
[Looper.cpp-->Looper::pollInner()]
int Looper::pollInner(int timeoutMillis) {
ifdef LOOPER_USES_EPOLL // 只討論使用epoll進行I/O復用的方式
struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 調用epoll_wait,等待感興趣的事件或超時發生 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS,timeoutMillis);else
......//使用別的方式進行I/O復用endif
//從epoll_wait返回,這時候一定發生了什么事情 mLock.lock(); if (eventCount < 0) { //返回值小于零,表示發生錯誤if (errno == EINTR) {goto Done;}//設置result為ALLOPER_POLL_ERROR,并跳轉到Doneresult = ALOOPER_POLL_ERROR;goto Done; }//eventCount為零,表示發生超時,因此直接跳轉到Done if (eventCount == 0) {result = ALOOPER_POLL_TIMEOUT;goto Done; }ifdef LOOPER_USES_EPOLL
// 根據epoll的用法,此時的eventCount表示發生事件的個數 for (int i = 0; i < eventCount; i++) {int fd = eventItems[i].data.fd;uint32_t epollEvents = eventItems[i].events;/* 之前通過pipe函數創建過兩個fd,這里根據fd知道是管道讀端有可讀事件。還記得對nativeWake函數的分析嗎?在那里我們向管道寫端寫了一個“W”字符,這樣就能觸發管道讀端從epoll_wait函數返回了 */if (fd == mWakeReadPipeFd) {if (epollEvents & EPOLLIN) {// awoken函數直接讀取并清空管道數據,讀者可自行研究該函數awoken();} ......} else {/* mRequests和前面的mResponse相對應,它也是一個KeyedVector,其中存儲了fd和對應的Request結構體,該結構體封裝了和監控文件句柄相關的一些上下文信息,例如回調函數等。我們在后面的小節會再次介紹該結構體 */ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex >= 0) {int events = 0;// 將epoll返回的事件轉換成上層LOOPER使用的事件if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;// 每處理一個Request,就相應構造一個ResponsepushResponse(events, mRequests.valueAt(requestIndex));} ......} }Done: ;
else
......endif
// 除了處理Request外,還處理Native的Message mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <= now) {{ sp<MessageHandler> handler = messageEnvelope.handler;Message message = messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage = true;mLock.unlock(); /* 調用Native的handler處理Native的Message從這里也可看出Native Message和Java層的Message沒有什么關系 */handler->handleMessage(message);} mLock.lock();mSendingMessage = false;result = ALOOPER_POLL_CALLBACK;} else {mNextMessageUptime = messageEnvelope.uptime;break;} }mLock.unlock(); // 處理那些帶回調函數的Response for (size_t i = 0; i < mResponses.size(); i++) {const Response& response = mResponses.itemAt(i);ALooper_callbackFunc callback = response.request.callback;if (callback) {// 有了回調函數,就能知道如何處理所發生的事情了int fd = response.request.fd;int events = response.events;void* data = response.request.data;// 調用回調函數處理所發生的事件int callbackResult = callback(fd, events, data);if (callbackResult == 0) {// callback函數的返回值很重要,如果為0,表明不需要再次監視該文件句柄removeFd(fd);}result = ALOOPER_POLL_CALLBACK;} } return result;}
看完代碼了,是否還有點模糊?那么,回顧一下pollInner函數的幾個關鍵點:
首先需要計算一下真正需要等待的時間。
調用epoll_wait函數等待。
epoll_wait函數返回,這時候可能有三種情況:
發生錯誤,則跳轉到Done處。
超時,這時候也跳轉到Done處。
epoll_wait監測到某些文件句柄上有事件發生。
假設epoll_wait因為文件句柄有事件而返回,此時需要根據文件句柄來分別處理:
如果是管道讀端有事件,則認為是控制命令,可以直接讀取管道中的數據。
如果是其他fd發生事件,則根據Request構造Response,并push到Response數組中。
真正開始處理事件是在有Done標志的位置。
首先處理Native的Message。調用Native Handler的handleMessage處理該Message。
處理Response數組中那些帶有callback的事件。
上面的處理流程還是比較清晰的,但還是有一個攔路虎,那就是mRequests,下面就來清剿這個攔路虎。
添加監控請求其實就是調用epoll_ctl增加文件句柄。下面通過從Native的Activity找到的一個例子來分析mRequests。
[android_app_NativeActivity.cpp-->loadNativeCode_native()]
static jint
loadNativeCode_native(JNIEnv* env, jobject clazz, jstring path,
{
...... /* 調用Looper的addFd函數。第一個參數表示監聽的fd;第二個參數0表示ident;第三個參數表示需要監聽的事件,這里為只監聽可讀事件;第四個參數為回調函數,當該fd發生指定事件時,looper將回調該函數;第五個參數code為回調函數的參數 */ code->looper->addFd(code->mainWorkRead, 0, ALOOPER_EVENT_INPUT, mainWorkCallback, code);......
}
Looper的addFd()代碼如下所示:
[Looper.cpp-->Looper::addFd()]
int Looper::addFd(int fd, int ident, int events,
ifdef LOOPER_USES_EPOLL
int epollEvents = 0; // 將用戶的事件轉換成epoll使用的值 if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { AutoMutex _l(mLock);Request request; // 創建一個Request對象request.fd = fd; // 保存fdrequest.ident = ident; // 保存idrequest.callback = callback; //保存 callbackrequest.data = data; // 保存用戶自定義數據struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); eventItem.events = epollEvents;eventItem.data.fd = fd;// 判斷該Request是否已經存在,mRequests以fd作為key值ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex < 0) {// 如果是新的文件句柄,則需要為epoll增加該fdint epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);......// 保存Request到mRequests鍵值數組mRequests.add(fd, request);} else {// 如果之前加過,那么修改該監聽句柄的一些信息int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);......mRequests.replaceValueAt(requestIndex, request);} }else
......endif
return 1;}
我們發現在pollInner()函數中,當某個監控fd上發生事件后,就會把對應的Request取出來調用。
pushResponse(events, mRequests.itemAt(i));
此函數如下:
[Looper.cpp-->Looper::pushResponse()]
void Looper::pushResponse(int events, const Request& request) {
}
根據前面的知識可知,并不是單獨處理Request,而是需要先收集Request,等到Native Message消息處理完之后再做處理。這表明,在處理邏輯上,Native Message的優先級高于監控fd的優先級。
下面來了解如何添加Native的Message。
Android 2.2中只有Java層才可以通過sendMessage()往MessageQueue中添加消息,從4.0開始,Native層也支持sendMessage()。sendMessage()的代碼如下:
[Looper.cpp-->Looper::sendMessage()]
void Looper::sendMessage(const sp& handler,
}
[Looper.java-->Looper::sendMessageAtTime()]
void Looper::sendMessageAtTime(nsecs_t uptime,
size_t i = 0;
{AutoMutex _l(mLock);size_t messageCount = mMessageEnvelopes.size();// 按時間排序,將消息插入正確的位置while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {i += 1;}MessageEnvelope messageEnvelope(uptime, handler, message);mMessageEnvelopes.insertAt(messageEnvelope, i, 1);// mSendingMessage和Java層中的那個mBlocked一樣,是一個小小的優化措施if (mSendingMessage) {return;} } // 喚醒epoll_wait,讓它處理消息 if (i == 0) {wake(); }}
2.3.4MessageQueue總結
想不到,一個小小的MessageQueue竟然有如此多的內容。在后面分析Android輸入系統時,會再次在Native層和MessageQueue碰面,這里僅是為后面的相會打下一定的基礎。
現在將站在一個比具體代碼更高的層次來認識一下MessageQueue及其伙伴。
MessageQueue只是消息處理大家族的一員,該家族的成員合照如圖2-5所示。
結合前述內容可從圖2-5中得到:
Java層提供了Looper類和MessageQueue類,其中Looper類提供循環處理消息的機制,MessageQueue類提供一個消息隊列,以及插入、刪除和提取消息的函數接口。另外,Handler也是在Java層常用的與消息處理相關的類。
圖 2-5消息處理的家族合照
MessageQueue內部通過mPtr變量保存一個Native層的NativeMessageQueue對象,mMessages保存來自Java層的Message消息。
NativeMessageQueue保存一個Native層的Looper對象,該Looper從ALooper派生,提供pollOnce和addFd等函數。
Java層有Message類和Handler類,而Native層對應也有Message類和MessageHandler抽象類。在編碼時,一般使用的是MessageHandler的派生類WeakMessageHandler。
在include/media/stagfright/foundation目錄下也定義了一個ALooper類,它是供stagefright使用的類似Java消息循環的一套基礎類。這種同名類的產生,估計是兩個事先未做交流的團隊的人編寫的。
MessageQueue核心邏輯下移到Native層后,極大地拓展了消息處理的范圍,總結后有以下幾點:
MessageQueue繼續支持來自Java層的Message消息,也就是早期的Message加Handler的處理方式。
MessageQueue在Native層的代表NativeMessageQueue支持來自Native層的Message,是通過Native層的Message和MessageHandler來處理的。
NativeMessageQueue還處理通過addFd添加的Request。在后面分析輸入系統時,還會大量碰到這種方式。
從處理邏輯上看,先是Native的Message,然后是Native的Request,最后才是Java的Message。
總結
以上是生活随笔為你收集整理的《深入理解Android:卷III A》一一2.3心系两界的MessageQueue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《ArcGIS Engine 地理信息系
- 下一篇: 《Linux防火墙(第4版)》——1.3