RxBus-實現EventBus之Sticky

背景

前期因爲工做中須要將EventBus替換成RxBus,因此按照EventBus的用法封裝了一套本身的RxBus,基本知足了使用,項目發出後有很多兄弟告訴我沒有EventBusSticky功能,因此得閒完善這個功能,一樣是按照EventBus3.0註解的方式去實現和調用java

RxBus-實現EventBus之postgit


效果

這裏寫圖片描述

sticky是什麼

在Android開發中,Sticky事件只指事件消費者在事件發佈以後才註冊的也能接收到該事件的特殊類型。Android中就有這樣的實例,也就是Sticky Broadcast,即粘性廣播。正常狀況下若是發送者發送了某個廣播,而接收者在這個廣播發送後才註冊本身的Receiver,這時接收者便沒法接收到剛纔的廣播,爲此Android引入了StickyBroadcast,在廣播發送結束後會保存剛剛發送的廣播(Intent),這樣當接收者註冊完Receiver後就能夠接收到剛纔已經發布的廣播。這就使得咱們能夠預先處理一些事件,讓有消費者時再把這些事件投遞給消費者。github

使用

徹底按照EventBus3.0版本的註解的方式去使用安全

  • 發送消息
    RxBus.getDefault().post(new EventStickText("我是sticky消息"));複製代碼
  • 接收消息
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void event(EventStickText eventStickText) {
        Observable.timer(1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> {
            textView.setText(eventStickText.getMsg());
        });
    }


    @Override
    protected void onStart() {
        super.onStart();
        RxBus.getDefault().register(this);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        RxBus.getDefault().unRegister(this);
    }複製代碼

實現

本篇的實現原理是基於以前的RxBus封裝的基礎上的完善,因此須要大體瞭解RxBus以前基本功能的封裝原理方能更加全面的理解一下的內容ide

原RxBus基本功能實現原理:EventBus徹底同樣的RxBuspost

1.添加sticky註解

不懂註解的同窗能夠先看下以前我寫的兩瓶關於註解的博客this

Java-註解詳解spa

Android-註解詳解.net

這裏添加boolean sticky()的方法,而且默認指定參數false線程

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
    int code() default -1;
    ThreadMode threadMode() default ThreadMode.CURRENT_THREAD;
    boolean sticky() default false;
}複製代碼

2.將sticky加入數據封裝類中

將數據新加入的註解sticky加入到數據封裝類中,在最後分發事件時用於區分sticky事件

public class SubscriberMethod {
    public Method method;
    public ThreadMode threadMode;
    public Class<?> eventType;
    public Object subscriber;
    public int code;
    public boolean sticky;

    public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode,boolean sticky ) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.subscriber = subscriber;
        this.code = code;
        this.sticky=sticky;
    }
 ******
 }複製代碼

3.記錄post事件分析

由於sticky的特殊性,而自帶RxJava提供給咱們四種方式處理Subject分發

  • ReplaySubject在訂閱者訂閱時,會發送全部的數據給訂閱者,不管它們是什麼時候訂閱的。

  • PublishSubject只會給在訂閱者訂閱的時間點以後的數據發送給觀察者。

  • AsyncSubject只在原Observable事件序列完成後,發送最後一個數據,後續若是還有訂閱者繼續訂閱該Subject, 則能夠直接接收到最後一個值。

  • BehaviorSubject在訂閱者訂閱時,會發送其最近發送的數據(若是此時尚未收到任何數據,它會發送一個默認值)。

因此只有ReplaySubjectBehaviorSubject具有Sticky的特性。

可是:這兩種方式都不適合

  • BehaviorSubject由於只是保留最近一次的事件,這樣會致使事件的覆蓋問題

  • ReplaySubject能解決BehaviorSubject的事件丟失的問題,能保存全部的事件,可是分發起來確實一個難點,暫時沒有找到合適的處理方法

  • 還有咱們以前的封裝採用的PublishSubject的實現方式去分發RxBus的事件,若是換成任何其餘的分發機制都會致使sticky事件和正常事件數據須要獨立來作,成本過高

public RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }複製代碼

