WebIM系列文章html
在一步一步打造WebIM(1)一文中,已經介紹瞭如何實現一個簡單的WebIM,可是,這個WebIM有一個問題,就是每一次添加消息監聽器時,都必須訪問一次數據庫去查詢是否有消息,顯然,若是用戶比較多時,必然對數據庫的壓力比較大。解決這個問題的一個方法就是先將消息緩存在內存中,不當即寫入數據庫,等到緩存滿了才寫入數據庫。本文將介紹如何實現消息緩存。數據庫
實現一個消息緩存管理類,以用戶爲單位緩存全部消息,每個用戶對應着一個List<Message>,保存着該用戶新收到的消息,消息緩存管理用一個Hashtable保存着全部用戶對應的List<Message>。緩存
具體實現代碼以下:spa
public class MessageCacheManagement { static MessageCacheManagement m_Instance = new MessageCacheManagement(); static public MessageCacheManagement Instance { get { return m_Instance; } } private MessageCacheManagement() { } Int32 m_Count = 0; Hashtable m_Cache = new Hashtable(); List<Message> GetUserMessageCache(String user) { if (!m_Cache.ContainsKey(user)) { m_Cache.Add(user, new List<Message>()); } return m_Cache[user] as List<Message>; } /// <summary> /// 清除緩存 /// </summary> public void Clear() { lock (m_Cache) { List<Message> msgs = new List<Message>(); foreach (DictionaryEntry ent in m_Cache) { (ent.Value as List<Message>).Clear(); } m_Count = 0; } } /// <summary> /// 獲取全部緩存的消息 /// </summary> /// <returns></returns> public List<Message> GetAll() { lock (m_Cache) { List<Message> msgs = new List<Message>(); foreach (DictionaryEntry ent in m_Cache) { foreach (Message msg in ent.Value as List<Message>) { msgs.Add(msg); } } return msgs; } } /// <summary> /// 獲取某一用戶緩存的消息的最小時間 /// </summary> public Nullable<DateTime> GetMinCreatedTime(string user) { lock (m_Cache) { List<Message> userMsgs = GetUserMessageCache(user); return userMsgs.Count == 0 ? null : new Nullable<DateTime>(userMsgs[0].CreatedTime); } } /// <summary> /// 在緩存中插入一條消息 /// </summary> /// <param name="user"></param> /// <param name="msg"></param> public void Insert(String user, Message msg) { List<Message> userMsgs = null; lock (m_Cache) { userMsgs = GetUserMessageCache(user); } lock (userMsgs) { userMsgs.Add(msg); m_Count++; } } /// <summary> /// 查找緩存中接受者爲user,發送時間大於from的消息 /// </summary> public List<Message> Find(String user, DateTime from) { List<Message> userMsgs = null; lock (m_Cache) { userMsgs = GetUserMessageCache(user); } lock (userMsgs) { List<Message> msgs = new List<Message>(); int i = 0; while (i < userMsgs.Count && userMsgs[i].CreatedTime <= from) i++; while (i < userMsgs.Count) { msgs.Add(userMsgs[i]); i++; } return msgs; } } /// <summary> /// 獲取消息總量 /// </summary> public Int32 Count { get { return m_Count; } } }
增長消息緩存後,添加消息監聽器的流程也要修改,具體思路是先獲取消息接收者在緩存中發送時間最先的消息的發送時間,顯然,若是監聽器的From大於或等於這個最小發送時間時,無需訪問數據庫,能夠直接訪問緩存。具體代碼修改成:code
/// <summary> /// 添加消息監聽器,若是查找到符合監聽器條件的消息,返回false,此時不會添加監聽器 /// 若是沒有查找到符合監聽器條件的消息,返回true,此時監聽器將被添加到m_Listeners中 /// </summary> public bool AddListener(String receiver, String sender, Nullable<DateTime> from, WebIM_AsyncResult asynResult) { MessageListener listener = new MessageListener(receiver, sender, from, asynResult); lock (m_Lock) { if (!m_Listeners.ContainsKey(receiver)) { m_Listeners.Add(receiver, new List<MessageListener>()); } List<MessageListener> listeners = m_Listeners[receiver] as List<MessageListener>; //獲取用戶receiver緩存的消息的最小發送時間 Nullable<DateTime> min = MessageCacheManagement.Instance.GetMinCreatedTime(receiver); List<Message> messages = new List<Message>(); //當from >= 緩存在內存中的消息的最小時間時,沒必要查詢數據庫 if (min == null || from == null || from.Value < min.Value) { //查詢數據庫 messages.AddRange(Find(receiver, sender, from)); } //在緩存中查詢 messages.AddRange(MessageCacheManagement.Instance.Find(receiver, from.Value)); if (messages.Count == 0) { //插入監聽器 listeners.Add(listener); } else { //發送消息 listener.Send(messages); } return messages.Count == 0; } }
增長消息緩存後,發送消息的流程也要修改,具體思路是:先將消息保存到緩存中,以後判斷緩存的消息的總數,若是超過設定的上限,就將消息寫入數據庫。具體代碼修改成(您能夠經過修改MAX_CACHE_COUNT修改緩存消息數的上限):htm
/// <summary> /// 插入新的消息,插入消息後將查詢m_Listeners中是否有符合條件的監聽器,如存在,同時將消息發送出去 /// </summary> public Message NewMessage(String receiver, String sender, DateTime createdTime, String content) { lock (m_Lock) { Message message = new Message(sender, receiver, content, createdTime, ++m_MaxKey); List<Message> messages = new List<Message>(); messages.Add(message); if (m_Listeners.ContainsKey(receiver)) { List<MessageListener> listeners = m_Listeners[receiver] as List<MessageListener>; List<MessageListener> removeListeners = new List<MessageListener>(); foreach (MessageListener listener in listeners) { if ((listener.Sender == "*" || String.Compare(listener.Sender, sender, true) == 0) && (listener.From == null || message.CreatedTime > listener.From)) { listener.Send(messages); removeListeners.Add(listener); System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(listener.Complete)); } } foreach (MessageListener listener in removeListeners) { //移除監聽器 listeners.Remove(listener); } } MessageCacheManagement.Instance.Insert(receiver, message); if (MessageCacheManagement.Instance.Count >= MAX_CACHE_COUNT) {//超過緩存的最大值,將緩存中的消息所有寫入數據庫 //啓動事務 SQLiteTransaction trans = m_Conn.BeginTransaction(); try { List<Message> cacheMsgs = MessageCacheManagement.Instance.GetAll(); foreach (Message msg in cacheMsgs) {SQLiteCommand cmd = new SQLiteCommand( "insert into Message (Receiver,Sender,Content,CreatedTime,Key) values (?,?,?,?,?)", m_Conn ); cmd.Parameters.Add("Receiver", DbType.String).Value = msg.Receiver; cmd.Parameters.Add("Sender", DbType.String).Value = msg.Sender; cmd.Parameters.Add("Content", DbType.String).Value = msg.Content; cmd.Parameters.Add("CreatedTime", DbType.DateTime).Value = msg.CreatedTime; cmd.Parameters.Add("Key", DbType.Int64).Value = msg.Key; cmd.ExecuteNonQuery(); } trans.Commit(); } catch { trans.Rollback(); } MessageCacheManagement.Instance.Clear(); } return message; } }
本文來自:http://www.cnblogs.com/lucc/archive/2010/04/27/1722470.htmlblog