1.簡單介紹
1.1.發佈/訂閱事件主要用於網絡請求的回調。html
事件總線可使Android各組件之間的通訊變得簡單,並且能夠解耦。react
其實RxJava實現事件總線和EventBus比較相似,他們都依據與觀察者模式。git
我的比較習慣用RxJava來實現,由於很是簡單而清晰。github
1.2.固然EventBus實現總線的方式也有不少人用。web
這裏給個傳送門==>EventBus的github地址:https://github.com/greenrobot/EventBus網絡
而後Otto實現總線也不錯==>Otto的github地址:https://github.com/square/ottoapp
1.3.使用RxJava的好處以及注意點ide
最明顯的好處就是:項目體積縮小了。函數
注意:使用RxLifecycle來解決RxJava內存泄漏的問題。工具
==>參考個人另外一篇博客:RxLifecycle第三方庫的使用。
1.4.理解一下觀察者模式。
這是一種行爲模式。
當你的類或者主對象(稱爲被觀察者)的狀態發生改變就會通知全部對此感興趣的類或對象(稱爲觀察者)。
1.5.理解一下發布/訂閱
發佈/訂閱 模式的功能和觀察者模式是同樣的,都是完成特定事件發生後的消息通知。
觀察者模式和發佈/訂閱模式之間仍是存在了一些差異,在發佈/訂閱模式中重點是發佈消息,而後由調度中心
統一調度,不須要知道具體有哪些訂閱者。(這樣就能夠匿名)
爲何要匿名?
在計算機程序設計中有一個很是棒的思想叫「解耦」。你一般但願在你的設計中保持儘量低的耦合度。
一般狀況下,你但願消息發佈商可以直接瞭解全部須要接收消息的訂閱者,
這樣,一旦「事件」或消息準備好就能夠及時通知每個訂閱者。
可是使用事件總線,發佈者能夠免除這種職責並實現獨立性,
由於消息發佈者和消息訂閱者能夠相互不知道對方,只關心對應的消息,從而接觸二者之間的依賴關係
怎麼實現匿名?
提到匿名,天然而然你就會問:你是如何真正實現發佈者和訂閱者之間的匿名?
很簡單,只要找到一箇中間人,讓這個中間人負責兩方的消息調度。事件總線就是一個這樣的中間人。
綜上所述,事件總線就是這麼簡單。
1.6.使用RxJava實現事件總線的簡單案例
github參考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples
以下面的例子:
咱們從頂部片斷(綠色部分)發佈事件,並從底部片斷(藍色部分)監聽點擊事件(經過事件總線)。
怎麼實現這個功能呢?
第一步自定義一個事件總線
public class RxBus { private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create()); public void send(Object o) { _bus.onNext(o); } public Observable<Object> toObserverable() { return _bus; } }
第二步將事件發佈到總線中。
@OnClick(R.id.btn_demo_rxbus_tap) public void onTapButtonClicked() { _rxBus.send(new TapEvent()); }
第三步監聽來自其餘組件或服務事件
_rxBus.toObserverable() .subscribe(new Action1<Object>() { @Override public void call(Object event) { if(event instanceof TapEvent) { _showTapText(); }else if(event instanceof SomeOtherEvent) { _doSomethingElse(); } } });
1.7.本篇文章的參考文獻
2.封裝好的總線類
2.1.RxJava1.x的總線實現方式
/** * desc : 利用 PublishSubject的特性:與普通的Subject不一樣,在訂閱時並不當即觸發訂閱事件, * 而是容許咱們在任意時刻手動調用onNext(),onError(),onCompleted來觸發事件。 */ public class RxBus { private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>(); private RxBus() { } private static class Holder { private static RxBus instance = new RxBus(); } public static RxBus getInstance() { return Holder.instance; } public <T> Observable<T> register(@NonNull Class<T> clz) { return register(clz.getName()); } public <T> Observable<T> register(@NonNull Object tag) { List<Subject> subjectList = subjectMapper.get(tag); if (null == subjectList) { subjectList = new ArrayList<>(); subjectMapper.put(tag, subjectList); } Subject<T, T> subject = PublishSubject.create(); subjectList.add(subject); //System.out.println("註冊到rxbus"); return subject; } public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) { unregister(clz.getName(), observable); } public void unregister(@NonNull Object tag, @NonNull Observable observable) { List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove(observable); if (subjects.isEmpty()) { subjectMapper.remove(tag); //System.out.println("從rxbus取消註冊"); } } } public void post(@NonNull Object content) { post(content.getClass().getName(), content); } public void post(@NonNull Object tag, @NonNull Object content) { List<Subject> subjects = subjectMapper.get(tag); if (!subjects.isEmpty()) { for (Subject subject: subjects) { subject.onNext(content); } } } }
幾個關鍵方法: register —— 由tag,生成一個subject List,同時利用PublishSubject建立一個Subject並返回,
它同時也是Observable的子類。
unregister —— 移除tag對應subject List 中的Observable。若subject List爲空,也將被移除。
post —— 遍歷tag對應subject List 中的Subject,執行onNext()。
這裏實際執行的是觀察者Observer的onNext(),
Subject的定義:public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。
測試代碼:
/* rxbus */ Observable<String> observable = RxBus.getInstance().register(String.class); observable.map(s -> { try { int v = Integer.valueOf(s); System.out.println("map變換成功, source = " + s); return v; } catch (Exception e) { System.out.println("map變換失敗, source = " + s); return s; } }).subscribe(value -> { System.out.println("訂閱 " + value); }); RxBus.getInstance().post("888"); RxBus.getInstance().post("發發發"); RxBus.getInstance().unregister(String.class, observable);
//這裏比較有意思的是,使用了lambda表達式。
//在map變換時,若是將字符串轉成Integer,沒有問題就返回整型;
//若報異常,就返回String型。
//一樣的,在最終訂閱時,value參數的類型也是由map變換來決定的。
2.2.RxJava2.0總線實現類
由於在RxJava2.0以後,io.reactivex.Observable中沒有進行背壓處理了。
若是有大量消息堆積在總線中來不及處理會產生OutOfMemoryError。
有新類io.reactivex.Flowable專門針對背壓問題。
無背壓處理的Observable實現,跟RxJava1.0x中同樣,使用PublishSubject來實現。
要實現有背壓的2.0x版,使用FlowableProcessor的子類PublishProcessor來產生Flowable。
源代碼以下:
public class RxBus { private final FlowableProcessor<Object> mBus; private RxBus() { mBus = PublishProcessor.create().toSerialized(); } private static class Holder { private static RxBus instance = new RxBus(); } public static RxBus getInstance() { return Holder.instance; } public void post(@NonNull Object obj) { mBus.onNext(obj); } public <T> Flowable<T> register(Class<T> clz) { return mBus.ofType(clz); } public void unregisterAll() { //會將全部由mBus 生成的 Flowable 都置 completed 狀態 後續的 全部消息 都收不到了 mBus.onComplete(); } public boolean hasSubscribers() { return mBus.hasSubscribers(); } }
測試代碼:
Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class); f1.subscribe(value -> System.out.println("訂閱f1消息 .. " + value)); RxBus.getInstance().post(999);
3.實際項目調用方式
3.1.首先自定義一個RxBus。
這個類感受有點像工具類。和其餘函數沒有任何耦合關係。
這個類見在上面2中封裝好的RxBus類。
3.2.在BaseListFragment實現了LazyLoadFragment中的抽象函數。
這裏解釋一下:
BaseListFragment是一個能夠刷新能夠加載更多的一個碎片。
LazyLoadFragment是一個懶加載的被BaseListFragmetn繼承的一個基類。
LazyLoadFragment經過判斷是否可見的函數setUserVisibleHint執行了一個抽象函數fetchData()。
adapter是頁面內容的一個適配器。
而後在BaseListFragment中重寫這個抽象函數。
@Override public void fetchData() { observable = RxBus.getInstance().register(BaseListFragment.TAG); observable.subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { adapter.notifyDataSetChanged(); } }); }
observable.subscribe(new Consumer<Integer>)返回的是一個Disposable類型。
以下面Disposable的簡單使用方式。
Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //這裏接收數據項 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //這裏接收onError } }, new Action() { @Override public void run() throws Exception { //這裏接收onComplete。 } });
3.3.小貼士
RxBus的註冊與反註冊必定要對應出現。
通常在活動或者Fragment中的onStart中register這個活動或者片斷的TAG(也就是一個惟一標識字符串)。
通常在活動或者Fragment中的onDestroy中ungister這個活動或者片斷的TAG。
post用於傳遞消息,看狀況調用唄。