XMPP協議之消息回執解決方案

苦惱中尋找方法

在開始作即時通訊時就知道了消息回執這個概念,目的是解決通信消息由於各類緣由未送達對方而提供的一種保障機制。產生這個問題的緣由主要是網絡不穩定、服務器或者客戶端一些異常致使沒有接收到消息。html

由於產品中使用的是openfire和spark的組合,因此一直就想在這個範圍內找一個現成的方案,只不過經過閱讀一些開發者的總結提到說openfire沒有消息回執的方案。因而也看到了別人的方案:git

  1. 發送者發送消息給服務端
  2. 服務端接收到消息後發送回執給發送者
  3. 發送者確認收到則結束,若是未收到就重發
  4. 服務端將消息記錄一下,並推送給接收者,等待接收者的回執
  5. 接收者接收消息併發回執給服務端
  6. 服務端接收回執刪除掉消息回執記錄,表示已經發送完畢
  7. 若是必定時間內沒收到從新推送消息給客戶端
  8. 接收者若是收到消息進行去重處理,若是不重複的執行第5-6步

這個流程基本就是完成了消息回執的功能,核心點就是在於發送者-服務端-接收者三者之間創建一個消息確認機制。這個方案若是要本身實現的話須要定製一套消息協議了,這個實現方法比較多,對於XMPP來講發message、iq均可以。固然也能夠看到這套方案會帶來問題,就是每條消息都要執行一套確認,因此會增大流量和計算量。github

流量對於移動網絡來講仍是很重要的,並且移動網絡由於移動的緣由很容易出現不穩定,因此天然這部分的流量可能會更大些。可是也正由於移動網絡的不穩定就更須要消息回執來確認消息狀態了,解決丟包的問題。web

因而這就變成了一個雙向的問題,只要能是儘可能減小消息的體積以此來減小流量吧。服務器

只不過對於我來講方法有了,怎麼作是個問題,畢竟要實現一套這樣的功能,還要保證穩定,不然這個消息回執功能自己不穩定還不如不要呢。基本的設計思路也有了:websocket

  1. 客戶端維護兩個列表(發送回執隊列和接收回執隊列),用於保存發送/接收消息回執狀況
  2. 服務端也維護一個列表,用於記錄消息回執的接收與發送狀況,服務端對列表進行超時檢查,若是回執未發送的重發消息,若是收到重複的消息則去重處理
  3. 客戶端按期檢查兩個列表裏的回執狀態,若是未收到回執的要作重發處理,若是收到的是重複的回執則進行去重處理

方案差很少有了,只不過在檢閱網上資料時有了新的發現。網絡

柳暗花明

在看別人的總結時發現XMPP有擴展協議是支持消息回執功能的,就是XEP-0184.瞭解下來這個協議確實是一套消息回執的實現方法,可是呢。。併發

  1. 它必須在openfire3.9以上版本才支持,這個能夠在openfire的版本日誌裏能夠看到
  2. 它只是一種端到端的消息回執,並且只有接收端收到消息後纔會返回回執,這樣對於發送者來講很麻煩,若是接收者不在線沒法得知消息是否發出了,由於服務端不會告知發送者已經拿到消息了。只有等到接收者上線獲取了消息後,由接收者發送一條確認的回執給接收者

這個看起來很美好的東西,發現不大好用啊。因而看了本身的openfire是4以上版本的,因此確實支持。而後檢查了客戶端使用的smack包裏確實有XEP-0184的實現。socket

//這個類是一個統一調用的類
org.jivesoftware.smackx.receipts.DeliveryReceiptManager

//這個是發送者發送一個回執請求,告知客戶端我要消息回執
org.jivesoftware.smackx.receipts.DeliveryReceiptRequest

//這個是接收者收到消息後返回的回執確認
org.jivesoftware.smackx.receipts.DeliveryReceipt

