RXJS Observable的冷,熱和Subject

1、Observable的冷和熱

 

Observable 熱:直播。全部的觀察者,不管進來的早仍是晚,看到的是一樣內容的一樣進度,訂閱的時候獲得的都是最新時刻發送的值。html

Observable 冷:點播。 新的訂閱者每次從頭開始。數組

冷的Observable例子:

一開始有個訂閱者,ide

兩秒後又有個訂閱者,這兩個序列按照本身的節奏走的,不一樣步。每一個流進行都會從interval的0開始。函數

console.log('RxJS included?', !!Rx);

const count$ = Rx.Observable.interval(1000).take(5);
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

熱的Observable例子

第二個訂閱者直接從2開始起,跟第一個訂閱者看到的內容是同樣的。學習

const count$ = Rx.Observable.interval(1000).take(5).share();
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

 

2、Subject

Subject便是觀察者Observer,也是被觀察對象Observable,同時實現了這兩個接口。spa

意味着3d

  • 一方面它能夠做爲流的組成的一方,輸出的一方。
  • 另外一方面,它能夠做爲流的觀察一方,接收一方。

Subject分爲ReplaySubject和BehaviorSubject。code

ReplaySubject:這種Subject會保留最新的n個值orm

BehaviorSubject:是ReplaySubject的特殊形式。 保留最新的一個值server

一、subscribe的等價寫法

subscribe 後面寫的一個函數,至關於語法糖,快捷方式,臨時建立冷一個observer對象。

默認狀況應該是傳入一個observer對象

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}

//等價寫法
counter$.subscribe(val =>{console.log(val);});
counter$.subscribe(observer2); 

二、兩個observer ,兩次subscribe

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}

counter$.subscribe(observer1);

setTimeout(function(){
  counter$.subscribe(observer2);
},2000);
View Code

 

問題:須要在兩處執行subscribe,不少狀況下是這樣的,定義好這些序列應該在何時被觸發,我執行執行一句subscribe(),兩個序列都會這麼執行。這種狀況下就須要用subject()。

三、subject

subject即便observable,由於它能夠subscribe observer。

也是observer,由於它能夠被observable subscribe。

 

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();


const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}

//再也不用counter$去subscribe,用subject去subscribe, 
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定義好兩邊後,用counter$去subscribe
counter$.subscribe(subject);
View Code

 

一句執行counter$.subscribe(subject),把定義好的序列,包括等待2秒的序列所有完成了。

4,subject是一個hot observable

往流裏推送新值

 第二個拿不到新值,由於第二個流訂閱的時候,兩個新值已通過去了。

5,ReplaySubject

replay把過去發生的事件進行重播。

ReplaySubject(2)把過去的2個事件進行重播。這樣observer1 subscribe的時候就能夠看到10和11。

六、BehaviorSubject只記住最新的值

總有一個最新值,總記住上一次的最新值

console.log('RxJS included?', !!Rx);


const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.BehaviorSubject();


subject.next(10);
subject.next(11);
const observer1 = {
  next: (val)=>{console.log('1: ' +val);},
  error: (err)=>{console.log('ERROR>> 1:'+ err);},
  complete: ()=>{console.log('1 is complete');}
}


const observer2 = {
  next: (val)=>{console.log('2: ' +val);},
  error: (err)=>{console.log('ERROR>> 2:'+ err);},
  complete: ()=>{console.log('2 is complete');}
}


//再也不用counter$去subscribe,用subject去subscribe, 
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定義好兩邊後,用counter$去subscribe
counter$.subscribe(subject);
View Code

 

取值的時候,會取獲得最新的data,儘管在取值的時候也就是subscribre的時候值已經發射完了,儘管時機已經錯失了仍是可以獲得它上一次發射以後的最新的一個值。

 

 

3、Angular中對Rx的支持

大量內置Observable支持:如Http,ReactiveForms,Route等。

Async Pipe是什麼?有什麼用?

 

Observable須要subscribe 一下,成員數組變量等於Observable獲得的值。

使用Async Pipe能夠直接使用Observable,還不用去取消訂閱。

memberResults$: Observable<User[]>; 

 

本文做者starof,因知識自己在變化,做者也在不斷學習成長,文章內容也不定時更新,爲避免誤導讀者,方便追根溯源,請諸位轉載註明出處:http://www.javashuo.com/article/p-sktjlzje-dn.html 有問題歡迎與我討論,共同進步。

相關文章
相關標籤/搜索