Android 使用RxJava實現一個發佈/訂閱事件總線

1.簡單介紹

1.1.發佈/訂閱事件主要用於網絡請求的回調。html

  事件總線可使Android各組件之間的通訊變得簡單,並且能夠解耦。react

  其實RxJava實現事件總線和EventBus比較相似,他們都依據與觀察者模式。git

  我的比較習慣用RxJava來實現,由於很是簡單而清晰。github

  

 

1.2.固然EventBus實現總線的方式也有不少人用。web

  這裏給個傳送門==>EventBus的github地址:https://github.com/greenrobot/EventBus網絡

  而後Otto實現總線也不錯==>Otto的github地址:https://github.com/square/ottoapp

 

 

1.3.使用RxJava的好處以及注意點ide

  最明顯的好處就是:項目體積縮小了。函數

  注意:使用RxLifecycle來解決RxJava內存泄漏的問題。工具

  ==>參考個人另外一篇博客:RxLifecycle第三方庫的使用。

  

 

1.4.理解一下觀察者模式。

  這是一種行爲模式。

  當你的類或者主對象(稱爲被觀察者)的狀態發生改變就會通知全部對此感興趣的類或對象(稱爲觀察者)。

  詳情瞭解請參考這篇文章:觀察者模式--千軍萬馬穿雲箭。

 

 

1.5.理解一下發布/訂閱

  發佈/訂閱 模式的功能和觀察者模式是同樣的,都是完成特定事件發生後的消息通知。

  觀察者模式和發佈/訂閱模式之間仍是存在了一些差異,在發佈/訂閱模式中重點是發佈消息,而後由調度中心

  統一調度,不須要知道具體有哪些訂閱者。(這樣就能夠匿名)

爲何要匿名?
在計算機程序設計中有一個很是棒的思想叫「解耦」。你一般但願在你的設計中保持儘量低的耦合度。
一般狀況下,你但願消息發佈商可以直接瞭解全部須要接收消息的訂閱者,
這樣,一旦「事件」或消息準備好就能夠及時通知每個訂閱者。
可是使用事件總線,發佈者能夠免除這種職責並實現獨立性,
由於消息發佈者和消息訂閱者能夠相互不知道對方,只關心對應的消息,從而接觸二者之間的依賴關係
怎麼實現匿名?
提到匿名,天然而然你就會問:你是如何真正實現發佈者和訂閱者之間的匿名? 
很簡單,只要找到一箇中間人,讓這個中間人負責兩方的消息調度。事件總線就是一個這樣的中間人。 綜上所述,事件總線就是這麼簡單。

 

 

1.6.使用RxJava實現事件總線的簡單案例

  案例來源:用RxJava實現事件總線-RxBus。

  github參考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples

  以下面的例子:

  咱們從頂部片斷(綠色部分)發佈事件,並從底部片斷(藍色部分)監聽點擊事件(經過事件總線)。

  

  怎麼實現這個功能呢?

  第一步自定義一個事件總線 

public class RxBus {
 
  private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
 
  public void send(Object o) {
    _bus.onNext(o);
  }
 
  public Observable<Object> toObserverable() {
    return _bus;
  }
}

  第二步將事件發佈到總線中。

@OnClick(R.id.btn_demo_rxbus_tap)
public void onTapButtonClicked() {
 
    _rxBus.send(new TapEvent());
}

  第三步監聽來自其餘組件或服務事件

_rxBus.toObserverable()
    .subscribe(new Action1<Object>() {
      @Override
      public void call(Object event) {
 
        if(event instanceof TapEvent) {
          _showTapText();
 
        }else if(event instanceof SomeOtherEvent) {
          _doSomethingElse();
        }
      }
    });

 

 

1.7.本篇文章的參考文獻

  Android RxJava實現RxBus。

  Android基於RxJava、RxAndroid的EventBus實現。

  用RxJava實現事件總線-RxBus。


2.封裝好的總線類

2.1.RxJava1.x的總線實現方式

/**
 * desc   : 利用 PublishSubject的特性:與普通的Subject不一樣,在訂閱時並不當即觸發訂閱事件,
 * 而是容許咱們在任意時刻手動調用onNext(),onError(),onCompleted來觸發事件。
 */
public class RxBus {

    private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();

    private RxBus() {

    }

