教你如何把openfire的muc聊天室改造爲羣

openfire羣聊與QQ羣對比

應該是去年的時候開始接觸openfire,當時在分析後發現基於xmpp協議的openfire已經具有了羣聊的功能。也就沒太當回事,以爲加點功能就能夠作成相似於QQ羣的那種模式。後來仔細瞭解後才發現並非這麼簡單:web

  • muc其實聊天室的形式,房間建立後能夠加入聊天,用戶離開就退出聊天室了,並無一個用戶固化的功能,因此要單獨爲這部分開發
  • muc由於沒有固化的成員關係,因此並無1對1聊天的那種離線消息。並且考慮到消息量是羣發的緣由,因此服務器對於加入聊天室的成員只會推送必定量的消息,固然這個能夠經過策略來配置爲所有推送。事實上考慮到羣聊天的特性,推送指定條數多是更靠譜的。
  • 還有一些QQ特有的功能,好比邀請進羣須要管理員審覈之類的管理功能就更少了,這塊都須要擴展實現

改造Openfire羣聊天室爲羣

實際上對於openfire的muc改造來講,持久化成員是第一個重要的工做。咱們指望的是這個房間裏的人都是固定的成員,這些成員能夠離開聊天室,但下次能夠進來繼續聊天。其實實現起來也挺簡單的:redis

基於openfire的實現

  1. 創建數據表,用於保存成員列表
    在openfire裏已經有一系列的表用於保存muc相關的數據:
  • ofMucRoom-這個是房間表,保存了聊天室的信息
  • ofMucAffiliation-這個是保存房間裏管理員角色人員的表(owner(10)、admin(20)、outcast(40))
  • ofMucMember-這個是房間裏成員的列表(對應的是member(30))

這裏ofMucAffiliation+ofMucMember保存的數據實際上是用於記錄的是用戶權限的,固然會發現其實這已經對應上咱們需求上的羣成員咯?確實是這樣的。數據庫

只不過有一個問題,就是ofMucAffiliation+ofMucMember裏只能知道用戶的jid,可是羣的話可能每一個用戶會修改本身的暱稱,對不對?因此還要加一個表用於保存這種用戶的個性化數據。固然這個表也挺簡單的就不細寫了。api

  1. 經過openfire的插件體系增長一個插件,在服務端實現加羣、退羣等功能
    畢竟xmpp協議裏是沒有得到羣列表和房間成員的功能的,以及一些加羣、退羣的管理功能都沒有,因此要本身開發。這裏能夠經過openfire的插件體系來作,這樣比較獨立,不影響openfire內核功能。

這塊涉及到寫插件的技術,網上有不少,我就很少說了。緩存

  1. 本身定義一套協議來完成客戶端與服務端的通信
    由於要走openfire,因此仍是要定義xmpp協議,我用的是IQ。考慮到我使用的是smack作的,因此這部分就再也不寫了。有興趣或者須要的網上找找IQ協議的寫法就好了。

其餘方式

其實這些功能無非就是增刪改查,並且咱們添加的功能完成能夠獨立於openfire以外,因此本身寫一套也是能夠的。好比用web的方式實現也是能夠的。服務器

特別是能夠設計成rest api,這樣對於端來講是比較友好通用的,兼顧PC、移動端就簡單多了,特別是移動端走http協議總比走長連接方便吧。網絡

分析openfire muc羣聊歷史消息的實現

簡單的介紹了羣的實現,另一個比較頭痛的問題就是muc離線消息。在openfire裏是有相似的支持的,這裏就作一些簡單的分析吧。ide

歷史消息策略HistoryStrategy

由於在openfire裏歷史消息推送策略是這樣的,咱們看一下它的策略類HistoryStrategy,裏面設定了一個枚舉:this

/**
 * Strategy type.
 */
public enum Type {
    defaulType, none, all, number;
}

能夠看出,其實就是三種:none(不顯示歷史記錄)、all(顯示整個歷史記錄)、number(指定條數記錄)。默認的是number。插件

策略類會維護一個內存列表,用於給新加入的用戶發送歷史記錄用:

private ConcurrentLinkedQueue<Message> history = new ConcurrentLinkedQueue<>();

實際上本身也能夠實現一套策略來代替它,好比將消息存在redis之類。只不過Openfire並無提供擴展,只能是修改openfire代碼來實現咯。

歷史消息的保存與維護

