Rxjava Subjects

上次提到調用observable的publish和connect方法後能夠將一個Observable發出的對象實時傳遞到訂閱在上的subscriber。
這個和Rxjava中Subject的概念十分相像。Subject是能夠理解爲橋接或者代理,能夠訂閱Observable也能夠本身做爲Observable提供Subscriber訂閱。
Rxjava提供PublishSubject其功能與上次提到的publish+connect同樣html

@Test
public void testPublishSubject() throws InterruptedException {
    //建立一個publish subject
    PublishSubject<Object> subject = PublishSubject.create();
    Observable.create(sub->{
        new Thread(()->{
            int i=0;
            while(i<5){
                sub.onNext(i++);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }).subscribe(subject);//將subject訂閱在原始Observable上
    //在subject上訂閱子subscriber
    subject.subscribe(x->{
        System.out.println("1st sub "+x);
    });
    Thread.sleep(1000);
    System.out.println("2nd subscriber start");
    //一秒後在subject上訂閱子subscriber
    subject.subscribe(x->{
        System.out.println("2nd sub "+x);
    });
    Thread.sleep(10000);
}
--------輸出---------
1st sub 1
1st sub 2
2nd subscriber start
1st sub 3
2nd sub 3
1st sub 4
2nd sub 4

主要subject種類

subject的用法大體都如上所示,能夠subscribe也能夠被subscribe,subject的不一樣是能夠同時訂閱多個observable並同時廣播到子subscriber上。java

AsyncSubject


該subject會等到原observable結束併發出原observable最後一個發出的對象或錯誤。併發

BehaviorSubject


該subject會發出最近的一個對象並持續發出接下來的所由對象。async

PublishSubject


上面已經介紹過,該subject會發出訂閱後原Observable所發出的對象。代理

ReplaySubject


該subject會返回從原始observable開始的全部對象。要注意的是這個可能存在潛在的OOM風險。code

相關文章
相關標籤/搜索