RxJS速成 (下)

上一部分: http://www.cnblogs.com/cgzl/p/8641738.htmlcss

Subject

Subject比較特殊, 它便是Observable又是Observer.html

做爲Observable, Subject是比較特殊的, 它能夠對多個Observer進行廣播, 而普通的Observable只能單播, 它有點像EventEmitters(事件發射器), 維護着多個註冊的Listeners.app

做爲Observable, 你能夠去訂閱它, 提供一個Observer就會正常的收到推送的值. 從Observer的角度是沒法分辨出這個Observable是單播的仍是一個Subject.this

從Subject內部來說, subscribe動做並無調用一個新的執行來傳遞值, 它只是把Observer註冊到一個列表裏, 就像其餘庫的AddListener同樣.spa

做爲Observer, 它是一個擁有next(), error(), complete()方法的對象, 調用next(value)就會爲Subject提供一個新的值, 而後就會多播到註冊到這個Subject的Observers.3d

 

例子 subject.ts:code

import { Subject } from "rxjs/Subject";

const subject = new Subject();

const subscriber1 = subject.subscribe({
    next: (v) => console.log(`observer1: ${v}`)
});
const subscriber2 = subject.subscribe({
    next: (v) => console.log(`observer2: ${v}`)
});

subject.next(1);
subscriber2.unsubscribe();
subject.next(2);

const subscriber3 = subject.subscribe({
    next: (v) => console.log(`observer3: ${v}`)
});

subject.next(3);

 

訂閱者1,2從開始就訂閱了subject. 而後subject推送值1的時候, 它們都收到了. component

而後訂閱者2, 取消了訂閱, 隨後subject推送值2, 只有訂閱者1收到了.server

後來訂閱者3也訂閱了subject, 而後subject推送了3, 訂閱者1,3都收到了這個值.htm

 

下面是一個angular 5的例子:

app.component.html:

<h3>從Subject共享Observable到多個Subscribers</h3>
<input type="text" placeholder="start typing" (input)="mySubject.next($event)" (keyup)="mySubject.next($event)">

<br> Subscriber to input events got {{inputValue}}
<br>
<br> Subscriber to keyup events got {{keyValue}}

 

app.component.ts:

import { Component } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/map';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  title = 'app';

  keyValue: string;
  inputValue: string;

  mySubject: Subject<Event> = new Subject();

  constructor() {
    // subscriber 1
    this.mySubject.filter(({ type }) => type === 'keyup')
      .map(e => (<KeyboardEvent>e).key)
      .subscribe(value => this.keyValue = value);

    // subscriber 2
    this.mySubject.filter(({ type }) => type === 'input')
      .map(e => (<HTMLInputElement>e.target).value)
      .subscribe(value => this.inputValue = value);
  }
}

input和keyup動做都把event推送到mySubject, 而後mySubject把值推送給訂閱者, 訂閱者1經過過濾和映射它只處理keyup類型的事件, 而訂閱者2只處理input事件.

效果:

 

BehaviorSubject

BehaviorSubject 是Subject的一個變種, 它有一個當前值的概念, 它會把它上一次發送給訂閱者值保存起來, 一旦有新的Observer進行了訂閱, 那這個Observer立刻就會從BehaviorSubject收到這個當前值.

也能夠這樣理解BehaviorSubject的特色:

  • 它表明一個隨時間變化的值, 例如, 生日的流就是Subject, 而一我的的年齡流就是BehaviorSubject.
  • 每一個訂閱者都會從BehaviorSubject那裏獲得它推送出來的初始值和最新的值.
  • 用例: 共享app狀態.

例子 behavior-subject.ts:

import { BehaviorSubject } from "rxjs/BehaviorSubject";

const subject = new BehaviorSubject(0);

