本文首發於IBM Developeworks:http://www.ibm.com/developerworks/cn/web/1202_zhouxiang_dojocometd/,感謝JayZ的投稿。javascript
簡介: 服務器推送技術已經出來一段時間了,業界上也有很多基於這種技術(應該說是設計模式)的開源實現,可是要移植或者說應用到本身的項目上都比較麻煩。Dojo 這樣一個大型的 Web2.0 開發框架提供了一套封裝好的基於服務端推送技術的具體實現(包括服務端 Java 和客戶端 Web 和 JavaScript),它基於 Bayeux 協議,提供了一些簡單並且強大的接口可讓你快速構建本身的服務端推送功能。客戶端實現即 Dojo 的 Cometd 前端組件,它封裝了創建鏈接、消息訂閱等等接口。服務端基於 Jetty 和 annotation,組建消息推送機制,一樣也封裝了比較簡單但實用的消息推送接口,與前端 Dojox 的 Cometd 接口協同工做。這篇文章將重點介紹 Dojo 的服務端推送機制是如何運做的,以及咱們應該如何基於 Dojo 的 Cometd 工具包構建本身的服務端推送功能。php
服務器推送技術和 Bayeux 協議簡介html
服務器推送技術的基礎思想是將瀏覽器主動查詢信息改成服務器主動發送信息。服務器發送一批數據,瀏覽器顯示這些數據,同時保證與服務器的鏈接。當服務器須要再次發送一批數據時,瀏覽器顯示數據並保持鏈接。之後,服務器仍然能夠發送批量數據,瀏覽器繼續顯示數據,依次類推。基於這種思想,這裏咱們要引出 Bayeux 協議。前端
Bayeux 是一套基於 Publish / Subscribe 模式,以 JSON 格式在瀏覽器與服務器之間傳輸事件的通訊協議。該協議規定了瀏覽器與服務器之問的雙向通訊機制,克服了傳統 Web 通訊模式的缺點。java
Bayeux 協議主要基於 HTTP 來傳輸低延遲的、異步的事件消息。這些消息經過頻道 (Channels) 來投遞,可以實現從服務器端到客戶端、從客戶端到服務器端或者經過服務器從一個客戶端到另外一個客戶端的傳送。Bayeux 協議的主要目的是爲使用了 Ajax 和 Comet 技術的 Web 客戶端實現高響應的用戶交互。Bayeux 協議旨在經過容許執行者更容易的實現互操做性,來下降開發 Comet 應用程序的複雜性。它解決了共同的消息發佈和路由問題,並提供了漸進式的改進和擴展機制。web
通常狀況下,在 HTTP 協議中,Client 要想得到 Server 的消息,必須先本身發送一個 Request,而後 Server 纔會給予 Response。而 Bayeux 協議改變了這個狀況,它容許 Server 端異步 Push 本身的消息到 Client 端。從而實現了 Client 和 Server 之間的雙向操做模式。ajax
回頁首json
服務器推送技術的一個簡單實現後端
基於 Bayeux 協議實現服務器推送技術的方式有不少,能夠經過 Flex 或者 Java 的 Applet。基於這兩種技術,咱們能夠創建在客戶端創建服務套接字接口,「雙向操做模式」天然很容易實現,可是這些方式須要除瀏覽器之外的運行環境的支持。這裏咱們但願能採用一種純腳本的方式,這種方式是不可能創建服務套接字接口的,那如何實現基於 Bayeux 協議的服務器推送呢?實際上是能夠模擬實現的,主要有兩種方式:設計模式
1. 基於 HTTP 的長輪詢來進行消息通訊(基於 Ajax 的長輪詢(long-polling)方式)。
2. 基於 Iframe 及 htmlfile 的流(streaming)方式。
這裏咱們採用第一種方式實現,即:客戶端先向服務器端發送一個 HTTP Request,服務器端接收到後,阻塞在那邊,等服務器有消息的時候,則返回一個 HTTP Response 給客戶端,客戶端收到後,斷開鏈接,緊接着再發第二個 HTTP Request,以此反覆進行,保持這個「長輪詢」。期間,若是鏈接超時,那麼會斷開重連,以保持鏈接。
基於以上的思想,咱們來看一下一個簡單的實現,這個簡單實現是基於 PHP 的。示例很簡單,即使沒用過 PHP 也可以很容易看明白,並且咱們會在後面一一做出解釋。
這個示例主要實現這樣一個功能:
咱們在瀏覽器裏面分別打開三個窗口,並訪問同一張頁面。修改其中一個頁面上的內容,另外兩個頁面上的內容也隨即發生變化(注意:這裏不用刷新頁面)。這就會給咱們一種:數據是服務器推送過來的感受。
圖 1. 簡單服務器推送示例 -- 內容修改前
咱們修改其中第一個窗口(左上)的內容(輸入「222」,點擊「Send」按鈕,發送到後臺)。此時不只第一個窗口的內容變化了,其他兩個窗口的內容也隨即變化。
圖 2. 簡單服務器推送示例 -- 內容修改
接下來咱們來看看示例代碼吧:
清單 1. 簡單服務器推送 -- 前端代碼 HTML
<form action="" method="get"
onsubmit="comet.doRequest($('word').value);$('word').value='';return false;">
<input type="text" name="word" id="word" value="" />
<input type="submit" name="submit" value="Send" />
</form> |
這個是咱們所看到的輸入框和提交按鈕,你們能夠注意一下它的「onsubmit」方法:當咱們輸入內容並點擊提交時,它會執行「comet.doRequest($('word').value)」方法向後端發起請求(其實在這以前咱們就已經創建了與服務端的長輪詢並可隨時開始服務器推送數據)。接下來咱們來看看這個「comet」是什麼樣子的以及他的 Request 的具體實現:
清單 2. 簡單服務器推送 -- 前端代碼 JavaScript
- var Comet = Class.create();
- Comet.prototype = {
- timestamp: 0,
- url: './backend.php',
- noerror: true,
- initialize: function(){
- },
- connect: function(){
- this.ajax = new Ajax.Request(this.url, {
- method: 'get',
- parameters: {
- 'timestamp': this.timestamp
- },
- onSuccess: function(transport){
- var response = transport.responseText.evalJSON();
- this.comet.timestamp = response['timestamp'];
- this.comet.handleResponse(response);
- this.comet.noerror = true;
- },
- onComplete: function(transport){
- if (!this.comet.noerror) setTimeout(function(){
- comet.connect()
- }, 5000);
- else
- this.comet.connect();
- this.comet.noerror = false;
- }
- });
- this.ajax.comet = this;
- },
- handleResponse: function(response){
- $('content').innerHTML += '<div>' + response['msg'] + '</div>';
- },
- doRequest: function(request){
- new Ajax.Request(this.url, {
- method: 'get',
- parameters: {
- 'msg': request
- }
- });
- }
- }
-
- var comet = new Comet();
- comet.connect();
var Comet = Class.create(); Comet.prototype = { timestamp: 0, url: './backend.php', noerror: true, initialize: function(){ }, connect: function(){ this.ajax = new Ajax.Request(this.url, { method: 'get', parameters: { 'timestamp': this.timestamp }, onSuccess: function(transport){ var response = transport.responseText.evalJSON(); this.comet.timestamp = response['timestamp']; this.comet.handleResponse(response); this.comet.noerror = true; }, onComplete: function(transport){ if (!this.comet.noerror) setTimeout(function(){ comet.connect() }, 5000); else this.comet.connect(); this.comet.noerror = false; } }); this.ajax.comet = this; }, handleResponse: function(response){ $('content').innerHTML += '<div>' + response['msg'] + '</div>'; }, doRequest: function(request){ new Ajax.Request(this.url, { method: 'get', parameters: { 'msg': request } }); } } var comet = new Comet(); comet.connect();
咱們先看最後兩段代碼,這裏是頁面初始化時會執行的代碼,其實在這裏,咱們就創建了一服務端的長輪詢,咱們來看看「connect」方法的實現吧:
「connect」方法這裏是發了一個 Ajax 請求,而後分別設定了成功時(onSuccess)的返回處理和請求完成時(onComplete)的處理(注意 onComplete 不論成功失敗都會執行)。咱們要掛住這裏的 onComplete 方法。能夠看到,當請求完成時,若是鏈接有問題,它會過 5 秒從新鏈接,;若是沒有問題,他會當即從新鏈接。
相信你們看到這裏應該會有點眉目了,這裏其實沒有什麼所謂的恆定不斷的鏈接(相似 TCP 方式),它的真正實現是經過不斷的 Ajax 請求實現的。
因此,當咱們開啓 3 個窗口時,其實咱們打開了 3 個模擬的不間斷的客戶端與服務端的鏈接,因此他們會即時解到服務端的信息,不須要刷新頁面。
咱們再來看看服務端的實現,看看他是如何推送的:
清單 3. 簡單服務器推送 -- 後端代碼 PHP
- $filename = dirname(__FILE__).'/data.txt';
-
- // 將新消息存入文件中
- $msg = isset($_GET['msg']) ? $_GET['msg'] : '';
- if ($msg != '')
- {
- file_put_contents($filename,$msg);
- die();
- }
-
- // 這是一個無限循環,一旦發現文件被修改,便會跳出循環並返回文件修改數據。若是文件一直沒有修改,則會一
- // 直處於循環檢測狀態,此時的 Ajax 鏈接也會一直保留,直到文件被修改成止,這就是所謂的「長輪詢」。
- $lastmodif = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0;
- $currentmodif = filemtime($filename);
- while ($currentmodif <= $lastmodif) // 檢測文件是否被修改
- {
- usleep(10000); // sleep 10ms to unload the CPU
- clearstatcache();
- $currentmodif = filemtime($filename);
- }
-
- // 返回 JSON 數組
- $response = array();
- $response['msg'] = file_get_contents($filename);
- $response['timestamp'] = $currentmodif;
- echo json_encode($response);
- flush();
$filename = dirname(__FILE__).'/data.txt'; // 將新消息存入文件中 $msg = isset($_GET['msg']) ? $_GET['msg'] : ''; if ($msg != '') { file_put_contents($filename,$msg); die(); } // 這是一個無限循環,一旦發現文件被修改,便會跳出循環並返回文件修改數據。若是文件一直沒有修改,則會一 // 直處於循環檢測狀態,此時的 Ajax 鏈接也會一直保留,直到文件被修改成止,這就是所謂的「長輪詢」。 $lastmodif = isset($_GET['timestamp']) ? $_GET['timestamp'] : 0; $currentmodif = filemtime($filename); while ($currentmodif <= $lastmodif) // 檢測文件是否被修改 { usleep(10000); // sleep 10ms to unload the CPU clearstatcache(); $currentmodif = filemtime($filename); } // 返回 JSON 數組 $response = array(); $response['msg'] = file_get_contents($filename); $response['timestamp'] = $currentmodif; echo json_encode($response); flush();
咱們能夠參照上面的註釋理解該代碼,其實並不須要多少 PHP 的知識。服務端推送技術不是一個開發用的控件庫,而是一個思想。這裏的 while 循環便說明了服務端推送是如何保留所謂的「長輪詢」的。
如今你們應該明白爲何三個窗口會同步變化了。其主要的核心思想就是服務端「握住」長輪詢,而後在適當的時候「放手」。
回頁首
Dojo 的 Cometd 工具包簡介
以前咱們是基於 JavaScript 本身實現了一個簡單的 Cometd 應用,咱們花了大量的代碼來創建一個 Cometd 框架,真正用於處理咱們本身的業務邏輯的代碼其實就是「handleResponse」裏面的那一行。咱們能不能吧這些通用的代碼省掉呢?答案是確定的。Dojo 已經對 Cometd 作了封裝,基於 Dojo 的 Cometd 包,咱們不用再浪費大量的代碼在搭建 Cometd 框架上。對於前端腳本代碼,咱們只須要加上一個 Cometd 包的簡單接口代碼,即可以開始加入咱們本身的業務邏輯代碼了。
固然,Dojo 的 Cometd 包還包括後端的代碼,能夠在 Dojo 的官網下載中找到,它不與 Dojo 包一塊兒發佈,是一個單獨的服務端開源代碼,基於 Java 和 Jetty 的,有興趣的讀者能夠下載下來研究一下。
經過 Dojo 的這兩部分代碼,咱們即可以迅速地搭建咱們的 Cometd 框架,咱們剩下須要作的就是加入咱們的業務邏輯。
回頁首
Dojo 的 Cometd 工具包以前端
接下來咱們來看看 Dojo 的 Cometd 工具包的前端封裝:
清單 4. Cometd 前端初始化
- dojox.cometd.init("http://www.xxx.com/cometd");
dojox.cometd.init("http://www.xxx.com/cometd");
這個接口用於創建並初始化與服務端的握手鍊接(Bayeux handshake,初始化了「Bayeux communication」 消息通信)。創建這個鏈接是基於 Bayeux 協議的,它主要有兩個任務:
- 客戶端與服務端協商傳輸的消息類型。
- 若是協商成功,服務端會通知客戶端具體的請求參數配置。
- 若是協商失敗,客戶端從新發起協商流程。
咱們深刻 Dojo 的 init 方法內部能夠看到握手鍊接的具體實現過程,它的實現也是不間斷的重複發送客戶端的 Ajax 請求,與咱們以前的自制案相似,有興趣的同窗能夠參考以下代碼(摘取部分):
清單 5. Cometd 內部機制
- this.init = function(...){
- ............
- var bindArgs = {
- url: this.url,
- handleAs: this.handleAs,
- content: { "message": dojo.toJson([props]) },
- load: dojo.hitch(this,function(msg){
- this._backon();
- this._finishInit(msg);
- }),
- error: dojo.hitch(this,function(e){
- this._backoff();
- this._finishInit(e);
- }),
- timeout: this.expectedNetworkDelay
- };
- ..............
- if(this._isXD){
- r = dojo.io.script.get(bindArgs);
- }else{
- r = dojo.xhrPost(bindArgs);
- }
- ..............
- }
-
- this._finishInit = function(data){
- ..................
- if(successful){
- ........
- //ajax request inside
- this.tunnelInit = transport.tunnelInit && dojo.hitch(transport,
- "tunnelInit");
- this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport,
- "tunnelCollapse");
- transport.startup(data);
- }else{
- if(!this._advice || this._advice["reconnect"] != "none"){
- setTimeout(dojo.hitch(this, "init", this.url, this._props),
- this._interval());
- }
- }
- ....................
- }
this.init = function(...){ ............ var bindArgs = { url: this.url, handleAs: this.handleAs, content: { "message": dojo.toJson([props]) }, load: dojo.hitch(this,function(msg){ this._backon(); this._finishInit(msg); }), error: dojo.hitch(this,function(e){ this._backoff(); this._finishInit(e); }), timeout: this.expectedNetworkDelay }; .............. if(this._isXD){ r = dojo.io.script.get(bindArgs); }else{ r = dojo.xhrPost(bindArgs); } .............. } this._finishInit = function(data){ .................. if(successful){ ........ //ajax request inside this.tunnelInit = transport.tunnelInit && dojo.hitch(transport, "tunnelInit"); this.tunnelCollapse = transport.tunnelCollapse && dojo.hitch(transport, "tunnelCollapse"); transport.startup(data); }else{ if(!this._advice || this._advice["reconnect"] != "none"){ setTimeout(dojo.hitch(this, "init", this.url, this._props), this._interval()); } } .................... }
可見,它們的 callback 方法裏面都帶有對本身自己的調用,這裏的」init「方法也不例外。細心的讀者可能還會發現,其實從例子上能夠看出:Dojo 的 Cometd 也支持跨域,它的跨域是經過「script」的方式實現的。這裏有一點須要你們瞭解,咱們默認的服務端推送實現方式是長輪詢(long-polling)模式,遇到跨域時,「long-polling」便再也不適用,轉爲基於「script」的返回調用(callback-polling)模式。
接下來咱們再來看看 Cometd 中關於消息推送的一些接口,這些消息通信主要是基於渠道:
清單 6. Cometd 前端發佈消息
- dojox.cometd.publish("/service/echo", { msg: msg });
dojox.cometd.publish("/service/echo", { msg: msg });
這裏的所謂「發佈消息」就是向後端發送消息,用於前端主動向後端推送。
這裏的第一個參數是發送消息的渠道標識(channel),這種「channel」共有三種類型:
1. 元渠道(meta channels):示例「/meta/connect」(一般以「/meta/」爲開頭)。元渠道主要不是用來消息傳輸,而是用於客戶端監聽,如握手鍊接或者網絡鏈接等等的錯誤。一般咱們會在客戶端調用「addListener()」來開啓監聽元渠道,它能夠在握手鍊接的創建以前就開啓監聽,並且這種消息監聽是同步的。
2. 服務渠道(service channels):示例「/service/connect」(一般以「/service/」爲開頭)。它主要用於私有消息通信,主要是一對一的通信。一般咱們會在客戶端調用「subscribe()」來訂閱服務渠道消息。服務渠道只有等握手鍊接創建好後才能開啓,並且它是異步通信的。
3. 普通渠道(normal channels):示例「/foo/bar」(無限制)。這種渠道沒有什麼限制,主要用於廣播消息,即:多個客戶端訂閱了一個服務,該服務能夠經過普通渠道進行消息廣播。
渠道是通訊的基礎模式,咱們能夠根據須要選擇相應的渠道模式。
第二個參數則是消息對象,這裏的「msg」則是消息內容。
有一點要注意:這裏的「publish」是基於 Bayeux 協議的,採用的異步消息傳輸機制,因此它是在服務端(Bayeux 服務器)收到消息以前就返回的。因此 publish 的返回並不表明服務端收到你 publish 的消息了。
Dojo 的 Cometd 還支持批量發送消息,經過這個接口能夠有效地避免沒必要要的網絡消息傳輸的浪費:
清單 7. Cometd 前端批量發佈消息
- // 方法 1
- cometd.batch(function()
- {
- cometd.publish('/channel1', { product: 'foo' });
- cometd.publish('/channel2', { notificationType: 'all' });
- cometd.publish('/channel3', { update: false });
- });
-
- // 方法 2
- cometd.startBatch()
- cometd.publish('/channel1', { product: 'foo' });
- cometd.publish('/channel2', { notificationType: 'all' });
- cometd.publish('/channel3', { update: false });
- cometd.endBatch()
// 方法 1 cometd.batch(function() { cometd.publish('/channel1', { product: 'foo' }); cometd.publish('/channel2', { notificationType: 'all' }); cometd.publish('/channel3', { update: false }); }); // 方法 2 cometd.startBatch() cometd.publish('/channel1', { product: 'foo' }); cometd.publish('/channel2', { notificationType: 'all' }); cometd.publish('/channel3', { update: false }); cometd.endBatch()
上述兩種方案均可以實現消息的批量發送,推薦使用方法 1。
接下來咱們看看服務端的消息推送:
清單 8. Cometd 前端訂閱消息
- dojox.cometd.subscribe("/service/echo",echoRpcReturn);
-
- function echoRpcReturn(msg){
- dojo.byId("responses").innerHTML += msg;
- }
dojox.cometd.subscribe("/service/echo",echoRpcReturn); function echoRpcReturn(msg){ dojo.byId("responses").innerHTML += msg; }
這裏所謂的「訂閱消息」,其實就是接收服務端推送的消息,是後端主動向前端推送。這也是服務端推送的精華所在,一樣也是很簡單的一行代碼。
這裏咱們看到了一個熟悉的方法 --- 「subscribe」,以前咱們已經介紹過了,它主要用於訂閱服務渠道私有消息,這裏就是它用法的一個示例。對應的服務端 Service 向對應的前端訂閱者推送消息,這裏就是經過「echo」渠道向前端推送消息,他會回調「echoRpcReturn」方法,並傳入推送的消息做爲實參。對於後端的每次推送,都會調用前端的「echoRpcReturn」方法。
回頁首
Dojo 的 Cometd 工具包以後端
Dojo 的 Cometd 工具包的後端實現是基於 Java 和 Jetty 組件的,經過 Dojo 的服務端 Cometd 組件,咱們一樣能極其迅速的構建 Cometd 框架。咱們須要作的僅僅是加入咱們的業務邏輯代碼便可。
先來看看 web.xml 的配置參數:
清單 9. 基本配置參數(web.xml)
- <web-app xmlns="http://java.sun.com/xml/ns/javaee"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
- http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
- version="2.5">
-
- <servlet>
- <servlet-name>cometd</servlet-name>
- <servlet-class>
- org.cometd.server.continuation.ContinuationCometdServlet
- </servlet-class>
- <init-param>
- <param-name>timeout</param-name>
- <param-value>60000</param-value>
- </init-param>
- </servlet>
- <servlet-mapping>
- <servlet-name>cometd</servlet-name>
- <url-pattern>/cometd/*</url-pattern>
- </servlet-mapping>
-
- <filter>
- <filter-name>cross-origin</filter-name>
- <filter-class>org.eclipse.jetty.servlets.CrossOriginFilter</filter-class>
- </filter>
- <filter-mapping>
- <filter-name>cross-origin</filter-name>
- <url-pattern>/cometd/*</url-pattern>
- </filter-mapping>
-
- </web-app>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5"> <servlet> <servlet-name>cometd</servlet-name> <servlet-class> org.cometd.server.continuation.ContinuationCometdServlet </servlet-class> <init-param> <param-name>timeout</param-name> <param-value>60000</param-value> </init-param> </servlet> <servlet-mapping> <servlet-name>cometd</servlet-name> <url-pattern>/cometd/*</url-pattern> </servlet-mapping> <filter> <filter-name>cross-origin</filter-name> <filter-class>org.eclipse.jetty.servlets.CrossOriginFilter</filter-class> </filter> <filter-mapping> <filter-name>cross-origin</filter-name> <url-pattern>/cometd/*</url-pattern> </filter-mapping> </web-app>
這裏咱們先來看看「ContinuationCometdServlet」,這個 Servlet 主要用於解釋 Bayeux 協議,因此關於它的配置是必須的。基於「ContinuationCometdServlet」的其餘配置參數還有不少,如:
Timeout:長輪詢的過時時間。若是超過這個時間尚未客戶端消息,服務端會推送一個空消息。
Interval:輪詢間隔時間。客戶端結束前一個請求到發送下一個請求之間的間隔時間。
maxInterval:服務端最長等待時間。即:創建鏈接時,若是超過這個時間仍沒有接到一個新的長輪詢鏈接請求,服務端就會認爲該客戶端無效或者關閉了。
logLevel:日誌級別。「0 = warn, 1 = info, 2 = debug」。
以上是主要的配置參數,其他的配置參數還有不少,這裏不一一介紹,有須要的讀者能夠查閱 Dojo 的幫助文檔。另外,最後幾行咱們還配置了一個「cross-origin」,對應着「CrossOriginFilter」類,他用於支持跨域的 JavaScript 請求,若是您的項目中要支持跨域的服務器推送,請加入該配置。
接下來咱們再來看看一些高級配置參數:
清單 10. 高級配置參數(web.xml)
- <servlet>
- <servlet-name>cometd</servlet-name>
- <servlet-class>org.cometd.java.annotation.AnnotationCometdServlet</servlet-class>
- <init-param>
- <param-name>logLevel</param-name>
- <param-value>1</param-value>
- </init-param>
- <init-param>
- <param-name>services</param-name>
- <param-value>org.cometd.examples.ChatService</param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- </servlet>
- <servlet-mapping>
- <servlet-name>cometd</servlet-name>
- <url-pattern>/cometd/*</url-pattern>
- </servlet-mapping>
-
- <servlet>
- <servlet-name>cometdDemo</servlet-name>
- <servlet-class>org.cometd.examples.CometdDemoServlet</servlet-class>
- <load-on-startup>2</load-on-startup>
- </servlet>
<servlet> <servlet-name>cometd</servlet-name> <servlet-class>org.cometd.java.annotation.AnnotationCometdServlet</servlet-class> <init-param> <param-name>logLevel</param-name> <param-value>1</param-value> </init-param> <init-param> <param-name>services</param-name> <param-value>org.cometd.examples.ChatService</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>cometd</servlet-name> <url-pattern>/cometd/*</url-pattern> </servlet-mapping> <servlet> <servlet-name>cometdDemo</servlet-name> <servlet-class>org.cometd.examples.CometdDemoServlet</servlet-class> <load-on-startup>2</load-on-startup> </servlet>
這裏咱們主要要注意三個地方:
1. 「CometdDemoServlet」:它是用於啓動服務端 Cometd 框架的 Servlet,咱們在後面會介紹。因爲他配置了「load-on-startup」參數,因此在服務容器啓動的時候,咱們的 Cometd 服務端就已經搭建好了,以後咱們會着重介紹他的「init」方法中的行爲。
2. 「AnnotationCometdServlet」:這個 Servlet 配置在這裏表示了咱們在服務端代碼是基於 annotation 的。這是一個很是實用的 Servlet,經過這個 Servlet,你會發現,咱們要作的事情僅僅是定義幾個 Service 類,實現其中的幾個方法便可。連不少調用 Cometd 框架 API 接口的代碼都省去了。
3. 「ChatService」:這裏聲明瞭一個 Service 類,他的用途是處理服務渠道的消息。這裏聲明的做用等同於代碼中的「processor.process(new ChatService())」。
配置完成後,咱們接下來能夠看看代碼了。經過以上的配置以後,你會發現,咱們接下來要寫的代碼很是簡單精煉:
清單 11. 服務類初始化 init
- public void init() throws ServletException
- {
- final BayeuxServerImpl bayeux =
- (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
- if (bayeux==null)
- throw new UnavailableException("No BayeuxServer!");
- .................
- // 建立擴展點
- bayeux.addExtension(new TimesyncExtension());
- bayeux.addExtension(new AcknowledgedMessagesExtension());
- // 設定握手鍊接權限
- bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(
- GrantAuthorizer.GRANT_PUBLISH);
- // 啓動服務渠道
- ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
- processor.process(new EchoRPC());
- processor.process(new Monitor());
- //processor.process(new ChatService());
-
- bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer()
- {
- public void configureChannel(ConfigurableServerChannel channel)
- {
- channel.setPersistent(true);
- }
- });
- if (bayeux.getLogger().isDebugEnabled())
- System.err.println(bayeux.dump());
- .................
- }
public void init() throws ServletException { final BayeuxServerImpl bayeux = (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE); if (bayeux==null) throw new UnavailableException("No BayeuxServer!"); ................. // 建立擴展點 bayeux.addExtension(new TimesyncExtension()); bayeux.addExtension(new AcknowledgedMessagesExtension()); // 設定握手鍊接權限 bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer( GrantAuthorizer.GRANT_PUBLISH); // 啓動服務渠道 ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux); processor.process(new EchoRPC()); processor.process(new Monitor()); //processor.process(new ChatService()); bayeux.createIfAbsent("/foo/bar/baz",new ConfigurableServerChannel.Initializer() { public void configureChannel(ConfigurableServerChannel channel) { channel.setPersistent(true); } }); if (bayeux.getLogger().isDebugEnabled()) System.err.println(bayeux.dump()); ................. }
這裏咱們介紹三個知識點:
1. Extension:Extension 是一個函數,它會在消息發出以前或者收到以後被調用,專門用來修改消息內容,例如加入一些特殊屬性(這些屬性多在消息的 ext 屬性中)。注意,這些屬性大可能是應用無關的,如記錄長輪詢的次數等等。這裏的「TimesyncExtension」和「AcknowledgedMessagesExtension」是兩個比較經常使用的 Extension:
1) 「Timesync Extension」用於計算客戶端事件和服務端時間的誤差。客戶端須要同時引入「dojox.cometd.timesync」類,該 Extension 使得客戶端和服務端在每次握手或者鏈接的時候可以互相交換各自的時鐘信息,這也是的客戶端能夠很精確的計算出他與服務端時鐘的偏移量。消息格式以下:
{ext:{timesync:{tc:12345567890,ts:1234567900,p:123,a:3},...},...}
TC:客戶端發消息的時間(距離 1970 年 1 月號的時長,單位爲毫秒)
TS:服務端收到消息的時間
2) 「Acknowledge Extension」用於提供可靠的順序消息機制。一旦加入了「Acknowledge Extension」,服務端會阻截非長輪詢的客戶端請求,這樣會使你的服務器更加的高效。注意:客戶端須要同時引入「dojox.cometd.ack」類與其協同工做。
2. Authorizer:設定握手鍊接權限,這裏設定值爲「GrantAuthorizer.GRANT_PUBLISH」,表示容許全部客戶端創建握手鍊接。
3. Process Service:啓動服務渠道「processor.process(new EchoRPC())」。經過這些服務渠道類,咱們能夠啓動服務渠道處理客戶端請求。這是咱們服務端推送技術的關鍵所在,咱們的業務邏輯代碼也是主要放在這些服務渠道類裏面。
接下來咱們來看看這些服務渠道類的具體實現:
清單 12. Echo Service 實現
- @Service ("echo")
- public static class EchoRPC
- {
- @Session
- private ServerSession _session;
-
- @SuppressWarnings("unused")
- @Configure("/service/echo")
- private void configureEcho(ConfigurableServerChannel channel)
- {
- channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH);
- }
-
- @Listener ("/service/echo")
- public void doEcho(ServerSession session, ServerMessage message)
- {
- Map<String,Object> data = message.getDataAsMap();
- Log.info("ECHO from "+session+" "+data);
-
- for(int i = 0; i < 50; i++){
- session.deliver(_session, message.getChannel(), data, null);
- }
- }
- }
@Service("echo") public static class EchoRPC {
@Session private ServerSession _session; @SuppressWarnings("unused") @Configure("/service/echo") private void configureEcho(ConfigurableServerChannel channel) { channel.addAuthorizer(GrantAuthorizer.GRANT_SUBSCRIBE_PUBLISH); } @Listener("/service/echo") public void doEcho(ServerSession session, ServerMessage message) { Map<String,Object> data = message.getDataAsMap(); Log.info("ECHO from "+session+" "+data); for(int i = 0; i < 50; i++){ session.deliver(_session, message.getChannel(), data, null); } } }
咱們能夠在「configureEcho」裏面設定該服務渠道支持的權限。咱們主要來看看「doEcho」方法,它被標識爲「@Listener("/service/echo")」,因此它能夠用於像客戶端推送服務渠道爲「echo」的消息,咱們以前客戶端代碼示例裏面的以下代碼:「dojox.cometd.subscribe("/service/echo",echoRpcReturn)」就是專門用於處理這裏服務渠道推送的消息,消息推送經過「deliver」方法,推送的消息信息放在「data」實參中。
再來看看 Monitor 類:
清單 13. Monitor Service 實現
- @Service ("monitor")
- public static class Monitor
- {
- @Listener ("/meta/subscribe")
- public void monitorSubscribe(ServerSession session, ServerMessage message)
- {
- Log.info("Monitored Subscribe from "+session+" for "
- +message.get(Message.SUBSCRIPTION_FIELD));
- }
-
- @Listener ("/meta/unsubscribe")
- public void monitorUnsubscribe(ServerSession session, ServerMessage message)
- {
- Log.info("Monitored Unsubscribe from "+session+" for "
- +message.get(Message.SUBSCRIPTION_FIELD));
- }
-
- @Listener ("/meta/*")
- public void monitorMeta(ServerSession session, ServerMessage message)
- {
- if (Log.isDebugEnabled())
- Log.debug(message.toString());
- }
- }
@Service("monitor") public static class Monitor { @Listener("/meta/subscribe") public void monitorSubscribe(ServerSession session, ServerMessage message) { Log.info("Monitored Subscribe from "+session+" for " +message.get(Message.SUBSCRIPTION_FIELD)); } @Listener("/meta/unsubscribe") public void monitorUnsubscribe(ServerSession session, ServerMessage message) { Log.info("Monitored Unsubscribe from "+session+" for " +message.get(Message.SUBSCRIPTION_FIELD)); } @Listener("/meta/*") public void monitorMeta(ServerSession session, ServerMessage message) { if (Log.isDebugEnabled()) Log.debug(message.toString()); } }
Monitor 渠道類與以前的 Echo 服務渠道類比較相似,不過它主要用於處理 meta 渠道,與業務邏輯無關。
最後,咱們來看看被註釋掉的「ChatService」類,他也能夠經過「processor.process(new ChatService())」來啓用,可是咱們這裏用了一個更爲簡單的方法:直接配置在 web.xml 文件中:
清單 14. ChatService 的配置
- <servlet>
- ...............
- <init-param>
- <param-name>services</param-name>
- <param-value>org.cometd.examples.ChatService</param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- </servlet>
<servlet>
...............
<init-param>
<param-name>services</param-name>
<param-value>org.cometd.examples.ChatService</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
細心的讀者可能在以前的代碼示例中已經看到,這裏就是經過配置的方式加載服務渠道類。參考如下具體實現的代碼:
清單 15. ChatService 實現
- @Service ("chat")
- public class ChatService
- {
- ..........................................
- @Listener ("/service/members")
- public void handleMembership(ServerSession client, ServerMessage message)
- {
- Map<String, Object> data = message.getDataAsMap();
- final String room = ((String)data.get("room")).substring("/chat/".length());
- Map<String, String> roomMembers = _members.get(room);
- if (roomMembers == null)
- {
- Map<String, String> new_room = new ConcurrentHashMap<String, String>();
- roomMembers = _members.putIfAbsent(room, new_room);
- if (roomMembers == null) roomMembers = new_room;
- }
- final Map<String, String> members = roomMembers;
- String userName = (String)data.get("user");
- members.put(userName, client.getId());
- client.addListener(new ServerSession.RemoveListener()
- {
- public void removed(ServerSession session, boolean timeout)
- {
- members.values().remove(session.getId());
- broadcastMembers(room,members.keySet());
- }
- });
- broadcastMembers(room,members.keySet());
- }
- private void broadcastMembers(String room, Set<String> members)
- {
- // Broadcast the new members list
- ClientSessionChannel channel =
- _session.getLocalSession().getChannel("/members/"+room);
- channel.publish(members);
- }
- ..........................................
- @Listener ("/service/privatechat")
- protected void privateChat(ServerSession client, ServerMessage message)
- {
- Map<String,Object> data = message.getDataAsMap();
- String room = ((String)data.get("room")).substring("/chat/".length());
- Map<String, String> membersMap = _members.get(room);
- if (membersMap==null)
- {
- Map<String,String>new_room=new ConcurrentHashMap<String, String>();
- membersMap=_members.putIfAbsent(room,new_room);
- if (membersMap==null)
- membersMap=new_room;
- }
- String[] peerNames = ((String)data.get("peer")).split(",");
- ArrayList<ServerSession> peers = new ArrayList<ServerSession>(peerNames.length);
- .................
- }
- }
@Service("chat") public class ChatService { .......................................... @Listener("/service/members") public void handleMembership(ServerSession client, ServerMessage message) { Map<String, Object> data = message.getDataAsMap(); final String room = ((String)data.get("room")).substring("/chat/".length()); Map<String, String> roomMembers = _members.get(room); if (roomMembers == null) { Map<String, String> new_room = new ConcurrentHashMap<String, String>(); roomMembers = _members.putIfAbsent(room, new_room); if (roomMembers == null) roomMembers = new_room; } final Map<String, String> members = roomMembers; String userName = (String)data.get("user"); members.put(userName, client.getId()); client.addListener(new ServerSession.RemoveListener() { public void removed(ServerSession session, boolean timeout) { members.values().remove(session.getId()); broadcastMembers(room,members.keySet()); } }); broadcastMembers(room,members.keySet()); } private void broadcastMembers(String room, Set<String> members) { // Broadcast the new members list ClientSessionChannel channel = _session.getLocalSession().getChannel("/members/"+room); channel.publish(members); } .......................................... @Listener("/service/privatechat") protected void privateChat(ServerSession client, ServerMessage message) { Map<String,Object> data = message.getDataAsMap(); String room = ((String)data.get("room")).substring("/chat/".length()); Map<String, String> membersMap = _members.get(room); if (membersMap==null) { Map<String,String>new_room=new ConcurrentHashMap<String, String>(); membersMap=_members.putIfAbsent(room,new_room); if (membersMap==null) membersMap=new_room; } String[] peerNames = ((String)data.get("peer")).split(","); ArrayList<ServerSession> peers = new ArrayList<ServerSession>(peerNames.length); ................. } }
以上是摘錄部分 ChatService 實現代碼,它主要是實現一個在線的聊天室,包括公開發言和私有(1 對 1)聊天等等功能,它的實現方式與以前的 Echo 和 Monitor 相似,這裏不作詳述,有興趣的讀者能夠參考一下他的實現,來構造本身的服務器推送應用。
回頁首
服務器推送技術之比較
其實有不少種方式實現服務器推送,它們各有各的優缺點:
- 傳統輪詢:此方法是利用 HTML 裏面 meta 標籤的刷新功能,在必定時間間隔後進行頁面的轉載,以此循環往復。它的最大缺點就是頁面刷性給人帶來的體驗不好,並且服務器的壓力也會比較大。
- Ajax 輪詢:異步響應機制,即經過不間斷的客戶端 Ajax 請求,去發現服務端的變化。這種方式因爲是客戶端主動鏈接的,因此會有必定程度的延時,而且服務器的壓力也不小。
- 長鏈接:這也是咱們以前所介紹的一種方式。因爲它是利用客戶端的現有鏈接實現服務器主動向客戶端推送信息,因此延時的狀況不多,而且因爲服務端的可操控性使得服務器的壓力也迅速減少。其實這種技術還有其餘的實現方式,經過 Iframe,在頁面上嵌入一個隱藏幀(Iframe),將其「src」屬性指向一個長鏈接的請求,這樣一來,服務端就可以源源不斷的向客戶端發送數據。這種方式的不足就在於:它會形成瀏覽器的進度欄一直顯示沒有加載完成,固然咱們能夠經過 Google 的一個稱爲「htmlfile」的 ActiveX 控件解決,可是畢竟他須要安裝 ActiveX 控件,對於終端用戶也是不合適的。
- 套接字:能夠利用 Flash 的 XMLSocket 類或者 Java 的 Applet 來創建 Socket 鏈接,實現全雙工的服務器推送,而後經過 Flash 或者 Applet 與 JavaScript 通訊的接口來實現最終的數據推送。可是這種方式須要 Flash 或者 JVM 的支持,一樣不太合適於終端用戶。
- HTML5 的 WebSocket:這種方式其實與套接字同樣,可是這裏須要單獨強調一下:它是不須要用戶而外安裝任何插件的。HTML5 提供了一個 WebSocket 的 JavaScript 接口,能夠直接與服務端創建 Socket 鏈接,實現全雙工通訊,這種方式的服務器推送就是徹底意義上的服務器推送了,沒有半點模擬的成分,只是現階段支持 HTML5 的瀏覽器並很少,並且通常老版本的各類瀏覽器基本都不支持。不過 HTML5 是一套很是好的標準,在未來,當 HTML5 流行起來之後將是咱們實現服務器推送技術的不二選擇。
回頁首
結束語
這篇文章介紹了 Dojo 中的服務器推送 Cometd 工具包。基於服務器推送的理念,介紹了 Bayeux 協議的核心思想,並結合一個簡單示例介紹了服務器推送的基本實現。隨後,本着快速創建服務器推送框架的想法,介紹了 Dojo 的 Cometd 工具包,並分別從客戶端接口和服務端接口兩個方面分別介紹了 Dojo 的服務器推送框架的搭建和實現原理。最後,經過一些簡單的示例展現了基於服務端推送的業務邏輯的具體實現。服務端推送技術具備很強的實用性,但願廣大讀者在開發本身的項目的過程當中多關注一下,以儘量多的完善本身的 Web 應用。