Android 消息机制源码笔记

Table of Contents

Update: 2017.02.24, reread the source code of android-24

本笔记是Android中对消息系统代码阅读笔记, 包括三个类: Message, MessageQueue, Messenger

Message

该类是描述了用于作为信息载体的"消息"的实现, 几个重要的成员变量. Message通常都是与Handler结合在一起使用.

成员变量

public final class Message implements Parcelable {
    public int what;
    public int arg1; 
    public int arg2;
    public Object obj;
    public Messenger replyTo;
    public int sendingUid = -1;
    /*package*/ static final int FLAG_IN_USE = 1 << 0;
    /*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;
    /*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
    /*package*/ int flags;
    /*package*/ long when;
    /*package*/ Bundle data;
    /*package*/ Handler target;
    /*package*/ Runnable callback;
    /*package*/ Message next;
    private static final Object sPoolSync = new Object();
    private static Message sPool;
    private static int sPoolSize = 0;
    private static final int MAX_POOL_SIZE = 50;
    private static boolean gCheckRecycle = true;
    ...
}
  1. what. 该变量是消息的标志符,用来标识一条消息. 对于同一个Handler来说, 该条消息的唯一的. 不同Handler来说, what相同不会互相影响, 因为每个handler都有自己的命名空间, 所以发给不同handler的message, 即使what相同, 也不会冲突.
  2. arg1/arg2. Message支持多重方式存储数据. 如果传送的数据很简单,可以用int来表示的话, 可以使用这两个字段. |
  3. obj. 传送的消息是一个类的实例, 可以用这个表示. 需要说明的是, 如果该 Message需要跨进程传输的话(通过Binder), 该类必须支持Parcelable.
  4. replyTo. 如果需要"回复"这条消息, 可以使用该变量. Message一般和Handler配合使用.
  5. sendingUid. 发送这条消息的uid. 跨进程使用消息一般会用到.
  6. flags. 目前有三个标记: FLAG_IN_USE, FLAG_ASYNCHRONOUS, FLAGS_TO_CLEAR_ON_COPY_FROM.
  7. when. 消息的激活时间 |
  8. data. Bundle类型, 发送的数据, 通过setData()设置. 如果arg1/arg2/obj都不能满足传输数据的需求,可以使用该字段.
  9. target. 目标Handler |
  10. callback. 目前来说, 当该字段被设置时,标志这条消息是一个Runnable消息, 消息被处理时就会执行这个callback所代笔的Runnable.
  11. next. 可以使用该字段构造Message链表.
  12. sPool. 全局消息池. 用于存放/回收Message.
  13. sPoolSync. 消息池的锁.
  14. sPoolSize. 消息池大小.

构造消息

两种方法:

  1. 通过obtain(…)函数族用于获取消息, 这些obtain()函数通过不同的接受不同的 参数来初始化获取到的消息. 主要流程就是先通过obtain()获取一个消息, 然后在把参数赋值给消息字段.

    public static Message obtain(Message orig) 
    public static Message obtain(Handler h) 
    public static Message obtain(Handler h, Runnable callback) 
    public static Message obtain(Handler h, int what) 
    public static Message obtain(Handler h, int what, Object obj) 
    public static Message obtain(Handler h, int what, int arg1, int arg2) 
    public static Message obtain(Handler h, int what, int arg1, int arg2, Object obj) 
    public static Message obtain() {
        synchronized (sPoolSync) {
            if (sPool != null) {
                Message m = sPool;
                sPool = m.next;
                m.next = null;
                m.flags = 0; // clear in-use flag
                sPoolSize--;
                return m;
            }
        }
        return new Message();
    }
    
    public static Message obtain(Handler h) {
        Message m = obtain();
        m.target = h;
    
        return m;
    }
    
  2. 通过构造函数(不推荐).

发送消息

调用sendToTarget()函数调用Message的target变量(一个Handler)的sendMessage()函数. 如果没有设置target则会报错. sendMessage()函数的实现在Handler一节讲述.

public void sendToTarget() {
    target.sendMessage(this);
}

异步消息

