返回

EventBus源码解析(二)EventBus的post

Android

EventBus的post()方法,为事件的发送,兼容了多线程多发送多事件(多事件后面介绍)的情况。它使用ThreadLocal来维护当前线程的PostingThread,通过ThreadLocal的特性,每个线程都有一个自己的独立的PostingThread。如果当前线程没有PostingThread,则创建PostingThread,并设置isPosting为true。

    public void post(Object event) {
        PostingThreadState postingState = getPostingThreadState();
        boolean async = false;
        if (postingState.isPosting) {
            // Abort events posted on the posting thread to avoid re-entrancy.
            // We don't want recursive event dispatching.
            throw new EventBusException(
                    "Cannot post events during event dispatch.");
        }
        // 事件的订阅者队列,按优先级排序
        Queue<Subscription> eventQueue = postingState.eventQueue;
        eventQueue.offer(Subscription.create(event, eventQueue));
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException(
                    "Internal error. Abort state was not reset");
        }
        try {
            while (eventQueue.peek() != null) {
                Subscription next = eventQueue.poll();
                // 如果当前线程为PostingThread,则直接invoke,否则新建一个线程
                async = postingState.executor != null;
                postSingleEvent(next.event, next.subscriptionQueue, async);
            }
        } finally {
            postingState.isPosting = false;
            postingState.canceled = false;
        }
    }

postSingleEvent()方法执行实际事件分发,它根据是否异步(async)来决定使用同步还是异步方式。

    private void postSingleEvent(Object event, Subscription subscriptionQueue, boolean async) {
        SubscriberExceptionEvent subscriberExceptionEvent;
        try {
            if (async) {
                // AsyncPoster是真正的执行异步事件发送任务的类
                postingState.executor.execute(postingState.asyncPoster, event, subscriptionQueue);
            } else {
                invokeSubscriber(subscriptionQueue, event);
            }
        } catch (Throwable th) {
            // 如果事件分发失败,则会发出一个SubscriberExceptionEvent事件
            logger.log(Level.SEVERE,
                    Thread.currentThread().getName() + " Subscriber error occurred!", th);
            subscriberExceptionEvent =
                    SubscriberExceptionEvent.makeSubscriberExceptionEvent(th, event);
            post(subscriberExceptionEvent);
        }
    }

invokeLater()方法将任务提交到消息队列,以便稍后执行。

    final class PostingThreadState {
        final AsyncPoster asyncPoster = new AsyncPoster(this);
        final Executor executor;
        Queue<Subscription> eventQueue;
        boolean isPosting;
        boolean canceled;

        PostingThreadState(Executor executor) {
            this.executor = executor;
        }

        void clear() {
            eventQueue = null;
        }

        void executePostSingleEvent(Object event, Subscription subscriptionQueue) {
            invokeSubscriber(subscriptionQueue, event);
        }
    }

invokeSubscriber()方法执行订阅者的回调函数。

    private static void invokeSubscriber(Subscription subscriptionQueue, Object event) {
        try {
            subscriptionQueue.subscriberMethod.invoke(subscriptionQueue.subscriber, event);
        } catch (InvocationTargetException e) {
            throwRuntimeException(e);
        } catch (IllegalAccessException e) {
            throwRuntimeException(e);
        } catch (Throwable th) {
            logger.log(Level.SEVERE,
                    "Could not dispatch event: " + event + " to subscribing class " + subscriptionQueue.subscriber.getClass(), th);
            throwRuntimeException(th);
        }
    }

EventBus支持在事件发布时添加一个tags。

    /**
     * Posts an event to all registered subscribers. This method will return successfully after the event has been posted to all subscribers. This means that an asynchronous
     * post will be returned successfully although the subscribers might not have finished yet.
     *
     * @param event The event to post.
     * @param tag Used to cancel the event delivery via {@link EventBus#cancelEventDelivery(Object)}.
     * @throws DeadEventException if the event is a sticky event and the event has already been posted, or if it is a regular non-sticky event and there is no subscription for
     *                            the event's type.
     */
    public void post(Object event, Object tag) {
        getPostingThreadState().eventQueue.offer(Subscription.create(event, getPostingThreadState().eventQueue, tag));
    }

通过cancelEventDelivery()方法可以取消事件的传递。

    /**
     * Cancels the event delivery. Events posted with a tag using {@link #post(Object, Object)} can be canceled by using the same tag as a parameter to this method.
     *
     * @param eventTag the tag associated with the event
     * @return true if the delivery was successfully canceled, false otherwise.
     * @throws IllegalArgumentException if the tag was not associated with an event
     * @since 2.2
     */
    public boolean cancelEventDelivery(Object eventTag) {
        PostingThreadState postingState = getPostingThreadState();
        for (Subscription subscription : postingState.eventQueue) {
            if (subscription.eventTag != null && subscription.eventTag.equals(eventTag)) {
                subscription.cancelled = true;
                postingState.canceled = true;
                return true;
            }
        }
        return false;
    }

EventBus使用SubscriberExceptionEvent来处理事件分发过程中发生的异常。

    /**
     * Generates the Throwable error event and posts it to the event bus.
     *
     * @param th the Throwable error
     * @param causingEvent the event that caused the error
     */
    private static void post(Throwable th, Object causingEvent) {
        // Since this method is public and can be called from outside the EventBus it had to
        // be thread safe
        logger.log(Level.SEVERE, "Could not dispatch event: " + causingEvent.getClass() + " to subscribing class "
                + causingEvent.getClass(), th);
        final SubscriberExceptionEvent subscriberExceptionEvent = SubscriberExceptionEvent.makeSubscriberExceptionEvent(th, causingEvent);
        EventBus eventBus = EventBus.getDefault();
        if (eventBus == null || eventBus == DEAD_EVENT_BUS) {
            throw new EventBusException(
                    "Could not create an EventBus to handle an error. Either call EventBus.init() or EventBus.getDefault() before," +
                            " or register a default instance with @Subscribe(priority = Integer.MAX_VALUE) or register a custom default instance with EventBus.setEventBus()");
        } else {
            eventBus.post(subscriberExceptionEvent);
        }
    }

通过将事件分发委托给一个后台线程(PostingThread),EventBus可以确保事件分发过程不会阻塞调用线程。此外,通过使用ThreadLocal来维护PostingThread,EventBus可以支持多线程环境中的并发事件分发。