//這個是用於發送者監聽接收者發來回執確認的事件
public interface ReceiptReceivedListener {
    /**
     * Callback invoked when a new receipt got received.
     * <p>
     * {@code receiptId} correspondents to the message ID, which can be obtained with
     * {@link org.jivesoftware.smack.packet.Stanza#getStanzaId()}.
     * </p>
     * 
     * @param fromJid the jid that send this receipt
     * @param toJid the jid which received this receipt
     * @param receiptId the message ID of the stanza(/packet) which has been received and this receipt is for
     * @param receipt the receipt
     */
    void onReceiptReceived(String fromJid, String toJid, String receiptId, Stanza receipt);
}

有了這三個傢伙確實是能夠作一套消息確認的機制,可是要在客戶端發送消息時發送一個DeliveryReceiptRequest,而後等待接收者發送回來的消息確認DeliveryReceipt。ide

public class ChatDemo {

    public static void main(String[] args) {
        AbstractXMPPConnection connection = SesseionHelper.newConn("192.168.11.111", 5222, "abc", "user1", "pwd1");
        
        //在發消息以前經過DeliveryReceiptManager訂閱回執
        DeliveryReceiptManager drm = DeliveryReceiptManager.getInstanceFor(connection);
        drm.addReceiptReceivedListener(new ReceiptReceivedListener() {
            
            @Override
            public void onReceiptReceived(String fromJid, String toJid,
                    String receiptId, Stanza receipt) {
                System.err.println((new Date()).toString()+ " - drm:" + receipt.toXML());
                
            }
        });
        
        Message msg = new Message("100069@bkos");
        msg.setBody("回覆個人消息1.");
        msg.setType(Type.chat);
        //將消息放到DeliveryReceiptRequest中,這樣就能夠在發送Message後發送回執請求
        DeliveryReceiptRequest.addTo(msg);
        
        try {
            connection.sendStanza(msg);
        } catch (NotConnectedException e) {
            e.printStackTrace();
        }
        
        connection.addAsyncStanzaListener(new StanzaListener() {
            
            @Override
            public void processPacket(Stanza packet) throws NotConnectedException {
                System.out.println((new Date()).toString()+ "- processPacket:" + packet.toXML());
            }
        }, new StanzaFilter() {
            @Override
            public boolean accept(Stanza stanza) {
                return stanza instanceof Message;
            }
        });
        
        while (true) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }

}

上面代碼是發送者要完成的代碼,這裏並無看到接收者返回回執的過程,這個實如今DeliveryReceiptManager裏完成的。

private DeliveryReceiptManager(XMPPConnection connection) {
    super(connection);
    ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
    sdm.addFeature(DeliveryReceipt.NAMESPACE);

    // Add the packet listener to handling incoming delivery receipts
    connection.addAsyncStanzaListener(new StanzaListener() {
        @Override
        public void processPacket(Stanza packet) throws NotConnectedException {
            DeliveryReceipt dr = DeliveryReceipt.from((Message) packet);
            // notify listeners of incoming receipt
            for (ReceiptReceivedListener l : receiptReceivedListeners) {
                l.onReceiptReceived(packet.getFrom(), packet.getTo(), dr.getId(), packet);
            }
        }
    }, MESSAGES_WITH_DELIVERY_RECEIPT);

    // Add the packet listener to handle incoming delivery receipt requests
    connection.addAsyncStanzaListener(new StanzaListener() {
        @Override
        public void processPacket(Stanza packet) throws NotConnectedException {
            final String from = packet.getFrom();
            final XMPPConnection connection = connection();
            switch (autoReceiptMode) {
            case disabled:
                return;
            case ifIsSubscribed:
                if (!Roster.getInstanceFor(connection).isSubscribedToMyPresence(from)) {
                    return;
                }
                break;
            case always:
                break;
            }

            final Message messageWithReceiptRequest = (Message) packet;
            Message ack = receiptMessageFor(messageWithReceiptRequest);
            if (ack == null) {
                LOGGER.warning("Received message stanza with receipt request from '" + from
                                + "' without a stanza ID set. Message: " + messageWithReceiptRequest);
                return;
            }
            connection.sendStanza(ack);
        }
    }, MESSAGES_WITH_DEVLIERY_RECEIPT_REQUEST);
}

