Angular與Rxjs學習

簡介初級核心概念建立Observableof()from()fromEvent()fromPromise()interval()ajax()操做符(Operators)mapfiltershare其餘錯誤處理重試失敗的可觀察對象問題記錄fromPromise no longer exported in master (v6)參考文獻javascript

簡介

RxJSReactiveX編程理念的JavaScript版本。ReactiveX來自微軟,它是一種針對異步數據流的編程。簡單來講,它將一切數據,包括HTTP請求,DOM事件或者普通數據等包裝成流的形式,而後用強大豐富的操做符對流進行處理,使你能以同步編程的方式處理異步數據,並組合不一樣的操做符來輕鬆優雅的實現你所須要的功能。 html

RxJS 提供了一種對 Observable 類型的實現,直到 Observable 成爲了 JavaScript 語言的一部分而且瀏覽器支持它以前,它都是必要的。這個庫還提供了一些工具函數,用於建立和使用可觀察對象。這些工具函數可用於:java

  • 把現有的異步代碼轉換成可觀察對象
  • 迭代流中的各個值
  • 把這些值映射成其它類型
  • 對流進行過濾
  • 組合多個流

初級核心概念

  • Observable: 一系列值的生產者
  • Observer: 它是observable值的消費者
  • Operator: 能夠在數據流的途中對值進行轉換的操做符

Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。web

Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:ajax

  • 訂閱:Observer 經過 Observable 提供的 subscribe() 方法訂閱 Observable。
  • 發佈:Observable 經過回調 next 方法向 Observer 發佈事件。

建立Observable

RxJS 中提供了不少操做符,用於建立 Observable 對象,經常使用的操做符以下: 編程

  • of(), 將普通JavaScript數據轉爲 Observable
  • from(), 把數組或iterable對象轉換成Observable
  • create(), 返回一個能夠在Observer上調用方法的Observable.
  • fromEvent(), 把event轉換成Observable.
  • fromPromise(), 把Promise轉換成Observable.
  • ajax(), 從ajax建立一個observable

of()

import { Component, OnInit } from '@angular/core';
import { Observable,of } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用of來建立可觀察對象</h2>
    <div>
      <button (click)="getData()">Click here</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  getData() {
    // Create simple observable that emits three values
    const myObservable = of(123);

    // Create observer object
    const myObserver = {
      nextx => console.log('Observer got a next value: ' + x),
      errorerr => console.error('Observer got an error: ' + err),
      complete() => console.log('Observer got a complete notification'),
    };

    // Execute with the observer object
    myObservable.subscribe(myObserver);
  }

}
複製代碼

啓動該項目,打開頁面並點擊按鈕,出現這樣的結果:json

from()

import { Component, OnInit } from '@angular/core';
import { Observable,from } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用from函數建立可觀察對象</h2>
    <div>
      <button (click)="fromData()">根據數組建立Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  fromData() {
    let persons = [
      { name'Dave'age34salary2000 },
      { name'Nick'age37salary32000 },
      { name'Howie'age40salary26000 },
      { name'Brian'age40salary30000 },
      { name'Kevin'age47salary24000 },
    ];

    const myObservable = from(persons);
    myObservable.subscribe(person => console.log(person));
  }

}
複製代碼

頁面測試結果爲:segmentfault

fromEvent()

import { Component, OnInit } from '@angular/core';
import { Observable,fromEvent } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用fromEvent函數建立可觀察對象</h2>
    <div>
      <p id="content">Hello Hresh</p>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
      this.fromEvent();
  }

  fromEvent() {
    const el = document.getElementById('content');

    const mouseMoves = fromEvent(el, 'click');

    const subscription = mouseMoves.subscribe(() => {
      el.style.color = 'red';
    });
  }
}
複製代碼

運行項目,點擊文本,文本將會變爲紅色。api

關於 fromEvent 在實際生產中有個典型的應用:輸入提示(type-ahead)建議數組

可觀察對象能夠簡化輸入提示建議的實現方式。典型的輸入提示要完成一系列獨立的任務:

  • 從輸入中監聽數據。
  • 移除輸入值先後的空白字符,並確認它達到了最小長度。
  • 防抖(這樣才能防止連續按鍵時每次按鍵都發起 API 請求,而應該等到按鍵出現停頓時才發起)
  • 若是輸入值沒有變化,則不要發起請求(好比按某個字符,而後快速按退格)。
  • 若是已發出的 AJAX 請求的結果會由於後續的修改而變得無效,那就取消它。

徹底用 JavaScript 的傳統寫法實現這個功能可能須要大量的工做。使用可觀察對象,你可使用這樣一個 RxJS 操做符的簡單序列:

import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, distinctUntilChanged, filter, map, switchMap } from 'rxjs/operators';


const searchBox = document.getElementById('search-box');

const typeahead = fromEvent(searchBox, 'input').pipe(
  map((e: KeyboardEvent) => (e.target as HTMLInputElement).value),
  filter(text => text.length > 2),        //判斷輸入內容長度是否大於2
  debounceTime(500),    //等待,直到用戶中止輸入(這個例子中是中止 1/2 秒)
  distinctUntilChanged(),    // 等待搜索文本發生變化。
  switchMap(() => ajax('/api/endpoint'))        //將搜索請求發送到服務。
);

typeahead.subscribe(data => {
 // Handle the data from the API
});
複製代碼

fromPromise()

import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用from函數建立可觀察對象</h2>
    <div>
      <button (click)="fromPromiseData()">根據Promise建立Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  fromPromiseData() {
    const myObservable = fromPromise(new Promise((resolve, reject) => {
      setTimeout(() => {
        // tslint:disable-next-line:prefer-const
        let username = 'hresh----Promise';
        resolve(username);
      }, 2000);
    }));
    myObservable.subscribe({
      next(data) { console.log(data); },
      error(err) { console.error('Error' + err); },
      complete() { console.log('completed'); }
    });
  }

}
複製代碼

在 Rxjs6 以後,from 能夠用來代替 fromPromise,因此上述代碼改成 from()也是能夠的。頁面測試結果爲:

這裏擴展一下,官方文檔有個案例是經過 fetch 方法返回 Promise 對象,這裏我作了一些修改:

import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用from函數建立可觀察對象</h2>
    <div>
      <button (click)="fromPromiseData()">根據Promise建立Observable</button>
      <br>
      <button (click)="fromData2()">根據Promise獲得的數組建立Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  fromPromiseData() {
    const myObservable = from(fetch('http://a.itying.com/api/productlist'));
    myObservable.subscribe({
      next(data) { console.log(data); },
      error(err) { console.error('Error' + err); },
      complete() { console.log('completed'); }
    });
  }

  fromData2() {
    //fetch()返回的Promise,獲取Promise對象中的內容,即數組
    let arrayData = [];
    fetch('http://a.itying.com/api/productlist').then(response => response.json()).then((data => {
      arrayData = data.result;
    }));

    let myObservable = null;
    setTimeout(() => {
      myObservable = from(arrayData);
      myObservable.subscribe(data => console.log(data));
    }, 2000);
  }

}
複製代碼

頁面測試結果爲:

interval()

import { Component, OnInit } from '@angular/core';
import { interval, Observable } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用interval函數建立可觀察對象</h2>
    <div>
      <button (click)="interval2()">根據interval建立Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  interval2() {
    const secondsCounter = interval(1000);
    // Subscribe to begin publishing values
    const oData = secondsCounter.subscribe(n =>
      console.log(`It's been ${n} seconds since subscribing!`));
    setTimeout(() => {
      console.log('取消計數操做');
      oData.unsubscribe();  /*5s後取消數據顯示*/
    }, 5000);
  }

}
複製代碼

interval 操做符支持一個數值類型的參數,用於表示定時的間隔。上面代碼表示每隔 1s,會輸出一個遞增的值,初始值從 0 開始。頁面測試結果爲:

ajax()

import { Component, OnInit } from '@angular/core';
import {ajax} from 'rxjs/ajax';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用ajax函數建立可觀察對象</h2>
    <div>
      <button (click)="ajax2()">根據ajax建立Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  ajax2(){
    const apiData = ajax('http://a.itying.com/api/productlist');
    // Subscribe to create the request
    apiData.subscribe(res => console.log(res.status, res.response));
  }

}
複製代碼

頁面測試結果:

操做符(Operators)

上述咱們講解的建立 Observable 的方法其實就是 Rxjs 的操做符,操做符是基於可觀察對象構建的一些對集合進行復雜操做的函數。RxJS 還定義了一些操做符,好比 map()filter()concat()flatMap()

操做符接受一些配置項,而後返回一個以來源可觀察對象爲參數的函數。當執行這個返回的函數時,這個操做符會觀察來源可觀察對象中發出的值,轉換它們,並返回由轉換後的值組成的新的可觀察對象。

map

import { map } from 'rxjs/operators';

const nums = of(123);

const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);

squaredNums.subscribe(x => console.log(x));

// Logs
// 1
// 4
// 9
複製代碼

能夠看到 map 接受一個 function 做爲參數, 經過該 function 能夠把每一個元素作平方運算進行轉換.。

filter