解決辦法:經過Map<事件類型,事件>手動記錄消息事件,和PublishSubject數據統一塊兒來處理,避免速度的獨立,這裏選擇線程安全的ConcurrentHashMap

4.ConcurrentHashMap記錄事件

初始化

/*stick數據*/
    private final Map<Class<?>, Object> stickyEvent =new ConcurrentHashMap<>();複製代碼

post方法中添加事件

/** * 提供了一個新的事件,單一類型 * * @param o 事件數據 */
    public void post(Object o) {
        synchronized (stickyEvent) {
            stickyEvent.put(o.getClass(), o);
        }
        bus.onNext(o);
    }複製代碼

5.經過sticky註解獲得Observable對象

register(Object subscriber)方法中經過反射獲得自定義註解的數據,而後放入到自定義的數據類型SubscriberMethod

/** * 註冊 * * @param subscriber 訂閱者 */
    public void register(Object subscriber) {
          /*避免重複建立*/
        if(eventTypesBySubscriber.containsKey(subscriber)){
            return;
        }
        Class<?> subClass = subscriber.getClass();
        Method[] methods = subClass.getDeclaredMethods();
        for (Method method : methods) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                //得到參數類型
                Class[] parameterType = method.getParameterTypes();
                //參數不爲空 且參數個數爲1
                if (parameterType != null && parameterType.length == 1) {

                    Class eventType = parameterType[0];

                    addEventTypeToMap(subscriber, eventType);
                    Subscribe sub = method.getAnnotation(Subscribe.class);
                    int code = sub.code();
                    ThreadMode threadMode = sub.threadMode();
                    boolean sticky = sub.sticky();

                    SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber, method, eventType, code, threadMode,
                            sticky);
                    addSubscriberToMap(eventType, subscriberMethod);

                    addSubscriber(subscriberMethod);
                }
            }
        }
    }複製代碼

當事件觸發之後,經過SubscriberMethod記錄的數據生成不一樣的Observable對象,如今對sticky消息增長了響應的對象處理

/** * 根據傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者 */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (stickyEvent) {
            Observable<T> observable = bus.ofType(eventType);
            final Object event = stickyEvent.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        subscriber.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }複製代碼

這裏使用merge操做符:能夠將多個Observables合併生成一個Observable。

6.Observable對象分發事件

獲得sticky的壓縮Observable對象後,還須要按照註解中被指定的線程去觸發事件任務

/** * 用於處理訂閱事件在那個線程中執行 * * @param observable * @param subscriberMethod * @return */
    private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {

        switch (subscriberMethod.threadMode) {
            case MAIN:
                observable.observeOn(AndroidSchedulers.mainThread());
                break;
            case NEW_THREAD:
                observable.observeOn(Schedulers.newThread());
                break;
            case CURRENT_THREAD:
                observable.observeOn(Schedulers.immediate());
                break;
            case IO:
                observable.observeOn(Schedulers.io());
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
        }
        return observable;
    }複製代碼

這裏很簡單,直接運用RxJava和RxAndroid的線程去管理便可

7.sticky消息的銷燬

由於這裏的sticky消息採用的是map隊列記錄的方式去實現,因此當sticky消息不在被需求記錄的時候,或者程序退出,須要手動清空map隊列的數據,避免內存溢出和浪費

/** * 移除指定eventType的Sticky事件 */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (stickyEvent) {
            return eventType.cast(stickyEvent.remove(eventType));
        }
    }

    /** * 移除全部的Sticky事件 */
    public void removeAllStickyEvents() {
        synchronized (stickyEvent) {
            stickyEvent.clear();
        }
    }複製代碼

結果

經過sticky消息的完善,RxBus已經徹底實現了EventBus3.0的所有功能,而且所有安裝EventBus3.0的使用方法來封裝,方便項目的遷移和使用。

  • 註解方式定義

  • post方式分發事件

  • sticky消息功能

  • 註冊銷燬簡單化


源碼

傳送門-GitHub項目地址-戳我


建議

若是你對這套封裝有任何的問題和建議歡迎加入QQ羣告訴我!

相關文章
相關標籤/搜索