RxJava接觸過蠻長時間了,可是讓我說個因此然來仍是說不出來,歸根結底仍是仍是理解不夠深入,趁着年末這個時候爭取寫個系列出來給本身的學習作個記錄ide
注意區分RxJava1.0和2.0的區別,如下默認是在2.0的基礎上作的測試學習
先來理解幾個概念:測試
一、Observable : 字面意思可觀察的,被觀察者,也就是事件的發生者spa
二、Observer:觀察者,也就是事件的接受者設計
三、subscribe():二者產生訂閱關係,須要注意一點的是 observable.subscribe(observer),感受像是被觀察者訂閱了觀察者,與常理不符,爲何這麼設計呢?我估計是爲了鏈式調用吧。。code
1、最簡單的使用方式:server
1 Observable.create(new ObservableOnSubscribe<Integer>() { 2 @Override 3 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 4 e.onNext(1); 5 e.onNext(2); 6 e.onNext(3); 7 e.onComplete(); 8 } 9 }).subscribe(new Observer<Integer>() { 10 11 @Override 12 public void onSubscribe(Disposable d) { 13 Log.i(TAG, "onSubscribe: "); 14 } 15 16 @Override 17 public void onNext(Integer integer) { 18 Log.i(TAG, "onNext: "+integer); 19 } 20 21 @Override 22 public void onError(Throwable e) { 23 Log.i(TAG, "onError: "+e.getMessage()); 24 } 25 26 @Override 27 public void onComplete() { 28 Log.i(TAG, "onComplete: complete"); 29 } 30 });
一、onNext()能夠屢次發送事件,onComplete()發送一次,屢次調用不會報錯,onError()發送一次,屢次調用會報錯,不可和onComplete()共存blog
二、調用onComplete()或者onError()後,觀察者也沒法接受到onNext()事件
三、Disposable(2.0新增),當調用了dispose()後,觀察者就沒法接受到事件了ip
2、Cold Observable和Hot Observable
Cold Observable:只有當訂閱者訂閱的時候,數據流纔開始發送,而且每一個訂閱者訂閱的時候都會獨立執行一遍數據流的發送(create(),just()....)
Hot Observable :無論有沒有訂閱者訂閱,一旦建立,就開始發送數據流
publish轉化:
1 ConnectableObservable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).publish();//轉化成Cold Observable 2 ob.connect();//開始發送數據流
若是不調用connnect(),不會發送數據流,一旦調用,就會建立一個subscription並訂閱到原Observable,將接受的數據轉發給訂閱者。
connect()與disConnect()
1.0 connect() 返回Subscription
2.0 connect() 返回Disposable
//注意區分要釋放哪一個 //釋放s,則表明中斷數據傳輸,再次鏈接則從新發送數據 //釋放d1或者d2,則表明取消註冊,數據已然在傳輸 public void doSubscribe(View v){ s= ob.connect(); //public final void subscribe(Observer<? super T> observer) {} 無返回值,沒法取消註冊 d1= ob.subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG, "onNext: first============"+aLong); } }); d2=ob.subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i(TAG, "accept: second=========="+aLong); } }); }
RefCount
Observable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
若是有訂閱者就會發送數據流,無訂閱數據流即中止,再次訂閱從新開始發送(可能會和Cold Observable混淆,注意此處每一個訂閱者接受到的數據是相同的)
Reply
ConnectableObservable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).replay();
ob.connect();
當和源 Observable 連接後,開始收集數據。當有 Observer 訂閱的時候,就把收集到的數據線發給 Observer。而後和其餘 Observer 同時接受數據
能夠同時設置收集數據的個數及時間
Cache
Observable<Long> ob= Observable.interval(200, TimeUnit.MILLISECONDS).take(5).cache();//只有當訂閱者訂閱後纔開始發送數據
與Reply相似,訂閱者所有取消後也不會中止發送。