隨便寫寫,可能有錯的地方java
能夠看出deliver對session的依賴很強,最後調用Connection的deliver方法進行信息的發送,實際上最下面用的是minaapi
信息的處理大約分爲兩種方式session
[1]:接收客戶信息並做出響應(通常是在創建鏈接的時候),XMPPIoHandler的信息接收的回調函數中使用StanzaHandler中按照語法(Iq,Message,Presence)的類別進行分發到不一樣類別的路由(Router)-->(Handler)-->Session-->Connection.deliverdom
[2]:對已經創建鏈接的用戶進行消息的推送:NotificationManager.(sendBroadcast()|sendNotificationToUser())—>Session—>Connection.deliver函數
廣播函數:鏈接創建以後,發送推送消息的廣播接口spa
單個用戶發送: 鏈接創建以後,根據用戶名稱進行推送的接口.net
分析:debug
1. 該方法爲私有方法,那麼確定是爲了sendBroadcast和sendNotificationToUser兩個方法所設立3d
2. 返回的是smack包中的IQ,使用IQ語法來發送信息code
下圖顯示了
IQ的set語法
自定義的命名空間NOTIFICATION,裏面包含了自定的封裝信息
數據包的截圖:
private IQ createNotificationIQ(String apiKey, String title, String message, String uri) { Random random = new Random(); String id = Integer.toHexString(random.nextInt());//使用隨機+hex的方法對id初始 // String id = String.valueOf(System.currentTimeMillis()); Element notification = DocumentHelper.createElement(QName.get( "notification", NOTIFICATION_NAMESPACE));//讀取相應的namespace notification.addElement("id").setText(id); notification.addElement("apiKey").setText(apiKey); notification.addElement("title").setText(title); notification.addElement("message").setText(message); notification.addElement("uri").setText(uri); IQ iq = new IQ(); iq.setType(IQ.Type.set); iq.setChildElement(notification);//包裝 return iq;
爲XmppIoHandler信息接收器的直接引用
Xml信息的直接接收入口
對象屬性 構造函數[可見實例化的StanzaHandler只爲一個Connection進行服務,此實例在session創建的時候進行初始化] |
核心入口方法
public void process(String stanza, XMPPPacketReader reader) throws Exception { boolean initialStream = stanza.startsWith("<stream:stream");//判斷是否爲初始化狀態 if (!sessionCreated || initialStream) { if (!initialStream) { return; // Ignore <?xml version="1.0"?>//若是session還未創建,而且initialStream並非初始化信息,直接棄掉 } if (!sessionCreated) {//若是session還未創建,不論stream是否爲初始 sessionCreated = true;//是指session的標識符爲創建 MXParser parser = reader.getXPPParser(); parser.setInput(new StringReader(stanza)); createSession(parser);//創建session } else if (startedTLS) {//若是session已經實例化,並且是xmpp的初始,判斷是否須要進行tls握手 startedTLS = false; tlsNegotiated();//進行tls握手 } return; } // If end of stream was requested if (stanza.equals("</stream:stream>")) {//若是爲結尾stanza session.close();//進行session的關閉操做 return; } // Ignore <?xml version="1.0"?> if (stanza.startsWith("<?xml")) { return; } // Create DOM object//這裏排除了上述的可能性 Element doc = reader.read(new StringReader(stanza)).getRootElement();//得到根doc if (doc == null) { return; } String tag = doc.getName();//得到元素的name if ("starttls".equals(tag)) {//starttls關鍵字(重要)請求tls的通知節點 最後是利用Connection的startTLS(原理是在過濾器鏈的頭就加入過濾器(這裏用的是mina的基本方法)) if (negotiateTLS()) { // Negotiate TLS startedTLS = true; } else {//若是不成功,關閉鏈接 connection.close(); session = null; } } else if ("message".equals(tag)) {//message語法(按照圖示的樹形結構) processMessage(doc); } else if ("presence".equals(tag)) {//出席語法 log.debug("presence..."); processPresence(doc); } else if ("iq".equals(tag)) {//iq語法 log.debug("iq..."); processIQ(doc); } else { log.warn("Unexpected packet tag (not message, iq, presence)" + doc.asXML()); session.close(); } }
這個類把IQ packets分發到其相應的Handler之中
此類的核心方法
public void route(IQ packet) { if (packet == null) { throw new NullPointerException(); } JID sender = packet.getFrom();//獲取JID ClientSession session = sessionManager.getSession(sender); if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED || ("jabber:iq:auth".equals(packet.getChildElement() .getNamespaceURI()) || "jabber:iq:register".equals(packet.getChildElement() .getNamespaceURI()) || "urn:ietf:params:xml:ns:xmpp-bind" .equals(packet.getChildElement().getNamespaceURI()))) { handle(packet);//在這裏掉用私有方法進行回覆 } else {//出現not_authorized 異常 IQ reply = IQ.createResultIQ(packet);//使用IQ自帶的create方法對result進行建造 reply.setChildElement(packet.getChildElement().createCopy()); reply.setError(PacketError.Condition.not_authorized); session.process(reply);//直接包裝成error的回覆 } }
由route調用的私有方法
private void handle(IQ packet) { try { Element childElement = packet.getChildElement(); String namespace = null; if (childElement != null) { namespace = childElement.getNamespaceURI(); } if (namespace == null) { if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) { log.warn("Unknown packet " + packet); } } else { IQHandler handler = getHandler(namespace);//根據namespace獲得handler if (handler == null) { sendErrorPacket(packet, PacketError.Condition.service_unavailable); } else { handler.process(packet); //進行操做 } } } catch (Exception e) {//處理中出現異常 log.error("Could not route packet", e); Session session = sessionManager.getSession(packet.getFrom()); if (session != null) { IQ reply = IQ.createResultIQ(packet); reply.setError(PacketError.Condition.internal_server_error); session.process(reply); } } }
獲得handler的私有方法
private IQHandler getHandler(String namespace) { IQHandler handler = namespace2Handlers.get(namespace);//在這裏調查namespace2Handler if (handler == null) { for (IQHandler handlerCandidate : iqHandlers) { if (namespace.equalsIgnoreCase(handlerCandidate.getNamespace())) { handler = handlerCandidate; namespace2Handlers.put(namespace, handler); break; } } } return handler; }
public IQRouter() { sessionManager = SessionManager.getInstance(); iqHandlers.add(new IQAuthHandler()); iqHandlers.add(new IQRegisterHandler()); iqHandlers.add(new IQRosterHandler()); }
從上面的代碼能夠看出所分配的Handler就是IQAuthHandler, IQRegisterHandler, IQRosterHandler利用handlerCandidate作的分類路由 |
出席路由:專門爲了Presence packet作分發的路由
public void route(Presence packet) { if (packet == null) { throw new NullPointerException(); } ClientSession session = sessionManager.getSession(packet.getFrom()); if (session == null || session.getStatus() != Session.STATUS_CONNECTED) { handle(packet); } else { packet.setTo(session.getAddress()); packet.setFrom((JID) null); packet.setError(PacketError.Condition.not_authorized); session.process(packet); } }
private void handle(Presence packet) { try { Presence.Type type = packet.getType(); // Presence updates (null == 'available') if (type == null || Presence.Type.unavailable == type) {//不管是否可用交給handler進行操做 presenceUpdateHandler.process(packet);//在出席發送樹之中,只有presenceUpdateHandler,須要對presenceUpdateHandler進行分析 } else { log.warn("Unknown presence type"); } } catch (Exception e) { log.error("Could not route packet", e); Session session = sessionManager.getSession(packet.getFrom()); if (session != null) { session.close(); } } }
PresenceUpdateHandler.process public void process(Packet packet) { ClientSession session = sessionManager.getSession(packet.getFrom()); try { Presence presence = (Presence) packet; Presence.Type type = presence.getType(); if (type == null) { // null == available if (session != null && session.getStatus() == Session.STATUS_CLOSED) { log.warn("Rejected available presence: " + presence + " - " + session); return; } if (session != null) { session.setPresence(presence); if (!session.isInitialized()) { // initSession(session); session.setInitialized(true); } //添加離線發送 String username = null; try { username = session.getUsername(); } catch (UserNotFoundException e) { log.error("User Not Found!"); } if(username != null && session.getPresence().isAvailable()) { (new NotificationManager()).pushCacheMessage(username); } } } else if (Presence.Type.unavailable == type) { if (session != null) { session.setPresence(presence); } } else { presence = presence.createCopy(); if (session != null) { presence.setFrom(new JID(null, session.getServerName(), null, true)); presence.setTo(session.getAddress()); } else { JID sender = presence.getFrom(); presence.setFrom(presence.getTo()); presence.setTo(sender); } presence.setError(PacketError.Condition.bad_request); PacketDeliverer.deliver(presence); } } catch (Exception e) { log.error("Internal server error. Triggered by packet: " + packet, e); } }