即時通訊系統Openfire分析之六:路由表 RoutingTable

  仍是從會話管理提及

  上一章,Session通過預建立、認證以後,才正常可用。認證時,最重要的操做,就是將Session加入到路由表,使之擁用了通訊功能。html

  添加到至路由表的操做,是在SessionManager中操做的,以下:node

  SessionManager.addSession(LocalClientSession session):算法

public void addSession(LocalClientSession session) {
    // Add session to the routing table (routing table will know session is not available yet)
    routingTable.addClientRoute(session.getAddress(), session);
    // Remove the pre-Authenticated session but remember to use the temporary ID as the key
    localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString());
    SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ?
            SessionEventDispatcher.EventType.anonymous_session_created :
            SessionEventDispatcher.EventType.session_created;
    // Fire session created event.
    SessionEventDispatcher.dispatchEvent(session, event);
    if (ClusterManager.isClusteringStarted()) {
        // Track information about the session and share it with other cluster nodes
        sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session));
    }
}

  進入路由表模塊, RoutingTableImpl.addClientRoute(session.getAddress(), session)方法:數據庫

public boolean addClientRoute(JID route, LocalClientSession destination) {
    boolean added;
    boolean available = destination.getPresence().isAvailable();
    localRoutingTable.addRoute(route.toString(), destination);
    ......
    return added;
}

  從這裏能夠看出,路由表的底層,是藉助LocalRoutingTable類來實現。緩存

  路由表的底層數據結構

  LocalRoutingTable類的成員構成,很是的簡單:服務器

Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();

  也就是說,路由表的實質,就是一個Map的數據結構,其Key爲JID地址,Velue爲RoutableChannelHandler類型報文處理器。session

  查看路由表RoutingTableImpl模塊中的路由添加方法,能夠看到表中存儲的是以RoutableChannelHandler衍生出來的幾個Session類型,總共提供了三種:數據結構

  LocalOutgoingServerSession(用於存儲鏈接本機的遠程服務端)、LocalClientSession(用於存儲鏈接到本機的客戶端)、RoutableChannelHandler(用於存儲組件),類結構以下:dom

|-- RoutableChannelHandler
    |-- Session
        |-- LocalSession
            |-- LocalClientSession
            |-- LocalServerSession
                |-- LocalOutgoingServerSession

  而LocalRoutingTable內的全部方法,就是一系列對這個Map結構的操做函數,核心的以下幾個:ide

  添加路由:

boolean addRoute(String address, RoutableChannelHandler route) {
    return routes.put(address, route) != route;
}

  獲取路由:

RoutableChannelHandler getRoute(String address) {
    return routes.get(address);
}

  獲取客戶端的Session列表:

Collection<LocalClientSession> getClientRoutes() {
    List<LocalClientSession> sessions = new ArrayList<>();
    for (RoutableChannelHandler route : routes.values()) {
        if (route instanceof LocalClientSession) {
            sessions.add((LocalClientSession) route);
        }
    }
    return sessions;
}

  移除路由

void removeRoute(String address) {
    routes.remove(address);
}

  還有一個每3分鐘一次的定時任務,查詢並關閉被閒置了的遠程服務器Session,在路由表中啓動該任務

public void start() {
    int period = 3 * 60 * 1000;
    TaskEngine.getInstance().scheduleAtFixedRate(new ServerCleanupTask(), period, period);
}

  路由表模塊 RoutingTable

  路由表是Openfire的核心module之一,RoutingTable接口定義了一系列操做標準,主要圍繞路由表進行,提供添加,刪除,查詢,消息路由等操做,而RoutingTableImpl負責具體實現。

  先來看看RoutingTableImpl的成員列表

/**
 * 緩存外部遠程服務器session
 * Key: server domain, Value: nodeID
 */
private Cache<String, byte[]> serversCache;
/**
 * 緩存服務器的組件
 * Key: component domain, Value: list of nodeIDs hosting the component
 */
private Cache<String, Set<NodeID>> componentsCache;
/**
 * 緩存已認證的客戶端session
 * Key: full JID, Value: {nodeID, available/unavailable}
 */
private Cache<String, ClientRoute> usersCache;
/**
 * 緩存已認證匿名的客戶端session
 * Key: full JID, Value: {nodeID, available/unavailable}
 */
private Cache<String, ClientRoute> anonymousUsersCache;
/**
 * 緩存已認證(包括匿名)的客戶端Resource,一個用戶,在每一端登陸,都會有一個resource
 * Key: bare JID, Value: list of full JIDs of the user
 */
