本文成文時間是2019-08-18
,文中提到的最新版本號是以2019-08-18
爲基準的。
在介紹正文以前須要先簡單瞭解幾個概念: STOMP
協議、STOMP over WebSocket
以及 RxJS
。(關於這三點本文不會進行詳細介紹)html
STOMP 即 Simple or Streaming Text Orientated Messageing Protocal ,是一種簡單(流)文本定向傳輸協議。前端
STOMP 是 WebSocket 更高級的子協議,它使用一個基於幀的格式來定義消息,與 HTTP 的 Request
和 Response
相似。git
STOMP 提供可互操做的鏈接格式,容許 STOMP 客戶端與任意代理進行交互。STOMP 是一個很是簡單易用的協議, 服務器端實現起來會相對困難一些,編寫客戶端很是容易。github
STOMP over Websocket 即經過 WebSocket 創建 STOMP 鏈接,也就是說是在 WebSocket 鏈接的基礎上再創建 STOMP 鏈接。web
WebSocket 協議定義了兩種類型的消息,文本和二進制,但它們的內容是未定義的。spring
若是說 Socket 是 C/S 架構 的 TCP 編程,那麼同理 WebSocket 就是 B/S架構的 TCP 編程,因此須要在客戶端與服務端之間定義一個機制去協商一個子協議 - 更高級別的消息協議,將它使用在 WebSocket 之上去定義每次發送消息的類別、格式和內容,等等。數據庫
子協議的使用是可選的,但不管哪一種方式,客戶端和服務器都須要就一些定義消息內容的協議達成一致。因而,一般選擇在 WebSocket 協議上使用 STOMP 協議來定義內容格式。npm
RxJS 是一個用於使用 Observables 進行反應式編程的庫,可簡化編寫異步或基於回調的代碼的過程。該項目是對 Reactive-Extensions / RxJS 的重寫,具備更好的性能,更好的模塊化,更好的可調試調用堆棧,同時主要保持向後兼容,而且進行了一些重大更改,從而減小了 API 操做。
RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions(Rx),Rx 是對 LINQ 的一種擴展,他的目標是對異步的集合進行操做,也就是說,集合中的元素是異步填充的,好比說從 Web 或者雲端獲取數據而後對集合進行填充。編程
LINQ(Language Integrated Query)語言集成查詢是一組用於 C# 和 Visual Basic 語言的擴展。它容許編寫 C# 或者 Visual Basic 代碼以操做內存數據的方式,查詢數據庫。
RxJS 的主要功能是利用響應式編程的模式來實現 JavaScript 的異步式編程(現前端主流框架 Vue React Angular 都是響應式的開發框架)。bootstrap
RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。學習 RxJS 以前咱們須要先了解觀察者模式和迭代器模式,還要對 Stream 流的概念有所認識。
接下來咱們就一塊兒來看下如何在的 Angular 項目中是使用 STOMP over WebSocket
進行數據流傳輸的。
本文案例是實際 Angular 項目中的一個小功能模塊,Angular 是 8.0 版本,本文涉及的組件主要包括右鍵菜單項負責生產消息的context-menu-component
動態組件,進度監控app-progress-bar
組件和日誌輸出組件app-console-area
。
項目中使用的 UI 框架爲 ng-zorro-antd,下面是 tabs 組件中的相關僞代碼(省略了組件間 Input Ouput 接口):
... <nz-tab [nzType]="'card'"> <ng-template #consoleArea>控制檯</ng-template> <app-progress-bar></app-progress-bar> <app-console-area></app-console-area> </nz-tab> ...
代碼與 UI 視圖的對應關係以下:
項目中使用的 STOMP 客戶端是 ng2-stompjs
庫, ng2-stompjs
目前的版本是 7.xx
,其底層的 @stomp/stompjs
已被重寫,自此與 STOMP 標準具備嚴格的兼容性。
ng2-stompjs 是第一個可靠地支持二進制有效負載的 STOMP JS 客戶端庫。
安裝 ng-stompjs
:
$ npm install @stomp/ng-stompjs
使用前須要定義配置文件,在目錄 src/app/config/
建立 stomp.config.js 文件:
import { InjectableRxStompConfig } from '@stomp/ng2-stompjs'; import { STOMP_SERVER_BASE_URL } from 'server.config'; const _window: any = window; export const myRxStompConfig: InjectableRxStompConfig = { // Which server? brokerURL: _window.STOMP_SERVER_BASE_URL ? _window.STOMP_SERVER_BASE_URL : STOMP_SERVER_BASE_URL // Headers // Typical keys: login, passcode, host connectHeaders: { login: 'guest', passcode: 'guest' }, // How often to heartbeat? // Interval in milliseconds, set to 0 to disable heartbeatIncoming: 0, // Typical value 0 - disabled heartbeatOutgoing: 20000, // Typical value 20000 - every 20 seconds // Wait in milliseconds before attempting auto reconnect // Set to 0 to disable // Typical value 500 (500 milli seconds) reconnectDelay: 200, // Will log diagnostics on console // It can be quite verbose, not recommended in production // Skip this key to stop logging to console debug: (msg: string): void => { console.log(new Date(), msg); } };
在建立實例時,此配置將由 Angular Dependency Injection 機制注入 RxStompService 服務,在 src/app/app.module.ts
文件中,添加如下內容。
import { InjectableRxStompConfig, RxStompService, rxStompServiceFactory } from '@stomp/ng2-stompjs'; import { myRxStompConfig } from './config/stomp.config'; ... @NgModule({ declarations: [/* 聲明模塊內部成員的地方 */], imports: [/* 導入的其餘module */], providers: [ { provide: InjectableRxStompConfig, useValue: myRxStompConfig }, { provide: RxStompService, useFactory: rxStompServiceFactory, deps: [InjectableRxStompConfig] } ], entryComponents: [/* 不會在模版中引用到的組件 */], bootstrap: [AppComponent] }) export class AppModule {}
咱們如今將 RxStompService 依賴注入 app-progress-bar
組件中,爲此咱們將它添加到構造函數中,以下所示:
constructor(private rxStompService: RxStompService) { }
爲了能實時接收服務器發送過來的消息,咱們須要在 app-progress-bar
組件的生命週期函數 OnInit
中,使用 watch()
方法進行訂閱:
ngOnInit() { // 訂閱 STOMP 消息 this.topicSubscription = this.rxStompService.watch('/topic/message').subscribe((message: Message) => { console.log(message.body); } this.errorSubscription = this.rxStompService.watch('/topic/error').subscribe((message: Message) => { this.progressInfo = message.body; }); }
注:app-message-bar 組件默認是不顯示的,當有消息傳遞進來時,此組件纔會顯示在頁面中,進度達到 100% 時,會自動隱藏。
文章開頭提到,STOMP 是一種基於幀的協議,其幀在 HTTP 上創建模型。一個框架由一個命令,一組可選的標題和一個可選的主體組成。
STOMP 服務器被建模爲能夠向其發送消息的一組目標,STOMP 協議將目標視爲不透明字符串,其語法是特定於服務器實現的。另外,STOMP 沒有定義目的地 destination 的傳遞語義應該是什麼。目的地的傳遞或「消息交換」語義可能因服務器而異,甚至從目的地到目的地也不一樣,這使得服務器可使用 STOMP 支持的語義進行創做。
STOMP 客戶端是一個用戶代理,能夠在兩種(多是同時的)模式下運行:
咱們的案例中兩種模式同時存在,發送消息的是生產者(咱們上文提到的 context-menu-component
動態組件),接收消息的是消費者(app-progress-bar
組件)。消
費者能夠經過訂閱不一樣的 destination,來得到不一樣的推送消息,不須要開發人員去管理這些訂閱與推送目的地以前的關係。
接下來就介紹下做爲生產者的 context-menu-component
組件,看看它都作了哪些事情吧。
context-menu-component
組件是觸發右鍵時動態產生的組件,它負責經過向不一樣的目的地 destination 下達不一樣的指令,進而來實現不一樣的功能需求。
使用 ng-zorro-antd
的 Dropdown
組件 ,動態生成:
public openProjectManagerContextMenu(context: ProjectManagerContext): void { this.contextMenuComponent = this.nzDropdownService.create(context.mouseEvent, this.contextMenuTemplate); }
當咱們點擊「運行用例」按鈕時,它做爲生產者會向 STOMP 服務端目的地 SEND 消息指令。
// 運行用例 public runProjectCases(): void { const streamTaskParam: StreamTaskParam = new StreamTaskParam(); streamTaskParam.project = this.globalService.projectInfo.projectName; this.openTaskProgressModal('/app/run-project-cases', JSON.stringify(streamTaskParam)); }
從代碼得知,這會將消息發送到名爲的 /app/run-project-cases
的目的地,STOMP 將此目標視爲不透明字符串,而且目標名稱不承擔傳遞語義。
STOMP 定義了本身的消息傳輸體制。首先是經過一個後臺綁定的鏈接點 endpoint 來創建 socket 鏈接,而後生產者經過 SEND 方法,綁定好發送的 destination。而 topic 和 app 則是一種消息處理手段的分支,走 app/url 的消息會被你設置到的 MassageMapping 攔截到,進行你本身定義的具體邏輯處理,而走 topic/url 的消息就不會被攔截,直接到 Simplebroker 節點中將消息推送出去。(其中 simplebroker 是 spring 的一種基於內存的消息隊列,你也可使用 activeMQ,rabbitMQ 代替)。
所以目的地 /app/run-project-cases
生產出來的消息會被攔截,最終會轉發到消費者 app-progress-bar
組件 的 /topic/message
。
app-progress-bar
組件做爲消費者使用 watch()
方法啓動與代理的訂閱,this.rxStompService.watch('/topic/message')
將代理到目的地爲 /topic/message
的訂閱上,並返回 RxJS Observable。
ngOnInit() { // 訂閱 STOMP 消息 this.topicSubscription = this.rxStompService.watch('/topic/message').subscribe((message: Message) => { console.log(message.body); // do something } }
app-progress-bar
組件都作了些什麼事情呢?它負責創建 STOMP 鏈接,從服務器端接收文本流,並將這些流進行數據解析,解析出來的數據一部分用來控制進度條的數值變化,一部分用來控制 app-console-area
組件日誌的輸出節點。
也就是說 app-console-area
組件中打印的內容是由 app-progress-bar
組件解析和傳遞的。
咱們知道 RxJS Observable 實際上就是一個函數,它接收一個 Observer 對象做爲參數,返回一個函數用來取消訂閱。因此咱們能夠在 app-progress-bar
組件銷燬時,調用 unsubscribe()
方法取消訂閱。
ngOnDestroy() { this.topicSubscription.unsubscribe();}
本文主要目的是是結合案例展示 STOMP 協議的使用場景,因此不會着重介紹案例上的功能以及實現細節。
「記一次」系列文章:
相關文獻: