Angular-Observable和RxJS

參考文章:介紹RxJS在Angular中的應用css

1、可觀察對象(Observable)

  • 可觀察對象支持在應用中的發佈者訂閱者之間傳遞消息。
  • 可觀察對象是聲明式的——也就是說,發佈者中用於發佈值的函數,只有在有消費者訂閱它以後纔會執行。
  • 可觀察對象能夠發送多個任意類型的值 —— 字面量、消息、事件。你的應用代碼只管訂閱並消費這些值就能夠了,作完以後,取消訂閱。不管這個流是擊鍵流HTTP響應流仍是定時器,對這些值進行監聽和中止監聽的接口都是同樣的。

1.1基本用法和詞彙

  • 做爲發佈者,你建立一個可觀察對象(Observable)的實例,其中定義了一個訂閱者(subscriber)函數。訂閱者函數用於定義「如何獲取或生成那些要發佈的值或消息」。訂閱者函數會接收一個 觀察者observer),並把值發佈觀察者next() 方法
  • 當有消費者調用 subscribe() 方法時,這個訂閱者函數就會執行。做爲消費者,要執行所建立的可觀察對象,並開始從中接收通知,你就要調用可觀察對象subscribe() 方法,並傳入一個觀察者observer)。
  • 觀察者是一個 JavaScript 對象,它定義了你收到的這些消息的處理器(handler)。
  • subscribe() 調用會返回一個 Subscription 對象,該對象具備一個 unsubscribe() 方法。 當調用該方法時,你就會中止接收通知。
// 在有消費者訂閱它以前,這個訂閱者函數並不會實際執行
const locations = new Observable((observer) => {
  const {next, error} = observer;
  let watchId;

  if ('geolocation' in navigator) {
    watchId = navigator.geolocation.watchPosition(next, error);
  } else {
    error('Geolocation not available');
  }

  return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});

// subscribe() 調用會返回一個 Subscription 對象,該對象具備一個 unsubscribe() 方法。
// subscribe()傳入一個觀察者對象,定義了你收到的這些消息的處理器
const locationsSubscription = locations.subscribe({
  next(position) { console.log('Current Position: ', position); },
  error(msg) { console.log('Error Getting Location: ', msg); }
});

// 10 seconds後調用該方法時,你就會中止接收通知。
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
複製代碼

1.2定義觀察者observer

通知類型 說明
next 必要。用來處理每一個送達值。在開始執行後可能執行零次或屢次。
error 可選。用來處理錯誤通知。錯誤會中斷這個可觀察對象實例的執行過程。
complete 可選。用來處理執行完畢(complete)通知。當執行完畢後,這些值就會繼續傳給下一個處理器。

1.3訂閱

  • 只有當有人訂閱 Observable 的實例時,訂閱者函數纔會開始發佈值。html

  • 訂閱時要先調用該實例subscribe() 方法,並把一個觀察者對象傳給subscribe(),用來接收通知。node

  • 使用 Observable 上定義的一些靜態方法來建立一些經常使用的簡單可觀察對象react

    • of(...items) —— 返回一個 Observable 實例,它用同步的方式把參數 中提供的這些值發送出來。ajax

    • from(iterable) —— 把它的參數轉換成一個 Observable 實例。 該方法一般用於把一個數組轉換成一個(發送多個值的)可觀察對象。編程

  • 下面的例子會建立並訂閱一個簡單的可觀察對象,它的觀察者會把接收到的消息記錄到控制檯中:api

// 建立簡單的可觀察對象,來發送3個值
const myObservable = of(1, 2, 3);

// 建立觀察者對象
const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// 訂閱
myObservable.subscribe(myObserver);
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

=>前面指定預約義觀察者並訂閱它,等同以下寫法,省略了next,error,complete
myObservable.subscribe(
  // subscribe() 方法能夠接收預約義在觀察者中同一行的回調函數
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);
複製代碼

不管哪一種狀況,next 處理器都是必要的,而 errorcomplete 處理器是可選的。數組

  • 注意,next() 函數能夠接受消息字符串、事件對象、數字值或各類結構。咱們把由可觀察對象發佈出來的數據統稱爲任何類型的值均可以表示爲可觀察對象,而這些值會被髮布爲一個流。

