Rxjava入門

簡介

RxJava是一個開源的Rx框架ReactiveX的java版本。
ReactiveX的主要目的是經過一系列Observable組合異步或事件代碼。其中使用的是觀察者模式。
能夠吧Reactive想象成不斷向訂閱者推送對象的機制,這個推送過程能夠是同步的也能夠是異步的。java

helloworld

下面看一個簡單的例子
首先引入rxjava的依賴react

//gradle
compile group: 'io.reactivex', name: 'rxjava', version: '1.2.1'
@Test
public void helloworld() {
    Observable obs=Observable.from(new String[]{"WORLD","WORLD2"});
    obs.subscribe(value-> System.out.println("Hello " + value + "!"));
}
-------輸出------
Hello WORLD!
Hello WORLD2!

首先建立了Observable對象,而後在上面訂閱了一個subscriber輸出observable的內容。框架

建立Observale

上面helloworld使用固定的值做爲observable,可使用Observable的create方法進行建立內容異步

@Test
public void create() {
    Observable.create(subscriber -> {
        IntStream.range(1, 5).forEach(i -> {
            if(!subscriber.isUnsubscribed()){
                subscriber.onNext("hello" + i);
            }
        });
        if(!subscriber.isUnsubscribed()) {
            subscriber.onCompleted();
        }
    }).subscribe(System.out::println);
}
-------輸出------
hello1
hello2
hello3
hello4

上面的例子建立了4個(1-4)對象,Observable的create方法接受一個方法,在方法內部向subscriber推送信息。
subsribler輸出獲得的值。
上面的代碼都是在主線程中運行的,若是想要不阻塞主線程能夠在create中新建線程或使用線程池gradle

@Test
public void asyn() throws InterruptedException {
    Observable.create(subscriber -> {
        new Thread(() -> {
            IntStream.range(1, 5).forEach(i -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("hello" + i);
                }
            });
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        }).start();
    }).subscribe(obj -> {
        System.out.println(obj);
        System.out.println(Thread.currentThread());
    });
    Thread.sleep(1000);
}
-------輸出------
hello1
Thread[Thread-0,5,main]
hello2
Thread[Thread-0,5,main]
hello3
Thread[Thread-0,5,main]
hello4
Thread[Thread-0,5,main]

Observable同時也提供了諸多如skip/take/map/zip等流式的轉換操做。線程

相關文章
相關標籤/搜索