subject.subscribe({
    next: v => console.log(`Observer1: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
    next: v => console.log(`Observer2: ${v}`)
});

subject.next(3);

 

效果:

 

經常使用Operators:

concat 

concat: 按順序合併observables. 只會在前一個observable結束以後纔會訂閱下一個observable.

它適合用於順序處理, 例如http請求.

例子: 

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/observable/concat';

let firstReq = Observable.timer(3000).mapTo('First Response');
let secondReq = Observable.timer(1000).mapTo('Second Response');

Observable.concat(firstReq, secondReq)
    .subscribe(res => console.log(res));

 

效果:

 

merge

把多個輸入的observable交錯的混合成一個observable, 不按順序.

merge其實是訂閱了每一個輸入的observable, 它只是把輸入的observable的值不帶任何轉換的發送給輸出的Observable. 只有當全部輸入的observable都結束了, 輸出的observable纔會結束. 任何在輸入observable傳遞來的錯誤都會當即發射到輸出的observable, 也就是把整個流都殺死了 .

例子:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/observable/merge';

let firstReq = Observable.timer(3000).mapTo('First Response');
let secondReq = Observable.timer(1000).mapTo('Second Response');

Observable.merge(firstReq, secondReq)
    .subscribe(res => console.log(res));

 

效果:

 

mergeMap (原來叫flatMap)

mergeMap把每一個輸入的Observable的值映射成Observable, 而後把它們混合成一個Observable.

mergeMap能夠把嵌套的observables拼合成非嵌套的observable.

它有這些好處:

  • 沒必要編寫嵌套的subscribe()
  • 把每一個observable發出來的值轉換成另外一個observable
  • 自動訂閱內部的observable而且把它們(可能)交錯的合成一排.

這個仍是經過例子來理解比較好:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/mergeMap';

function getData() {
    const students = Observable.from([
        { name: 'Dave', age: 17 },
        { name: 'Nick', age: 18 },
        { name: 'Lee', age: 15 }
    ]);

    const teachers = Observable.from([
        { name: 'Miss Wan', age: 28 },
        { name: 'Mrs Wang', age: 31 },
    ]);

    return Observable.create(
        observer => {
            observer.next(students);
            observer.next(teachers);
        }
    );
}

getData()
    .mergeMap(persons => persons)
    .subscribe(
        p => console.log(`Subscriber got ${p.name} - ${p.age}`)
    );

 

效果:

 

switchMap

switchMap把每一個值都映射成Observable, 而後使用switch把這些內部的Observables合併成一個.

switchMap有一部分很想mergeMap, 但也僅僅是一部分像而已.

由於它還具備取消的效果, 每次發射的時候, 前一個內部的observable會被取消, 下一個observable會被訂閱. 能夠把這個理解爲切換到一個新的observable上了.

 

這個仍是看marble圖比較好理解:

 

例子: 

// 當即發出值, 而後每5秒發出值
const source = Rx.Observable.timer(0, 5000);
// 當 source 發出值時切換到新的內部 observable,發出新的內部 observable 所發出的值
const example = source.switchMap(() => Rx.Observable.interval(500));
// 輸出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));

更好的例子是: 網速比較慢的時候, 客戶端發送了屢次重複的請求, 若是前一次請求在2秒內沒有返回的話, 那麼就取消前一次請求, 再也不須要前一次請求的結果了, 這裏就應該使用debounceTime配合switchMap.

 

mergeMap vs switchMap的例子

mergeMap:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';

const outer = Observable.interval(1000).take(2);

const combined = outer.mergeMap(x => {
    return Observable.interval(400)
        .take(3)
        .map(y => `outer ${x}: inner ${y}`);
});

combined.subscribe(res => console.log(`result ${res}`));

 

效果:

switchMap:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';

const outer = Observable.interval(1000).take(2);

const combined = outer.switchMap(x => {
    return Observable.interval(400)
        .take(3)
        .map(y => `outer ${x}: inner ${y}`);
});

combined.subscribe(res => console.log(`result ${res}`));

 

 

zip

zip操做符也會合並多個輸入的observables成爲一個observable. 多個輸入的observable的值, 按順序, 按索引進行合併, 若是某一個observable在該索引上的值尚未發射值, 那麼會等它, 直到全部的輸入observables在該索引位置上的值都發射出來, 輸出的observable纔會發射該索引的值.

例子:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';

let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);

Observable
    .zip(age$,
        name$,
        isDev$,
        (age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
    .subscribe(x => console.log(x));

 

效果:

 

就不往下寫了, 其實看文檔就行, 最重要的仍是上一部分.

相關文章
相關標籤/搜索