import android.util.Log; import rx.Observable; import rx.Subscriber; import rx.functions.Action1; public class RXJavaDemo { private static final String TAG = RXJavaDemo.class.getSimpleName(); private int count = 0; public RXJavaDemo() { } public void call() { new Thread(new Runnable() { @Override public void run() { while (true) { mObservable.subscribe(mSubscriber); mObservable.subscribe(action1); Observable.just("just Object").subscribe(action1); } } }).start(); } private Observable<String> mObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onNext("2"); subscriber.onNext("3"); subscriber.onCompleted(); } }); private Subscriber<String> mSubscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.v(TAG, "onNext, string : " + s); Log.v(TAG, "onNext, count : " + count); count++; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onCompleted() { Log.v(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.v(TAG, "onError, e : " + e.toString()); } }; private Action1<String> action1 = new Action1() { @Override public void call(Object o) { if (o == null) { Log.v(TAG, "Action1, object is null"); return; } Log.v(TAG, "Acition1, o : " + ((String) o)); } }; }
使用RXJava以前:java
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
使用RXJava以後:react
Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });
build.gradleandroid
compile 'io.reactivex:rxjava:1.0.9' compile 'io.reactivex:rxandroid:0.24.0' compile 'com.squareup.retrofit:retrofit:1.9.0'
RxJava提供四種不一樣的Subject:PublishSubject、BehaviorSubject、、ReplaySubject.、AsyncSubject緩存
BehaviorSubject, 會首先向他的訂閱者發送截至訂閱前最新的一個數據對象(或初始值),而後正常發送訂閱後的數據流。ide
ReplaySubject, 會緩存它所訂閱的全部數據,向任意一個訂閱它的觀察者重發。gradle
AsyncSubject, 當Observable完成時只會發佈最後一個數據給已經訂閱的每個觀察者。ui
PublishSubject, 沒有發送數據,觀察者只能等待,沒有線程阻塞,沒有資源消耗。在調用publishSubject.onNext時,才發送消息。 發送消息結束之後,publishSubject並無結束,觀察者等待消息再一次的發送。若是想關閉publishSubject,publishSubject需調用publishSubject.onCompleted方法關閉。此時,publishSubject再發送消息,觀察者不能收到發送的消息。線程