項目簡介描述:模擬在在線商品競拍,單擊關注某個商品時,能夠在商品詳情頁上看到實時的看到的別人競拍的商品價格,模擬從後端實時的獲取數據,當node
單擊取消按鈕時,客戶端即取消對服務器端的返回的數據的訂閱,並關閉數據流。web
項目流程圖:編程
項目步驟:後端
1. 對點擊關注按鈕進行編碼:數組
1 // 模板代碼 2 <div class="thumbnail"> 3 <button class="btn btn-default btn-lg" [class.active]="isWatched" (click)='watchProduct()'> 4 {{isWatched ? '取消關注':'關注'}} 5 </button> 6 <label>最新出價: {{currentBid | number: '.2-2'}}元</label> 7 </div>
2. 模板中的ts屬性的添加:服務器
// 關注參數 public isWatched:boolean = false; // 是否關注,默認是false,即爲沒有關注某個商品 public currentBid: number; // 當前商品的價格 // 保存這個流 public subscription: Subscription; // 當訂閱一個流時的返回值,使用它能夠取消對某個流的訂閱 // 注入WebsocketService, 支持websocket協議,支持雙向的通訊,即客戶端和服務器能同時回覆和發送數據 // 服務須要注入器,此例中的引入的WebsocketService時,能夠在constructor快速的實現注入,便可以使用 constructor(private productService: ProductService, private routeInfo: ActivatedRoute, private wsService: WebsocketService) { }
3. 做爲中介媒體的服務來成爲WebSocket的服務的建立流,發送消息的函數:websocket
// 常見WebSocket對象 public ws: Websocket; // 經過url來建立websocket流, 返回可觀測的流 createObservableSocket(url: string, id: number): Observable<any> { this.ws = new WebSocket(url); // 建立鏈接 return new Observable( // 返回可觀測的流 observer => { this.ws.onmessage = (event) => observer.next(event.data); // 推送內容 this.ws.onerror = (event) => observer.error(event); // 當發生錯誤時,推送錯誤消息 this.ws.onclose = (event) => observer.complete(); // 當關閉時,可觀察對象的完畢 this.ws.onopen = (event) => this.sendMessage({productId: id}); // 當ws打開時,即經過函數sendMessage發送數據 return () => this.ws.close(); // 這個匿名函數取消訂閱的方法的時候調用,關閉WebSocket, 不然的話,容易形成內存的泄露; } ) } // 經過建立的ws對象,來發送數據,發送的數據的格式是字符串的格式 sendMessage(message: any) { this.ws.send(JSON.stringify(message)); // 穿過來的參數是對象,可是send消息的格式是字符串格式的; }
4. 在服務器中實現對productId的存儲:dom
const Server = require('ws').Server; // 後端建立ws服務器 // Map中存在的是:每個客戶端關注的商品的id的數組,由於每個客戶端能夠關注多個商品; const subscriptions = new Map<any, number[]>(); const wsServer = new Server({port: 8090}); wsServer.on('connection', (websocket) => { websocket.on('message', (message) => { // 當客戶端有信息傳遞時,即服務器端接收,因爲這個客戶端傳遞個是JSON.stringify({productId:id});因此在接到數據時,使用 // JSON.parse(message); let messageObj = JSON.parse(message); // key值是鏈接到服務端的客戶端,當subscripitons.get(websocket)的值爲undefined,則賦值爲空數組; let productIds = subscriptions.get(websocket) || []; // 將新的商品的Id放到值的數組中;將已有的和新建的id組合在一塊兒; subscriptions.set(websocket, [...productIds, messageObj.productId]); // [...productIds]: 擴展運算符 }) })
1 // 實現消息向客戶端的定時的推送 2 // 模擬數據的更新 3 setInterval(function() { 4 // 隨機生成每一個商品的最新的商品的價格 5 products.forEach((product) => { 6 let currentBid = currentBids.get(product.id) || product.price; 7 let newBid = currentBid + Math.random() * 5; 8 currentBids.set(product.id, newBid); 9 }) 10 // 循環每個客戶端, 推送每個客戶端關注的商品的價格 11 subscriptions.forEach((productIds: number[], ws) => { 12 // 返回的數據的格式是:[{productId:xxx,bid: xxx},{},{}],對應是每一個被關注的商品的最新的報價 13 // *** 防止頁面刷新報錯; 14 if (ws.readyState === 1) { 15 // 經過映射將pid組成{productId: pid, bid: currentBid.get(pid)} 16 let newBids = productIds.map(pid => ({ 17 productId: pid, 18 bid: currentBids.get(pid) 19 })); 20 // 發送的數據 21 ws.send(JSON.stringify(newBids)); 22 } else { 23 subscriptions.delete(ws); // 刪除已經關閉的客戶端 24 } 25 // 這以後,而後在客戶端訂閱這個流; 26 }); 27 }, 2000);
5. 客戶端對服務器的流的接收(關注)和取消(取消關注):socket
1 watchProduct() { 2 // 服務器返回的流的訂閱 3 if (this.subscription) { 4 // 取消對流的訂閱, 並對subscription賦值爲null; 5 this.subscription.unsubscribe(); 6 this.isWatched = false; // 按鈕的狀態的變化 7 this.subscription = null; // 將流的對象置空 8 } else { 9 this.isWatched = true; // 按鈕的狀態的變化 10 this.subscription = this.wsService.createObservableSocket('ws://localhost:8090', this.product.id) 11 .subscribe( 12 products => { 13 products = JSON.parse(products); 14 console.log(products); 15 let product = products.find(p => p.productId == this.product.id); //經過id篩選出當前的商品 16 this.currentBid = product.bid; // 展現在頁面上 17 } 18 ); 19 } 20 }
6. 重點是websocket的編程的流的建立和訂閱,node服務器來模擬用戶的訪問和出價,對流的數據的處理。函數
對websocket的編程的流程有大概的瞭解。