應該是去年的時候開始接觸openfire,當時在分析後發現基於xmpp協議的openfire已經具有了羣聊的功能。也就沒太當回事,以爲加點功能就能夠作成相似於QQ羣的那種模式。後來仔細瞭解後才發現並非這麼簡單:web
實際上對於openfire的muc改造來講,持久化成員是第一個重要的工做。咱們指望的是這個房間裏的人都是固定的成員,這些成員能夠離開聊天室,但下次能夠進來繼續聊天。其實實現起來也挺簡單的:redis
這裏ofMucAffiliation+ofMucMember保存的數據實際上是用於記錄的是用戶權限的,固然會發現其實這已經對應上咱們需求上的羣成員咯?確實是這樣的。數據庫
只不過有一個問題,就是ofMucAffiliation+ofMucMember裏只能知道用戶的jid,可是羣的話可能每一個用戶會修改本身的暱稱,對不對?因此還要加一個表用於保存這種用戶的個性化數據。固然這個表也挺簡單的就不細寫了。api
這塊涉及到寫插件的技術,網上有不少,我就很少說了。緩存
其實這些功能無非就是增刪改查,並且咱們添加的功能完成能夠獨立於openfire以外,因此本身寫一套也是能夠的。好比用web的方式實現也是能夠的。服務器
特別是能夠設計成rest api,這樣對於端來講是比較友好通用的,兼顧PC、移動端就簡單多了,特別是移動端走http協議總比走長連接方便吧。網絡
簡單的介紹了羣的實現,另一個比較頭痛的問題就是muc離線消息。在openfire裏是有相似的支持的,這裏就作一些簡單的分析吧。ide
由於在openfire裏歷史消息推送策略是這樣的,咱們看一下它的策略類HistoryStrategy,裏面設定了一個枚舉:this
/** * Strategy type. */ public enum Type { defaulType, none, all, number; }
能夠看出,其實就是三種:none(不顯示歷史記錄)、all(顯示整個歷史記錄)、number(指定條數記錄)。默認的是number。插件
策略類會維護一個內存列表,用於給新加入的用戶發送歷史記錄用:
private ConcurrentLinkedQueue<Message> history = new ConcurrentLinkedQueue<>();
實際上本身也能夠實現一套策略來代替它,好比將消息存在redis之類。只不過Openfire並無提供擴展,只能是修改openfire代碼來實現咯。
歷史消息的保存是在openfire裏的MultiUserChatServiceImpl裏實現的,它會啓動一個TimerTask,定時的將消息保存到歷史消息表裏。下面是定時任務的實現
/** * Logs the conversation of the rooms that have this feature enabled. */ private class LogConversationTask extends TimerTask { @Override public void run() { try { logConversation(); } catch (Throwable e) { Log.error(LocaleUtils.getLocalizedString("admin.error"), e); } } } private void logConversation() { ConversationLogEntry entry; boolean success; for (int index = 0; index <= log_batch_size && !logQueue.isEmpty(); index++) { entry = logQueue.poll(); if (entry != null) { success = MUCPersistenceManager.saveConversationLogEntry(entry); if (!success) { logQueue.add(entry); } } } }
這是具體的保存聊天曆史的代碼,能夠看到消息是放在logQueue裏的,而後定時任務從裏面取必定的條數保存到數據庫存中。MUCPersistenceManager就是數據庫的訪問類。
在start方法裏啓動
@Override public void start() { XMPPServer.getInstance().addServerListener( this ); // Run through the users every 5 minutes after a 5 minutes server startup delay (default // values) userTimeoutTask = new UserTimeoutTask(); TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout); // Log the room conversations every 5 minutes after a 5 minutes server startup delay // (default values) logConversationTask = new LogConversationTask(); TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout); // Remove unused rooms from memory cleanupTask = new CleanupTask(); TaskEngine.getInstance().schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY); // Set us up to answer disco item requests XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this); XMPPServer.getInstance().getIQDiscoInfoHandler().setServerNodeInfoProvider(this.getServiceDomain(), this); XMPPServer.getInstance().getServerItemsProviders().add(this); ArrayList<String> params = new ArrayList<>(); params.clear(); params.add(getServiceDomain()); Log.info(LocaleUtils.getLocalizedString("startup.starting.muc", params)); // Load all the persistent rooms to memory for (LocalMUCRoom room : MUCPersistenceManager.loadRoomsFromDB(this, this.getCleanupDate(), router)) { rooms.put(room.getName().toLowerCase(), room); } }
這裏是聊天室服務啓動的過程,它會啓動LogConversationTask用於按期將聊天記錄保存到庫裏。並且這裏最後幾句會發現啓動時會從庫裏讀取數據(MUCPersistenceManager.loadRoomsFromDB),loadRoomsFromDB實現了讀取Hitory數據到historyStrategy裏。
具體的數據保存在ofMucConversationLog表中。
有了歷史消息推送策略和數據,那麼怎麼樣推送給客戶端呢?這裏有一個history協議,在LocalMUCUser處理packet的時候,若是這個packet是Presence,而且帶有history節說明是客戶端發來要歷史記錄的。
在LocalMUCUser.process(Presence packet)裏有history消息節的處理代碼,由於代碼太多,就截取關鍵的部分:
// Get or create the room MUCRoom room = server.getChatRoom(group, packet.getFrom()); // User must support MUC in order to create a room HistoryRequest historyRequest = null; String password = null; // Check for password & requested history if client supports MUC if (mucInfo != null) { password = mucInfo.elementTextTrim("password"); if (mucInfo.element("history") != null) { historyRequest = new HistoryRequest(mucInfo); } } // The user joins the room role = room.joinRoom(recipient.getResource().trim(), password, historyRequest, this, packet.createCopy());
這裏能夠看到,先獲取到historyRequest節的信息,而後調用room.joinRoom方法。這裏的room.joinRoom就是用戶加入聊天室的關鍵部分。在joinRoom裏會發送歷史消息給這個用戶:
if (historyRequest == null) { Iterator<Message> history = roomHistory.getMessageHistory(); while (history.hasNext()) { joinRole.send(history.next()); } } else { historyRequest.sendHistory(joinRole, roomHistory); }
這裏會發現有兩種狀況,1種是historyRequest爲空的狀況時,服務端默認按照策略的設置向用戶發送歷史消息。若是不爲空,則根據客戶端的請求參數發送。那麼這裏咱們看看historyRequest的實現:
public class HistoryRequest { private static final Logger Log = LoggerFactory.getLogger(HistoryRequest.class); private static final XMPPDateTimeFormat xmppDateTime = new XMPPDateTimeFormat(); private int maxChars = -1; private int maxStanzas = -1; private int seconds = -1; private Date since; public HistoryRequest(Element userFragment) { Element history = userFragment.element("history"); if (history != null) { if (history.attribute("maxchars") != null) { this.maxChars = Integer.parseInt(history.attributeValue("maxchars")); } if (history.attribute("maxstanzas") != null) { this.maxStanzas = Integer.parseInt(history.attributeValue("maxstanzas")); } if (history.attribute("seconds") != null) { this.seconds = Integer.parseInt(history.attributeValue("seconds")); } if (history.attribute("since") != null) { try { // parse since String into Date this.since = xmppDateTime.parseString(history.attributeValue("since")); } catch(ParseException pe) { Log.error("Error parsing date from history management", pe); this.since = null; } } } } /** * Returns the total number of characters to receive in the history. * * @return total number of characters to receive in the history. */ public int getMaxChars() { return maxChars; } /** * Returns the total number of messages to receive in the history. * * @return the total number of messages to receive in the history. */ public int getMaxStanzas() { return maxStanzas; } /** * Returns the number of seconds to use to filter the messages received during that time. * In other words, only the messages received in the last "X" seconds will be included in * the history. * * @return the number of seconds to use to filter the messages received during that time. */ public int getSeconds() { return seconds; } /** * Returns the since date to use to filter the messages received during that time. * In other words, only the messages received since the datetime specified will be * included in the history. * * @return the since date to use to filter the messages received during that time. */ public Date getSince() { return since; } /** * Returns true if the history has been configured with some values. * * @return true if the history has been configured with some values. */ private boolean isConfigured() { return maxChars > -1 || maxStanzas > -1 || seconds > -1 || since != null; } /** * Sends the smallest amount of traffic that meets any combination of the requested criteria. * * @param joinRole the user that will receive the history. * @param roomHistory the history of the room. */ public void sendHistory(LocalMUCRole joinRole, MUCRoomHistory roomHistory) { if (!isConfigured()) { Iterator<Message> history = roomHistory.getMessageHistory(); while (history.hasNext()) { joinRole.send(history.next()); } } else { Message changedSubject = roomHistory.getChangedSubject(); boolean addChangedSubject = (changedSubject != null) ? true : false; if (getMaxChars() == 0) { // The user requested to receive no history if (addChangedSubject) { joinRole.send(changedSubject); } return; } int accumulatedChars = 0; int accumulatedStanzas = 0; Element delayInformation; LinkedList<Message> historyToSend = new LinkedList<>(); ListIterator<Message> iterator = roomHistory.getReverseMessageHistory(); while (iterator.hasPrevious()) { Message message = iterator.previous(); // Update number of characters to send String text = message.getBody() == null ? message.getSubject() : message.getBody(); if (text == null) { // Skip this message since it has no body and no subject continue; } accumulatedChars += text.length(); if (getMaxChars() > -1 && accumulatedChars > getMaxChars()) { // Stop collecting history since we have exceded a limit break; } // Update number of messages to send accumulatedStanzas ++; if (getMaxStanzas() > -1 && accumulatedStanzas > getMaxStanzas()) { // Stop collecting history since we have exceded a limit break; } if (getSeconds() > -1 || getSince() != null) { delayInformation = message.getChildElement("delay", "urn:xmpp:delay"); try { // Get the date when the historic message was sent Date delayedDate = xmppDateTime.parseString(delayInformation.attributeValue("stamp")); if (getSince() != null && delayedDate != null && delayedDate.before(getSince())) { // Stop collecting history since we have exceded a limit break; } if (getSeconds() > -1) { Date current = new Date(); long diff = (current.getTime() - delayedDate.getTime()) / 1000; if (getSeconds() <= diff) { // Stop collecting history since we have exceded a limit break; } } } catch (Exception e) { Log.error("Error parsing date from historic message", e); } } // Don't add the latest subject change if it's already in the history. if (addChangedSubject) { if (changedSubject != null && changedSubject.equals(message)) { addChangedSubject = false; } } historyToSend.addFirst(message); } // Check if we should add the latest subject change. if (addChangedSubject) { historyToSend.addFirst(changedSubject); } // Send the smallest amount of traffic to the user for (Object aHistoryToSend : historyToSend) { joinRole.send((Message) aHistoryToSend); } } } }
這裏面主要是用於約定發送歷史消息的一些參數:
private int maxChars = -1; private int maxStanzas = -1; private int seconds = -1; private Date since;
這是能夠設定的幾個參數,具體的對應關係以下面的表格所示
屬性 | 數據類型 | 含義 |
---|---|---|
maxchars | int | 限制歷史中的字符總數爲"X" (這裏的字符數量是所有 XML 節的字符數, 不僅是它們的 XML 字符數據). |
maxstanzas | int | 制歷史中的消息總數爲"X". |
seconds | int | 僅發送最後 "X" 秒收到的消息. |
since | datetime | 僅發送從指定日期時間 datetime 以後收到的消息 (這個datatime必須 MUST 符合XMPP Date and Time Profiles 13 定義的DateTime 規則,). |
固然這裏還實現了一個sendHistory方法,也就是針對客戶端提交了查詢要求時的歷史消息發送方法。具體的實現上面的代碼吧。也就是根據歷史管理屬性裏設定的幾個參數進行鍼對性的發送。
可是這裏有個關鍵點就是since屬性,它表示客戶端能夠設定一個時間戳,服務端根據發送這個時間戳以後的增量數據給客戶端。這個對於客戶端而已仍是頗有做用的。
那麼看完了openfire的歷史消息的實現,再來實現離線消息是否是就簡單的多了。羣聊天曆史消息有幾個問題:
因此不用舉太多問題,就這兩個就夠了,那麼我以爲openfire的這種歷史消息策略中使用number(條數)是很重要的。好比服務器只緩存最近1000條聊天曆史,這樣總體的服務器緩存量就低了。這就解決了第一個問題。
若是羣用戶須要查詢歷史上的數據,應該是另開一個服務接口專門用於查詢歷史數據,這樣就不用在剛上線進入羣時接收一堆的離線消息。
前面分析HistoryRequest時提到了它能夠設置一個時間戳參數,這個是告訴服務端從這個參數以後的歷史消息推送過來。
好比,用戶A昨天晚20:00下的線(最後消息時間戳是2017-06-07 20:00:00),今天早上8:00上線。在用戶A離線的時間裏有100條離心線消息記錄。
那麼用戶A上線,客戶端發送HistoryRequest(since=2017-06-07 20:00:00),服務器則只發送2017-06-07 20:00:00以後的聊天記錄100條。這樣就實現了增量的消息,對於服務端和客戶端都是友好的。
固然,這裏能發多少消息最終仍是要看服務端緩存了多少消息用於發送給客戶端,畢竟就像問題2中提出的那樣,用戶可能一個月都不上線,這期間的歷史消息要是都推送那確定崩了。因此上線時的歷史消息推送這個功能僅適合推送少許的數據。這個在具體的系統設計時應該根據實際狀況來設計。