開發建立XMPP「發佈訂閱」擴展(xmpp pubsub extend)

發佈訂閱(PubSub)是一個功能強大的XMPP協議擴展。用戶訂閱一個項目(在xmpp中叫作node),獲得通知時,也即當事項節點更新時。xmpp服務器通知用戶(經過message格式)。html

節點類型:java

  • Leaf node: 葉子節點,包含了發佈項.
  • Collection node: 能夠看作集合節點,它下面包含葉子.

注意:不能訂閱整個Collection node,只能訂閱Leaf nodenode

訪問和發佈模式 Access and Publisher Models

  • Open: 任何人都能訂閱
  • Authorize: 訂閱請求必須由全部者批准,只有認證的用戶能夠訂閱項目。
  • Whitelist: 白名單裏的用戶能夠訂閱.
  • Presence: 只有能收到發佈者也即Owner的即席狀態的用戶才能收到訂閱.
  • Roster: 只有在用戶花名冊或花名冊組內的用戶能夠收到訂閱事項提醒

在openfire裏,Whitelist的配置以下:數據庫

 

發佈者模式:服務器

  • Open: anyone may publish items to the node.(權限最大)
  • Publishers: owners and publishers are allowed to publish items to the node.
  • Subscribers: owners, publishers and subscribers are allowed to publish items to the node.

 

發佈訂閱的過程,發佈者發佈到葉子節點,訂閱者收到消息提醒app

XMPP中的訂閱流程

一、首先,須要確認你的服務器支持pubsub特性jsp

1.1  查詢XMPP服務的全部服務ide

<iq type='get'
from='wangxin@im/CVTalk'
to='im'
id='11'>
<query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>

返回:測試

<iq id="11" to="wangxin@im/PC" from="im" type="result">
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="server" name="Openfire Server" type="im"/>
<identity category="pubsub" name="null" type="pep"/>
<feature var="http://jabber.org/protocol/pubsub#manage-subscriptions"/>
<feature var="http://jabber.org/protocol/pubsub#modify-affiliations"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-default"/>
<feature var="http://jabber.org/protocol/pubsub#collections"/>
<feature var="jabber:iq:private"/>
<feature var="http://jabber.org/protocol/disco#items"/>
<feature var="vcard-temp"/>
<feature var="http://jabber.org/protocol/pubsub#publish"/>
<feature var="urn:xmpp:archive:auto"/>
<feature var="http://jabber.org/protocol/pubsub#subscribe"/>
<feature var="http://jabber.org/protocol/pubsub#retract-items"/>
<feature var="http://jabber.org/protocol/offline"/>
<feature var="http://jabber.org/protocol/pubsub#meta-data"/>
<feature var="jabber:iq:register"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-subscriptions"/>
<feature var="http://jabber.org/protocol/pubsub#default_access_model_open"/>
<feature var="jabber:iq:roster"/>
<feature var="http://jabber.org/protocol/pubsub#config-node"/>
<feature var="http://jabber.org/protocol/address"/>
<feature var="http://jabber.org/protocol/pubsub#publisher-affiliation"/>
<feature var="http://jabber.org/protocol/pubsub#item-ids"/>
<feature var="http://jabber.org/protocol/pubsub#instant-nodes"/>
<feature var="http://jabber.org/protocol/commands"/>
<feature var="http://jabber.org/protocol/pubsub#multi-subscribe"/>
<feature var="http://jabber.org/protocol/pubsub#outcast-affiliation"/>
<feature var="http://jabber.org/protocol/pubsub#get-pending"/>
<feature var="google:jingleinfo"/>
<feature var="jabber:iq:privacy"/>
<feature var="urn:xmpp:archive:manage"/>
<feature var="http://jabber.org/protocol/pubsub#subscription-options"/>
<feature var="jabber:iq:last"/>
<feature var="http://jabber.org/protocol/pubsub#create-and-configure"/>
<feature var="urn:xmpp:ping"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-items"/>
<feature var="jabber:iq:time"/>
<feature var="http://jabber.org/protocol/pubsub#create-nodes"/>
<feature var="http://jabber.org/protocol/pubsub#persistent-items"/>
<feature var="jabber:iq:version"/>
<feature var="http://jabber.org/protocol/pubsub#presence-notifications"/>
<feature var="http://jabber.org/protocol/pubsub"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-affiliations"/>
<feature var="http://jabber.org/protocol/pubsub#delete-nodes"/>
<feature var="http://jabber.org/protocol/pubsub#purge-nodes"/>
<feature var="http://jabber.org/protocol/disco#info"/>
<feature var="http://jabber.org/protocol/rsm"/>
</query>
</iq>
View Code

 