如果flags的FLAG_ASYNCHRONOUS被设置, 表明该消息是异步消息. 异步消息的处理不会收到一些同步barrier的限制.参见Looper.

MessageQueue

该类基于前面的Message类实现了一个消息队列, 用于"顺序"处理 消息, 消息按照其产生时间先后排序. 一个MessageQueue通常与一个Looper一起使用 (一个线程只能有一个Looper).

插入消息

调用enqueueMessage(msg, when)函数, 该函数会锁住整个对象. 该函数的流程:

  1. 判断当前MessageQueue是否正在退出(mQuitting被设置). 如果是则报错, 回收消息并返回.
  2. 设置消息的FLAG_IN_USE标记位. 然后执行3或4.
  3. 如果当前队列为空, 或者消息的when为0, 或者消息的触发时间 比当前队列的第一个消息还早. 则将消息设置为消息头.
  4. 如果3不成立, 则将新消息与队列进行比较, 将其插入到合适位置. 这里有一个其他设置: 如果当前队列被阻塞, 且第一个元素是barrier (barrier是target为null的消息). 那么如果新插入的消息是第一个async消息, 就会唤醒阻塞队列(如果不是第一个,就不唤醒).
  5. 如果需要唤醒, 则唤醒. 下面是该函数代码:

    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
    
        synchronized (this) {
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
    
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
    
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
    

    函数的流程如下:

    1. 首先判断该msg是否有target或正在被使用.
    2. 获取消息队列的锁, 进入同步操作.
      1. 如果队列正在退出, 回收新消息, 并返回.
      2. 否则, 插入到相应位置
      3. 判断是否需要唤醒, 如果是则唤醒.

消息队列是否空闲

空闲条件: 队列为空或第一个消息的触发时间还没到. 需要获取对象锁.

从队列获取消息

通过函数next()从队列中获取一个消息. 下面是该函数流程,

  1. 判断该队列是否已经退出(mPtr=0), 是则直接返回Null;
  2. 进入一个无限for()循环获取消息.
    1. 调用JNI函数nativePollOnce(ptr, timeout), 该函数的第二个参数表示要 阻塞的时长, 如果为0则立即返回, 如果为-1则一直阻塞.
    2. 获取全局锁.
      1. 判断第一个消息是否为barrier. 如果是,获取第一个async消息.
      2. 如果有可用消息(第一个不为barrier或是async消息), 判断消息触发时间, 如果未到, 则设置睡眠时间为剩余时间.
      3. 否则, 说明消息可用, 将消息从队列移除, 设置FLAG_IN_USE, 返回消息.
      4. 如果2,3不成立, 没获取到任何消息, 则设置睡眠时间为-1, 那么在下一次循环中 会进入一直睡眠状态.
      5. MessageQueue还支持添加IdleHandler接口, 当当前线程没有消息 可处理即将进入等待之前, 可以执行这些handler的相关函数.
Message next() {
     // Return here if the message loop has already quit and been disposed.
     // This can happen if the application tries to restart a looper after quit
     // which is not supported.
     final long ptr = mPtr;
     if (ptr == 0) {
         return null;
     }

     int pendingIdleHandlerCount = -1; // -1 only during first iteration
     int nextPollTimeoutMillis = 0;
     for (;;) {
         if (nextPollTimeoutMillis != 0) {
             Binder.flushPendingCommands();
         }

         nativePollOnce(ptr, nextPollTimeoutMillis);

         synchronized (this) {
             // Try to retrieve the next message.  Return if found.
             final long now = SystemClock.uptimeMillis();
             Message prevMsg = null;
             Message msg = mMessages;
             if (msg != null && msg.target == null) {
                 // Stalled by a barrier.  Find the next asynchronous message in the queue.
                 do {
                     prevMsg = msg;
                     msg = msg.next;
                 } while (msg != null && !msg.isAsynchronous());
             }
             if (msg != null) {
                 if (now < msg.when) {
                     // Next message is not ready.  Set a timeout to wake up when it is ready.
                     nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                 } else {
                     // Got a message.
                     mBlocked = false;
                     if (prevMsg != null) {
                         prevMsg.next = msg.next;
                     } else {
                         mMessages = msg.next;
                     }
                     msg.next = null;
                     if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                     msg.markInUse();
                     return msg;
                 }
             } else {
                 // No more messages.
                 nextPollTimeoutMillis = -1;
             }

             // Process the quit message now that all pending messages have been handled.
             if (mQuitting) {
                 dispose();
                 return null;
             }

             ...
     }
 }

Looper

Looper类用于在线程中实现一个"消息循环"行为. Looper有一个MessageQueue类型的变量mQueue用于存储消息.

为线程初始化一个looper

Looper类有一个静态变量sThreadLocal, 该变量是一个ThreadLocal 类型的线程私有变量. 当调用prepare()函数进行初始化时, 会在函数内部生成一个looper实例并赋值给该变量. 调用 myLooper函数会返回这个变量.

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

PS: Looper还有一个静态变量sMainLooper, 这个变量是UI线程 的Looper引用, 在应用启动时被初始化.

loop()函数处理消息

函数的处理在loop()函数中, 该函数建立了一个"无限循环", 每次循环都从消息队列中获取一个消息, 这会调用MessageQueue的next() 函数,

  1. 如果没有可用消息, 该函数会阻塞.
  2. 如果MessageQueue正在退出, 该函数返回null.
public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;
    ...
    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }

        ...

        msg.target.dispatchMessage(msg);

        ...

        msg.recycleUnchecked();
    }
}

