Android消息处理机制

Android消息处理机制

概览

Android应用程序的每一个线程在启动时,都可以在内部创建一个消息队列,当然也有不存在消息队列的纯任务型线程

一般是根据是否有界面操作,把线程分为三种:

  1. 应用程序的主线程,Android把UI界面操作都放在这里 ActivityThread
  2. 与界面无关的纯任务线程 HandlerThread
  3. 与界面相关的子线程,AsyncTask

Android的消息机制主要是通过三个类来实现:MessageQueue、Looper、Handler

MessageQueue是消息队列实体,

Looper用来创建消息队列、进入消息循环

Handler用来发送消息和处理消息

不带消息循环的线程,即纯任务型线程的生命周期就是从任务开始执行到任务执行结束

带消息循环的线程,其生命周期划分为创建消息队列和进入消息循环两个阶段

下面就先分析创建和进入消息循环的两个阶段

创建消息队列

先看一下Looper的几个关键数据

1
2
3
4
5
6
7
8
9
public final class Looper {
//线程局部变量区域
// sThreadLocal.get() will return null unless you've called prepare().
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
//主线程的Looper对象
private static Looper sMainLooper; // guarded by Looper.class

final MessageQueue mQueue;
final Thread mThread;

静态成员sThreadLocal是线程局部存储,每个线程在这个变量中存储的Looper是独立的,即每个线程的sThreadLocal里存的都是自己的Looper

但是静态成员sMainLooper只保存主线程的Looper,设置这个变量就是为了让其他子线程都能够很轻易的获得主线程的Looper

Java层Looper的静态函数prepare函数似乎所有线程都可以访问的接口,就是创建属于当前线程的Looper并存放到threadlocal里面,myLooper是相反的接口,是提供给所有的子线程,从threadLocal里取出属于自己子线程的looper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

public static void prepare() {
prepare(true);
}

private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}

//myLooper就返回的是跟当前线程关联的Looper
/**
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}

prepareMainLooper只能由主线程调用,其内部还是调用的prepare接口,只不过将其保存到了sMainLooper中

再继续分析Looper的创建过程

1
2
3
4
5
6
//构造函数
private Looper(boolean quitAllowed) {
//创建一个MessageQueue
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}

可以看到构造函数创建了MessageQueue对象,同时将当前线程保存到了mThread中

再来跟踪MessageQueue创建的具体过程,内部还是调用nativeInit来完成初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    private long mPtr; // used by native code
//真正的消息队列
Message mMessages;

MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//调用更深层次的nativeInit来完成初始化
//mPtr指向native层创建的NativeMessageQueue
mPtr = nativeInit();
}


//对应Java层的nativeInit
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}

nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}

nativeInit创建了一个nativeMessageQueue对象,同时返回了指针,所以mPtr里保存的就是nativeMessageQueue对象的地址,

1
2
3
4
5
6
7
8
9
10
11
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//先检查是否已经为当前线程创建过一个C++层的Looper对象
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//如果还没创建就创建一个Looper对象
mLooper = new Looper(false);
//跟当前线程关联起来
Looper::setForThread(mLooper);
}
}

NativeMessageQueue的构造函数首先获得当前线程的Looper,这个Looper是C++层的Looper,所以正常流程到这里是还没创建的,保存到mLooper中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
//早期版本用的pipe管道,现在好像改用eventfd了
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));

AutoMutex _l(mLock);
rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}

// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);

这里创建了eventfd,后续感知消息队列的变化就通过这个fd,rebuildEpollLocked就是将fd加到epoll中监控

这几个类的关系就是

image-20220917162544730

进入消息循环

调用Looper的loop函数进入消息循环,依次获取当前线程的looper对象和消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//消息循环
/**
* Run the message queue in this thread. Be sure to call
* {@link #quit()} to end the loop.
*/
public static void loop() {
final Looper me = myLooper();
//获取当前线程的消息队列
final MessageQueue queue = me.mQueue;
for (;;) {
//不停检查这个消息队列中是否有新的消息需要处理
//如果有新消息就会返回,如果没有新消息就会进入睡眠等待状态
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}

queue的next方法就是取出新消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
//保存注册到消息队列中的空闲消息处理器
//如果没有消息需要处理,线程不会马上进入睡眠等待状态,而是先调用空闲消息处理器
int pendingIdleHandlerCount = -1; // -1 only during first iteration
//没有消息需要处理时,当前线程需要进入睡眠的等待时间
//等于-1时会无限地处于睡眠等待状态
int nextPollTimeoutMillis = 0;
for (;;) {
//如果nextPollTimeoutMillis等待时间不为0,说明当前线程等会会进入休眠状态,
//就处理那些正在等待的Binder进程间通信,避免长时间得不到处理
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//检查当前消息队列中是否有新消息需要处理
//如果有消息需要处理mMessage就不会为Null
//如果没有新消息需要处理,就需要等待nextPollTimeoutMillis的时间
nativePollOnce(ptr, nextPollTimeoutMillis);


synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
//还没到处理的时候,设置一个等待时间
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//到了mMessage该被处理的时候了,返回msg
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}

这里nativePollOnce是来判断是否有新消息需要处理,如果有消息就会继续往下执行,否则就会卡在这里睡眠等待。

从nativePollOnce出来之后,mMessages存放的是当前需要处理的消息

Message是以链表的形式,按处理实现从小到大的顺序排列在消息队列中的,msg.next -> msg.next

首先判断当前消息是否到了要被处理的时间,如果到了,就返回message,(now>msg.when),如果还没到就计算一下还需要等待的时间,下一次循环nativePollOnce就会睡眠等待。

前面是有消息的情况,如果没有消息要处理,就给nextPollTimeoutMillis赋值-1,表示等待时间为无限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        //处理销毁的消息
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}

// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
//空闲时间,获取当前线程的消息队列中空闲消息处理器的个数
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
//没有空闲消息处理器注册到当前线程的消息队列中
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
//把当前现成的空闲消息处理器拷贝到mPendingIdleHandlers中
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

//依次调用每一个空闲消息处理器的queueIdle来接受一个线程空闲消息
//每一次next只会调用一次
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
//如果queueIdle返回false,就从mIdleHandlers中移除,就再也不会接收到线程空闲消息了
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}

我们可以注册空闲消息处理器到消息队列中,像这后半段,就是依次调用每个空闲消息处理器的queueIdle,如果queueIdle返回false的话,空闲消息队列就会被移除

判断新消息

回头看一下nativePollOnce是怎么判断是否有新消息存在的,

就是从Messagequeue -> nativeMessageQueue -> Looper

然后通过epoll判断是否有eventfd是否被写了,没有的话就会在这里等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/


//检查当前线程是否有新消息需要处理
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

//找到NativeMessageQueue里的C++层的looper,looper里面包的是epoll
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
//mLooper是C++层的一个Looper对象,
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}


int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;

……………………
//不停地循环调用pollInner来检查是否有新的消息需要处理,如果有,result就不会等于0
result = pollInner(timeoutMillis);




int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE

………………
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//监听注册在前面创建的epoll实例,如果其中的文件描述符都没有发生IO读写,就会睡眠
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

// Handle all events.
//从epoll_wait返回回来之后,循环检查哪一个文件描述符发生了读写
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
//EPOLLIN发生了写入新数据的事件
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}

消息发送过程

Android系统通过Handler类来向线程的消息队列发送消息

handler的构造函数如下,需要给一个Callback参数保存到mcallback,这里可以看到Handler是关联到looper的,也即handler发送到的目标messageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
//获取当前线程的Looper对象
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
//获取当前线程的mQueue对象
mQueue = mLooper.mQueue;
//handler的Callback
mCallback = callback;
mAsynchronous = async;
}

