上周又手動撸了一遍EventBus實現,同時上傳EventBus的中文注釋源碼到Github上,歡迎大家fork&star.
EventBus是一個Android事件發布/訂閱框架,通過解耦發布者和訂閱者簡化Android事件傳遞.事件傳遞既可以用於Android四大組件間的通訊,也可以用於用戶異步線程和主線程間通訊等.
傳統的事件傳遞方法包括:Handler,BroadCastReceiver,interface回調,相比於EventBus,EventBus的代碼更加簡潔,代碼簡單,而且事件發布和訂閱充分解耦.
基本概念如下:
EventBus的默認構造方法如下:
EventBus.getDefault();
源碼如下:
/** 通過volatile保證每個線程獲取的都是最新的EventBus. */
static volatile EventBus defaultInstance;
/** 懶漢的單例模式構造EventBus. */
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
再去了解EventBus具體構造函數之前,需要先看一下EventBusBuilder的具體內容,中文注釋源碼如下:
/**
* 構建器模式
* Effective Java : 遇到多個構造器參數時要考慮用構造器.
*/
@SuppressWarnings("unused")
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
/** 是否監聽異常日志. */
boolean logSubscriberExceptions = true;
/** 如果沒有訂閱者,顯示log信息. */
boolean logNoSubscriberMessages = true;
/** 是否發送監聽到的異常. */
boolean sendSubscriberExceptionEvent = true;
/** 如果沒有訂閱者,就發布一條默認事件. */
boolean sendNoSubscriberEvent = true;
/** 如果失敗,則拋出異常. */
boolean throwSubscriberException;
/** 是否響應訂閱事件的父類事件. */
boolean eventInheritance = true;
boolean ignoreGeneratedIndex;
/** 是否為嚴格模式.值為true時,當Subscribe注解描述的響應函數不符合要求時,會拋出相應的異常. */
boolean strictMethodVerification;
/** 線程池. */
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
/** 從命名來看,含義是不遍歷的Method響應函數集合,但是沒啥軟用,EventBus3.0版本也沒有遍歷這個集合. */
List subscriberInfoIndexes;
EventBusBuilder() {
}
/** Default: true. */
public EventBusBuilder logSubscriberExceptions(boolean logSubscriberExceptions) {
this.logSubscriberExceptions = logSubscriberExceptions;
return this;
}
/** Default: true. */
public EventBusBuilder logNoSubscriberMessages(boolean logNoSubscriberMessages) {
this.logNoSubscriberMessages = logNoSubscriberMessages;
return this;
}
/** Default: true. */
public EventBusBuilder sendSubscriberExceptionEvent(boolean sendSubscriberExceptionEvent) {
this.sendSubscriberExceptionEvent = sendSubscriberExceptionEvent;
return this;
}
/** Default: true. */
public EventBusBuilder sendNoSubsciberEvent(boolean sendNoSubscriberEvent) {
this.sendNoSubscriberEvent = sendNoSubscriberEvent;
return this;
}
/**
* Fails if an subscriber throws an exception (default: false).
*/
public EventBusBuilder throwSubscriberException(boolean throwSubscriberException) {
this.throwSubscriberException = throwSubscriberException;
return this;
}
/**
* By default, EventBus considers the event class hierarchy
* (subscribers to super classes will be notified).
*/
public EventBusBuilder eventInheritance(boolean eventInheritance) {
this.eventInheritance = eventInheritance;
return this;
}
/**
* Provide a custom thread pool to EventBus used for async and background event delivery.
*/
public EventBusBuilder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
public EventBusBuilder skipMethodVerificationFor(Class clazz) {
if (skipMethodVerificationForClasses == null) {
skipMethodVerificationForClasses = new ArrayList<>();
}
skipMethodVerificationForClasses.add(clazz);
return this;
}
/**
* Forces the use of reflection even if there's a generated index.(default: false)
*/
public EventBusBuilder ignoreGeneratedIndex(boolean ignoreGeneratedIndex) {
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
return this;
}
/** Default: false. */
public EventBusBuilder strictMethodVerification(boolean strictMethodVerification) {
this.strictMethodVerification = strictMethodVerification;
return this;
}
/**
* Adds an index generated by EventBus' annotation preprocessor.
*/
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
if (subscriberInfoIndexes == null) {
subscriberInfoIndexes = new ArrayList<>();
}
subscriberInfoIndexes.add(index);
return this;
}
/**
* Installs the default EventBus returned by {@link EventBus#getDefault()}
* using this builder's values.
*/
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
"It may be only set once before it's used the first time to " +
"ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}
/**
* Builds an EventBus based on the current configuration.
*/
public EventBus build() {
return new EventBus(this);
}
}
了解了EventBusBuilder的構造器模式之後,我們就可以去看一下EventBus的默認構造函數了.
/** Map<訂閱事件, 訂閱該事件的訂閱者集合> */
private final Map> subscriptionsByEventType;
/** Map<訂閱者, 訂閱事件集合> */
private final Map
post()方法首先從currentPostingThreadState對象中獲取當前線程的PostingThreadState對象.為什麼說是當前線程的PostingThreadState對象呢,這就需要看一下currentPostingThreadState對象的構造函數了.
/** 存儲當前線程的PostingThreadState對象. */
private final ThreadLocal currentPostingThreadState =
new ThreadLocal() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
可以看到,currentPostingThreadState是通過ThreadLocal來實現對PostingThreadState對象的存儲.ThreadLocal是一個線程內部的數據存儲類,通過它可以在指定的線程中存儲數據,而這段數據是不會與其他線程共享的.
ThreadLocal的內部原理是:通過生成一個包裹的泛型對象的數組,在不同的線程會有不同的數組索引值.通過這樣就可以做到每個線程通過get()方法獲取的時候,取到的是自己線程對應的數據.
PostingThreadState類的定義如下:
/** 當前線程的事件分發類. */
final static class PostingThreadState {
/** 當前線程的發布事件隊列. */
final List
回到Post方法,Post方法取出當前線程的PostingThreadState對象之後,將需要入隊的Event事件入隊,然後調用了postSingleEvent方法.接下來,我們去看一下這個方法的具體實現.
postSingleEvent()方法
postSingleEvent的中文注釋源碼如下:
private void postSingleEvent(Object event, PostingThreadState postingState) {
Class eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List
從源碼中可以看出,postSingleEvent方法主要是調用了postSingleEventForEventType來對訂閱事件進行分發.區別是,當EventBus的eventInheritance成員屬性為true時,訂閱了當前事件父類事件或者實現接口的事件的訂閱函數也會響應這個訂閱事件.
postSingleEventForEventType()方法
postSingleEventForEventType()中文注釋源碼如下:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
Class eventClass) {
CopyOnWriteArrayList subscriptions;
synchronized (this) {
// 獲取訂閱事件類類型對應的訂閱者信息集合.(register函數時構造的集合)
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 發布訂閱事件給訂閱函數
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
從源碼中可以看出,postSingleEventForEventType的作用是:找出所有訂閱event事件的訂閱函數集合,然後調用postToSubscription方法進行事件分發.
postToSubscription()方法
postToSubscription()方法注釋源碼如下:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " +
subscription.subscriberMethod.threadMode);
}
}
從源碼中可以看出,postToSubscription方法主要是根據訂閱方法指定的ThreadMode進行相應的處理.
雖然ThreadMode的具體含義已經在上面的博文中介紹過了,但是這裡還是要結合代碼講一下實現原理.
POSTING
POSTING的含義是訂閱函數可以直接運行在發送當前Event事件的線程中.而post方法又是發布訂閱事件線程調用的,所以直接執行訂閱方法即可.EventBus中訂閱方法的執行是通過反射機制.
/** 通過反射來執行訂閱函數. */
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
MAIN
MAIN的含義表示訂閱函數需要運行在主線程中,例如一些UI的操作.
如何判斷當前發布訂閱事件的線程是否為UI線程,可以通過如下方法:
Looper.getMainLooper() == Looper.myLooper();
所以,如果當前發布訂閱事件的線程是UI線程,則直接反射調用訂閱函數即可.如果不是,則通過mainThreadPoster來執行.
public class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
/** 用於表示當前隊列中是否有正在發送的任務. */
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
/**
* 將訂閱者和訂閱者事件組成PendingPost並入隊列.
* @param subscription 訂閱者
* @param event 訂閱者事件
*/
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
// 如果現在隊列中沒有正在執行的消息,則發送一條空消息,讓當前handler開始輪詢執行消息.
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
// 如果在規定的時間內沒有發送完隊列中的所有請求,則先退出當前循環,讓出cpu,
// 同時發送消息再次調度handleMessage方法.
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
mainThreadPoster是HandlerPoster的實例,HandlerPoster關聯了主線程的Looper,因此通過handleMessage方法通過反射調用訂閱函數將訂閱函數在主線程中執行.
BACKGROUND
BACKGROUND的意思是訂閱函數必須運行在子線程中,而且是順序執行.這個實現很簡單,通過隊列機制+線程池就可以實現該功能.
/**
* 後台通過線程池去執行事件響應回調.
*/
final class BackgroundPoster implements Runnable{
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.e("EventBus", Thread.currentThread().getName() + " was interruppted", e);
}
}finally {
executorRunning = false;
}
}
}
ASYNC
ASYNC意思是訂閱函數運行在子線程中,而且可以並發執行.這個實現就更簡單了,直接線程池+Runnable即可.EventBus具體實現如下:
/**
* 將訂閱事件在後台響應執行,並且執行順序是並發執行.
*/
class AsyncPoster implements Runnable{
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
post流程圖
接下來,我們總結一下post的流程圖.
總結
經過上述EventBus的源碼分析,我們應該了解到EventBus通過反射機制實現了訂閱者和發布者的解耦和訂閱發布功能.與傳統的觀察者模型相比,不需要寫冗余的interface接口,而且支持自定義要執行的線程,感覺還是很不錯的.