package qianxingzhe.rxjava_learning; import org.junit.Test; import java.util.ArrayList; import java.util.List; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscriber; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; /** * Created by lunyi.yly on 16/8/6. */ public class RxJavaText { @Test public void hello_world_01() { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } }); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable e) { System.out.println("onError()"); } @Override public void onNext(String s) { System.out.println("onNext()"); System.out.println(s); } }; observable.subscribe(subscriber); } @Test public void hello_world_02() { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } }); Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable e) { System.out.println("onError()"); } @Override public void onNext(String s) { System.out.println("onNext()"); System.out.println("s"); } }; observable.subscribe(observer); } @Test public void just_01() { Observable<String> observable = Observable.just("hello world"); Action1<String> action1 = new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }; observable.subscribe(action1); } /** * just 用來建立只發出一個事件就結束的Observable對象 */ @Test public void just_02() { Observable.just("hello world").subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * map 把一個事件轉換爲另外一個事件 */ @Test public void map() { Observable.just("hello world") .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.hashCode(); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); } /** * from 接收一個集合做爲輸入,而後每次輸出一個元素給subscriber */ @Test public void from() { Observable.from(new String[]{"hello", "world"}) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * flatMap 接收一個Observable的輸出做爲輸入,同時輸出另一個Observable */ @Test public void flatmap() { List<List<String>> lists = new ArrayList<>(); Observable.from(lists) .flatMap(new Func1<List<String>, Observable<String>>() { @Override public Observable<String> call(List<String> strings) { return Observable.from(strings); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * filter 輸出和輸入相同的元素,而且會過濾掉那些不知足檢查條件的 */ @Test public void filter() { Observable.from(new String[]{"hello", "world"}) .flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.just(s); } }) .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.equals("hello"); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * take 輸出最多指定數量的結果 */ @Test public void take() { Observable.from(new Integer[]{1, 2, 3, 4, 5,}) .take(3) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } }); } /** * doOnNext 容許咱們在每次輸出一個元素以前作一些額外的事情,好比這裏的保存標題。 */ @Test public void doOnNext() { Observable.just("hello world") .doOnNext(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } /** * subscribeOn 指定被觀察者代碼運行的線程 * ObserverOn 指定觀察者運行的線程 */ @Test public void subscribeOn_ObserverOn() { Observable.just("http://www.baidu.com") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("在UI線程執行"); } }); } /** * unsubscribe 在他當前執行的地方終止 */ @Test public void unsubscribe() { Subscription subscribe = Observable.just("hello world") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); subscribe.unsubscribe(); System.out.println(subscribe.isUnsubscribed()); } }