发送消息用的是sendMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean sendMessage(Message msg)
{
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}

Delay 和AtTime都是封装的时间,最终是调用目标MessageQueue的enqueueMessage,注意,这里设置了target为this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}

synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}

msg.markInUse();
msg.when = when;
//取出队列的头消息
Message p = mMessages;
//需不需要唤醒线程
boolean needWake;
//插入消息队列的头
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
//插入头之后,如果mBLocked即当前线程正在阻塞就需要唤醒,没有的话就不用唤醒
needWake = mBlocked;
//插在消息队列的中间
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
//插入中间就不用唤醒
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}

// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
//mPtr指向C++层的NativeMessageQueue
nativeWake(mPtr);
}
}
return true;
}

因为消息队列中的消息是按照处理时间从小到大排序的,所以需要判断插入的位置,当消息被插入到头部时,说明需要处理的消息的时间被提前了,那就得需要唤醒目标线程

这里可以看到message的插入就没经过C++层,是直接Java层面上就已经插入了消息队列中,C++的epoll只负责线程的睡眠和唤醒

看一下nativeWake

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private native static void nativeWake(long ptr);
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
//mLooper指向一个C++层的Looper对象,
mLooper->wake();
}


//唤醒looper
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif

uint64_t inc = 1;
//唤醒就是往mwakeEventfd里面写一个uint64
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
mWakeEventFd, strerror(errno));
}
}
}

唤醒looper实际就是往eventfd随便写入一个uint64,然后那边epoll就知道有写入事件发生了

消息处理过程

回到Looper.loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}

// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}

final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;

final long traceTag = me.mTraceTag;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
final long end;
try {
//msg.target指向的是一个Handler对象,这里是把消息派发给Handler进行处理
msg.target.dispatchMessage(msg);
end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}

这里派发消息是通过handler调用dispatchMessage,相当于通过handler发送消息,折返回来又通过handler处理消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Handle system messages here.
*/
//派发消息
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
//如果要处理的消息在发送时指定了回调接口,就用指定的回调函数
handleCallback(msg);
} else {
//如果handler有回调函数,就用handler的回调函数
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}

派发消息,如果消息自身在发送时已经指定了一个回调接口,那么就调用handleCallback来处理这个消息

如果Handler自身在创建时指定了一个Callback,就调用这个回调函数

都不满足的话,就调用handlerMessage来处理

handleCallback

1
2
3
private static void handleCallback(Message message) {
message.callback.run();
}

直接调用msg的callback的run函数,因为callback是一个Runnable对象

这种消息是通过post发送的,post首先会用getPostMessage把传进来的Runnable封装成Message对象,然后正常的调用sendMessage发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
//调用getPostMessage将Runnable对象封装成Message对象
public final boolean post(Runnable r)
{
return sendMessageDelayed(getPostMessage(r), 0);
}


private static Message getPostMessage(Runnable r, Object token) {
Message m = Message.obtain();
m.obj = token;
m.callback = r;
return m;
}

mCallback.handleMessage

mCallback是实现了Callback接口的对象

1
2
3
4
5
6
7
8
9
10
11
/**
* Callback interface you can use when instantiating a Handler to avoid
* having to implement your own subclass of Handler.
*/
public interface Callback {
/**
* @param msg A {@link android.os.Message Message} object
* @return True if no further handling is desired
*/
public boolean handleMessage(Message msg);
}

其就是一个接口,实现了handleMessage

调用mCallback.handleMessage实际就是派发给实现了callback接口的一个对象处理

handleMessage

1
2
3
//空实现,一般我们都是用Handler的一个子类来发送消息,然后使用其handleMessage来处理消息
public void handleMessage(Message msg) {
}

handlerMessage是空实现,一般情况下我们都不直接用Handler来发送消息,而是使用它的一个子类来发送消息,这个子类重写了Handler的成员函数handlerMessage



