前期因爲工做中須要將EventBus
替換成RxBus
,因此按照EventBus
的用法封裝了一套本身的RxBus
,基本知足了使用,項目發出後有很多兄弟告訴我沒有EventBus
的Sticky
功能,因此得閒完善這個功能,一樣是按照EventBus3.0
註解的方式去實現和調用java
在Android開發中,Sticky事件只指事件消費者在事件發佈以後才註冊的也能接收到該事件的特殊類型。Android中就有這樣的實例,也就是Sticky Broadcast,即粘性廣播。正常狀況下若是發送者發送了某個廣播,而接收者在這個廣播發送後才註冊本身的Receiver,這時接收者便沒法接收到剛纔的廣播,爲此Android引入了StickyBroadcast,在廣播發送結束後會保存剛剛發送的廣播(Intent),這樣當接收者註冊完Receiver後就能夠接收到剛纔已經發布的廣播。這就使得咱們能夠預先處理一些事件,讓有消費者時再把這些事件投遞給消費者。github
徹底按照EventBus3.0版本的註解的方式去使用安全
RxBus.getDefault().post(new EventStickText("我是sticky消息"));複製代碼
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void event(EventStickText eventStickText) {
Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> {
textView.setText(eventStickText.getMsg());
});
}
@Override
protected void onStart() {
super.onStart();
RxBus.getDefault().register(this);
}
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getDefault().unRegister(this);
}複製代碼
本篇的實現原理是基於以前的RxBus
封裝的基礎上的完善,因此須要大體瞭解RxBus
以前基本功能的封裝原理方能更加全面的理解一下的內容ide
sticky
註解不懂註解的同窗能夠先看下以前我寫的兩瓶關於註解的博客this
Java-註解詳解spa
Android-註解詳解.net
這裏添加boolean sticky()
的方法,而且默認指定參數false
線程
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
int code() default -1;
ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
boolean sticky() default false;
}複製代碼
sticky
加入數據封裝類中將數據新加入的註解sticky
加入到數據封裝類中,在最後分發事件時用於區分sticky
事件
public class SubscriberMethod {
public Method method;
public ThreadMode threadMode;
public Class<?> eventType;
public Object subscriber;
public int code;
public boolean sticky;
public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode,boolean sticky ) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.subscriber = subscriber;
this.code = code;
this.sticky=sticky;
}
******
}複製代碼
post
事件分析由於sticky
的特殊性,而自帶RxJava
提供給咱們四種方式處理Subject分發
ReplaySubject
在訂閱者訂閱時,會發送全部的數據給訂閱者,不管它們是什麼時候訂閱的。
PublishSubject
只會給在訂閱者訂閱的時間點以後的數據發送給觀察者。
AsyncSubject
只在原Observable
事件序列完成後,發送最後一個數據,後續若是還有訂閱者繼續訂閱該Subject
, 則能夠直接接收到最後一個值。
BehaviorSubject
在訂閱者訂閱時,會發送其最近發送的數據(若是此時尚未收到任何數據,它會發送一個默認值)。
因此只有ReplaySubject
和BehaviorSubject
具有Sticky
的特性。
可是:這兩種方式都不適合
BehaviorSubject
由於只是保留最近一次的事件,這樣會致使事件的覆蓋問題
ReplaySubject能解決BehaviorSubject的事件丟失的問題,能保存全部的事件,可是分發起來確實一個難點,暫時沒有找到合適的處理方法
還有咱們以前的封裝採用的PublishSubject
的實現方式去分發RxBus
的事件,若是換成任何其餘的分發機制都會致使sticky事件和正常事件數據須要獨立來作,成本過高
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}複製代碼
解決辦法:經過Map<事件類型,事件>手動記錄消息事件,和PublishSubject
數據統一塊兒來處理,避免速度的獨立,這裏選擇線程安全的ConcurrentHashMap
ConcurrentHashMap
記錄事件初始化
/*stick數據*/
private final Map<Class<?>, Object> stickyEvent =new ConcurrentHashMap<>();複製代碼
在post
方法中添加事件
/** * 提供了一個新的事件,單一類型 * * @param o 事件數據 */
public void post(Object o) {
synchronized (stickyEvent) {
stickyEvent.put(o.getClass(), o);
}
bus.onNext(o);
}複製代碼
sticky
註解獲得Observable
對象在register(Object subscriber)
方法中經過反射獲得自定義註解的數據,而後放入到自定義的數據類型SubscriberMethod
中
/** * 註冊 * * @param subscriber 訂閱者 */
public void register(Object subscriber) {
/*避免重複建立*/
if(eventTypesBySubscriber.containsKey(subscriber)){
return;
}
Class<?> subClass = subscriber.getClass();
Method[] methods = subClass.getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(Subscribe.class)) {
//得到參數類型
Class[] parameterType = method.getParameterTypes();
//參數不爲空 且參數個數爲1
if (parameterType != null && parameterType.length == 1) {
Class eventType = parameterType[0];
addEventTypeToMap(subscriber, eventType);
Subscribe sub = method.getAnnotation(Subscribe.class);
int code = sub.code();
ThreadMode threadMode = sub.threadMode();
boolean sticky = sub.sticky();
SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode,
sticky);
addSubscriberToMap(eventType, subscriberMethod);
addSubscriber(subscriberMethod);
}
}
}
}複製代碼
當事件觸發之後,經過SubscriberMethod
記錄的數據生成不一樣的Observable對象,如今對sticky消息增長了響應的對象處理
/** * 根據傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (stickyEvent) {
Observable<T> observable = bus.ofType(eventType);
final Object event = stickyEvent.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}複製代碼
這裏使用merge
操做符:能夠將多個Observables
合併生成一個Observable。
Observable
對象分發事件獲得sticky的壓縮Observable
對象後,還須要按照註解中被指定的線程去觸發事件任務
/** * 用於處理訂閱事件在那個線程中執行 * * @param observable * @param subscriberMethod * @return */
private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {
switch (subscriberMethod.threadMode) {
case MAIN:
observable.observeOn(AndroidSchedulers.mainThread());
break;
case NEW_THREAD:
observable.observeOn(Schedulers.newThread());
break;
case CURRENT_THREAD:
observable.observeOn(Schedulers.immediate());
break;
case IO:
observable.observeOn(Schedulers.io());
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable;
}複製代碼
這裏很簡單,直接運用RxJava和RxAndroid的線程去管理便可
由於這裏的sticky消息採用的是map隊列記錄的方式去實現,因此當sticky消息不在被需求記錄的時候,或者程序退出,須要手動清空map隊列的數據,避免內存溢出和浪費
/** * 移除指定eventType的Sticky事件 */
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (stickyEvent) {
return eventType.cast(stickyEvent.remove(eventType));
}
}
/** * 移除全部的Sticky事件 */
public void removeAllStickyEvents() {
synchronized (stickyEvent) {
stickyEvent.clear();
}
}複製代碼
經過sticky消息的完善,RxBus已經徹底實現了EventBus3.0的所有功能,而且所有安裝EventBus3.0的使用方法來封裝,方便項目的遷移和使用。
註解方式定義
post方式分發事件
sticky消息功能
註冊銷燬簡單化