DeliveryReceiptManager裏會訂閱消息事件,當收到消息是須要回執時發送ack包,這裏的ack就是帶了DeliveryReceipt的一個消息包。

好了,這個XEP-0184差很少看明白了,但並非想要的那種消息回執。它更像是手機消息或者郵件的那種接收確認回執。是端到端的一種確認機制。可是若是在服務端對這個消息作一些截取處理,作一箇中間狀態也是能夠達到咱們要的消息回執的狀態的。

作法就是在服務端截取XEP-0184的消息,若是是請求消息DeliveryReceiptRequest則在服務端保存記錄,同時服務端發送DeliveryReceipt(ack)給發送方。而後客戶端照樣接收消息返回ack後服務端截獲更新服務端記錄便可。

這種作法就是借用xep-0184協議來完成消息回執的功能。

真正的又一村

也不知道是否意外,在看一篇博文時發現了一個更有意思東西,就是XEP-0198.

它是幹啥的呢?

流管理背後的基本概念是,初始化的實體(一個服務端或者客戶端)和接收的實體(一個服務端)能夠爲更靈活的管理stream交換命令.下面兩條流管理的特性被普遍的關注,由於它們能夠提升網絡的可靠性和終端用戶的體驗:

  • Stanza確認(Stanza Acknowledgements) – 可以確認一段或者一系列Stanza是否已被某一方接收.
  • 流恢復(Stream Resumption) – 可以迅速的恢復(resume)一個已經被終止的流.

這就忽然發現又一村原來在這啊,XMPP畢竟最開始是基於TCP協議的,能夠在流的基礎上完成消息到達回執。它的特徵也代表了這點,一是能夠作消息確認,保證消息是否被另外一方接收。另一點就是在消息未確認接收時能夠作恢復(也就是重試)。這不就徹底知足咱們消息回執的要求了嗎?

它的工做過程是:一端發起 請求,另外一端必須以應答。

只不過在smack要4.1.x以上版本,並且默認是不開啓流管理功能的,因此要手動的開啓一下,剩下的事情由smack和openfire來完成。在創建TCPConnection前執行正面這句:

XMPPTCPConnection.setUseStreamManagementResumptionDefault(true);

這個代碼就是說開啓流恢復,固然流恢復開啓了Stanza確認也是要開啓的,能夠看setUseStreamManagementResumptionDefault的實現,裏面調用setUseStreamManagementDefault:

public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
    if (useSmResumptionDefault) {
        // Also enable SM is resumption is enabled
        setUseStreamManagementDefault(useSmResumptionDefault);
    }
    XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
}

openfire服務端默認是開啓這個功能的,在openfire.xml裏有設置:

<!-- XEP-0198 properties -->  
  <stream> 
    <management> 
      <!-- Whether stream management is offered to clients by server. -->  
      <active>true</active>  
      <!-- Number of stanzas sent to client before a stream management
                 acknowledgement request is made. -->  
      <requestFrequency>5</requestFrequency> 
    </management> 
  </stream>

好了,這樣就完成了消息回執的功能了。沒想到XMPP協議已經支持了整個流程,省去了不少事情,同時openfire中websocket也是支持xep-198,因此手機端應該也是能夠支持。

參考與引用

http://developerworks.github.io/2014/10/03/xmpp-xep-0198-stream-management/
http://blog.csdn.net/chszs/article/details/48576553

本文轉至我本身的博客: https://mini188.cn/c/XMPP%E5%8D%8F%E8%AE%AE%E4%B9%8B%E6%B6%88%E6%81%AF%E5%9B%9E%E6%89%A7%E8%A7%A3%E5%86%B3%E6%96%B9%E6%A1%88

相關文章
相關標籤/搜索