空闲消息处理器

前面分析消息循环时已经分析了空闲消息,这里来空闲消息的一些结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Callback interface for discovering when a thread is going to block
* waiting for more messages.
*/
public static interface IdleHandler {
/**
* Called when the message queue has run out of messages and will now
* wait for more. Return true to keep your idle handler active, false
* to have it removed. This may be called if there are still messages
* pending in the queue, but they are all scheduled to be dispatched
* after the current time.
*/
boolean queueIdle();
}

空闲消息处理器是一个接口,消息循环每次到空闲时间时,就会调用每个注册的空闲消息处理器的queueIdle,相当于派发了空闲消息

添加空闲消息处理器是MessageQueue的两个成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}

public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}

看完了Android的消息机制的基础设施之后,看一下Android是怎么在线程中使用的

应用程序主线程

ActivityThread启动之后会进入main函数,去掉其他的代码,就这两句就创建了主线程的Looper并在最后进入的消息循环

1
2
3
4
5
public static void main(String[] args) {
//创建一个消息循环,向AMS发送启动完成的通知之后,就会进入这个循环
Looper.prepareMainLooper();

Looper.loop();

纯任务线程

一般的子线程跟Java一样,使用Thread描述,不过我们一般是实现Thread的一个子类,然后用这个Thread子类来创建一个Android应用程序子线程,

1
2
3
4
5
6
7
8
9
10
11
12
public class SubThread extends Thread{
public SubThread(String name){
super(name);
}

public void run(){

}
}

SubThread subThread = new SubThread("Sub Thread");
subThread.start();

带消息循环的与界面无关的子线程

Android中提供的是HandlerThread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HandlerThread extends Thread {
//保存自己线程的looper
Looper mLooper;

@Override
public void run() {
mTid = Process.myTid();
Looper.prepare();
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll();
}
Process.setThreadPriority(mPriority);
onLooperPrepared();
Looper.loop();
mTid = -1;
}

HandlerThread是继承Thread的,当我们新建一个HandlerThread线程并start时,run函数就会被调用,这里可以看到HandlerThread新建了一个looper,然后在最后进入了消息循环

我们可以定义一个实现了Runnable接口的类,例如ThreadTask类,用来描述想要子线程执行的任务

1
2
3
4
5
6
7
8
9
public class ThreadTask implements Runnable{
public void run(){
//code
}
}

ThreadTask threadTask = new ThreadTask();
Handler handler = new Handler(handlerThread.getLooper());
handler.post(threadTask);

还记得前面说过post会把callback封装到msg里面

带消息循环的与界面相关的子线程

Android提供了一个异步任务类AsyncTask,来将一个涉及界面操作的任务放在子线程中执行

以一个CounterService来作为案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CounterService extends Service implements ICounterService {
private ICounterCallback counterCallback = null;

//initVal是计数器的初始值,callback是一个计数器的回调接口
public void startCounter(int initVal, ICounterCallback callback) {
counterCallback = callback;
//创建一个AsyncTask异步任务
AsyncTask<Integer, Integer, Integer> task = new AsyncTask<Integer, Integer, Integer>() {
@Override
//每隔1s就把计数器加1
protected Integer doInBackground(Integer... vals) {
Integer initCounter = vals[0];

stop = false;
while(!stop) {
publishProgress(initCounter);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

initCounter++;
}

return initCounter;
}

@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);

int val = values[0];
//更新到界面上
counterCallback.count(val);
}

@Override
protected void onPostExecute(Integer val) {
//更新到界面上
counterCallback.count(val);
}

};
//启动异步任务
task.execute(initVal);
}

在AsyncTask中doInBackground是真正干的任务,当异步任务执行完成之后,会将返回值分发给成员函数onPostExecute来处理,

接下来详细看一下AsynTask,AsyncTask有三个类型的参数Params、Progress和Result

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AsyncTask<Params, Progress, Result> {
//类型为ThreadFactory的工厂,创建出来的线程是用来执行sWorkQueue中的工作任务
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);

