Handler源码解析

当前线程启动消息队列

Thread默认是没有消息循环机制的,而Looper就是在Thread基础上运行一个消息循环,通过Looper.prepare()可以把当前线程初始化为一个Looper thread。一个典型的Looper thread创建示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
class LooperThread extends Thread {
public Handler mHandler;
public void run() {
Looper.prepare();
mHandler = new Handler(Looper.myLooper()) {
public void handleMessage(Message msg) {
//在此处理消息
}
};
Looper.loop();
}
}

上述代码分为三部:1)调用Looper.prepare()初始化Looper;2)创建一个Handler用于与Looper进行交互;3)调用Looper.loop()启动消息循环。下面分别看下这三步是如何实现的:

初始化Looper.prepare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//android-12.1.0_r27\frameworks\base\core\java\android\os\Looper.java
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

public static void prepare() { //prepare是静态方法
prepare(true);
}

private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
//一个线程中Looper.prepare()只能执行一次,否则会抛出异常
throw new RuntimeException("Only one Looper may be created per thread");
}
//构造了一个Looper放到ThreadLocal中
sThreadLocal.set(new Looper(quitAllowed));
}

//Looper的构造方法也很简单:1)创建消息队列;2)获取当先线程句柄;
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}

创建Handler

用户无法直接使用Looper来进行消息发送和处理,而是需要通过一个Handler来实现。使用Handler可以发送和处理Message,或者运行一个Runnable。每个Handler都会绑定一个Looper,Handler将会把message和runnable投递到Looper的消息队列中,然后Looper thread将会执行这些message和runnable。

下图是Handler与Looper、Thread、MessageQueue之间的对应关系,Thread和Looper是一对一的关系,否则会抛出异常,一个Looper只有一个消息队列,但是可以多个Handler对应同一个Looper。

LooperThreadMessageQueueHandler1Handler2Handler3messagemessagemessage

下面看下Handler的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    public Handler(@Nullable Callback callback, boolean async) {
...... //代码省略
mLooper = Looper.myLooper(); //默认使用当前线程关联的looper,也可以通过其他构造函数来指定使用哪个Looper
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue; //获取Looper的消息队列
mCallback = callback;
mAsynchronous = async;
}
//android-12.1.0_r27\frameworks\base\core\java\android\os\Looper.java
public static @Nullable Looper myLooper() {
return sThreadLocal.get(); //从ThreadLocal中获取当前线程关联的Looper
}

启动消息循环

下面代码是一个精简版代码,精简了debug、异常处理等相关逻辑,但基本逻辑不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    public static void loop() {
final Looper me = myLooper();
me.mInLoop = true;
for (;;) { //forever loop
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}

/**
* Poll and deliver single message, return true if the outer loop should continue.
*/
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
Message msg = me.mQueue.next(); // 从消息队列中读取消息,如果消息队列为空将会堵塞等待
if (msg == null) {
// No message indicates that the message queue is quitting.
return false;
}

Object token = null;
try {
//msg.target就是Handler,调用Handler.dispatchMessage进行消息处理
msg.target.dispatchMessage(msg);
} catch (Exception exception) {
} finally {
}

msg.recycleUnchecked();

return true;
}
//android-12.1.0_r27\frameworks\base\core\java\android\os\Handler.java
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) { //callback就是Runnable
handleCallback(msg); //执行Runnable.run()
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg); //处理message消息
}
}

消息发送 Handler.post()

1
2
3
4
5
6
7
8
9
10
//android-12.1.0_r27\frameworks\base\core\java\android\os\Handler.java
public final boolean post(@NonNull Runnable r) {
//1. 通过getPostMessage将Runnable转换为Message,然后调用sendMessageDelayed
return sendMessageDelayed(getPostMessage(r), 0);
}
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain(); //从消息池中获取一个消息,是为了消息的复用,避免重新new一个类
m.callback = r;
return m;
}

上述代码通过post执行一个Runnable,通过getPostMessage把Runnable转换为Message,然后调用sendMessageDelayed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
//获取当前系统时间,加上延时时间就是消息被执行的时间,然后调用sendMessageAtTime
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
......
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this; //Message的target字段指向Handler自身,target就是消息的最终投递目的地
msg.workSourceUid = ThreadLocalWorkSource.getUid();

if (mAsynchronous) {
msg.setAsynchronous(true);
}
//消息插入队列,并指定消息投递时间
return queue.enqueueMessage(msg, uptimeMillis);
}

通过Handler的post来执行任务最终会被加入到Looper的消息队列中,即MessageQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//android-12.1.0_r27\frameworks\base\core\java\android\os\MessageQueue.java
// MessageQueue通过一个单项链表维护Message
boolean enqueueMessage(Message msg, long when) {
...... //代码省略
synchronized (this) {
......
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
//队列为空,或者队列中的消息投递时间比新消息晚,就会把新消息放在队首。
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) { //按照投递时间先后进行排序插入
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}

if (needWake) { //判断是否需要唤醒线程
nativeWake(mPtr);
}
}
return true;
}

nativeWake的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//android-12.1.0_r27\frameworks\base\core\jni\android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
//android-12.1.0_r27\system\core\libutils\Looper.cpp
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif

uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}

从消息队列获取消息

Looper.loop()会通过MessageQueue.next()不断从消息队列获取消息,如果队列为空,线程就会阻塞等待。下面是next()对应的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr; //mPtr是一个long类型,是native层c++类NativeMessageQueue对象的指针
if (ptr == 0) {
return null;
}

int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}

nativePollOnce(ptr, nextPollTimeoutMillis); //阻塞等待,第二个参数是超时时间

synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages; //mMessages是个单项链表,这里是获取链表首位的Message
if (msg != null && msg.target == null) {
//msg.target==null表示这个Message是同步屏障,将会暂停执行队列中的同步消息,
//异步消息不受同步屏障的影响,所以找到下一个异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
//下个消息还没有到执行时间,因此计算出需要阻塞等待的时间,即阻塞等待的超时时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}

// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}

// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}

if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}

IdleHandler

当消息队列中没有消息,消息循环将要去阻塞等待新消息的时候会执行IdleHandler。IdleHandler使用方法如下:

1
2
3
4
5
6
7
8
9
Looper.myLooper().getQueue().addIdleHandler(new MessageQueue.IdleHandler() {
@Override
public boolean queueIdle() {
//do something
return false;
//返回false表示任务只执行一次,任务完成后将会被移除任务;
//返回true表示下次消息循环将要阻塞时还会执行这个任务;
}
});

消息阻塞等待的底层实现

当消息队列中没有消息,或者消息的执行时间还没有到,消息队列的执行线程会进入阻塞等待状态。在MessageQueue.next()中通过调用nativePollOnce方法进行阻塞等待。nativePollOnce()方法定义如下,是一个native函数:

1
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
  • 第一个参数ptr是native层C++ NativeMessageQueue对象的指针。
  • 第二个参数timeoutMillis是阻塞等待超时时间,当达到超时时间后,会自动唤醒。阻塞的超时时间就是下个消息的执行时间减去当前系统时间得到的时间差。如果消息队列为空,则超时时间会传入-1,会一直阻塞,直到新消息插入后唤醒。

nativePollOnce的native实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//android-12.1.0_r27\frameworks\base\core\jni\android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//将java层传下来的ptr参数强转成NativeMessageQueue对象指针
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用NativeMessageQueue#pollOnce方法
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis); //调用native层Looper对象的pollOnce方法
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

native层的Looper函数在system\core\libutils\Looper.cpp文件中实现,native层的Looper#pollOnce方法实现代码很长,这里就不贴代码了,原理简要描述下:

native层的Looper类通过linux eventfdepoll两个机制来完成线程间通信。eventfd是用于事件通知机制的文件描述符,epoll则用于监控eventfd的I/O事件,两者结合可以实现事件的等待和通知。

构造时会通过linux函数epoll_create1创建一个epoll实例,并关联一个读写文件描述符,然后在pollOnce方法中,通过epoll_wait来监听这个文件描述符的I/O事件,收到事件或者超时后就会唤醒,否则线程就处于阻塞等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//android-12.1.0_r27\system\core\libutils\include\utils\Looper.h
//unique_fd是android用于管理文件描述符的类,当离开作用域后文件会被自动关闭,在这里把它等同于文件描述符理解即可
android::base::unique_fd mWakeEventFd; //eventfd文件描述符
android::base::unique_fd mEpollFd; //创建epoll实例后返回的文件描述符

//android-12.1.0_r27\system\core\libutils\Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
//调用eventfd系统函数创建eventfd,函数返回一个文件描述符,然后赋值给mWakeEventFd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));

AutoMutex _l(mLock);
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
...... //代码省略

//epoll_create1方法创建一个epoll实例,返回的文件描述符赋值给mEpollFd
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN; //设置eventfd为可读
eventItem.data.fd = mWakeEventFd.get();
//把eventfd文件描述符添加到epoll的监控列表中
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);

......
}

下面代码实现了堵塞等待,并设置超时事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//system\core\libutils\Looper.cpp文件的Looper::pollInner方法中
int Looper::pollInner(int timeoutMillis) {
.......
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
.......
//因为epoll支持同时监控多个文件描述符,因此循环查找
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
......
}
}
void Looper::awoken() {
......
uint64_t counter;
//从eventfd文件中读取一个int内容
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}

唤醒堵塞等待的线程:

1
2
3
4
5
6
void Looper::wake() {
uint64_t inc = 1;
//向eventfd文件描述符中写入一个int值
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
......
}

同步屏障

同步屏障就是target为null的一个Message。当插入一个同步屏障时,都会伴随着再发送一个异步消息。消息循环的处理中,在遇到同步屏障后,就不会再处理同步消息,异步消息则不受影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//frameworks\base\core\java\android\os\MessageQueue.java
Message next() {
......
for (;;) {
......
nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
......

在UI渲染流程中,在ViewRootImpl#scheduleTraversals是会首先插入同步屏障,然后调用Choreographer发送一个异步消息。这个异步消息中就会执行UI渲染的三大流程。

1
2
3
4
5
6
7
8
9
10
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
notifyRendererOfFramePending();
pokeDrawLockIfNeeded();
}
}