歷史消息的保存是在openfire裏的MultiUserChatServiceImpl裏實現的,它會啓動一個TimerTask,定時的將消息保存到歷史消息表裏。下面是定時任務的實現

/**
 * Logs the conversation of the rooms that have this feature enabled.
 */
private class LogConversationTask extends TimerTask {
    @Override
    public void run() {
        try {
            logConversation();
        }
        catch (Throwable e) {
            Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
        }
    }
}

private void logConversation() {
    ConversationLogEntry entry;
    boolean success;
    for (int index = 0; index <= log_batch_size && !logQueue.isEmpty(); index++) {
        entry = logQueue.poll();
        if (entry != null) {
            success = MUCPersistenceManager.saveConversationLogEntry(entry);
            if (!success) {
                logQueue.add(entry);
            }
        }
    }
}

這是具體的保存聊天曆史的代碼,能夠看到消息是放在logQueue裏的,而後定時任務從裏面取必定的條數保存到數據庫存中。MUCPersistenceManager就是數據庫的訪問類。

在start方法裏啓動

@Override
public void start() {
    XMPPServer.getInstance().addServerListener( this );

    // Run through the users every 5 minutes after a 5 minutes server startup delay (default
    // values)
    userTimeoutTask = new UserTimeoutTask();
    TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout);
    // Log the room conversations every 5 minutes after a 5 minutes server startup delay
    // (default values)
    logConversationTask = new LogConversationTask();
    TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout);
    // Remove unused rooms from memory
    cleanupTask = new CleanupTask();
    TaskEngine.getInstance().schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY);

    // Set us up to answer disco item requests
    XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this);
    XMPPServer.getInstance().getIQDiscoInfoHandler().setServerNodeInfoProvider(this.getServiceDomain(), this);
    XMPPServer.getInstance().getServerItemsProviders().add(this);

    ArrayList<String> params = new ArrayList<>();
    params.clear();
    params.add(getServiceDomain());
    Log.info(LocaleUtils.getLocalizedString("startup.starting.muc", params));
    // Load all the persistent rooms to memory
    for (LocalMUCRoom room : MUCPersistenceManager.loadRoomsFromDB(this, this.getCleanupDate(), router)) {
        rooms.put(room.getName().toLowerCase(), room);
    }
}

這裏是聊天室服務啓動的過程,它會啓動LogConversationTask用於按期將聊天記錄保存到庫裏。並且這裏最後幾句會發現啓動時會從庫裏讀取數據(MUCPersistenceManager.loadRoomsFromDB),loadRoomsFromDB實現了讀取Hitory數據到historyStrategy裏。

具體的數據保存在ofMucConversationLog表中。

如何推送歷史消息給客戶端

有了歷史消息推送策略和數據,那麼怎麼樣推送給客戶端呢?這裏有一個history協議,在LocalMUCUser處理packet的時候,若是這個packet是Presence,而且帶有history節說明是客戶端發來要歷史記錄的。

在LocalMUCUser.process(Presence packet)裏有history消息節的處理代碼,由於代碼太多,就截取關鍵的部分:

// Get or create the room
MUCRoom room = server.getChatRoom(group, packet.getFrom());
// User must support MUC in order to create a room
HistoryRequest historyRequest = null;
String password = null;
// Check for password & requested history if client supports MUC
if (mucInfo != null) {
    password = mucInfo.elementTextTrim("password");
    if (mucInfo.element("history") != null) {
        historyRequest = new HistoryRequest(mucInfo);
    }
}
// The user joins the room
role = room.joinRoom(recipient.getResource().trim(),
        password,
        historyRequest,
        this,
        packet.createCopy());

這裏能夠看到,先獲取到historyRequest節的信息,而後調用room.joinRoom方法。這裏的room.joinRoom就是用戶加入聊天室的關鍵部分。在joinRoom裏會發送歷史消息給這個用戶:

if (historyRequest == null) {
    Iterator<Message> history = roomHistory.getMessageHistory();
    while (history.hasNext()) {
        joinRole.send(history.next());
    }
}
else {
    historyRequest.sendHistory(joinRole, roomHistory);
}

這裏會發現有兩種狀況,1種是historyRequest爲空的狀況時,服務端默認按照策略的設置向用戶發送歷史消息。若是不爲空,則根據客戶端的請求參數發送。那麼這裏咱們看看historyRequest的實現:

public class HistoryRequest {

