RxJava 版本1.0 和 版本2.0的比較

如今RxJava已是2.0以上版本了,看到不少資料上還在講1.0的版本,所以作個簡單的筆記,供你們參考,不足之處請兄弟們留言我添加完善java

一、概述編程

RxJava(Reactive Extensions Java),RxJava基於觀察者模式,是一種響應式編程模型,目的是提供統一的接口幫助開發者便捷的處理異步數據流。app

RxJava本質上是一個異步庫,使用簡潔的邏輯處理繁瑣複雜任務。異步

 

二、觀察者模式ide

觀察者模式涉及到2主角函數

1)Observable :被觀察者spa

2)Observer:觀察者線程

被觀察者中有一個觀察者的列表,當被觀察者發生變化的時候回根據這張表一一通知觀察者。code

 

三、1.0和2.0的區別 orm

3.一、背壓

1.0背壓集中在Obserable中處理,致使有點混亂,。

2.0版本把對背壓的處理提出來了,出現了兩種觀察者模式:

Observable(被觀察者)/Observer(觀察者) 不支持背壓(Backpressure)
Flowable(被觀察者)/Subscriber(觀察者) 支持背壓(Backpressure)

Flowable是新增的

3.二、操做符修改

1.0版本

Action這類接口命名Action0、Action1…(數字表明可接受的參數),如今作出了改動

1.x 2.x
Action0 Action
Action1 Consumer
Action2 BiConsumer
...  
ActionN  

Funx修改

1.x 2.x
Func

Function<T, R>

A functional interface that takes a value and returns another value

Func2 BiFunction<T1, T2, R>
Func3 ~ Func9

Function3 ~ Function9

 

Function3<T1, T2, T3, R>

Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R>

FuncN Function<T, R>

First()

1.x 2.x
first firstElement
first(Func1) filter(predicate).first()
firstOrDefault(T) first(T)
firstOrDefault(Func1, T) first(T)
   

 

3.三、線程調度 Schedulers

2.0去掉了Schedulers.immediate()、Schedulers.test()

 

3.4 、建立被觀察者的變化

1.0版本 
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
        subscriber.onNext("hello");
        subscriber.onNext("hello");
        subscriber.onCompleted();
    }
});

2.0版本 
變化點1:create參數Observable.OnSubscribe改成ObservableOnSubscribe 
變化點2:回到函數call(Subscriber)改成 subscribe(ObservableEmitter) 
變化點3:可發射三種事件:emitter.onNext(T value)、onComplete、onError(Throwable e)
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {

           @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
         
                e.onNext("hello!");
                e.onNext("world!");
                e.onNext("are you ok!");
                e.onError(new Throwable("error"));
                e.onComplete();
            }
        });

3.五、建立觀察者的變化

1.0 中建立觀察者Observer有兩種方式

方式1:採用 Observer 接口

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            
        }

        @Override
        public void onCompleted() {
           
        }

        @Override
        public void onError(Throwable e) {
            
        }
    };

方式2:採用Subscriber接口建立, 其對 Observer接口進行了擴展,增長了onStart()、unSubscribe()

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            
        }

        @Override
        public void onCompleted() {
            
        }

        @Override
        public void onError(Throwable e) {
            
        }
    };

2.0 建立觀察者Observer
變化點1:增長了回調方法onSubscribe(),其最早被調用,可用來作一些初始化的工做
變化點2:onCompleted()改爲 onComplete()
    
Observer<Integer> observer = new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onNext(Integer value) {
        }

        @Override
        public void onError(Throwable e) {
        }

        // 注意
        @Override
        public void onComplete() {
        }
    }

 

3.六、接口都增長了異常拋出

public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}


public interface BiConsumer<T1, T2> {

    /**
     * Performs an operation on the given values.
     * @param t1 the first value
     * @param t2 the second value
     * @throws Exception on error
     */
    void accept(T1 t1, T2 t2) throws Exception;
}

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(T t) throws Exception;
}


/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

 

3.7 from 變化

from -> fromArray

1.0版本

private void doFrom() {
       

        String[] items = {"item1", "item2", "item3"};
        Observable.from(items)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });

    }

2.0版本
private void doFrom() {
        String[] items = {"item1", "item2", "item3"};
        Observable.fromArray(items).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

    }
相關文章
相關標籤/搜索