1.2 查詢某一項XMPP子域,如pubsubfetch

<iq type='get'
from='wangxin@im/PC'
to='pubsub.im'
id='11'>
<query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>

返回:

<iq id="11" to="wangxin@im/PC" from="pubsub.im" type="result">
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="pubsub" name="Publish-Subscribe service" type="service"/>
<feature var="http://jabber.org/protocol/pubsub"/>
<feature var="http://jabber.org/protocol/pubsub#collections"/>
<feature var="http://jabber.org/protocol/pubsub#config-node"/>
<feature var="http://jabber.org/protocol/pubsub#create-and-configure"/>
<feature var="http://jabber.org/protocol/pubsub#create-nodes"/>
<feature var="http://jabber.org/protocol/pubsub#delete-nodes"/>
<feature var="http://jabber.org/protocol/pubsub#get-pending"/>
<feature var="http://jabber.org/protocol/pubsub#instant-nodes"/>
<feature var="http://jabber.org/protocol/pubsub#item-ids"/>
<feature var="http://jabber.org/protocol/pubsub#meta-data"/>
<feature var="http://jabber.org/protocol/pubsub#modify-affiliations"/>
<feature var="http://jabber.org/protocol/pubsub#manage-subscriptions"/>
<feature var="http://jabber.org/protocol/pubsub#multi-subscribe"/>
<feature var="http://jabber.org/protocol/pubsub#outcast-affiliation"/>
<feature var="http://jabber.org/protocol/pubsub#persistent-items"/>
<feature var="http://jabber.org/protocol/pubsub#presence-notifications"/>
<feature var="http://jabber.org/protocol/pubsub#publish"/>
<feature var="http://jabber.org/protocol/pubsub#publisher-affiliation"/>
<feature var="http://jabber.org/protocol/pubsub#purge-nodes"/>
<feature var="http://jabber.org/protocol/pubsub#retract-items"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-affiliations"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-default"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-items"/>
<feature var="http://jabber.org/protocol/pubsub#retrieve-subscriptions"/>
<feature var="http://jabber.org/protocol/pubsub#subscribe"/>
<feature var="http://jabber.org/protocol/pubsub#subscription-options"/>
<feature var="http://jabber.org/protocol/pubsub#default_access_model_open"/>
<feature var="http://jabber.org/protocol/disco#info"/>
</query>
</iq>
View Code

 

1.3  查詢發佈訂閱中的某一個持久化的葉子節點

<iq type='get'
from='wangxin@im/PC'
to='pubsub.im'
id='info1'>
<query xmlns='http://jabber.org/protocol/disco#info'
node='NodeID_003'/>
</iq>

返回