    private static final Logger Log = LoggerFactory.getLogger(HistoryRequest.class);
    private static final XMPPDateTimeFormat xmppDateTime = new XMPPDateTimeFormat();

    private int maxChars = -1;
    private int maxStanzas = -1;
    private int seconds = -1;
    private Date since;

    public HistoryRequest(Element userFragment) {
        Element history = userFragment.element("history");
        if (history != null) {
            if (history.attribute("maxchars") != null) {
                this.maxChars = Integer.parseInt(history.attributeValue("maxchars"));
            }
            if (history.attribute("maxstanzas") != null) {
                this.maxStanzas = Integer.parseInt(history.attributeValue("maxstanzas"));
            }
            if (history.attribute("seconds") != null) {
                this.seconds = Integer.parseInt(history.attributeValue("seconds"));
            }
            if (history.attribute("since") != null) {
                try {
                    // parse since String into Date
                    this.since = xmppDateTime.parseString(history.attributeValue("since"));
                }
                catch(ParseException pe) {
                    Log.error("Error parsing date from history management", pe);
                    this.since = null;
                }
            }
        }
    }
    
    /**
     * Returns the total number of characters to receive in the history.
     * 
     * @return total number of characters to receive in the history.
     */
    public int getMaxChars() {
        return maxChars;
    }

    /**
     * Returns the total number of messages to receive in the history.
     * 
     * @return the total number of messages to receive in the history.
     */
    public int getMaxStanzas() {
        return maxStanzas;
    }

    /**
     * Returns the number of seconds to use to filter the messages received during that time. 
     * In other words, only the messages received in the last "X" seconds will be included in 
     * the history.
     * 
     * @return the number of seconds to use to filter the messages received during that time.
     */
    public int getSeconds() {
        return seconds;
    }

    /**
     * Returns the since date to use to filter the messages received during that time. 
     * In other words, only the messages received since the datetime specified will be 
     * included in the history.
     * 
     * @return the since date to use to filter the messages received during that time.
     */
    public Date getSince() {
        return since;
    }

    /**
     * Returns true if the history has been configured with some values.
     * 
     * @return true if the history has been configured with some values.
     */
    private boolean isConfigured() {
        return maxChars > -1 || maxStanzas > -1 || seconds > -1 || since != null;
    }

    /**
     * Sends the smallest amount of traffic that meets any combination of the requested criteria.
     * 
     * @param joinRole the user that will receive the history.
     * @param roomHistory the history of the room.
     */
    public void sendHistory(LocalMUCRole joinRole, MUCRoomHistory roomHistory) {
        if (!isConfigured()) {
            Iterator<Message> history = roomHistory.getMessageHistory();
            while (history.hasNext()) {
                joinRole.send(history.next());
            }
        }
        else {
            Message changedSubject = roomHistory.getChangedSubject();
            boolean addChangedSubject = (changedSubject != null) ? true : false;
            if (getMaxChars() == 0) {
                // The user requested to receive no history
                if (addChangedSubject) {
                    joinRole.send(changedSubject);
                }
                return;
            }
            int accumulatedChars = 0;
            int accumulatedStanzas = 0;
            Element delayInformation;
            LinkedList<Message> historyToSend = new LinkedList<>();
            ListIterator<Message> iterator = roomHistory.getReverseMessageHistory();
            while (iterator.hasPrevious()) {
                Message message = iterator.previous();
                // Update number of characters to send
                String text = message.getBody() == null ? message.getSubject() : message.getBody();
                if (text == null) {
                    // Skip this message since it has no body and no subject  
                    continue;
                }
                accumulatedChars += text.length();
                if (getMaxChars() > -1 && accumulatedChars > getMaxChars()) {
                    // Stop collecting history since we have exceded a limit
                    break;
                }
                // Update number of messages to send
                accumulatedStanzas ++;
                if (getMaxStanzas() > -1 && accumulatedStanzas > getMaxStanzas()) {
                    // Stop collecting history since we have exceded a limit
                    break;
                }

                if (getSeconds() > -1 || getSince() != null) {
                    delayInformation = message.getChildElement("delay", "urn:xmpp:delay");
                    try {
                        // Get the date when the historic message was sent
                        Date delayedDate = xmppDateTime.parseString(delayInformation.attributeValue("stamp"));
                        if (getSince() != null && delayedDate != null && delayedDate.before(getSince())) {
                            // Stop collecting history since we have exceded a limit
                            break;
                        }
                        if (getSeconds() > -1) {
                            Date current = new Date();
                            long diff = (current.getTime() - delayedDate.getTime()) / 1000;
                            if (getSeconds() <= diff) {
                                // Stop collecting history since we have exceded a limit
                                break;
                            }
                        }
                    }
                    catch (Exception e) {
                        Log.error("Error parsing date from historic message", e);
                    }

                }

                // Don't add the latest subject change if it's already in the history.
                if (addChangedSubject) {
                    if (changedSubject != null && changedSubject.equals(message)) {
                        addChangedSubject = false;
                    }
                }

                historyToSend.addFirst(message);
            }
            // Check if we should add the latest subject change.
            if (addChangedSubject) {
                historyToSend.addFirst(changedSubject);
            }
            // Send the smallest amount of traffic to the user
            for (Object aHistoryToSend : historyToSend) {
                joinRole.send((Message) aHistoryToSend);
            }
        }
    }
}