    private static class Holder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return Holder.instance;
    }

    public <T> Observable<T> register(@NonNull Class<T> clz) {
        return register(clz.getName());
    }

    public <T> Observable<T> register(@NonNull Object tag) {
        List<Subject> subjectList = subjectMapper.get(tag);
        if (null == subjectList) {
            subjectList = new ArrayList<>();
            subjectMapper.put(tag, subjectList);
        }

        Subject<T, T> subject = PublishSubject.create();
        subjectList.add(subject);

        //System.out.println("註冊到rxbus");
        return subject;
    }

    public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) {
        unregister(clz.getName(), observable);
    }

    public void unregister(@NonNull Object tag, @NonNull Observable observable) {
        List<Subject> subjects = subjectMapper.get(tag);
        if (null != subjects) {
            subjects.remove(observable);
            if (subjects.isEmpty()) {
                subjectMapper.remove(tag);
                //System.out.println("從rxbus取消註冊");
            }
        }
    }

    public void post(@NonNull Object content) {
        post(content.getClass().getName(), content);
    }

    public void post(@NonNull Object tag, @NonNull Object content) {
        List<Subject> subjects = subjectMapper.get(tag);
        if (!subjects.isEmpty()) {
            for (Subject subject: subjects) {
                subject.onNext(content);
            }
        }
    }
}
幾個關鍵方法: 
register —— 由tag,生成一個subject List,同時利用PublishSubject建立一個Subject並返回,
它同時也是Observable的子類。
unregister —— 移除tag對應subject List 中的Observable。若subject List爲空,也將被移除。
post —— 遍歷tag對應subject List 中的Subject,執行onNext()。
這裏實際執行的是觀察者Observer的onNext(),
Subject的定義:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。

  測試代碼:

/*
rxbus
 */
Observable<String> observable = RxBus.getInstance().register(String.class);
observable.map(s -> {
    try {
        int v = Integer.valueOf(s);
        System.out.println("map變換成功, source = " + s);
        return v;
    } catch (Exception e) {
        System.out.println("map變換失敗, source = " + s);
        return s;
    }
}).subscribe(value -> {
    System.out.println("訂閱 " + value);
});

RxBus.getInstance().post("888");
RxBus.getInstance().post("發發發");
RxBus.getInstance().unregister(String.class, observable);
//這裏比較有意思的是,使用了lambda表達式。
//在map變換時,若是將字符串轉成Integer,沒有問題就返回整型;
//若報異常,就返回String型。
//一樣的,在最終訂閱時,value參數的類型也是由map變換來決定的。

 

 

2.2.RxJava2.0總線實現類

  由於在RxJava2.0以後,io.reactivex.Observable中沒有進行背壓處理了。

  若是有大量消息堆積在總線中來不及處理會產生OutOfMemoryError。

  有新類io.reactivex.Flowable專門針對背壓問題。

 

  無背壓處理的Observable實現,跟RxJava1.0x中同樣,使用PublishSubject來實現。

  要實現有背壓的2.0x版,使用FlowableProcessor的子類PublishProcessor來產生Flowable。

  

  源代碼以下:

public class RxBus {

    private final FlowableProcessor<Object> mBus;

    private RxBus() {
        mBus = PublishProcessor.create().toSerialized();
    }

    private static class Holder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return Holder.instance;
    }

    public void post(@NonNull Object obj) {
        mBus.onNext(obj);
    }

    public <T> Flowable<T> register(Class<T> clz) {
        return mBus.ofType(clz);
    }

    public void unregisterAll() {
        //會將全部由mBus 生成的 Flowable 都置  completed 狀態  後續的 全部消息  都收不到了
        mBus.onComplete();
    }

    public boolean hasSubscribers() {
        return mBus.hasSubscribers();
    }

}

  測試代碼:

Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class);
f1.subscribe(value -> System.out.println("訂閱f1消息 .. " + value));
RxBus.getInstance().post(999);

 


3.實際項目調用方式

3.1.首先自定義一個RxBus。

  這個類感受有點像工具類。和其餘函數沒有任何耦合關係。

  這個類見在上面2中封裝好的RxBus類。

 

 

3.2.在BaseListFragment實現了LazyLoadFragment中的抽象函數。

  這裏解釋一下:

  BaseListFragment是一個能夠刷新能夠加載更多的一個碎片。

  LazyLoadFragment是一個懶加載的被BaseListFragmetn繼承的一個基類。

  LazyLoadFragment經過判斷是否可見的函數setUserVisibleHint執行了一個抽象函數fetchData()。

  adapter是頁面內容的一個適配器。

  而後在BaseListFragment中重寫這個抽象函數。

 @Override
    public void fetchData() {
        observable = RxBus.getInstance().register(BaseListFragment.TAG);
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                adapter.notifyDataSetChanged();
            }
        });
    }

  observable.subscribe(new Consumer<Integer>)返回的是一個Disposable類型。

  以下面Disposable的簡單使用方式。

 Disposable disposable = observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                  //這裏接收數據項
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
              //這裏接收onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
              //這裏接收onComplete。
            }
        });

 

 

3.3.小貼士

  RxBus的註冊與反註冊必定要對應出現。

  通常在活動或者Fragment中的onStart中register這個活動或者片斷的TAG(也就是一個惟一標識字符串)。

  通常在活動或者Fragment中的onDestroy中ungister這個活動或者片斷的TAG。

  post用於傳遞消息,看狀況調用唄。

相關文章
相關標籤/搜索