RxJava是一個開源的Rx框架ReactiveX的java版本。
ReactiveX的主要目的是經過一系列Observable組合異步或事件代碼。其中使用的是觀察者模式。
能夠吧Reactive想象成不斷向訂閱者推送對象的機制,這個推送過程能夠是同步的也能夠是異步的。java
下面看一個簡單的例子
首先引入rxjava的依賴react
//gradle compile group: 'io.reactivex', name: 'rxjava', version: '1.2.1'
@Test public void helloworld() { Observable obs=Observable.from(new String[]{"WORLD","WORLD2"}); obs.subscribe(value-> System.out.println("Hello " + value + "!")); } -------輸出------ Hello WORLD! Hello WORLD2!
首先建立了Observable對象,而後在上面訂閱了一個subscriber輸出observable的內容。框架
上面helloworld使用固定的值做爲observable,可使用Observable的create方法進行建立內容異步
@Test public void create() { Observable.create(subscriber -> { IntStream.range(1, 5).forEach(i -> { if(!subscriber.isUnsubscribed()){ subscriber.onNext("hello" + i); } }); if(!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } }).subscribe(System.out::println); } -------輸出------ hello1 hello2 hello3 hello4
上面的例子建立了4個(1-4)對象,Observable的create方法接受一個方法,在方法內部向subscriber推送信息。
subsribler輸出獲得的值。
上面的代碼都是在主線程中運行的,若是想要不阻塞主線程能夠在create中新建線程或使用線程池gradle
@Test public void asyn() throws InterruptedException { Observable.create(subscriber -> { new Thread(() -> { IntStream.range(1, 5).forEach(i -> { if (!subscriber.isUnsubscribed()) { subscriber.onNext("hello" + i); } }); if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } }).start(); }).subscribe(obj -> { System.out.println(obj); System.out.println(Thread.currentThread()); }); Thread.sleep(1000); } -------輸出------ hello1 Thread[Thread-0,5,main] hello2 Thread[Thread-0,5,main] hello3 Thread[Thread-0,5,main] hello4 Thread[Thread-0,5,main]
Observable同時也提供了諸多如skip/take/map/zip等流式的轉換操做。線程