<iq id="info1" to="wangxin@im/PC" from="pubsub.im" type="result">
<query xmlns="http://jabber.org/protocol/disco#info" node="NodeID_003">
<identity category="pubsub" name="null" type="leaf"/>
<feature var="http://jabber.org/protocol/pubsub"/>
<feature var="http://jabber.org/protocol/disco#info"/>
<x xmlns="jabber:x:data" type="result">
<field var="FORM_TYPE" type="hidden">
<value>http://jabber.org/protocol/pubsub#meta-data</value>
</field>
<field label="節點的簡化名" var="pubsub#title" type="text-single">
<value/>
</field>
<field label="節點的描述" var="pubsub#description" type="text-single">
<value/>
</field>
<field label="Whether the node is a leaf (default) or a collection" var="pubsub#node_type" type="text-single">
<value>leaf</value>
</field>
<field label="The collection with which a node is affiliated." var="pubsub#collection" type="text-single"/>
<field label="是否容許訂閱" var="pubsub#subscribe" type="boolean">
<value>1</value>
</field>
<field label="強制設置新的訂閱" var="pubsub#subscription_required" type="boolean">
<value>0</value>
</field>
<field label="用事件通知投送有效載荷" var="pubsub#deliver_payloads" type="boolean">
<value>1</value>
</field>
<field label="當節點配置改變時通知訂閱者" var="pubsub#notify_config" type="boolean">
<value>1</value>
</field>
<field label="當節點被刪除時通知訂閱者" var="pubsub#notify_delete" type="boolean">
<value>1</value>
</field>
<field label="當節點的項目被刪除時通知訂閱者" var="pubsub#notify_retract" type="boolean">
<value>1</value>
</field>
<field label="僅投送通知給有效的用戶" var="pubsub#presence_based_delivery" type="boolean">
<value>0</value>
</field>
<field label="指定有效的數據類型給此節點" var="pubsub#type" type="text-single">
<value/>
</field>
<field label="XSLT信息體" var="pubsub#body_xslt" type="text-single">
<value/>
</field>
<field label="XSLT有效載荷" var="pubsub#dataform_xslt" type="text-single">
<value/>
</field>
<field label="指定誰能夠訂閱和查看項目" var="pubsub#access_model" type="list-single">
<value>open</value>
<option>
<value>authorize</value>
</option>
<option>
<value>open</value>
</option>
<option>
<value>presence</value>
</option>
<option>
<value>roster</value>
</option>
<option>
<value>whitelist</value>
</option>
</field>
<field label="指定發佈者模型" var="pubsub#publish_model" type="list-single">
<value>publishers</value>
<option>
<value>publishers</value>
</option>
<option>
<value>subscribers</value>
</option>
<option>
<value>open</value>
</option>
</field>
<field label="好友列表容許訂閱" var="pubsub#roster_groups_allowed" type="list-multi"/>
<field label="有問題時聯繫相關人員" var="pubsub#contact" type="jid-multi"/>
<field label="默認的語言" var="pubsub#language" type="text-single">
<value>English</value>
</field>
<field label="節點的主人" var="pubsub#owner" type="jid-multi">
<value>test17@im</value>
</field>
<field label="節點的發佈者" var="pubsub#publisher" type="jid-multi"/>
<field label="選擇實體將收到的信息回覆給項目" var="pubsub#itemreply" type="list-single">
<value>owner</value>
</field>
<field label="多用戶對話房間的回覆將被傳遞" var="pubsub#replyroom" type="jid-multi"/>
<field label="給用戶的回覆將被傳遞" var="pubsub#replyto" type="jid-multi"/>
<field label="發送項目給新的訂閱者" var="pubsub#send_item_subscribe" type="boolean">
<value>1</value>
</field>
<field label="持續的項目被存儲" var="pubsub#persist_items" type="boolean">
<value>0</value>
</field>
<field label="項目的最大數字被持續化" var="pubsub#max_items" type="text-single">
<value>-1</value>
</field>
<field label="最大的有效載荷字節大小" var="pubsub#max_payload_size" type="text-single">
<value>5120</value>
</field>
</x>
</query>
</iq>

 

測試:經過smackx 和spark im客戶端實現發佈訂閱

發佈者:

import java.util.Date;

import org.jivesoftware.smack.XMPPConnection;  
import org.jivesoftware.smackx.pubsub.AccessModel;
import org.jivesoftware.smackx.pubsub.ConfigureForm;
import org.jivesoftware.smackx.pubsub.FormType;
import org.jivesoftware.smackx.pubsub.LeafNode;  
import org.jivesoftware.smackx.pubsub.PayloadItem;  
import org.jivesoftware.smackx.pubsub.PubSubManager;  
import org.jivesoftware.smackx.pubsub.PublishModel;
import org.jivesoftware.smackx.pubsub.SimplePayload;  

public class Publisher {
    private static XMPPConnection connection = new XMPPConnection("im.cvte.cn");  
    private static String USRE_NAME = "test17";  
    private static String PASSWORD = "password";  
    private static String nodeId = "NodeID_003"; 
      