public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//工作任务队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);

sPoolworkQueue,里面存的都是实现了Runnable接口的对象,就知道其是公国队列

只不过LinkedBlockingQueue有一些特性,一个线程试图从空的LinkedBlockingQueue取出工作任务就会被阻塞,一个线程如果试图往一个满的LinkedBlockingQueue中写入工作任务也会被阻塞

sThreadFactory是用来创建执行工作任务的线程的

1
2
3
4
5
6
7
8
9
10
11
public static final Executor THREAD_POOL_EXECUTOR;

static {
//保存在这个线程池中的线程就是sThreadFactory创建的
//sPoolWorkQueue会被传进去作为工作队列
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

threadPoolExecutor是一个线程池,其创建的时候传入到最后两个参数sPoolWorkQUeue和sTHreadFactory就是给线程池指定了工作队列和线程工厂,即用sThreadFactory创建工作线程,执行sPoolWorkQueue中的任务

CORE_POOL_SIZE和MAXIMUM_POOL_SIZE指定的是核心线程数量和线程池中最大的线程数量

由于上面关键的成员变量都是静态成员变量,所以其实在同一个应用程序进程中,所以异步任务类AsyncTask都是在一个线程池中执行的,这有助于减少创建线程池时的消耗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//指向的一个Handler对象是在应用程序的主线程中创建的
private static InternalHandler sHandler;


//继承Handler类
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}

@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
//异步任务执行完成时,通知主线程报告执行情况
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
//异步任务在执行过程中会不断的向应用程序主线程发送,来报告执行情况
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}

这里sHandler是static成员,所以会在AsyncTask第一次被应用程序使用时被创建,如果应用程序第一次使用AsyncTask是运行在主线程,那么这里创建的Handler就可以被用来向主线程的消息队列发送消息

这里虽然InternalHandler没有默认构造函数,但是其继承的Handler,Handelr是有默认构造函数的

1
2
3
public Handler() {
this(null, false);
}

handleMessage可以处理两类消息MESSAGE_POST_PROGRESS和MESSAGE_POST_RESULT

这里msg.obj指向的是一个AsyncTaskResult对象,其是在AsyncTask的运行过程中传递数据的

1
2
3
4
5
6
7
8
9
10
11
12
//mTask描述一个宿主异步任务
//mData描述宿主异步任务在执行过程中产生的中间数据
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;

AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}

了解了AsyncTask的基本组件之后,看一下AsyncTask的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  //创建一个异步任务
//mHandler就指向MainHandler
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
//WorkRunnable继承自Callback类,即将要执行的工作任务
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
//这里来到调用doInBackGround
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
//封装mWorker
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}

这里也就得知AsyncTask创建完成后,会在内部获得一个WorkerRunnable和FutureTask对象

前面知道Service里面startCounter使用execute来启动异步任务的

image-20220918180503702

异步任务的执行,传入一个默认的Executor和一个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();


@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}

mStatus = Status.RUNNING;

onPreExecute();

mWorker.mParams = params;
exec.execute(mFuture);

return this;
}

最后是调用的executor的execute,这里默认的executor是一个SerialExecutor,主要用来线性处理工作任务

image-20220918181144773

image-20220918181207308

image-20220918181211902

经过SerialExector中转一下还是用的Thread_Poll_Execute去执行任务

mActive是当前活动的任务,poll是取出任务,然后 还是调用Thread_Pool_Execute去执行execute

image-20220918181450508

这个线程池就是前面介绍的,线程池的execute

image-20220918181506197

image-20220918181512415

如果当前线程数量小于核心线程数量,就新建一个线程,并把任务添加到新线程里面

即便成功插入到了工作队列,也得检查一下是否需要新建线程

如果我们不能插入task,应该尝试添加一个新线程

image-20220918181642726