1.4建立可觀察對象

  • 要建立一個與前面的 of(1, 2, 3) 等價的可觀察對象,你能夠這樣作:
// 訂閱者函數會接收一個 Observer 對象,並把值發佈給觀察者的 next() 方法。
function sequenceSubscriber(observer) {
  // 同步地 發佈 1, 2, and 3, 而後 complete
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
 
  // 同步發佈數據,因此取消訂閱 不須要作任何事情
  return {unsubscribe() {}};
}
 
// 使用 Observable 構造函數,建立一個新的可觀察對象,
// 當執行可觀察對象的 subscribe() 方法時,這個構造函數就會把它接收到的參數sequenceSubscriber做爲訂閱者函數來運行。 
const sequence = new Observable(sequenceSubscriber);
 
sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});
 
// Logs:
// 1
// 2
// 3
// Finished sequence
複製代碼
  • 下面的例子用來發佈事件的可觀察對象:
function fromEvent(target, eventName) {
    return new Observable(
        // new Observable中傳入的訂閱者函數是用內聯方式定義的
        // 訂閱者函數會接收一個 觀察者對象observer,並把值e發佈給觀察者的 next() 方法
        (observer) => {
            const handler = (e) => observer.next(e);

            // Add the event handler to the target
            target.addEventListener(eventName, handler);

            return () => {
                // Detach the event handler from the target
                target.removeEventListener(eventName, handler);
            };
        }

    );
}


const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;

const subscription = fromEvent(nameInput, 'keydown')//使用fromEvent函數來建立可發佈 keydown 事件的可觀察對象
    .subscribe(
        // subscribe() 方法接收預約義在觀察者中同一行的next回調函數
        (e: KeyboardEvent) => {
            if (e.keyCode === ESC_KEY) {
                nameInput.value = '';
            }
        }
    );
複製代碼

1.5多播?

1.6錯誤處理

  • 因爲可觀察對象能夠setTimeout異步生成值,因此用 try/catch沒法捕獲錯誤的。你應該在觀察者中指定一個 error 回調來處理錯誤。
  • 發生錯誤時還會致使可觀察對象清理現有的訂閱,而且中止生成值。
  • 可觀察對象能夠生成值subscribe()調用 next 回調),也能夠調用 completeerror 回調來主動結束
myObservable.subscribe({
  next: (num) => console.log('Next num: ' + num),
  error: (err) => console.log('Received an errror: ' + err)
});
複製代碼

2、RxJS 庫

RxJS是一個使用可觀察對象進行響應式編程的promise

2.1建立可觀察對象的函數

RxJS 提供了一些用來建立可觀察對象函數。這些函數能夠簡化根據某些東西建立可觀察對象的過程,好比承諾、定時器、事件、ajax等等。緩存

  • 承諾
import { fromPromise } from 'rxjs';

// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
 next(response) { console.log(response); },
 error(err) { console.error('Error: ' + err); },
 complete() { console.log('Completed'); }
});
複製代碼
  • 定時器
import { interval } from 'rxjs';

// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
secondsCounter.subscribe(n =>
  console.log(`It's been ${n} seconds since subscribing!`));
複製代碼
  • 事件
import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element');

// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  // Log coords of mouse movements
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);

  // When the mouse is over the upper-left of the screen,
  // unsubscribe to stop listening for mouse movements
  if (evt.clientX < 40 && evt.clientY < 40) {
    subscription.unsubscribe();
  }
});
複製代碼
  • ajax
import { ajax } from 'rxjs/ajax';

// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
複製代碼

2.2經常使用操做符

操做符會觀察來源可觀察對象中發出的轉換它們,並返回由轉換後的值組成的新的可觀察對象

  • Observable能夠鏈式寫法,這意味着咱們能夠這樣:
Observable.fromEvent(node, 'input')
  .map((event: any) => event.target.value)
  .filter(value => value.length >= 2)
  .subscribe(value => { console.log(value); });
複製代碼