    static{  
        try {  
            connection.connect();  
            connection.login(USRE_NAME,PASSWORD);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    public static void main(String[] args)throws Exception{  
  
        try{  
            PubSubManager manager = new PubSubManager(connection,"pubsub.im");  
               
            LeafNode myNode = null;  
            try {  
                myNode = manager.getNode(nodeId);  //建立葉子節點
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
            if(myNode == null){  
                myNode = manager.createNode(nodeId);  
            }  
            String id1 = "1001";
            
            SimplePayload payload1 = new SimplePayload("message","pubsub:cvtalk","<message xmlns='pubsub:cvtalk'><body>"+ id1+":消息發佈:"+ new Date().toString()+"</body></message>" );  

            //設置葉子節點參數,目前失靈
            ConfigureForm f = new ConfigureForm(FormType.submit);
            //配置參數
            f.setPersistentItems(true);  //是否持久化
            f.setDeliverPayloads(true);
            f.setAccessModel(AccessModel.open);
            f.setPublishModel(PublishModel.publishers);
            //f.setSubscribe(true);            
            //經過設置建立葉子
            //myNode =(LeafNode)manager.createNode(nodeId, f); 
            
            PayloadItem<SimplePayload> item1 = new PayloadItem<SimplePayload>(id1, payload1);
            //不帶itemID的SimplePayload,一樣是OK的
            //PayloadItem<SimplePayload> item1 = new PayloadItem<SimplePayload>(payload1);   
            myNode.publish(item1);  
            System.out.println("-----publish item1-----------"); 

        }  
        catch(Exception E)  
        {E.printStackTrace();}  
          
    }  
}

 

訂閱者,這裏的代碼請寫到spark的LoginDialog的login()方法 :

 

private boolean login() {
.......
            connection.login(.......
.......
PubSubManager manager = new PubSubManager(connection,"pubsub.im");  
                    Node eventNode = manager.getNode("NodeID_003");  
                    eventNode.addItemEventListener(new ItemEventListener<PayloadItem>() {  
                        public void handlePublishedItems(ItemPublishEvent evt) {  
                            System.out.println("收到訂閱的載荷數量=" + evt.getItems().size());  
                            for (Object obj : evt.getItems()) {  
                                PayloadItem<SimplePayload> item = (PayloadItem<SimplePayload>) obj;  
                                System.out.println("訂閱項目=" + item.getPayload().toString());  
                            }  
                        }  
                    });  
                    eventNode.subscribe(connection.getUser());  
......

 

 

訂閱到達的消息

<message id="NodeID_003__wangxin@im__0K463" to="wangxin@im/PC" from="pubsub.im">
  <thread>n4Ch63</thread>
  <event xmlns="http://jabber.org/protocol/pubsub#event">
    <items node="NodeID_003">
      <item id="1001">
        <message xmlns="pubsub:cvtalk">
          <body>1001:消息發佈:Tue Dec 08 15:36:59 CST 2015</body>
        </message>
      </item>
    </items>
  </event>
  <headers xmlns="http://jabber.org/protocol/shim">
    <header name="pubsub#subid">GP00jOONb9Lg2PRr0K0T01xunpquPmVC2q7QhjYg</header>
  </headers>
</message>

 

smack中的pubsub的其餘操做

獲取節點配置

public ConfigureForm getDefaultConfiguration()
        throws XMPPException
    {
        // Errors will cause exceptions in getReply, so it only returns
        // on success.
        PubSub reply = (PubSub)sendPubsubPacket(Type.GET, new NodeExtension(PubSubElementType.DEFAULT), PubSubElementType.DEFAULT.getNamespace());
        return NodeUtils.getFormFromPacket(reply, PubSubElementType.DEFAULT);
    }

刪除節點

    public void deleteNode(String nodeId)
        throws XMPPException
    {
        sendPubsubPacket(Type.SET, new NodeExtension(PubSubElementType.DELETE, nodeId), PubSubElementType.DELETE.getNamespace());
        nodeMap.remove(nodeId);
    }

 

監聽器

一共有3個監聽:

  • ItemDeleteListener
  • ItemEventListener
  • NodeConfigListener

其中 ItemEventListener使用的是泛型參數,類型是 org.jivesoftware.smackx.pubsub.Item

public interface ItemEventListener <T extends Item> 
{
    /**
     * Called whenever an item is published to the node the listener
     * is registered with.
     * 
     * @param items The publishing details.
     */
    void handlePublishedItems(ItemPublishEvent<T> items);
}

 

另外,Personal Event Publishing (XEP-163) 也是基於發佈訂閱,xmpp包體結構很相似,發佈的代碼:

   PEPManager pepManager = new PEPManager(smackConnection);
   pepManager.addPEPListener(new PEPListener() {
       public void eventReceived(String inFrom, PEPEvent inEvent) {
           LOGGER.debug("Event received: " + inEvent);
       }
   });

   PEPProvider pepProvider = new PEPProvider();
   pepProvider.registerPEPParserExtension("http://jabber.org/protocol/tune", new TuneProvider());
   ProviderManager.getInstance().addExtensionProvider("event", "http://jabber.org/protocol/pubsub#event", pepProvider);
   
   Tune tune = new Tune("jeff", "1", "CD", "My Title", "My Track");
   pepManager.publish(tune);

 

接收的監聽:

public interface PEPListener {

    /**
     * Called when PEP events are received as part of a presence subscribe or message filter.
     *  
     * @param from the user that sent the entries.
     * @param event the event contained in the message.
     */
    public void eventReceived(String from, PEPEvent event);

}

 

最後一個問題,在openfire中葉子節點上的新項目持久化到哪裏了?

PubSubPersistenceManager類中writePendingItems負責持久化到數據庫

private static void writePendingItems(Connection con, LinkedListNode<RetryWrapper> addItem, boolean batch) throws SQLException

但每次發佈卻看不到數據庫中的記錄,能夠在下面代碼找到答案,原來都提交內存了

writePendingItems(Connection con, LinkedList<RetryWrapper> addList, LinkedList<PublishedItem> delList) 將數據庫中的記錄刪除了

/**
     * Flush the cache(s) of items to be persisted (itemsToAdd) and deleted (itemsToDelete).
     * @param sendToCluster If true, delegate to cluster members, otherwise local only
     */
    public static void flushPendingItems(boolean sendToCluster)
    {
        // forward to other cluster members and wait for response
        if (sendToCluster) {
            CacheFactory.doSynchronousClusterTask(new FlushTask(), false);
        }

        if (itemsToAdd.getFirst() == null && itemsToDelete.getFirst() == null) {
            return;     // nothing to do for this cluster member
        }
        
        Connection con = null;
        boolean rollback = false;
        LinkedList<RetryWrapper> addList = null;
        LinkedList<PublishedItem> delList = null;

        // Swap pending items so we can parse and save the contents from this point in time
        // while not blocking new entries from being cached.
        synchronized(itemsPending) 
        {
            addList = itemsToAdd;
            delList = itemsToDelete;

            itemsToAdd = new LinkedList<RetryWrapper>();
            itemsToDelete = new LinkedList<PublishedItem>();
            
            // Ensure pending items are available via the item read cache;
            // this allows the item(s) to be fetched by other request threads
            // while being written to the DB from this thread
            int copied = 0;
            for (String key : itemsPending.keySet()) {
                if (!itemCache.containsKey(key)) {
                    itemCache.put(key, (((RetryWrapper)itemsPending.get(key).object)).get());
                    copied++;
                }
            }
            if (log.isDebugEnabled() && copied > 0) {
                log.debug("Added " + copied + " pending items to published item cache");
            }
            itemsPending.clear();
        }

        // Note that we now make multiple attempts to write cached items to the DB:
        //   1) insert all pending items in a single batch
        //   2) if the batch insert fails, retry by inserting each item separately
        //   3) if a given item cannot be written, return it to the pending write cache
        // By default step 3 will be tried once per item, but this can be configured
        // (or disabled) using the "xmpp.pubsub.item.retry" property. In the event of
        // a transaction rollback, items that could not be written to the database
        // will be returned to the pending item write cache.
        try {
            con = DbConnectionManager.getTransactionConnection();
            writePendingItems(con, addList, delList);
        } catch (SQLException se) {
            log.error("Failed to flush pending items; initiating rollback", se);
            // return new items to the write cache
            LinkedListNode<RetryWrapper> node = addList.getLast();
            while (node != null) {
                savePublishedItem(node.object);
                node.remove();
                node = addList.getLast();
            }
            rollback = true;
        } finally {
            DbConnectionManager.closeTransactionConnection(con, rollback);
        }
    }

 

 

參考網頁:
http://xmpp.org/extensions/xep-0060.html

http://xmpp.org/extensions/xep-0163.html

https://community.igniterealtime.org/thread/38433http://www.igniterealtime.org/support/articles/pubsub.jsphttp://blog.csdn.net/u011163195/article/details/17683741

相關文章
相關標籤/搜索