你可使用管道來把這些操做符連接起來。管道讓你能夠把多個由操做符返回的函數組合成一個。pipe() 函數以你要組合的這些函數做爲參數,而且返回一個新的函數,當執行這個新函數時,就會順序執行那些被組合進去的函數。

應用於某個可觀察對象上的一組操做符就像一個處理流程 —— 也就是說,對你感興趣的這些值進行處理的一組操做步驟。這個處理流程自己不會作任何事。你須要調用 subscribe() 來經過處理流程得出並生成一個結果。

import { filter, map } from 'rxjs/operators';

const nums = of(12345);

// Create a function that accepts an Observable.
const squareOddVals = pipe(
  filter((n: number) => n % 2 !== 0),
  map(n => n * n)
);

// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);

// Subscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));

// Logs
// 1
// 9
// 25
複製代碼

pipe() 函數也同時是 RxJS 的 Observable 上的一個方法,因此你能夠用下列簡寫形式來達到一樣的效果:

import { filter, map } from 'rxjs/operators';

const squareOdd = of(12345)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
複製代碼

share

在講解 share 操做符前,咱們須要瞭解一下冷熱兩種模式下的 Observables。官方定義以下:

Cold Observables 在被訂閱後運行,也就是說,observables 序列僅在 subscribe 函數被調用後纔會推送數據。與 Hot Observables 不一樣之處在於,Hot Observables 在被訂閱以前就已經開始產生數據,例如mouse move事件。

從原理來講是這樣的: Cold內部會建立一個新的數據生產者, 而Hot則會一直使用外部的數據生產者.

舉個例子:

Cold: 就至關於我在B站看英雄聯盟賽事重播,能夠從頭看起。

Hot: 就至關於看英雄聯盟賽事直播, 若是來晚了, 那麼前面就看不到了。

share() 操做符容許多個訂閱者共享同一個 Observable. 也就是把 Cold 變成 Hot。看以下示例:

import { Component, OnInit } from '@angular/core';
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>share操做符</h2>
    <div>
      <button (click)="shareData()">Click</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  shareData() {
    const numbers = interval(1000).pipe(
      take(5),
      share()
    );

    function subscribeToNumbers(name{
      numbers.subscribe(
        x => console.log(`${name}${x}`)
      );
    }

    subscribeToNumbers('Dave');

    const anotherSubscription = () => subscribeToNumbers('Nick');

    setTimeout(anotherSubscription, 2500);
  }

}
複製代碼

頁面測試結果爲:

這裏 interval 是每隔1秒產生一個數據, take(5)表示取5個數, 也就是1,2,3,4,5,而後 share()就把這個 observable 從 cold 變成了 hot 的。後邊 Dave 進行了訂閱,2.5秒之後, Nick 進行了訂閱。

其餘

RxJS 提供了不少操做符,不過只有少數是經常使用的。 以下圖所示:

關於這些操做符的使用能夠參看 RxJS 系列之三 - Operators 詳解RxJS API 文檔

錯誤處理

除了能夠在訂閱時提供 error() 處理器外,RxJS 還提供了 catchError 操做符,它容許你在管道中處理已知錯誤。

假設你有一個可觀察對象,它發起 API 請求,而後對服務器返回的響應進行映射。若是服務器返回了錯誤或值不存在,就會生成一個錯誤。若是你捕獲這個錯誤並提供了一個默認值,流就會繼續處理這些值,而不會報錯。

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('http://a.itying.com/api/productlist').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
複製代碼

頁面測試結果:

重試失敗的可觀察對象

catchError 提供了一種簡單的方式進行恢復,而 retry 操做符讓你能夠嘗試失敗的請求。

能夠在 catchError 以前使用 retry 操做符。它會訂閱到原始的來源可觀察對象,它能夠從新運行致使結果出錯的動做序列。若是其中包含 HTTP 請求,它就會從新發起那個 HTTP 請求。

下列代碼把前面的例子改爲了在捕獲錯誤以前重發請求:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
複製代碼

注意:不要重試登陸認證請求,這些請求只應該由用戶操做觸發。咱們確定不會但願自動重複發送登陸請求致使用戶的帳號被鎖定。

問題記錄

fromPromise no longer exported in master (v6)

RxJS version:
6.0.0-alpha.3

Code to reproduce:

import { fromPromise } from 'rxjs';
複製代碼

Expected behavior:
should work just as it did with v5.5 (but different location)

解決辦法:統一使用 from 來代替 fromPromise。

參考文獻

關於Angular6版本升級和RXJS6新特性的講解

RxJS: 簡單入門

RxJS 系列之二 - Observable 詳解

RxJS 系列之三 - Operators 詳解

RxJS速成 (上)

相關文章
相關標籤/搜索