Android開發之《RXJava的簡單實現》

import android.util.Log;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

public class RXJavaDemo {
    private static final String TAG = RXJavaDemo.class.getSimpleName();

    private int count = 0;

    public RXJavaDemo() {
    }

    public void call() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    mObservable.subscribe(mSubscriber);
                    mObservable.subscribe(action1);
                    Observable.just("just Object").subscribe(action1);
                }
            }
        }).start();
    }

    private Observable<String> mObservable = Observable.create(
            new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("1");
                    subscriber.onNext("2");
                    subscriber.onNext("3");
                    subscriber.onCompleted();
                }
            });

    private Subscriber<String> mSubscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            Log.v(TAG, "onNext, string : " + s);
            Log.v(TAG, "onNext, count : " + count);
            count++;

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onCompleted() {
            Log.v(TAG, "onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            Log.v(TAG, "onError, e : " + e.toString());
        }
    };

    private Action1<String> action1 = new Action1() {
        @Override
        public void call(Object o) {
            if (o == null) {
                Log.v(TAG, "Action1, object is null");
                return;
            }

            Log.v(TAG, "Acition1, o : " + ((String) o));
        }
    };
}

 

使用RXJava以前:java

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

  

使用RXJava以後:react

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

  

 build.gradleandroid

    compile 'io.reactivex:rxjava:1.0.9'
    compile 'io.reactivex:rxandroid:0.24.0'
    compile 'com.squareup.retrofit:retrofit:1.9.0'

 

RxJava提供四種不一樣的Subject:PublishSubject、BehaviorSubject、、ReplaySubject.、AsyncSubject緩存

BehaviorSubject, 會首先向他的訂閱者發送截至訂閱前最新的一個數據對象(或初始值),而後正常發送訂閱後的數據流。ide

ReplaySubject, 會緩存它所訂閱的全部數據,向任意一個訂閱它的觀察者重發。gradle

AsyncSubject, 當Observable完成時只會發佈最後一個數據給已經訂閱的每個觀察者。ui

PublishSubject, 沒有發送數據,觀察者只能等待,沒有線程阻塞,沒有資源消耗。在調用publishSubject.onNext時,才發送消息。 發送消息結束之後,publishSubject並無結束,觀察者等待消息再一次的發送。若是想關閉publishSubject,publishSubject需調用publishSubject.onCompleted方法關閉。此時,publishSubject再發送消息,觀察者不能收到發送的消息。線程

相關文章
相關標籤/搜索