這裏面主要是用於約定發送歷史消息的一些參數:

private int maxChars = -1;
private int maxStanzas = -1;
private int seconds = -1;
private Date since;

這是能夠設定的幾個參數,具體的對應關係以下面的表格所示

歷史管理屬性

屬性 數據類型 含義
maxchars int 限制歷史中的字符總數爲"X" (這裏的字符數量是所有 XML 節的字符數, 不僅是它們的 XML 字符數據).
maxstanzas int 制歷史中的消息總數爲"X".
seconds int 僅發送最後 "X" 秒收到的消息.
since datetime 僅發送從指定日期時間 datetime 以後收到的消息 (這個datatime必須 MUST 符合XMPP Date and Time Profiles 13 定義的DateTime 規則,).

還有sendHistory

固然這裏還實現了一個sendHistory方法,也就是針對客戶端提交了查詢要求時的歷史消息發送方法。具體的實現上面的代碼吧。也就是根據歷史管理屬性裏設定的幾個參數進行鍼對性的發送。

可是這裏有個關鍵點就是since屬性,它表示客戶端能夠設定一個時間戳,服務端根據發送這個時間戳以後的增量數據給客戶端。這個對於客戶端而已仍是頗有做用的。

實現羣離線消息的方法

那麼看完了openfire的歷史消息的實現,再來實現離線消息是否是就簡單的多了。羣聊天曆史消息有幾個問題:

  • 問題1:羣人員龐大歷史消息巨大服務端如何緩存這些歷史數據?好比一個羣1000人,一人一天發10條,就有10000條/天,一個月就是30萬,這還只是一個聊天羣的,100個羣就是3000萬。
  • 問題2:對於羣成員而言,可能一個月未登陸,那麼可能就要接收這一個月的離線消息,客戶端基本就崩了,網絡流量也很巨大,怎麼處理?

利用HistoryStrategy限制服務端推送條數

因此不用舉太多問題,就這兩個就夠了,那麼我以爲openfire的這種歷史消息策略中使用number(條數)是很重要的。好比服務器只緩存最近1000條聊天曆史,這樣總體的服務器緩存量就低了。這就解決了第一個問題。

若是羣用戶須要查詢歷史上的數據,應該是另開一個服務接口專門用於查詢歷史數據,這樣就不用在剛上線進入羣時接收一堆的離線消息。

利用HistoryRequest來獲取增量數據

前面分析HistoryRequest時提到了它能夠設置一個時間戳參數,這個是告訴服務端從這個參數以後的歷史消息推送過來。

好比,用戶A昨天晚20:00下的線(最後消息時間戳是2017-06-07 20:00:00),今天早上8:00上線。在用戶A離線的時間裏有100條離心線消息記錄。

那麼用戶A上線,客戶端發送HistoryRequest(since=2017-06-07 20:00:00),服務器則只發送2017-06-07 20:00:00以後的聊天記錄100條。這樣就實現了增量的消息,對於服務端和客戶端都是友好的。

固然,這裏能發多少消息最終仍是要看服務端緩存了多少消息用於發送給客戶端,畢竟就像問題2中提出的那樣,用戶可能一個月都不上線,這期間的歷史消息要是都推送那確定崩了。因此上線時的歷史消息推送這個功能僅適合推送少許的數據。這個在具體的系統設計時應該根據實際狀況來設計。

相關文章
相關標籤/搜索