返回
EventBus源码解析(二)EventBus的post
Android
2023-11-14 17:49:24
EventBus的post()方法,为事件的发送,兼容了多线程多发送多事件(多事件后面介绍)的情况。它使用ThreadLocal来维护当前线程的PostingThread,通过ThreadLocal的特性,每个线程都有一个自己的独立的PostingThread。如果当前线程没有PostingThread,则创建PostingThread,并设置isPosting为true。
public void post(Object event) {
PostingThreadState postingState = getPostingThreadState();
boolean async = false;
if (postingState.isPosting) {
// Abort events posted on the posting thread to avoid re-entrancy.
// We don't want recursive event dispatching.
throw new EventBusException(
"Cannot post events during event dispatch.");
}
// 事件的订阅者队列,按优先级排序
Queue<Subscription> eventQueue = postingState.eventQueue;
eventQueue.offer(Subscription.create(event, eventQueue));
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException(
"Internal error. Abort state was not reset");
}
try {
while (eventQueue.peek() != null) {
Subscription next = eventQueue.poll();
// 如果当前线程为PostingThread,则直接invoke,否则新建一个线程
async = postingState.executor != null;
postSingleEvent(next.event, next.subscriptionQueue, async);
}
} finally {
postingState.isPosting = false;
postingState.canceled = false;
}
}
postSingleEvent()方法执行实际事件分发,它根据是否异步(async)来决定使用同步还是异步方式。
private void postSingleEvent(Object event, Subscription subscriptionQueue, boolean async) {
SubscriberExceptionEvent subscriberExceptionEvent;
try {
if (async) {
// AsyncPoster是真正的执行异步事件发送任务的类
postingState.executor.execute(postingState.asyncPoster, event, subscriptionQueue);
} else {
invokeSubscriber(subscriptionQueue, event);
}
} catch (Throwable th) {
// 如果事件分发失败,则会发出一个SubscriberExceptionEvent事件
logger.log(Level.SEVERE,
Thread.currentThread().getName() + " Subscriber error occurred!", th);
subscriberExceptionEvent =
SubscriberExceptionEvent.makeSubscriberExceptionEvent(th, event);
post(subscriberExceptionEvent);
}
}
invokeLater()方法将任务提交到消息队列,以便稍后执行。
final class PostingThreadState {
final AsyncPoster asyncPoster = new AsyncPoster(this);
final Executor executor;
Queue<Subscription> eventQueue;
boolean isPosting;
boolean canceled;
PostingThreadState(Executor executor) {
this.executor = executor;
}
void clear() {
eventQueue = null;
}
void executePostSingleEvent(Object event, Subscription subscriptionQueue) {
invokeSubscriber(subscriptionQueue, event);
}
}
invokeSubscriber()方法执行订阅者的回调函数。
private static void invokeSubscriber(Subscription subscriptionQueue, Object event) {
try {
subscriptionQueue.subscriberMethod.invoke(subscriptionQueue.subscriber, event);
} catch (InvocationTargetException e) {
throwRuntimeException(e);
} catch (IllegalAccessException e) {
throwRuntimeException(e);
} catch (Throwable th) {
logger.log(Level.SEVERE,
"Could not dispatch event: " + event + " to subscribing class " + subscriptionQueue.subscriber.getClass(), th);
throwRuntimeException(th);
}
}
EventBus支持在事件发布时添加一个tags。
/**
* Posts an event to all registered subscribers. This method will return successfully after the event has been posted to all subscribers. This means that an asynchronous
* post will be returned successfully although the subscribers might not have finished yet.
*
* @param event The event to post.
* @param tag Used to cancel the event delivery via {@link EventBus#cancelEventDelivery(Object)}.
* @throws DeadEventException if the event is a sticky event and the event has already been posted, or if it is a regular non-sticky event and there is no subscription for
* the event's type.
*/
public void post(Object event, Object tag) {
getPostingThreadState().eventQueue.offer(Subscription.create(event, getPostingThreadState().eventQueue, tag));
}
通过cancelEventDelivery()方法可以取消事件的传递。
/**
* Cancels the event delivery. Events posted with a tag using {@link #post(Object, Object)} can be canceled by using the same tag as a parameter to this method.
*
* @param eventTag the tag associated with the event
* @return true if the delivery was successfully canceled, false otherwise.
* @throws IllegalArgumentException if the tag was not associated with an event
* @since 2.2
*/
public boolean cancelEventDelivery(Object eventTag) {
PostingThreadState postingState = getPostingThreadState();
for (Subscription subscription : postingState.eventQueue) {
if (subscription.eventTag != null && subscription.eventTag.equals(eventTag)) {
subscription.cancelled = true;
postingState.canceled = true;
return true;
}
}
return false;
}
EventBus使用SubscriberExceptionEvent来处理事件分发过程中发生的异常。
/**
* Generates the Throwable error event and posts it to the event bus.
*
* @param th the Throwable error
* @param causingEvent the event that caused the error
*/
private static void post(Throwable th, Object causingEvent) {
// Since this method is public and can be called from outside the EventBus it had to
// be thread safe
logger.log(Level.SEVERE, "Could not dispatch event: " + causingEvent.getClass() + " to subscribing class "
+ causingEvent.getClass(), th);
final SubscriberExceptionEvent subscriberExceptionEvent = SubscriberExceptionEvent.makeSubscriberExceptionEvent(th, causingEvent);
EventBus eventBus = EventBus.getDefault();
if (eventBus == null || eventBus == DEAD_EVENT_BUS) {
throw new EventBusException(
"Could not create an EventBus to handle an error. Either call EventBus.init() or EventBus.getDefault() before," +
" or register a default instance with @Subscribe(priority = Integer.MAX_VALUE) or register a custom default instance with EventBus.setEventBus()");
} else {
eventBus.post(subscriberExceptionEvent);
}
}
通过将事件分发委托给一个后台线程(PostingThread),EventBus可以确保事件分发过程不会阻塞调用线程。此外,通过使用ThreadLocal来维护PostingThread,EventBus可以支持多线程环境中的并发事件分发。