上一章,Session通過預建立、認證以後,才正常可用。認證時,最重要的操做,就是將Session加入到路由表,使之擁用了通訊功能。html
添加到至路由表的操做,是在SessionManager中操做的,以下:node
SessionManager.addSession(LocalClientSession session):算法
public void addSession(LocalClientSession session) { // Add session to the routing table (routing table will know session is not available yet) routingTable.addClientRoute(session.getAddress(), session); // Remove the pre-Authenticated session but remember to use the temporary ID as the key localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString()); SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ? SessionEventDispatcher.EventType.anonymous_session_created : SessionEventDispatcher.EventType.session_created; // Fire session created event. SessionEventDispatcher.dispatchEvent(session, event); if (ClusterManager.isClusteringStarted()) { // Track information about the session and share it with other cluster nodes sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session)); } }
進入路由表模塊, RoutingTableImpl.addClientRoute(session.getAddress(), session)方法:數據庫
public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); localRoutingTable.addRoute(route.toString(), destination); ...... return added; }
從這裏能夠看出,路由表的底層,是藉助LocalRoutingTable類來實現。緩存
LocalRoutingTable類的成員構成,很是的簡單:服務器
Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();
也就是說,路由表的實質,就是一個Map的數據結構,其Key爲JID地址,Velue爲RoutableChannelHandler類型報文處理器。session
查看路由表RoutingTableImpl模塊中的路由添加方法,能夠看到表中存儲的是以RoutableChannelHandler衍生出來的幾個Session類型,總共提供了三種:數據結構
LocalOutgoingServerSession(用於存儲鏈接本機的遠程服務端)、LocalClientSession(用於存儲鏈接到本機的客戶端)、RoutableChannelHandler(用於存儲組件),類結構以下:dom
|-- RoutableChannelHandler |-- Session |-- LocalSession |-- LocalClientSession |-- LocalServerSession |-- LocalOutgoingServerSession
而LocalRoutingTable內的全部方法,就是一系列對這個Map結構的操做函數,核心的以下幾個:ide
添加路由:
boolean addRoute(String address, RoutableChannelHandler route) { return routes.put(address, route) != route; }
獲取路由:
RoutableChannelHandler getRoute(String address) { return routes.get(address); }
獲取客戶端的Session列表:
Collection<LocalClientSession> getClientRoutes() { List<LocalClientSession> sessions = new ArrayList<>(); for (RoutableChannelHandler route : routes.values()) { if (route instanceof LocalClientSession) { sessions.add((LocalClientSession) route); } } return sessions; }
移除路由
void removeRoute(String address) { routes.remove(address); }
還有一個每3分鐘一次的定時任務,查詢並關閉被閒置了的遠程服務器Session,在路由表中啓動該任務
public void start() { int period = 3 * 60 * 1000; TaskEngine.getInstance().scheduleAtFixedRate(new ServerCleanupTask(), period, period); }
路由表是Openfire的核心module之一,RoutingTable接口定義了一系列操做標準,主要圍繞路由表進行,提供添加,刪除,查詢,消息路由等操做,而RoutingTableImpl負責具體實現。
/** * 緩存外部遠程服務器session * Key: server domain, Value: nodeID */ private Cache<String, byte[]> serversCache; /** * 緩存服務器的組件 * Key: component domain, Value: list of nodeIDs hosting the component */ private Cache<String, Set<NodeID>> componentsCache; /** * 緩存已認證的客戶端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> usersCache; /** * 緩存已認證匿名的客戶端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> anonymousUsersCache; /** * 緩存已認證(包括匿名)的客戶端Resource,一個用戶,在每一端登陸,都會有一個resource * Key: bare JID, Value: list of full JIDs of the user */ private Cache<String, Collection<String>> usersSessions; private String serverName; // 服務器的域名 private XMPPServer server; // XMPP服務 private LocalRoutingTable localRoutingTable; // 路由表底層 private RemotePacketRouter remotePacketRouter; // 遠程包路由器 private IQRouter iqRouter; // IQ包路由器 private MessageRouter messageRouter; // Message包路由器 private PresenceRouter presenceRouter; // Presence包路由器 private PresenceUpdateHandler presenceUpdateHandler; // 在線狀態更新處理器
成員列表中,除了LocalRoutingTable以外,還定義了一堆的緩存。這些緩存幹嗎用?
Openfire支持集羣機制,即在多臺服務器上分別運行一個Openfire實例,並使各個實例的數據同步。算法一致,數據一致,用戶無論鏈接到任意一臺服務器,效果就都同樣。
集羣中的數據同步,除了數據庫以外,其餘的都是用經過緩存來處理,而上面的這些緩存正是集羣同步的一部分,用於同步用戶路由信息,每一個服務器都會有緩存的副本。
總的來講,LocalRoutingTable用於存儲本機的路由數據,而Cache中是存儲了整個集羣的路由數據。
可是,須要注意的一點,LocalRoutingTable與Cache,這二者的數據結構並不相同:
(1)LocalRoutingTable中記錄了本機中全部的Session實例,能夠用來通訊
(2)Cache中只存儲了用戶路由節點信息,須要經過集羣管理組件來獲取Session實例
路由表的操做,實際上就是在會話管理中,對會話實例的操做。爲免與上面混淆,這一節的功能說明,以會話代稱。
添加路由(會話)
代碼以下:
@Override public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); // 加入到路由表 localRoutingTable.addRoute(route.toString(), destination); // 若爲匿名客戶端,添加到anonymousUsersCache、usersSessions緩存隊列中 if (destination.getAuthToken().isAnonymous()) { Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache); try { lockAn.lock(); added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockAn.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); usersSessions.put(route.toBareJID(), Arrays.asList(route.toString())); } finally { lock.unlock(); } } } // 非匿名客戶端,添加到usersCache、usersSessions緩存隊列中 else { Lock lockU = CacheFactory.getLock(route.toString(), usersCache); try { lockU.lock(); added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockU.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids == null) { // Optimization - use different class depending on current setup if (ClusterManager.isClusteringStarted()) { jids = new HashSet<>(); } else { jids = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); } } jids.add(route.toString()); usersSessions.put(route.toBareJID(), jids); } finally { lock.unlock(); } } } return added; }
主要兩步:
(1)添加到路由表
(2)添加到對應的緩存中
移除路由(會話)
代碼以下:
@Override public boolean removeClientRoute(JID route) { boolean anonymous = false; String address = route.toString(); ClientRoute clientRoute = null; // 從緩存中移除客戶端的Session信息 Lock lockU = CacheFactory.getLock(address, usersCache); try { lockU.lock(); clientRoute = usersCache.remove(address); } finally { lockU.unlock(); } if (clientRoute == null) { Lock lockA = CacheFactory.getLock(address, anonymousUsersCache); try { lockA.lock(); clientRoute = anonymousUsersCache.remove(address); anonymous = true; } finally { lockA.unlock(); } } if (clientRoute != null && route.getResource() != null) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); if (anonymous) { usersSessions.remove(route.toBareJID()); } else { Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids != null) { jids.remove(route.toString()); if (!jids.isEmpty()) { usersSessions.put(route.toBareJID(), jids); } else { usersSessions.remove(route.toBareJID()); } } } } finally { lock.unlock(); } } // 將對應客戶端的Session信息,移出路由表 localRoutingTable.removeRoute(address); return clientRoute != null; }
操做與添加相似:
(1)移除緩存裏的路由信息
(2)移除路由表中的信息
獲取路由(會話)
@Override public ClientSession getClientRoute(JID jid) { // Check if this session is hosted by this cluster node ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString()); if (session == null) { // The session is not in this JVM so assume remote RemoteSessionLocator locator = server.getRemoteSessionLocator(); if (locator != null) { // Check if the session is hosted by other cluster node ClientRoute route = usersCache.get(jid.toString()); if (route == null) { route = anonymousUsersCache.get(jid.toString()); } if (route != null) { session = locator.getClientSession(route.getNodeID().toByteArray(), jid); } } } return session; }
從上面的方法代碼中能夠看到,獲取路由的方法是:先查找本地路由表,若獲取不到對應Session時,則經過集羣獲取。RemoteSessionLocator是用於適配不一樣的集羣組件所抽象的接口,爲不一樣集羣組件提供了透明處理。
至於如何從集羣中獲取Session,主要就在於sersCache和anonymousUsersCache這兩個cache,它們記錄了每一個客戶端的路由節點信息,經過它能夠取得對應的Session實例。詳見第八章《集羣管理》
根據發送的形式,分爲兩種:一是廣播、二是單點路由
一、以廣播的形式,向全部在線的客戶端發送消息
@Override public void broadcastPacket(Message packet, boolean onlyLocal) { // Send the message to client sessions connected to this JVM for(ClientSession session : localRoutingTable.getClientRoutes()) { session.process(packet); } // Check if we need to broadcast the message to client sessions connected to remote cluter nodes if (!onlyLocal && remotePacketRouter != null) { remotePacketRouter.broadcastPacket(packet); } }
二、單點發送的形式,向某個指定的客戶端發送消息
@Override public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException { boolean routed = false; try { if (serverName.equals(jid.getDomain())) { // Packet sent to our domain. routed = routeToLocalDomain(jid, packet, fromServer); Log.info("routeToLocalDomain"); } else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) { // Packet sent to component hosted in this server routed = routeToComponent(jid, packet, routed); Log.info("routeToComponent"); } else { // Packet sent to remote server routed = routeToRemoteDomain(jid, packet, routed); Log.info("routeToRemoteDomain"); } } catch (Exception ex) { // Catch here to ensure that all packets get handled, despite various processing // exceptions, rather than letting any fall through the cracks. For example, // an IAE could be thrown when running in a cluster if a remote member becomes // unavailable before the routing caches are updated to remove the defunct node. // We have also occasionally seen various flavors of NPE and other oddities, // typically due to unexpected environment or logic breakdowns. Log.error("Primary packet routing failed", ex); } if (!routed) { if (Log.isDebugEnabled()) { Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML()); } if (packet instanceof IQ) { iqRouter.routingFailed(jid, packet); } else if (packet instanceof Message) { messageRouter.routingFailed(jid, packet); } else if (packet instanceof Presence) { presenceRouter.routingFailed(jid, packet); } } }
路由表中的功能,最後由SessionManager集中處理,詳見上一章的分析,這裏再也不贅述。
特色提一點,比較有用:路由表作爲一個module已經在Openfire主服務啓動時完成實例化,因此,在自定義的插件、或者其餘任何須要發送消息的地方,只需選擇調用以下兩個方法中之一,便可完成消息發送:
XMPPServer.getInstance().getRoutingTable().routePacket(jid, packet, fromServer);
XMPPServer.getInstance().getRoutingTable().broadcastPacket(packet, onlyLocal);
而消息發送中,最後消息如何送到網卡實現發送,在第三章《消息路由》中已經詳細分析,一樣再也不贅述。
本章就到此結束,OVER!