下面是整個順序步驟:

  1. 假設用戶輸入:a

  2. Observable對觸發 oninput 事件做出反應,將值以參數的形式傳遞給observernext()。(內部實現)

  3. map() 根據 event.target.value 的內容返回一個新的 Observable,並調用 next() 傳遞給下一個observer

  4. filter() 若是值長度 >=2 的話,則返回一個新的 Observable,並調用 next() 傳遞給下一個observer

  5. 最後,將結果傳遞給 subscribe 訂閱塊。

只要記住每一次 operator 都會返回一個Observable,無論 operator 有多少個,最終只有最後一個 Observable 會被訂閱

  • 提倡使用管道來組合操做符,而不是使用鏈式寫法
import { filter, map } from 'rxjs/operators';

const squareOdd = of(1, 2, 3, 4, 5) // 可觀察對象
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
複製代碼
  • takeWhile

若是組件有多個訂閱者的話,咱們須要將這些訂閱者存儲在數組中,當組件被銷燬時再逐個取消訂閱。但,咱們有更好的辦法: 使用 takeWhile() operator,它會在你傳遞一個布爾值是調用 next() 仍是 complete()

private alive: boolean = true;
ngOnInit() {
  const node = document.querySelector('input[type=text]');

  this.s = Observable.fromEvent(node, 'input')
    .takeWhile(() => this.alive)
    .map((event: any) => event.target.value)
    .filter(value => value.length >= 2)
    .subscribe(value => { console.log(value) });
}

ngOnDestroy() {
  this.alive = false;
}
複製代碼

RxJS很火很大緣由我認仍是提供了豐富的API,如下是摘抄:

建立數據流:

  • 單值:of, empty, never
  • 多值:from
    • .from([1, 2, 3, 4])
  • 定時:interval, timer
  • 從事件建立:fromEvent
  • Promise建立:fromPromise
  • 自定義建立:create

轉換操做:

  • 改變數據形態:map, mapTo, pluck
    • mapTo: event$.mapTo(1) // 使event流的值爲1
    • pluck: event$.pluck('target', 'value') // 從event流中取得其target屬性的value屬性
  • 過濾一些值:filter, skip, first, last, take,distinctUntilChanged
    • distinctUntilChanged:保留跟前一個元素不同的元素
  • 時間軸上的操做:delay, timeout, throttletime, throttle, debouncetime, debounce, audit, bufferTime
    • throttletime 兩個輸出的流之間間隔設置的參數時間
    • debouncetime 數據一個接一個流過來,只要每一個數據之間的間隔時間小於等於設置的參數時間,這些數據都會被攔下來。一個數據若是想要經過的話,它和它後面的數據間隔的時間,要大於設置的參數時間
    • debounce:若是在900毫秒內沒有新事件產生,那麼以前的事件將經過;若是在900毫秒內有新事件產生,那麼以前的事件將被捨棄。
    • throttle:在必定時間範圍內無論產生了多少事件,它只放第一個過去,剩下的都將捨棄
    • butterTime:緩存參數毫秒內的全部的源Observable的值,而後一次性以數組的形式發出
  • 累加:reduce, scan
  • 異常處理:throw, catch, retry, finally
  • 條件執行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
  • 轉接:switch

組合數據流:

  • concat,保持原來的序列順序鏈接兩個數據流。只有運行完前面的流,纔會運行後面的流
  • merge,將兩個流按各自的順序疊加成一個流
  • race,預設條件爲其中一個數據流完成
  • forkJoin,預設條件爲全部數據流都完成
  • zip,取各來源數據流最後一個值合併爲對象
  • combineLatest,取各來源數據流最後一個值合併爲數組
  • startWith,先發出做爲startWith參數指定的項,而後再發出由源 Observable 所發出的項

竊聽:

  • do、tap 是兩個徹底相同的操做符,用於竊聽Observable的生命週期事件,而不會產生打擾。

操做符參考資料
Rxjs 經常使用操做符

2.3錯誤處理