private Cache<String, Collection<String>> usersSessions;

private String serverName;                              // 服務器的域名
private XMPPServer server;                              // XMPP服務
private LocalRoutingTable localRoutingTable;            // 路由表底層
private RemotePacketRouter remotePacketRouter;          // 遠程包路由器
private IQRouter iqRouter;                              // IQ包路由器
private MessageRouter messageRouter;                    // Message包路由器
private PresenceRouter presenceRouter;                  // Presence包路由器
private PresenceUpdateHandler presenceUpdateHandler;    // 在線狀態更新處理器

  成員列表中,除了LocalRoutingTable以外,還定義了一堆的緩存。這些緩存幹嗎用?

  Openfire支持集羣機制,即在多臺服務器上分別運行一個Openfire實例,並使各個實例的數據同步。算法一致,數據一致,用戶無論鏈接到任意一臺服務器,效果就都同樣。

  集羣中的數據同步,除了數據庫以外,其餘的都是用經過緩存來處理,而上面的這些緩存正是集羣同步的一部分,用於同步用戶路由信息,每一個服務器都會有緩存的副本。

  總的來講,LocalRoutingTable用於存儲本機的路由數據,而Cache中是存儲了整個集羣的路由數據。

  可是,須要注意的一點,LocalRoutingTable與Cache,這二者的數據結構並不相同:

  (1)LocalRoutingTable中記錄了本機中全部的Session實例,能夠用來通訊

  (2)Cache中只存儲了用戶路由節點信息,須要經過集羣管理組件來獲取Session實例

  路由表的操做

  路由表的操做,實際上就是在會話管理中,對會話實例的操做。爲免與上面混淆,這一節的功能說明,以會話代稱。

  添加路由(會話)

  代碼以下:

@Override
public boolean addClientRoute(JID route, LocalClientSession destination) {
    boolean added;
    boolean available = destination.getPresence().isAvailable();
    
    // 加入到路由表
    localRoutingTable.addRoute(route.toString(), destination);
    
    // 若爲匿名客戶端,添加到anonymousUsersCache、usersSessions緩存隊列中
    if (destination.getAuthToken().isAnonymous()) {
        Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache);
        try {
            lockAn.lock();
            added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) ==
                    null;
        }
        finally {
            lockAn.unlock();
        }
        // Add the session to the list of user sessions
        if (route.getResource() != null && (!available || added)) {
            Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
            try {
                lock.lock();
                usersSessions.put(route.toBareJID(), Arrays.asList(route.toString()));
            }
            finally {
                lock.unlock();
            }
        }
    }
    
    // 非匿名客戶端,添加到usersCache、usersSessions緩存隊列中
    else {
        Lock lockU = CacheFactory.getLock(route.toString(), usersCache);
        try {
            lockU.lock();
            added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
        }
        finally {
            lockU.unlock();
        }
        // Add the session to the list of user sessions
        if (route.getResource() != null && (!available || added)) {
            Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
            try {
                lock.lock();
                Collection<String> jids = usersSessions.get(route.toBareJID());
                if (jids == null) {
                    // Optimization - use different class depending on current setup
                    if (ClusterManager.isClusteringStarted()) {
                        jids = new HashSet<>();
                    }
                    else {
                        jids = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
                    }
                }
                jids.add(route.toString());
                usersSessions.put(route.toBareJID(), jids);
            }
            finally {
                lock.unlock();
            }
        }
    }
    return added;
}

  主要兩步:

  (1)添加到路由表
  (2)添加到對應的緩存中

  移除路由(會話)

  代碼以下:

@Override
public boolean removeClientRoute(JID route) {
    boolean anonymous = false;
    String address = route.toString();
    ClientRoute clientRoute = null;
    
    // 從緩存中移除客戶端的Session信息
    Lock lockU = CacheFactory.getLock(address, usersCache);
    try {
        lockU.lock();
        clientRoute = usersCache.remove(address);
    }
    finally {
        lockU.unlock();
    }
    if (clientRoute == null) {
        Lock lockA = CacheFactory.getLock(address, anonymousUsersCache);
        try {
            lockA.lock();
            clientRoute = anonymousUsersCache.remove(address);
            anonymous = true;
        }
        finally {
            lockA.unlock();
        }
    }
    if (clientRoute != null && route.getResource() != null) {
        Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
        try {
            lock.lock();
            if (anonymous) {
                usersSessions.remove(route.toBareJID());
            }
            else {
                Collection<String> jids = usersSessions.get(route.toBareJID());
                if (jids != null) {
                    jids.remove(route.toString());
                    if (!jids.isEmpty()) {
                        usersSessions.put(route.toBareJID(), jids);
                    }
                    else {
                        usersSessions.remove(route.toBareJID());
                    }
                }
            }
        }
        finally {
            lock.unlock();
        }
    }
    
    // 將對應客戶端的Session信息,移出路由表
    localRoutingTable.removeRoute(address);
    return clientRoute != null;
}

  操做與添加相似:
  (1)移除緩存裏的路由信息
  (2)移除路由表中的信息

  獲取路由(會話)

