【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )
文章目錄
- 一、MessageQueue 的 Java 層機制
- 二、MessageQueue 的 native 層阻塞機制
- 三、MessageQueue 的 native 層解除阻塞機制
- 三、MessageQueue 的 native 層 JNI 方法動態注冊
- 三、MessageQueue 的 native 層完整代碼 android_os_MessageQueue.cpp
一、MessageQueue 的 Java 層機制
之前在 【Android 異步操作】手寫 Handler ( 消息隊列 MessageQueue | 消息保存到鏈表 | 從鏈表中獲取消息 ) 中 , 模仿 Android 的 MessageQueue 手寫的 MessageQueue , 使用了如下同步機制 ,
從 消息隊列 MessageQueue 中取出 消息 Message ,
如果當前鏈表為空 , 此時會 調用 wait 方法阻塞 , 直到消息入隊時 , 鏈表中有了元素 , 會調用 notify 解除該阻塞 ;
在實際的 Android 中的 消息隊列 MessageQueue 的同步機制 是在 native 層實現 的 ;
在創建 消息隊列 MessageQueue 時 , 調用了 nativeInit() 方法 , 銷毀 MessageQueue 時調用 nativeDestroy 方法 ;
如果調用 next 獲取下一個消息時 , 如果當前消息隊列 MessageQueue 中沒有消息 , 此時需要阻塞 , 調用 nativePollOnce 即可實現在 native 阻塞線程 ;
// 初始化 MessageQueue 時調用的方法 private native static long nativeInit();// 銷毀 MessageQueue 時調用的方法 private native static void nativeDestroy(long ptr);// 線程阻塞方法@UnsupportedAppUsageprivate native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/// 線程喚醒方法 private native static void nativeWake(long ptr);private native static boolean nativeIsPolling(long ptr);private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);// 此處初始化 MessageQueue , 調用了 nativeInit 方法 MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;mPtr = nativeInit();}// 獲取消息隊列中的下一個消息 @UnsupportedAppUsageMessage next() {// Return here if the message loop has already quit and been disposed.// This can happen if the application tries to restart a looper after quit// which is not supported.final long ptr = mPtr;if (ptr == 0) {return null;}int pendingIdleHandlerCount = -1; // -1 only during first iterationint nextPollTimeoutMillis = 0;for (;;) {if (nextPollTimeoutMillis != 0) {Binder.flushPendingCommands();}// 此處阻塞線程 nativePollOnce(ptr, nextPollTimeoutMillis);}}二、MessageQueue 的 native 層阻塞機制
線程阻塞方法 private native void nativePollOnce(long ptr, int timeoutMillis) , 是 native 方法 , 該方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 中實現 ,
從 Java 層傳入 long 類型 , 然后轉為 NativeMessageQueue* 類型指針 ,
該 Java 層傳入的 long 類型是初始化消息隊列時 , 由 nativeInit 方法返回 , 是 消息隊列在 Native 層的指針 ,
之后 NativeMessageQueue 指針調用了其本身的 pollOnce 函數 , 該函數中 , 主要調用了 Looper 的 pollOnce 函數 , mLooper->pollOnce(timeoutMillis) ;
frameworks/base/core/jni/android_os_MessageQueue.cpp 中阻塞相關源碼 :
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;// 這是主要操作 mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;} }static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }參考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
繼續查看 Native 層 Looper.cpp 的 pollOnce 方法 ,
Looper.cpp 源碼路徑是 system/core/libutils/Looper.cpp ,
在該方法中 , 最終調用 Looper.cpp 的 pollInner 方法 ,
在 pollInner 方法中 , 調用了 epoll_wait 方法 , 該方法就是等待方法 , 在該方法中會監聽 mEpollFd 文件句柄 ,
#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);int epoll_pwait(int epfd, struct epoll_event *events,int maxevents, int timeout,const sigset_t *sigmask);參考 : epoll_wait
system/core/libutils/Looper.cpp 中阻塞相關源碼 :
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result = 0;for (;;) {while (mResponseIndex < mResponses.size()) {const Response& response = mResponses.itemAt(mResponseIndex++);int ident = response.request.ident;if (ident >= 0) {int fd = response.request.fd;int events = response.events;void* data = response.request.data; #if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning signalled identifier %d: ""fd=%d, events=0x%x, data=%p",this, ident, fd, events, data); #endifif (outFd != NULL) *outFd = fd;if (outEvents != NULL) *outEvents = events;if (outData != NULL) *outData = data;return ident;}}if (result != 0) { #if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result); #endifif (outFd != NULL) *outFd = 0;if (outEvents != NULL) *outEvents = 0;if (outData != NULL) *outData = NULL;return result;}result = pollInner(timeoutMillis);} }int Looper::pollInner(int timeoutMillis) {// ... int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); }參考 : system/core/libutils/Looper.cpp
三、MessageQueue 的 native 層解除阻塞機制
在 MessageQueue 消息隊列的 Java 層 , 將 Message 消息插入到鏈表表頭后 , 調用了 nativeWake 方法 , 喚醒了線程 , 即解除了阻塞 ;
public final class MessageQueue {boolean enqueueMessage(Message msg, long when) {if (msg.target == null) {throw new IllegalArgumentException("Message must have a target.");}if (msg.isInUse()) {throw new IllegalStateException(msg + " This message is already in use.");}synchronized (this) {if (mQuitting) {IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread");Log.w(TAG, e.getMessage(), e);msg.recycle();return false;}msg.markInUse();msg.when = when;Message p = mMessages;boolean needWake;if (p == null || when == 0 || when < p.when) {// New head, wake up the event queue if blocked.msg.next = p;mMessages = msg;needWake = mBlocked;} else {// Inserted within the middle of the queue. Usually we don't have to wake// up the event queue unless there is a barrier at the head of the queue// and the message is the earliest asynchronous message in the queue.needWake = mBlocked && p.target == null && msg.isAsynchronous();Message prev;for (;;) {prev = p;p = p.next;if (p == null || when < p.when) {break;}if (needWake && p.isAsynchronous()) {needWake = false;}}msg.next = p; // invariant: p == prev.nextprev.next = msg;}// We can assume mPtr != 0 because mQuitting is false.if (needWake) {nativeWake(mPtr);}}return true;} }在 native 層的 frameworks/base/core/jni/android_os_MessageQueue.cpp 實現了上述
Java 層定義的 private native static void nativeWake(long ptr) 方法 ,
注冊 JNI 方法方式是動態注冊 , 注冊的參數如下 , Java 層的 nativeWake 對應的 native 層方法是 android_os_MessageQueue_nativeWake 方法 ,
static const JNINativeMethod gMessageQueueMethods[] = {/* name, signature, funcPtr */{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },{ "nativeSetFileDescriptorEvents", "(JII)V",(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents }, };參考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
下面是 frameworks/base/core/jni/android_os_MessageQueue.cpp 中的相關方法實現 ,
在 android_os_MessageQueue_nativeWake 方法中調用了 本身的 wake 方法 ,
在 wake 方法中調用了 system/core/libutils/Looper.cpp 中的 wake 方法 ;
void NativeMessageQueue::wake() {// 此處調用了 Looper 的 wake 函數 mLooper->wake(); }static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake(); }參考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
查看 system/core/libutils/Looper.cpp 中的 wake 方法 , 在該方法中 ,
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))) 代碼說明 ,
向 mWakeEventFd 文件句柄寫入了數據 ;
void Looper::wake() { #if DEBUG_POLL_AND_WAKEALOGD("%p ~ wake", this); #endifuint64_t inc = 1;ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));if (nWrite != sizeof(uint64_t)) {if (errno != EAGAIN) {LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",mWakeEventFd, strerror(errno));}} }參考 : system/core/libutils/Looper.cpp
阻塞的時候使用的是 mEpollFd 文件句柄 ,
喚醒的時候使用的是 mWakeEventFd 文件句柄 ,
下面分析這兩個文件句柄之間的聯系 ;
Looper 構造函數 , 調用了 rebuildEpollLocked() 方法 ,
在 rebuildEpollLocked 方法 中調用 mEpollFd = epoll_create(EPOLL_SIZE_HINT) , 創建了一個句柄 ,
調用 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem) 注冊事件監聽 ,
注冊 mEpollFd 句柄 , 監聽 mWakeEventFd 句柄的 eventItem 事件 ,
監聽的事件是 eventItem.events = EPOLLIN 事件 ,
該事件代表 , 向 mWakeEventFd 文件句柄寫入數據 , 此時就對應解除 epoll_wait 阻塞 ;
system/core/libutils/Looper.cpp 中 Looper 構造函數 , rebuildEpollLocked 方法 :
Looper::Looper(bool allowNonCallbacks) :mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",strerror(errno));AutoMutex _l(mLock);rebuildEpollLocked(); }void Looper::rebuildEpollLocked() {// Close old epoll instance if we have one.if (mEpollFd >= 0) { #if DEBUG_CALLBACKSALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endifclose(mEpollFd);}// Allocate the new epoll instance and register the wake pipe.// 創建句柄 mEpollFd = epoll_create(EPOLL_SIZE_HINT);LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union// 注冊監聽的事件eventItem.events = EPOLLIN;eventItem.data.fd = mWakeEventFd;// 注冊事件監聽 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",strerror(errno));for (size_t i = 0; i < mRequests.size(); i++) {const Request& request = mRequests.valueAt(i);struct epoll_event eventItem;request.initEventItem(&eventItem);int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",request.fd, strerror(errno));}} }MessageQueue 消息隊列是通過 Linux 的 epoll 機制實現的阻塞 ;
三、MessageQueue 的 native 層 JNI 方法動態注冊
JNI 動態注冊 , 消息隊列 MessageQueue 中的注冊方法 , 使用的是動態注冊 ,
static const JNINativeMethod gMessageQueueMethods[] = {/* name, signature, funcPtr */{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },{ "nativeSetFileDescriptorEvents", "(JII)V",(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents }, };// 動態注冊 JNI 函數 int register_android_os_MessageQueue(JNIEnv* env) {int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,NELEM(gMessageQueueMethods));jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,"dispatchEvents", "(II)I");return res; }詳情參考 : 【Android NDK 開發】JNI 動態注冊 ( 動態注冊流程 | JNI_OnLoad 方法 | JNINativeMethod 結構體 | GetEnv | RegisterNatives )
三、MessageQueue 的 native 層完整代碼 android_os_MessageQueue.cpp
完整的 android_os_MessageQueue.cpp 代碼 :
/** Copyright (C) 2010 The Android Open Source Project** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#define LOG_TAG "MessageQueue-JNI"#include <nativehelper/JNIHelp.h> #include <android_runtime/AndroidRuntime.h>#include <utils/Looper.h> #include <utils/Log.h> #include "android_os_MessageQueue.h"#include "core_jni_helpers.h"namespace android {static struct {jfieldID mPtr; // native object attached to the DVM MessageQueuejmethodID dispatchEvents; } gMessageQueueClassInfo;// Must be kept in sync with the constants in Looper.FileDescriptorCallback static const int CALLBACK_EVENT_INPUT = 1 << 0; static const int CALLBACK_EVENT_OUTPUT = 1 << 1; static const int CALLBACK_EVENT_ERROR = 1 << 2;class NativeMessageQueue : public MessageQueue, public LooperCallback { public:NativeMessageQueue();virtual ~NativeMessageQueue();virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);void wake();void setFileDescriptorEvents(int fd, int events);virtual int handleEvent(int fd, int events, void* data);private:JNIEnv* mPollEnv;jobject mPollObj;jthrowable mExceptionObj; };MessageQueue::MessageQueue() { }MessageQueue::~MessageQueue() { }bool MessageQueue::raiseAndClearException(JNIEnv* env, const char* msg) {if (env->ExceptionCheck()) {jthrowable exceptionObj = env->ExceptionOccurred();env->ExceptionClear();raiseException(env, msg, exceptionObj);env->DeleteLocalRef(exceptionObj);return true;}return false; }NativeMessageQueue::NativeMessageQueue() :mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);} }NativeMessageQueue::~NativeMessageQueue() { }void NativeMessageQueue::raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) {if (exceptionObj) {if (mPollEnv == env) {if (mExceptionObj) {env->DeleteLocalRef(mExceptionObj);}mExceptionObj = jthrowable(env->NewLocalRef(exceptionObj));ALOGE("Exception in MessageQueue callback: %s", msg);jniLogException(env, ANDROID_LOG_ERROR, LOG_TAG, exceptionObj);} else {ALOGE("Exception: %s", msg);jniLogException(env, ANDROID_LOG_ERROR, LOG_TAG, exceptionObj);LOG_ALWAYS_FATAL("raiseException() was called when not in a callback, exiting.");}} }void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {mPollEnv = env;mPollObj = pollObj;mLooper->pollOnce(timeoutMillis);mPollObj = NULL;mPollEnv = NULL;if (mExceptionObj) {env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj = NULL;} }void NativeMessageQueue::wake() {mLooper->wake(); }void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {if (events) {int looperEvents = 0;if (events & CALLBACK_EVENT_INPUT) {looperEvents |= Looper::EVENT_INPUT;}if (events & CALLBACK_EVENT_OUTPUT) {looperEvents |= Looper::EVENT_OUTPUT;}mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,reinterpret_cast<void*>(events));} else {mLooper->removeFd(fd);} }int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {int events = 0;if (looperEvents & Looper::EVENT_INPUT) {events |= CALLBACK_EVENT_INPUT;}if (looperEvents & Looper::EVENT_OUTPUT) {events |= CALLBACK_EVENT_OUTPUT;}if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {events |= CALLBACK_EVENT_ERROR;}int oldWatchedEvents = reinterpret_cast<intptr_t>(data);int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,gMessageQueueClassInfo.dispatchEvents, fd, events);if (!newWatchedEvents) {return 0; // unregister the fd}if (newWatchedEvents != oldWatchedEvents) {setFileDescriptorEvents(fd, newWatchedEvents);}return 1; }// ----------------------------------------------------------------------------sp<MessageQueue> android_os_MessageQueue_getMessageQueue(JNIEnv* env, jobject messageQueueObj) {jlong ptr = env->GetLongField(messageQueueObj, gMessageQueueClassInfo.mPtr);return reinterpret_cast<NativeMessageQueue*>(ptr); }static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return 0;}nativeMessageQueue->incStrong(env);return reinterpret_cast<jlong>(nativeMessageQueue); }static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->decStrong(env); }static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake(); }static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);return nativeMessageQueue->getLooper()->isPolling(); }static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,jlong ptr, jint fd, jint events) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->setFileDescriptorEvents(fd, events); }// ----------------------------------------------------------------------------// 動態注冊 JNI 函數的結構體 // 每個結構體中的元素是 Java 方法名稱 , 方法簽名 , C++ 中的方法指針 static const JNINativeMethod gMessageQueueMethods[] = {/* name, signature, funcPtr */{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },{ "nativeSetFileDescriptorEvents", "(JII)V",(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents }, };// 動態注冊 JNI 函數 int register_android_os_MessageQueue(JNIEnv* env) {int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,NELEM(gMessageQueueMethods));jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,"dispatchEvents", "(II)I");return res; }} // namespace android參考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
總結
以上是生活随笔為你收集整理的【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Android 异步操作】Handle
- 下一篇: 【Android 异步操作】Handle