除了能夠在訂閱時提供 error() 處理器外,RxJS 還提供了 catchError 操做符,它容許你在管道中處理已知錯誤。 下面是使用 catchError 操做符實現這種效果的例子:

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  //若是你捕獲這個錯誤並提供了一個默認值,流就會繼續處理這些值,而不會報錯。
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
複製代碼

2.4重試失敗的可觀察對象

能夠在 catchError 以前使用 retry 操做符。 下列代碼爲前面的例子加上了捕獲錯誤前重發請求的邏輯:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
複製代碼

2.5可觀察對象的命名約定

習慣上的可觀察對象的名字以$符號結尾。

stopwatchValue$: Observable<number>;
複製代碼

3、Angular 中的可觀察對象

Angular 使用可觀察對象做爲處理各類經常使用異步操做的接口。好比:

  • EventEmitter 類派生自 Observable
  • HTTP 模塊使用可觀察對象來處理 AJAX 請求和響應。
  • 路由器和表單模塊使用可觀察對象來監聽對用戶輸入事件的響應。

3.1事件發送器 EventEmitter

Angular 提供了一個 EventEmitter 類,它用來從組件的 @Output() 屬性中發佈一些值。EventEmitter 擴展Observable,並添加了一個 emit() 方法,這樣它就能夠發送任意值了。當你調用 emit() 時,就會把所發送的值傳給訂閱上來的觀察者的 next() 方法。

@Output() changed = new EventEmitter<string>();

click() {
    this.changed.emit('hi~');
}
複製代碼
@Component({
  template: `<comp (changed)="subscribe($event)"></comp>`
})
export class HomeComponent {
  subscribe(message: string) {
     // 接收:hi~
  }
}
複製代碼

3.2HTTP

AngularHttpClientHTTP 方法調用中返回可觀察對象。例如,http.get(‘/api’) 就會返回可觀察對象。

相對於基於承諾(Promise)的 HTTP API,它有一系列優勢:

  • 可觀察對象不會修改服務器的響應(和在承諾上串聯起來的 .then() 調用同樣)。反之,你可使用一系列操做符來按需轉換這些值。
  • HTTP 請求是能夠經過 unsubscribe() 方法來取消的。
  • 請求能夠進行配置,以獲取進度事件的變化。
  • 失敗的請求很容易重試

3.3Async 管道

AsyncPipe 會訂閱一個可觀察對象或承諾,並返回其發出的最後一個值。當發出新值時,該管道就會把這個組件標記爲須要進行變動檢查的

3.4路由器 (router)

  • Router.events 以可觀察對象的形式提供了其事件。 你可使用 RxJS 中的 filter() 操做符來找到感興趣的事件,而且訂閱它們,以便根據瀏覽過程當中產生的事件序列做出決定。 例子以下:
import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';

