發佈訂閱(PubSub)是一個功能強大的XMPP協議擴展。用戶訂閱一個項目(在xmpp中叫作node),獲得通知時,也即當事項節點更新時。xmpp服務器通知用戶(經過message格式)。html
節點類型:java
注意:不能訂閱整個Collection node,只能訂閱Leaf nodenode
在openfire裏,Whitelist的配置以下:數據庫
發佈者模式:服務器
發佈訂閱的過程,發佈者發佈到葉子節點,訂閱者收到消息提醒app
一、首先,須要確認你的服務器支持pubsub特性jsp
1.1 查詢XMPP服務的全部服務ide
<iq type='get' from='wangxin@im/CVTalk' to='im' id='11'> <query xmlns='http://jabber.org/protocol/disco#info'/> </iq>
返回:測試
<iq id="11" to="wangxin@im/PC" from="im" type="result"> <query xmlns="http://jabber.org/protocol/disco#info"> <identity category="server" name="Openfire Server" type="im"/> <identity category="pubsub" name="null" type="pep"/> <feature var="http://jabber.org/protocol/pubsub#manage-subscriptions"/> <feature var="http://jabber.org/protocol/pubsub#modify-affiliations"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-default"/> <feature var="http://jabber.org/protocol/pubsub#collections"/> <feature var="jabber:iq:private"/> <feature var="http://jabber.org/protocol/disco#items"/> <feature var="vcard-temp"/> <feature var="http://jabber.org/protocol/pubsub#publish"/> <feature var="urn:xmpp:archive:auto"/> <feature var="http://jabber.org/protocol/pubsub#subscribe"/> <feature var="http://jabber.org/protocol/pubsub#retract-items"/> <feature var="http://jabber.org/protocol/offline"/> <feature var="http://jabber.org/protocol/pubsub#meta-data"/> <feature var="jabber:iq:register"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-subscriptions"/> <feature var="http://jabber.org/protocol/pubsub#default_access_model_open"/> <feature var="jabber:iq:roster"/> <feature var="http://jabber.org/protocol/pubsub#config-node"/> <feature var="http://jabber.org/protocol/address"/> <feature var="http://jabber.org/protocol/pubsub#publisher-affiliation"/> <feature var="http://jabber.org/protocol/pubsub#item-ids"/> <feature var="http://jabber.org/protocol/pubsub#instant-nodes"/> <feature var="http://jabber.org/protocol/commands"/> <feature var="http://jabber.org/protocol/pubsub#multi-subscribe"/> <feature var="http://jabber.org/protocol/pubsub#outcast-affiliation"/> <feature var="http://jabber.org/protocol/pubsub#get-pending"/> <feature var="google:jingleinfo"/> <feature var="jabber:iq:privacy"/> <feature var="urn:xmpp:archive:manage"/> <feature var="http://jabber.org/protocol/pubsub#subscription-options"/> <feature var="jabber:iq:last"/> <feature var="http://jabber.org/protocol/pubsub#create-and-configure"/> <feature var="urn:xmpp:ping"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-items"/> <feature var="jabber:iq:time"/> <feature var="http://jabber.org/protocol/pubsub#create-nodes"/> <feature var="http://jabber.org/protocol/pubsub#persistent-items"/> <feature var="jabber:iq:version"/> <feature var="http://jabber.org/protocol/pubsub#presence-notifications"/> <feature var="http://jabber.org/protocol/pubsub"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-affiliations"/> <feature var="http://jabber.org/protocol/pubsub#delete-nodes"/> <feature var="http://jabber.org/protocol/pubsub#purge-nodes"/> <feature var="http://jabber.org/protocol/disco#info"/> <feature var="http://jabber.org/protocol/rsm"/> </query> </iq>
1.2 查詢某一項XMPP子域,如pubsubfetch
<iq type='get' from='wangxin@im/PC' to='pubsub.im' id='11'> <query xmlns='http://jabber.org/protocol/disco#info'/> </iq>
返回:
<iq id="11" to="wangxin@im/PC" from="pubsub.im" type="result"> <query xmlns="http://jabber.org/protocol/disco#info"> <identity category="pubsub" name="Publish-Subscribe service" type="service"/> <feature var="http://jabber.org/protocol/pubsub"/> <feature var="http://jabber.org/protocol/pubsub#collections"/> <feature var="http://jabber.org/protocol/pubsub#config-node"/> <feature var="http://jabber.org/protocol/pubsub#create-and-configure"/> <feature var="http://jabber.org/protocol/pubsub#create-nodes"/> <feature var="http://jabber.org/protocol/pubsub#delete-nodes"/> <feature var="http://jabber.org/protocol/pubsub#get-pending"/> <feature var="http://jabber.org/protocol/pubsub#instant-nodes"/> <feature var="http://jabber.org/protocol/pubsub#item-ids"/> <feature var="http://jabber.org/protocol/pubsub#meta-data"/> <feature var="http://jabber.org/protocol/pubsub#modify-affiliations"/> <feature var="http://jabber.org/protocol/pubsub#manage-subscriptions"/> <feature var="http://jabber.org/protocol/pubsub#multi-subscribe"/> <feature var="http://jabber.org/protocol/pubsub#outcast-affiliation"/> <feature var="http://jabber.org/protocol/pubsub#persistent-items"/> <feature var="http://jabber.org/protocol/pubsub#presence-notifications"/> <feature var="http://jabber.org/protocol/pubsub#publish"/> <feature var="http://jabber.org/protocol/pubsub#publisher-affiliation"/> <feature var="http://jabber.org/protocol/pubsub#purge-nodes"/> <feature var="http://jabber.org/protocol/pubsub#retract-items"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-affiliations"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-default"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-items"/> <feature var="http://jabber.org/protocol/pubsub#retrieve-subscriptions"/> <feature var="http://jabber.org/protocol/pubsub#subscribe"/> <feature var="http://jabber.org/protocol/pubsub#subscription-options"/> <feature var="http://jabber.org/protocol/pubsub#default_access_model_open"/> <feature var="http://jabber.org/protocol/disco#info"/> </query> </iq>
1.3 查詢發佈訂閱中的某一個持久化的葉子節點
<iq type='get' from='wangxin@im/PC' to='pubsub.im' id='info1'> <query xmlns='http://jabber.org/protocol/disco#info' node='NodeID_003'/> </iq>
返回
<iq id="info1" to="wangxin@im/PC" from="pubsub.im" type="result"> <query xmlns="http://jabber.org/protocol/disco#info" node="NodeID_003"> <identity category="pubsub" name="null" type="leaf"/> <feature var="http://jabber.org/protocol/pubsub"/> <feature var="http://jabber.org/protocol/disco#info"/> <x xmlns="jabber:x:data" type="result"> <field var="FORM_TYPE" type="hidden"> <value>http://jabber.org/protocol/pubsub#meta-data</value> </field> <field label="節點的簡化名" var="pubsub#title" type="text-single"> <value/> </field> <field label="節點的描述" var="pubsub#description" type="text-single"> <value/> </field> <field label="Whether the node is a leaf (default) or a collection" var="pubsub#node_type" type="text-single"> <value>leaf</value> </field> <field label="The collection with which a node is affiliated." var="pubsub#collection" type="text-single"/> <field label="是否容許訂閱" var="pubsub#subscribe" type="boolean"> <value>1</value> </field> <field label="強制設置新的訂閱" var="pubsub#subscription_required" type="boolean"> <value>0</value> </field> <field label="用事件通知投送有效載荷" var="pubsub#deliver_payloads" type="boolean"> <value>1</value> </field> <field label="當節點配置改變時通知訂閱者" var="pubsub#notify_config" type="boolean"> <value>1</value> </field> <field label="當節點被刪除時通知訂閱者" var="pubsub#notify_delete" type="boolean"> <value>1</value> </field> <field label="當節點的項目被刪除時通知訂閱者" var="pubsub#notify_retract" type="boolean"> <value>1</value> </field> <field label="僅投送通知給有效的用戶" var="pubsub#presence_based_delivery" type="boolean"> <value>0</value> </field> <field label="指定有效的數據類型給此節點" var="pubsub#type" type="text-single"> <value/> </field> <field label="XSLT信息體" var="pubsub#body_xslt" type="text-single"> <value/> </field> <field label="XSLT有效載荷" var="pubsub#dataform_xslt" type="text-single"> <value/> </field> <field label="指定誰能夠訂閱和查看項目" var="pubsub#access_model" type="list-single"> <value>open</value> <option> <value>authorize</value> </option> <option> <value>open</value> </option> <option> <value>presence</value> </option> <option> <value>roster</value> </option> <option> <value>whitelist</value> </option> </field> <field label="指定發佈者模型" var="pubsub#publish_model" type="list-single"> <value>publishers</value> <option> <value>publishers</value> </option> <option> <value>subscribers</value> </option> <option> <value>open</value> </option> </field> <field label="好友列表容許訂閱" var="pubsub#roster_groups_allowed" type="list-multi"/> <field label="有問題時聯繫相關人員" var="pubsub#contact" type="jid-multi"/> <field label="默認的語言" var="pubsub#language" type="text-single"> <value>English</value> </field> <field label="節點的主人" var="pubsub#owner" type="jid-multi"> <value>test17@im</value> </field> <field label="節點的發佈者" var="pubsub#publisher" type="jid-multi"/> <field label="選擇實體將收到的信息回覆給項目" var="pubsub#itemreply" type="list-single"> <value>owner</value> </field> <field label="多用戶對話房間的回覆將被傳遞" var="pubsub#replyroom" type="jid-multi"/> <field label="給用戶的回覆將被傳遞" var="pubsub#replyto" type="jid-multi"/> <field label="發送項目給新的訂閱者" var="pubsub#send_item_subscribe" type="boolean"> <value>1</value> </field> <field label="持續的項目被存儲" var="pubsub#persist_items" type="boolean"> <value>0</value> </field> <field label="項目的最大數字被持續化" var="pubsub#max_items" type="text-single"> <value>-1</value> </field> <field label="最大的有效載荷字節大小" var="pubsub#max_payload_size" type="text-single"> <value>5120</value> </field> </x> </query> </iq>
發佈者:
import java.util.Date; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smackx.pubsub.AccessModel; import org.jivesoftware.smackx.pubsub.ConfigureForm; import org.jivesoftware.smackx.pubsub.FormType; import org.jivesoftware.smackx.pubsub.LeafNode; import org.jivesoftware.smackx.pubsub.PayloadItem; import org.jivesoftware.smackx.pubsub.PubSubManager; import org.jivesoftware.smackx.pubsub.PublishModel; import org.jivesoftware.smackx.pubsub.SimplePayload; public class Publisher { private static XMPPConnection connection = new XMPPConnection("im.cvte.cn"); private static String USRE_NAME = "test17"; private static String PASSWORD = "password"; private static String nodeId = "NodeID_003"; static{ try { connection.connect(); connection.login(USRE_NAME,PASSWORD); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args)throws Exception{ try{ PubSubManager manager = new PubSubManager(connection,"pubsub.im"); LeafNode myNode = null; try { myNode = manager.getNode(nodeId); //建立葉子節點 } catch (Exception e) { e.printStackTrace(); } if(myNode == null){ myNode = manager.createNode(nodeId); } String id1 = "1001"; SimplePayload payload1 = new SimplePayload("message","pubsub:cvtalk","<message xmlns='pubsub:cvtalk'><body>"+ id1+":消息發佈:"+ new Date().toString()+"</body></message>" ); //設置葉子節點參數,目前失靈 ConfigureForm f = new ConfigureForm(FormType.submit); //配置參數 f.setPersistentItems(true); //是否持久化 f.setDeliverPayloads(true); f.setAccessModel(AccessModel.open); f.setPublishModel(PublishModel.publishers); //f.setSubscribe(true); //經過設置建立葉子 //myNode =(LeafNode)manager.createNode(nodeId, f); PayloadItem<SimplePayload> item1 = new PayloadItem<SimplePayload>(id1, payload1); //不帶itemID的SimplePayload,一樣是OK的 //PayloadItem<SimplePayload> item1 = new PayloadItem<SimplePayload>(payload1); myNode.publish(item1); System.out.println("-----publish item1-----------"); } catch(Exception E) {E.printStackTrace();} } }
訂閱者,這裏的代碼請寫到spark的LoginDialog的login()方法 :
private boolean login() { ....... connection.login(....... .......
PubSubManager manager = new PubSubManager(connection,"pubsub.im"); Node eventNode = manager.getNode("NodeID_003"); eventNode.addItemEventListener(new ItemEventListener<PayloadItem>() { public void handlePublishedItems(ItemPublishEvent evt) { System.out.println("收到訂閱的載荷數量=" + evt.getItems().size()); for (Object obj : evt.getItems()) { PayloadItem<SimplePayload> item = (PayloadItem<SimplePayload>) obj; System.out.println("訂閱項目=" + item.getPayload().toString()); } } }); eventNode.subscribe(connection.getUser());
......
訂閱到達的消息
<message id="NodeID_003__wangxin@im__0K463" to="wangxin@im/PC" from="pubsub.im"> <thread>n4Ch63</thread> <event xmlns="http://jabber.org/protocol/pubsub#event"> <items node="NodeID_003"> <item id="1001"> <message xmlns="pubsub:cvtalk"> <body>1001:消息發佈:Tue Dec 08 15:36:59 CST 2015</body> </message> </item> </items> </event> <headers xmlns="http://jabber.org/protocol/shim"> <header name="pubsub#subid">GP00jOONb9Lg2PRr0K0T01xunpquPmVC2q7QhjYg</header> </headers> </message>
獲取節點配置
public ConfigureForm getDefaultConfiguration() throws XMPPException { // Errors will cause exceptions in getReply, so it only returns // on success. PubSub reply = (PubSub)sendPubsubPacket(Type.GET, new NodeExtension(PubSubElementType.DEFAULT), PubSubElementType.DEFAULT.getNamespace()); return NodeUtils.getFormFromPacket(reply, PubSubElementType.DEFAULT); }
刪除節點
public void deleteNode(String nodeId) throws XMPPException { sendPubsubPacket(Type.SET, new NodeExtension(PubSubElementType.DELETE, nodeId), PubSubElementType.DELETE.getNamespace()); nodeMap.remove(nodeId); }
一共有3個監聽:
其中 ItemEventListener使用的是泛型參數,類型是 org.jivesoftware.smackx.pubsub.Item
public interface ItemEventListener <T extends Item> { /** * Called whenever an item is published to the node the listener * is registered with. * * @param items The publishing details. */ void handlePublishedItems(ItemPublishEvent<T> items); }
另外,Personal Event Publishing (XEP-163) 也是基於發佈訂閱,xmpp包體結構很相似,發佈的代碼:
PEPManager pepManager = new PEPManager(smackConnection); pepManager.addPEPListener(new PEPListener() { public void eventReceived(String inFrom, PEPEvent inEvent) { LOGGER.debug("Event received: " + inEvent); } }); PEPProvider pepProvider = new PEPProvider(); pepProvider.registerPEPParserExtension("http://jabber.org/protocol/tune", new TuneProvider()); ProviderManager.getInstance().addExtensionProvider("event", "http://jabber.org/protocol/pubsub#event", pepProvider); Tune tune = new Tune("jeff", "1", "CD", "My Title", "My Track"); pepManager.publish(tune);
接收的監聽:
public interface PEPListener { /** * Called when PEP events are received as part of a presence subscribe or message filter. * * @param from the user that sent the entries. * @param event the event contained in the message. */ public void eventReceived(String from, PEPEvent event); }
最後一個問題,在openfire中葉子節點上的新項目持久化到哪裏了?
PubSubPersistenceManager類中writePendingItems負責持久化到數據庫
private static void writePendingItems(Connection con, LinkedListNode<RetryWrapper> addItem, boolean batch) throws SQLException
但每次發佈卻看不到數據庫中的記錄,能夠在下面代碼找到答案,原來都提交內存了
writePendingItems(Connection con, LinkedList<RetryWrapper> addList, LinkedList<PublishedItem> delList) 將數據庫中的記錄刪除了
/** * Flush the cache(s) of items to be persisted (itemsToAdd) and deleted (itemsToDelete). * @param sendToCluster If true, delegate to cluster members, otherwise local only */ public static void flushPendingItems(boolean sendToCluster) { // forward to other cluster members and wait for response if (sendToCluster) { CacheFactory.doSynchronousClusterTask(new FlushTask(), false); } if (itemsToAdd.getFirst() == null && itemsToDelete.getFirst() == null) { return; // nothing to do for this cluster member } Connection con = null; boolean rollback = false; LinkedList<RetryWrapper> addList = null; LinkedList<PublishedItem> delList = null; // Swap pending items so we can parse and save the contents from this point in time // while not blocking new entries from being cached. synchronized(itemsPending) { addList = itemsToAdd; delList = itemsToDelete; itemsToAdd = new LinkedList<RetryWrapper>(); itemsToDelete = new LinkedList<PublishedItem>(); // Ensure pending items are available via the item read cache; // this allows the item(s) to be fetched by other request threads // while being written to the DB from this thread int copied = 0; for (String key : itemsPending.keySet()) { if (!itemCache.containsKey(key)) { itemCache.put(key, (((RetryWrapper)itemsPending.get(key).object)).get()); copied++; } } if (log.isDebugEnabled() && copied > 0) { log.debug("Added " + copied + " pending items to published item cache"); } itemsPending.clear(); } // Note that we now make multiple attempts to write cached items to the DB: // 1) insert all pending items in a single batch // 2) if the batch insert fails, retry by inserting each item separately // 3) if a given item cannot be written, return it to the pending write cache // By default step 3 will be tried once per item, but this can be configured // (or disabled) using the "xmpp.pubsub.item.retry" property. In the event of // a transaction rollback, items that could not be written to the database // will be returned to the pending item write cache. try { con = DbConnectionManager.getTransactionConnection(); writePendingItems(con, addList, delList); } catch (SQLException se) { log.error("Failed to flush pending items; initiating rollback", se); // return new items to the write cache LinkedListNode<RetryWrapper> node = addList.getLast(); while (node != null) { savePublishedItem(node.object); node.remove(); node = addList.getLast(); } rollback = true; } finally { DbConnectionManager.closeTransactionConnection(con, rollback); } }
參考網頁:
http://xmpp.org/extensions/xep-0060.html
http://xmpp.org/extensions/xep-0163.html
https://community.igniterealtime.org/thread/38433http://www.igniterealtime.org/support/articles/pubsub.jsphttp://blog.csdn.net/u011163195/article/details/17683741