@Override
public ClientSession getClientRoute(JID jid) {
    // Check if this session is hosted by this cluster node
    ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString());
    if (session == null) {
        // The session is not in this JVM so assume remote
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            // Check if the session is hosted by other cluster node
            ClientRoute route = usersCache.get(jid.toString());
            if (route == null) {
                route = anonymousUsersCache.get(jid.toString());
            }
            if (route != null) {
                session = locator.getClientSession(route.getNodeID().toByteArray(), jid);
            }
        }
    }
    return session;
}

  從上面的方法代碼中能夠看到,獲取路由的方法是:先查找本地路由表,若獲取不到對應Session時,則經過集羣獲取。RemoteSessionLocator是用於適配不一樣的集羣組件所抽象的接口,爲不一樣集羣組件提供了透明處理。

  至於如何從集羣中獲取Session,主要就在於sersCache和anonymousUsersCache這兩個cache,它們記錄了每一個客戶端的路由節點信息,經過它能夠取得對應的Session實例。詳見第八章《集羣管理》

  消息路由

  根據發送的形式,分爲兩種:一是廣播、二是單點路由

  一、以廣播的形式,向全部在線的客戶端發送消息

@Override
public void broadcastPacket(Message packet, boolean onlyLocal) {
    // Send the message to client sessions connected to this JVM
    for(ClientSession session : localRoutingTable.getClientRoutes()) {
        session.process(packet);
    }

    // Check if we need to broadcast the message to client sessions connected to remote cluter nodes
    if (!onlyLocal && remotePacketRouter != null) {
        remotePacketRouter.broadcastPacket(packet);
    }
}

  二、單點發送的形式,向某個指定的客戶端發送消息

@Override
public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {
    
    boolean routed = false;
    try {
        if (serverName.equals(jid.getDomain())) {
            // Packet sent to our domain.
            routed = routeToLocalDomain(jid, packet, fromServer);
            Log.info("routeToLocalDomain");
        }
        else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) {
            // Packet sent to component hosted in this server
            routed = routeToComponent(jid, packet, routed);
            Log.info("routeToComponent");
        }
        else {
            // Packet sent to remote server
            routed = routeToRemoteDomain(jid, packet, routed);
            Log.info("routeToRemoteDomain");
        }
    } catch (Exception ex) {
        // Catch here to ensure that all packets get handled, despite various processing
        // exceptions, rather than letting any fall through the cracks. For example,
        // an IAE could be thrown when running in a cluster if a remote member becomes 
        // unavailable before the routing caches are updated to remove the defunct node.
        // We have also occasionally seen various flavors of NPE and other oddities, 
        // typically due to unexpected environment or logic breakdowns. 
        Log.error("Primary packet routing failed", ex); 
    }

    if (!routed) {
        if (Log.isDebugEnabled()) {
            Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML());
        }
        if (packet instanceof IQ) {
            iqRouter.routingFailed(jid, packet);
        }
        else if (packet instanceof Message) {
            messageRouter.routingFailed(jid, packet);
        }
        else if (packet instanceof Presence) {
            presenceRouter.routingFailed(jid, packet);
        }
    }
}

  路由表中的功能,最後由SessionManager集中處理,詳見上一章的分析,這裏再也不贅述。

  特色提一點,比較有用:路由表作爲一個module已經在Openfire主服務啓動時完成實例化,因此,在自定義的插件、或者其餘任何須要發送消息的地方,只需選擇調用以下兩個方法中之一,便可完成消息發送:

XMPPServer.getInstance().getRoutingTable().routePacket(jid, packet, fromServer);

XMPPServer.getInstance().getRoutingTable().broadcastPacket(packet, onlyLocal);

  而消息發送中,最後消息如何送到網卡實現發送,在第三章《消息路由》中已經詳細分析,一樣再也不贅述。


 

  本章就到此結束,OVER!

相關文章
相關標籤/搜索