Android MessageQueue 底层实现
GUI系统几乎都是基于消息循环的,从AWT到Swing, 从Android到iOS,UI开发可以说离不开这个概念了。
这个模型给予事件分发,界面绘制的处理带来了极大的方便,下面从原理的角度分析下Android的消息循环系统是如何实现的。
Android 的消息循环机制有几个个重要的角色Looper,Handler,MessageQueue,Message。 他们的关系如下:
其中最关键的MessageQueue,它可以说是这个机制的核心,也是本文分析的重点,我们下面从MessageQueue的角度看下如何实现消息循环。
首先要准备几个问题:
-
当没有Message的时候,系统是如何工作的?
-
当收到Message时候,系统进行了那些处理?
-
Message是怎么延时执行的(Handler.postDelay)?
下面开始逐一进行分析。
当没有Message的时候,系统是如何工作的?
先回答下如果没有Message,或者Message的执行时间没到(message.when>now),当前持有MessageQueue的线程会进入阻塞状态。 MessageQueue中有两个关键方法,next()和enqueueMessage()。
先说下next(),next()是MessageQueue取出消息的核心,我们看下如何实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } } |
关键点在于调用nativePollOnce和调用时传入的timeout参数。 我们下看下传入的timeout参数的值是什么。
首先就可以看出,第一次调用nativePollOnce时传入的参数是0,这里可以先说下,传0的话nativePollOnce会立刻返回,因为epoll_wait接收0会直接返回,不会阻塞。
然后处理异步消息,MessageQueue中有个同步屏障的概念,如果一个Message的target是null,它就是一个消息屏障,只会执行异步消息,中断所有的普通消息,这个在requestLayout的过程中有用到,以后有时间在分析吧,先了解下。
然后计算timeout时间,timeout时间等于下一个message的时间减当前时间,见这行代码。
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
然后这个时间就是下次nativePollOnce调用的参数了。
如果MessageQueue已经遍历到队尾了,那么这个nextPollTimeoutMills的值就是-1,-1就是在epoll_wait
无限阻塞的意思,直到有监听的fd触发响应,如果传入-1了,那么当前线程就会阻塞,直到有人调用了MessageQueue的enqueueMessage(),稍后分析。
然后看下nativePollOnce的实现。
1 2 3 4 5 6 |
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } |
这里我们画个图看下调用,然后对着代码分析。
我们看下这个NativeMessageQueue是个什么,这个NativeMessageQueue应该是随Java层的MassageQueue一起构造的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
class NativeMessageQueue : public MessageQueue, public LooperCallback { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis); void wake(); void setFileDescriptorEvents(int fd, int events); virtual int handleEvent(int fd, int events, void* data); private: JNIEnv* mPollEnv; jobject mPollObj; jthrowable mExceptionObj; }; |
1 2 3 4 5 6 |
// MessageQueue的构造器创建了一个Native的MessageQueue并保存了指针 MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); } |
1 2 3 4 5 6 7 8 9 10 11 |
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); } |
NativeMessageQueue的时候回获取一个Looper,这个Looper是Native层的Looper
1 2 3 4 5 6 7 8 9 10 11 |
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { // Native层的Looper也是一个线程一个,并且存放在TLS容器中,这里判断,如果没有,就new 一个Looper放入TLS。 mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } } |
我们看下Native层的Looper定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
class Looper : public RefBase { protected: virtual ~Looper(); public: enum { POLL_WAKE = -1, POLL_CALLBACK = -2, POLL_TIMEOUT = -3, POLL_ERROR = -4, }; enum { EVENT_INPUT = 1 << 0, EVENT_OUTPUT = 1 << 1, EVENT_ERROR = 1 << 2, EVENT_HANGUP = 1 << 3, EVENT_INVALID = 1 << 4, }; enum { PREPARE_ALLOW_NON_CALLBACKS = 1<<0 }; Looper(bool allowNonCallbacks); bool getAllowNonCallbacks() const; int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData); inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); } int pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData); inline int pollAll(int timeoutMillis) { return pollAll(timeoutMillis, NULL, NULL, NULL); } void wake(); int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data); int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data); int removeFd(int fd); void sendMessage(const sp<MessageHandler>& handler, const Message& message); void sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler, const Message& message); void sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message); void removeMessages(const sp<MessageHandler>& handler); void removeMessages(const sp<MessageHandler>& handler, int what); bool isPolling() const; static sp<Looper> prepare(int opts); static void setForThread(const sp<Looper>& looper); static sp<Looper> getForThread(); private: struct Request { int fd; int ident; int events; int seq; sp<LooperCallback> callback; void* data; void initEventItem(struct epoll_event* eventItem) const; }; struct Response { int events; Request request; }; struct MessageEnvelope { MessageEnvelope() : uptime(0) { } MessageEnvelope(nsecs_t u, const sp<MessageHandler> h, const Message& m) : uptime(u), handler(h), message(m) { } nsecs_t uptime; sp<MessageHandler> handler; Message message; }; const bool mAllowNonCallbacks; // immutable int mWakeEventFd; // immutable Mutex mLock; Vector<MessageEnvelope> mMessageEnvelopes; // guarded by mLock bool mSendingMessage; // guarded by mLock volatile bool mPolling; int mEpollFd; // guarded by mLock but only modified on the looper thread bool mEpollRebuildRequired; // guarded by mLock KeyedVector<int, Request> mRequests; // guarded by mLock int mNextRequestSeq; Vector<Response> mResponses; size_t mResponseIndex; nsecs_t mNextMessageUptime; // set to LLONG_MAX when none int pollInner(int timeoutMillis); int removeFd(int fd, int seq); void awoken(); void pushResponse(int events, const Request& request); void rebuildEpollLocked(); void scheduleEpollRebuildLocked(); static void initTLSKey(); static void threadDestructor(void *st); static void initEpollEvent(struct epoll_event* eventItem); }; |
这个Looper的定义比较多,我们看下重点内容,这个Looper具有发送Native层消息的功能比如使用方法void sendMessage(const sp<MessageHandler>& handler, const Message& message)
。
还可以使用Looper监听fd,NDK开发中,Android给我们提供了ALooper进行包装,本质其实是一样的,我们稍后介绍。
1 2 3 |
int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data); int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data); |
addFd,removeFd之类的方法就是对fd之类的监听。
到此,我们可以看出类关系如下,有Java层的,有Native层的,这里一并画了。
好,我们继续看NativeMessageQueue的pollOnce方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// 这个方法是核心, env是JNI环境变量,pollObj是Java层的MessageQueue对象 void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } } |
NativeMessageQueue又调用到了Looper的pollOnce方法,可以看出,Native层的MessageQueue最终还是依赖于Native层的Looper。
继续跟踪Looper的pollOnce
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 这个地方的逻辑是如果有其他的fd已经被唤醒,那么直接返回这个ident,ident是注册fd时的标识 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } // 核心还是调用到这里的pollInner result = pollInner(timeoutMillis); } } |
继续跟踪pollInner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
// 还记得这个timeoutMills么,这个是从Java层传递过来的,第一次传入的是0,其他根据下一个Message的时间传入 int Looper::pollInner(int timeoutMillis) { // Adjust the timeout based on when the next message is due. // mNextMessageUptime初始值是LLONG_MAX if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } } // Poll. int result = POLL_WAKE; // 清除mResponses队列 mResponses.clear(); mResponseIndex = 0; // We are about to idle. mPolling = true; // 创建epoll_event,用于接收epoll事件,可以看出,最大接收16个epoll事件 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 开始epoll_wait,这个时候开始进入阻塞状态,如果传入的是0,立刻返回,传入的是-1,无限阻塞直到fd发生变化。 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // Rebuild epoll set if needed. if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error: %s", strerror(errno)); result = POLL_ERROR; goto Done; } // Check for poll timeout. // 超过时间了,但是没有任何的fd响应,需要告诉上层 if (eventCount == 0) { result = POLL_TIMEOUT; goto Done; } // Handle all events. // 处理epoll事件,遍历每个epollItem for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; // 如果这个fd是mWakeEventFd,那么现在的情况是上层告诉我们该唤醒了 // 这个fd类型是eventFd,较早版本的Rom是使用pipe实现的,eventFd相对pipe来说,可以少一个fd(eventfd一个,pipe两个), // 并且内核的实现较为高效 if (fd == mWakeEventFd) { // 如果类型是EPOLLIN,那么处理eventFd,就是一个read操作 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 如果是其他fd响应了,那么将这个fd封装,然后放到mResponses里 ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } // 有可能goto到这 Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; // 这个mMessageEnvelopes里面的数据是sendMessage方法插入的,涉及到其他线程访问,所以需要加锁 while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); // mMessageEnvelopes中存在数据,并且达到了执行时间,那么取出Message执行 if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; // unlock以后,可以再对messageQueue进行操作 mLock.unlock(); handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. // 记录下下次需要唤醒的时间 mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // Invoke all response callbacks. // 处理所有响应的Response for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result; } |
我们可以看到,pollInner真正的调用了epoll_wait去监听fd的变化,如果观察到fd变化或者超时,那么epoll_wait就会返回,这个正好表示线程从阻塞开始唤醒。
epoll_wait之后,处理fd也是有先后顺序的。 我画了个图,这样看起来比较清晰。
从图中可以看出,如果收到fd响应后,会将fd包装到mResponses中,等待执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// 处理epoll事件,遍历每个epollItem for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; // 如果这个fd是mWakeEventFd,那么现在的情况是上层告诉我们该唤醒了 // 这个fd类型是eventFd,较早版本的Rom是使用pipe实现的,eventFd相对pipe来说,可以少一个fd(eventfd一个,pipe两个), // 并且内核的实现较为高效 if (fd == mWakeEventFd) { // 如果类型是EPOLLIN,那么处理eventFd,就是一个read操作 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 如果是其他fd响应了,那么将这个fd封装,然后放到mResponses里 ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } |
然后是先处理的NativeMessage,没有可以处理的NativeMessage后,才会去处理fd。
1 2 3 4 5 6 7 8 9 |
sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; // unlock以后,可以再对messageQueue进行操作 mLock.unlock(); handler->handleMessage(message); |
Message的定义和Java层的非常像
1 2 3 4 5 6 7 8 |
struct Message { Message() : what(0) { } Message(int w) : what(w) { } /* The message type. (interpretation is left up to the handler) */ int what; }; |
最后是处理响应的fd了,这个也很简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } |
其实就是调用了callback->handleEvent
。
pollInner逐层返回就调用到nativePollOnce了,至此nativePollOnce分析完毕。
总结下,MessageQueue在next()中如果下个消息的时间没到,或者没有消息了,就会进入阻塞状态,阻塞状态根据传入的timeout决定时间,可能是具体的间隔时间或者是-1(永远阻塞)。
当收到Message时候,系统进行了那些处理?
假设当前线程已经阻塞了,这个时候,别的线程往MessageQueue中放入了消息,会调用到MessageQueue的enqueueMessage(),这个时候可能会把MessageQueue给唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; // 没有消息,或者需要执行的时间小于当前Message的时间 if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; // 如果已经block的话,需要唤醒 needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { // 唤醒 nativeWake(mPtr); } } return true; } |
可以看出,如果入队时MessageQueue没有消息,或者需要执行的时间小于当前Message的时间,那么可能需要唤醒。
是根据’mBlocked’决定的,mBlocked
是在next()
中进行赋值,如果没有可以执行的Message和IdleHandler,那么mBlocked
就是true,enqueueMessage()
时需要将线程唤醒。
我们看下nativeWake()
的实现
1 2 3 4 5 |
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } |
jni层只是包装,核心还是NativeMessageQueue的wake(),NativeMessageQueue的wake会直接调用Looper的wake()
1 2 3 4 5 6 7 8 9 10 11 |
void Looper::wake() { uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s", mWakeEventFd, strerror(errno)); } } } |
可以看出,只是向mWakeEventFd中写入了一个1,然后就唤醒了。
其实这是利用了epoll机制,epoll监听了这个mWakeEventFd
类型,如果mWakeEventFd
变得可读时,epoll_wait
会立即返回,当前线程就被唤醒处理消息了。
mWakeEventFd
是一个eventFd类型文件,pollOnce时,eventFd是无法读取的,只有等到别的线程或者进程往fd里写入数据,eventFd才可以读取,这个特性正好和epoll_wait结合起来,实现了一套阻塞唤醒机制。
epoll监听mWakeEventFd的逻辑在Looper的构造方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", strerror(errno)); // 加锁 AutoMutex _l(mLock); // 初始化Epoll rebuildEpollLocked(); } // 监听epoll的实现 void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; // 将mWakeEvent用epoll进行监听 eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); // 将所有的Request用epoll监听 for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } } |
Message是怎么延时执行的(Handler.postDelay)
这个问题分析到现在已经很简单了,Handler在postDelay的时候会用当前时间加上delay的时间作为Message的when字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } // Handler的poseDelay最终调用到这里 // updateMillis的值是当前时间加上delay时间 public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } |
然后在enqueueMessage中将该Message插入到何时的位置,如果需要唤醒,则唤醒。其实就是一个简单的遍历链表,找地方插入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// MessageQueue的enqueueMessage中 for (;;) { prev = p; p = p.next; // 根据时间,找到何时的位置,MessageQueue的Message是根据执行时间排序的 if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } // 更新指针 msg.next = p; // invariant: p == prev.next prev.next = msg; |
然后MessageQueue中有了该Message(已排好顺序),Looper会一直取Message,等待执行就可以了。