RxJava部分操做符

package qianxingzhe.rxjava_learning;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/**
 * Created by lunyi.yly on 16/8/6.
 */

public class RxJavaText {

    @Test
    public void hello_world_01() {
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello, World!");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted()");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError()");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext()");
                System.out.println(s);
            }
        };

        observable.subscribe(subscriber);
    }

    @Test
    public void hello_world_02() {
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello, World!");
                subscriber.onCompleted();
            }
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted()");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError()");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext()");
                System.out.println("s");
            }
        };

        observable.subscribe(observer);
    }

    @Test
    public void just_01() {
        Observable<String> observable = Observable.just("hello world");

        Action1<String> action1 = new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        };

        observable.subscribe(action1);
    }

    /**
     * just 用來建立只發出一個事件就結束的Observable對象
     */
    @Test
    public void just_02() {
        Observable.just("hello world").subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });
    }

    /**
     * map 把一個事件轉換爲另外一個事件
     */
    @Test
    public void map() {
        Observable.just("hello world")
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return s.hashCode();
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                });
    }

    /**
     * from 接收一個集合做爲輸入,而後每次輸出一個元素給subscriber
     */
    @Test
    public void from() {
        Observable.from(new String[]{"hello", "world"})
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
    }

    /**
     * flatMap 接收一個Observable的輸出做爲輸入,同時輸出另一個Observable
     */
    @Test
    public void flatmap() {
        List<List<String>> lists = new ArrayList<>();
        Observable.from(lists)
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {
                        return Observable.from(strings);
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
    }

    /**
     * filter 輸出和輸入相同的元素,而且會過濾掉那些不知足檢查條件的
     */
    @Test
    public void filter() {
        Observable.from(new String[]{"hello", "world"})
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        return Observable.just(s);
                    }
                })
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.equals("hello");
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
    }

    /**
     * take 輸出最多指定數量的結果
     */
    @Test
    public void take() {
        Observable.from(new Integer[]{1, 2, 3, 4, 5,})
                .take(3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                });
    }

    /**
     * doOnNext 容許咱們在每次輸出一個元素以前作一些額外的事情,好比這裏的保存標題。
     */
    @Test
    public void doOnNext() {
        Observable.just("hello world")
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
    }

    /**
     * subscribeOn 指定被觀察者代碼運行的線程
     * ObserverOn 指定觀察者運行的線程
     */
    @Test
    public void subscribeOn_ObserverOn() {
        Observable.just("http://www.baidu.com")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println("在UI線程執行");
                    }
                });

    }

    /**
     * unsubscribe 在他當前執行的地方終止
     */
    @Test
    public void unsubscribe() {
        Subscription subscribe = Observable.just("hello world")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
        subscribe.unsubscribe();
        System.out.println(subscribe.isUnsubscribed());

    }
}
相關文章
相關標籤/搜索