在RxJava 1.x 系列中,講解了RxJava的大體用法,由於如今都用RxJava 2了,因此Rxjava 1就不細講,主要來學習RxJava 2。html
/** * rajava2 的基本使用 */ private void rxJava2BaseUser() { Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); //throw new Exception("發生了錯誤"); } }) .subscribe(new Observer<String>() { Disposable disposable; // 新增該方法 @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); disposable = d; } @Override public void onNext(@NonNull String s) { Log.d(TAG, "Item: " + s); if (s.equals("4")) disposable.dispose(); // 在RxJava 2.x 中,新增的Disposable能夠作到切斷的操做,讓Observer觀察者再也不接收上游事件 } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("----- 01 -----"); emitter.onNext("----- 02 -----"); emitter.onNext("----- 03 -----"); } }) // Consumer 和 RxJava 1 中的 Action1 相似 .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Item: " + s); } }); }
基本使用和RxJava 1沒有什麼區別。java
1.新增了onSubscribe方法,onSubscribe方法會在事件開始的時候,觸發。react
2.新增的Disposable能夠作到切斷的操做,讓Observer觀察者再也不接收上游事件。git
3.Action1 --> Consumer 只接收onNext方法。github
Disposable
該怎麼辦呢, RxJava中已經內置了一個容器
CompositeDisposable
, 每當咱們獲得一個
Disposable
時就調用
CompositeDisposable.add()
將它添加到容器中, 在退出的時候, 調用
CompositeDisposable.clear()
便可切斷全部的水管.
/** * rajava2 線程 */ private void rxJava2Thread() { Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "事件處理線程:" + Thread.currentThread().getName()); emitter.onNext("---- 1 ----"); emitter.onNext("---- 2 ----"); emitter.onNext("---- 3 ----"); } }) .subscribeOn(Schedulers.newThread()) // 指明被觀察者處理的線程 .observeOn(AndroidSchedulers.mainThread()) // 指明觀察者線程 .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe:" + Thread.currentThread().getName()); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "Item: " + s + " :" + Thread.currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError:" + e.getMessage() + " :" + Thread.currentThread().getName()); } @Override public void onComplete() { Log.d(TAG, "onComplete:" + Thread.currentThread().getName()); } }); }
結果:網絡
02-10 10:02:31.007 25414-25414/pers.bolin.rxjava2demo D/MainActivity: onSubscribe:main 02-10 10:02:31.009 25414-25970/pers.bolin.rxjava2demo D/MainActivity: 事件處理線程:RxNewThreadScheduler-1 02-10 10:02:31.047 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 1 ---- :main 02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 2 ---- :main 02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 3 ---- :main
能夠看出事件已經被分到不一樣的線程去處理了。ide
.subscribeOn(Schedulers.newThread()) // 指明被觀察者處理的線程 .observeOn(AndroidSchedulers.mainThread()) // 指明觀察者線程
須要注意的是subscribeOn 只在第一次切換有效,observeOn每次切換都是有效的post
看下線程的參數有哪些:學習
just;from ;map ;flatMap 和RxJava使用一致:RxJava 1.x 理解-3url
更多的操做符使用:
官方:http://reactivex.io/documentation/operators.html