PS:这里我分析的不是很细,只是想粗略的先找到调用路径,这里start应该就是线程开始执行任务

1
2
3
4
5
6
7
8
private boolean addWorker(Runnable firstTask, boolean core) {
w = new Worker(firstTask);
final Thread t = w.thread;

if (workerAdded) {
t.start();
workerStarted = true;
}

结合前面,调用的就是传入的Runnable的run函数

image-20220918182148169

而传入的Runnable是mFuture,mFuture在构造的时候传入的mWorker,所以这里就调用到mWorker的call函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
133      public FutureTask(Callable<V> callable) {
134 if (callable == null)
135 throw new NullPointerException();
136 this.callable = callable;
137 this.state = NEW; // ensure visibility of callable
138 }


//调用callable的call函数
256 public void run() {
257 if (state != NEW ||
258 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
259 return;
260 try {
261 Callable<V> c = callable;
262 if (c != null && state == NEW) {
263 V result;
264 boolean ran;
265 try {
266 result = c.call();
267 ran = true;
268 } catch (Throwable ex) {
269 result = null;
270 ran = false;
271 setException(ex);
272 }
273 if (ran)
274 set(result);
275 }
276 } finally {
277 // runner must be non-null until state is settled to
278 // prevent concurrent calls to run()
279 runner = null;
280 // state must be re-read after nulling runner to prevent
281 // leaked interrupts
282 int s = state;
283 if (s >= INTERRUPTING)
284 handlePossibleCancellationInterrupt(s);
285 }
286 }
287

再回头来看mworker,这里就找到了调用doInBackground的路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//WorkRunnable继承自Callback类,即将要执行的工作任务
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
//这里来到调用doInBackGround
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};

publishProgress

上面分析的AsyncTask的任务的执行流程,下面看一下其是怎么执行与界面相关的操作的

执行界面操作需要调用publishProgress函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
//先把values描述的异步任务执行过程数据封装成一个AsyncTaskResult<Progress>对象
//然后再把AsyncTaskResult封装成Message_POST_PROGRESS对象,
//最后再通过sHandler发送到主线程消息队列中
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}

private Handler getHandler() {
return mHandler;
}

//mHandler就指向MainHandler
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);

getHandler得到的是mHandler,而mHandler是构造的时候赋值的,指向MainLooper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Returns a new {@link android.os.Message Message} from the global message pool. More efficient than
* creating and allocating new instances. The retrieved message has its handler set to this instance (Message.target == this).
* If you don't want that facility, just call Message.obtain() instead.
*/
public final Message obtainMessage()
{
return Message.obtain(this);
}

/**
* Same as {@link #obtain()}, but sets the value for the <em>target</em> member on the Message returned.
* @param h Handler to assign to the returned Message object's <em>target</em> member.
* @return A Message object from the global pool.
*/
public static Message obtain(Handler h) {
Message m = obtain();
m.target = h;

return m;
}
/**
* Sends this Message to the Handler specified by {@link #getTarget}.
* Throws a null pointer exception if this field has not been set.
*/
public void sendToTarget() {
target.sendMessage(this);
}

obtain创建一个Message,并设置其target为传进来的Handler参数,然后sendToTarget其实还是调用Handler去发送的Message,这里封装的还是一个MESSAGE_POST_PROGRESS

而Handler是指向主线程的,就完成了向主线程发送消息,等到主线程派发消息时,就会来到Handler的handleMessage,就会分发给onProgressUpdata处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
//异步任务执行完成时,通知主线程报告执行情况
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
//异步任务在执行过程中会不断的向应用程序主线程发送,来报告执行情况
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
1
2
3
4
5
6
7
8
			
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);

int val = values[0];
counterCallback.count(val);
}

也就是说虽然都是AsyncTask中的代码,子线程调用publishProgress,onProgressUpdate是在主线程中做的处理,counterCallback.count的工作就是更新界面上的计数器