代码流程:

  1. 调用queue.next()函数从队列获取消息.
  2. 判断消息是否为空, 如果为空则退出循环(线程也可能退出). 因为next()函数可能会导致线程阻塞. 所以如果"被唤醒"还拿到空消息, 有可能是别的线程调用了quit()函数.
  3. 调用msg的target变量(即Handler)的dispatchMessage()函数.
  4. 调用Message的recycleUnchecked()函数回收消息.

Messenger

Messenger可以用来传递消息, 它里面封装了一个Handler, 其他的程序可以 通过这个Messenger给这个Handler发送消息. 因为它实现了Parcelable,所以 可以依附在Message上在进程间进行传递. 下面是Messenger的主要api.

  1. 构造函数.

    public Messenger(Handler target) {
        mTarget = target.getIMessenger();
    }
    
  2. 发送消息

    public void send(Message message) throws RemoteException {
        mTarget.send(message);
    }
    

Handler

在一般的APP开发中, 都是通过handler进行消息的发送或 处理. 这里是几个主要功能的代码笔记.

创建handler

Handler的构造函数有多个, 基本最后都调到下面两个函数之一:

  1. Handler(callback, async). 第一个参数callback的用于, 如果不想自己写一个Handler的子类 (Handler的通常用法), 可以传入一个callback参数用于处理消息. 第二个参数async标志消息是否要按时间排序. 该函数会去拿去当前线程的Looper, 如果没有则报错.

       public Handler(Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }
    
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }
    
  2. Handler(looper, callback, async). 第一个参数looper是显示的传入一个looper参数给handler的构造函数. 这样即使当前线程没有looper也可以.

获取一个消息

调用obtainMessage()可以获取一个消息, 函数内部通过 调用Message的obtain()函数实现.

发送消息

Handler的发送消息相关的函数也有多个, 基本都是先计算该message 的执行时间, 然后调用sendMessageAtTime()函数. 该函数内部调用了 enqueueMessage()函数, 最终调用到了MessageQueue的 enqueueMessage()函数.

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

发送runnable

post系列函数用于发送一个"Runnable"消息, 该runnable会被存入 消息的callback变量. 在Looper做消息分发时, 会回调到Handler的 dispatchMessage()函数来处理callback.代码如下:

public final boolean post(Runnable r)
{
   return  sendMessageDelayed(getPostMessage(r), 0);
}

public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

//called from looper
public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}
private static void handleCallback(Message message) {
    message.callback.run();
}

Created At <2016-06-11 Sat 23:25> by Luis Xu. Email: xuzhengchaojob@gmail.com