@Component({
  selector: 'app-routable',
  templateUrl: './routable.component.html',
  styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {

  navStart: Observable<NavigationStart>;

  constructor(private router: Router) {
    // Create a new Observable the publishes only the NavigationStart event
    this.navStart = router.events.pipe(
      filter(evt => evt instanceof NavigationStart)
    ) as Observable<NavigationStart>;
  }

  ngOnInit() {
    this.navStart.subscribe(evt => console.log('Navigation Started!'));
  }
}
複製代碼
  • ActivatedRoute 是一個可注入的路由器服務,它使用可觀察對象來獲取關於路由路徑路由參數的信息。好比,ActivateRoute.url 包含一個用於彙報路由路徑的可觀察對象。例子以下:
import { ActivatedRoute } from '@angular/router';

@Component({
  selector: 'app-routable',
  templateUrl: './routable.component.html',
  styleUrls: ['./routable.component.css']
})
export class Routable2Component implements OnInit {
  constructor(private activatedRoute: ActivatedRoute) {}

  ngOnInit() {
    this.activatedRoute.url
      .subscribe(url => console.log('The URL changed to: ' + url));
  }
}
複製代碼

3.5響應式表單 (reactive forms)

響應式表單具備一些屬性,它們使用可觀察對象來監聽表單控件的值。 FormControlvalueChanges 屬性和 statusChanges 屬性包含了會發出變動事件可觀察對象。訂閱可觀察的表單控件屬性是在組件類中觸發應用邏輯的途徑之一。好比:

import { FormGroup } from '@angular/forms';

@Component({
  selector: 'my-component',
  template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
  nameChangeLog: string[] = [];
  heroForm: FormGroup;

  ngOnInit() {
    this.logNameChange();
  }
  logNameChange() {
    const nameControl = this.heroForm.get('name');
    nameControl.valueChanges.subscribe(
      (value: string) => this.nameChangeLog.push(value)
    );
  }
}
複製代碼

4、可觀察對象與其它技術的比較

4.1可觀察對象 vs. 承諾

可觀察對象 承諾 Observable優點
可觀察對象是聲明式的,在被訂閱以前,它不會開始執行。 承諾是在建立時就當即執行的。 這讓可觀察對象可用於定義那些應該按需執行的情景。
可觀察對象能提供多個值 承諾只提供一個 這讓可觀察對象可用於隨着時間的推移獲取多個值。
可觀察對象會區分串聯處理和訂閱語句。 承諾只有 .then() 語句。 這讓可觀察對象可用於建立供系統的其它部分使用而不但願當即執行的複雜菜譜。
可觀察對象的 subscribe() 會負責處理錯誤。 承諾會把錯誤推送給它的子承諾 這讓可觀察對象可用於進行集中式、可預測的錯誤處理。

4.2建立與訂閱

  • 在有消費者訂閱以前,可觀察對象不會執行。subscribe() 會執行一次定義好的行爲,而且能夠再次調用它。從新訂閱會致使從新計算這些值。
content_copy
// declare a publishing operation
new Observable((observer) => { subscriber_fn });
// initiate execution
observable.subscribe(() => {
      // observer handles notifications
    });
複製代碼
  • 承諾會當即執行,而且只執行一次。當承諾建立時,會當即計算出結果。沒有辦法從新作一次。全部的 then 語句(訂閱)都會共享同一次計算。
content_copy
// initiate execution
new Promise((resolve, reject) => { executer_fn });
// handle return value
promise.then((value) => {
      // handle result here
    });
複製代碼

4.3串聯

  • 可觀察對象會區分各類轉換函數,好比映射和訂閱。只有訂閱纔會激活訂閱者函數,以開始計算那些值。
content_copy
observable.map((v) => 2*v);
複製代碼
  • 承諾並不區分最後的 .then() 語句(等價於訂閱)和中間的 .then() 語句(等價於映射)。
content_copy
promise.then((v) => 2*v);
複製代碼

4.4可取消

  • 可觀察對象的訂閱是可取消的。取消訂閱會移除監聽器,使其再也不接受未來的值,並通知訂閱者函數取消正在進行的工做。
content_copy
const sub = obs.subscribe(...);
sub.unsubscribe();
複製代碼
  • 承諾是不可取消的。

4.5錯誤處理

  • 可觀察對象的錯誤處理是交給訂閱者錯誤處理器的,而且該訂閱者會自動取消對這個可觀察對象的訂閱。
content_copy
obs.subscribe(() => {
  throw Error('my error');
});
複製代碼
  • 承諾會把錯誤推給其子承諾。
content_copy
promise.then(() => {
      throw Error('my error');
});
複製代碼

4.6速查表

4.7可觀察對象 vs. 事件 API

4.8可觀察對象 vs. 數組

5、Subject

咱們在寫一個Service用於數據傳遞時,老是使用 new Subject

@Injectable()
export class MessageService {
    private subject = new Subject<any>();

    send(message: any) {
        this.subject.next(message);
    }

    get(): Observable<any> {
        return this.subject.asObservable();
    }
}
複製代碼

F組件須要向M組件傳遞數據時,咱們能夠在F組件中使用 send()

constructor(public srv: MessageService) { }

ngOnInit() {
    this.srv.send('w s k f m?')
}
複製代碼

M組件只須要訂閱內容就行:

constructor(private srv: MessageService) {}

message: any;
ngOnInit() {
    this.srv.get().subscribe((result) => {
        this.message = result;
    })
}
複製代碼
相關文章
相關標籤/搜索