RxJava是android的異步框架,官方介紹是可觀測的序列,組成異步基於事件程序的庫。特色是觀察者模式,基於事件流的鏈式調用,隨着異步操做調度過程複雜的狀況下,程序邏輯也變得愈來愈複雜,但RxJava依然可以保持簡潔。java
簡單的說觀察者A與被觀察者B創建訂閱關係,當被觀察者B發生某種改變時,當即通知觀察者Areact
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
注意各地方添加泛型避免大片警告,onNext()是事件的回調,onComplete()是事件的結尾。onComplete()與onError互斥須要保持惟一性,並只能調用一次。android
Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("消息1"); e.onNext("消息2"); e.onNext("消息3"); e.onComplete(); } });
建立觀察者時回調的onSubscribe能夠獲取Disposable對象,在合適的時候判斷條件,調用dispose()便可接觸訂閱關係緩存
Observer<String> observer=new Observer<String>() { @Override public void onSubscribe(Disposable d) { //經過判斷解除訂閱關係 d.dispose(); } @Override public void onNext(String o) { //對應observable的onNext方法 } @Override public void onError(Throwable e) { //對應observable的onError方法 } @Override public void onComplete() { //對應observable的onComplete方法 } };
observable.subscribeOn(Schedulers.io()) //指定事件生產在子線程 .observeOn(AndroidSchedulers.mainThread()) //指定事件消費在UI線程 .subscribe(observer);
//just模式,將自動發送onNext()事件 Observable<String> observable = Observable.just("發送消息"); //fromIterable模式,遍歷集合,並自動發送onNext()事件 Observable<String> observable = Observable.fromIterable((Iterable<String>) mList); //interval模式,定時自動發送整數序列,從0開始每隔2秒計數, Observable<Long> observable = Observable.interval(0,2, TimeUnit.SECONDS) //range模式,自動發送特定的整數序列,0表示不發送,負數會拋異常,從1開始發送到20 Observable<Integer> observable = Observable.range(1,20); //timer模式,定時執行觀察者的onNext()方法 Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);
如建立操做,數據過濾操做,條件操做,轉載如下博客,很詳細:框架
Schedulers.immediate() 默認模式,在當前線程運行異步
Schedulers.newThread() 建立新的子線程運行ide
Schedulers.io() 建立新的子線程運行,內部使用的是無上限的線程池,可重用空閒的線程,效率高spa
AndroidSchedulers.mainThread() 在UI主線程運行線程
subscribeOn() 指定Observable(被觀察者)所在的線程,或者叫作事件產生的線程code
observeOn() 指定 Observer(觀察者)所運行在的線程,或者叫作事件消費的線程
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("hello RxJava!"); e.onComplete(); } },BackpressureStrategy.BUFFER);//增長背壓模式
onSubscribe()會返回Subscription對象,調用cancel()便可取消訂閱關係,request()便可指定消費事件的數量
Subscriber<String> subscriber=new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(String s) { Log.i("RxJava", "onNext: "+s); } @Override public void onError(Throwable t) { Log.i("RxJava", "onError"); } @Override public void onComplete() { Log.i("RxJava", "onComplete"); } }; flowable.subscribe(subscriber);//創建訂閱關係
若是生產者和消費者不在同一線程的狀況下,若是生產者的速度大於消費者的速度,就會產生Backpressure問題。即異步狀況下,Backpressure問題纔會存在。
當消費者處理不了事件,就丟棄。
消費者經過request()傳入其需求n,而後生產者把n個事件傳遞給消費者供其消費。其餘消費不掉的事件就丟掉
LATEST與DROP功能基本一致,惟一的區別就是LATEST總能使消費者可以接收到生產者產生的最後一個事件
這種方式會在產生Backpressure問題的時候直接拋出一個異常,這個異常就是著名的MissingBackpressureException