RxJava 2.x 理解-1

在RxJava 1.x 系列中,講解了RxJava的大體用法,由於如今都用RxJava 2了,因此Rxjava 1就不細講,主要來學習RxJava 2。html

基本使用:

    /**
     * rajava2 的基本使用
     */
    private void rxJava2BaseUser() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("1");
                        emitter.onNext("2");
                        emitter.onNext("3");
                        //throw new Exception("發生了錯誤");
                    }
                })
                .subscribe(new Observer<String>() {

                    Disposable disposable;

                    // 新增該方法
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.d(TAG, "onSubscribe");
                        disposable = d;
                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        Log.d(TAG, "Item: " + s);
                        if (s.equals("4"))
                            disposable.dispose(); // 在RxJava 2.x 中,新增的Disposable能夠作到切斷的操做,讓Observer觀察者再也不接收上游事件
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("----- 01 -----");
                        emitter.onNext("----- 02 -----");
                        emitter.onNext("----- 03 -----");
                    }
                })
                // Consumer 和 RxJava 1 中的 Action1 相似
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "Item: " + s);
                    }
                });
    }

基本使用和RxJava 1沒有什麼區別。java

1.新增了onSubscribe方法,onSubscribe方法會在事件開始的時候,觸發。react

2.新增的Disposable能夠作到切斷的操做,讓Observer觀察者再也不接收上游事件。git

3.Action1  --> Consumer 只接收onNext方法。github

4.那若是有多個 Disposable 該怎麼辦呢, RxJava中已經內置了一個容器 CompositeDisposable, 每當咱們獲得一個 Disposable時就調用 CompositeDisposable.add()將它添加到容器中, 在退出的時候, 調用 CompositeDisposable.clear() 便可切斷全部的水管.

線程切換:

/**
     * rajava2 線程
     */
    private void rxJava2Thread() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                        Log.d(TAG, "事件處理線程:" + Thread.currentThread().getName());
                        emitter.onNext("---- 1 ----");
                        emitter.onNext("---- 2 ----");
                        emitter.onNext("---- 3 ----");
                    }
                })
                .subscribeOn(Schedulers.newThread())        // 指明被觀察者處理的線程
                .observeOn(AndroidSchedulers.mainThread())  // 指明觀察者線程
                .subscribe(new Observer<String>() {

                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.d(TAG, "onSubscribe:" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        Log.d(TAG, "Item: " + s + " :" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "onError:" + e.getMessage() + " :" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete:" + Thread.currentThread().getName());
                    }
                });
    }

結果:網絡

02-10 10:02:31.007 25414-25414/pers.bolin.rxjava2demo D/MainActivity: onSubscribe:main
02-10 10:02:31.009 25414-25970/pers.bolin.rxjava2demo D/MainActivity: 事件處理線程:RxNewThreadScheduler-1
02-10 10:02:31.047 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 1 ---- :main
02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 2 ---- :main
02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 3 ---- :main

能夠看出事件已經被分到不一樣的線程去處理了。ide

                .subscribeOn(Schedulers.newThread())        // 指明被觀察者處理的線程
                .observeOn(AndroidSchedulers.mainThread())  // 指明觀察者線程

須要注意的是subscribeOn 只在第一次切換有效,observeOn每次切換都是有效的post

看下線程的參數有哪些:學習

  • Schedulers.io() 表明io操做的線程, 一般用於網絡,讀寫文件等io密集型的操做
  • Schedulers.computation() 表明CPU計算密集型的操做, 例如須要大量計算的操做
  • Schedulers.newThread() 表明一個常規的新線程
  • AndroidSchedulers.mainThread() 表明Android的主線程
  • Schedulers.single() 表明一個默認的、共享的、單線程支持的調度器實例,用於在相同的後臺線程上執行強順序執行。
  • Schedulers.trampoline()表明當其它排隊的任務完成後,在當前線程排隊開始執行

變換/操做符:

 just;from ;map ;flatMap  和RxJava使用一致:RxJava 1.x 理解-3url

更多的操做符使用:

官方:http://reactivex.io/documentation/operators.html

RxJava 知識梳理(2) - RxJava2 操做符實戰

RxJava2-Android-Samples

相關文章
相關標籤/搜索