上兩篇文章咱們介紹了時序數據庫Influxdb在互聯網級監控系統下的應用:html
互聯網級監控系統必備-時序數據庫之Influxdb技術數據庫
互聯網級監控系統必備-時序數據庫之Influxdb集羣及踩過的坑緩存
在咱們監控平臺V1.0和V2.0版本的演進過程當中,設計上,咱們在監控引擎端引入了內存存儲的理念,即監控數據內存槽。安全
爲何須要一個內存存儲來作監控數據的內存槽,它的應用場景是什麼?服務器
一. 從實際應用場景出發數據結構
首先,咱們看一個實際的監控圖表:配置中心服務的TPM多線程
橫軸是時間,縱軸是數值。每分鐘一個點,固然也能夠每10s一個點,一段時間區間內全部的點,鏈接成一條曲線,如圖所示:一段時間內配置中心服務的TPM實時監控圖表。併發
在咱們的服務容器中,每次服務調用咱們都會上報一次監控數據,即服務的耗時和相關的容器數據、緯度數據等。分佈式
由點及線,咱們看每一個點背後的數據:上圖中,21:05:59:996這個時間點配置中心服務的TPM是10K,即這一分鐘內發生了1W次調用,上報了1W個監控數據!函數
咱們統計了每 1 Minute內服務的調用次數,相似的咱們還能夠統計每30s, 10s, 1s的調用次數。不管1 Minute,30s,20s,1s,這都表明了一個Time Window:時間窗口。
有了這些上報的監控數據,咱們就能夠在一個Time Window範圍內進行監控數據的採樣、分析。監控數據採樣主要是在各個緯度下取這個Time Window下全部監控數據的最大值、最小值、個數、合計值、平均
值、最新值、第一個值等,示例中的TPM則是取的一分鐘內的監控數據的個數,一樣的,若是取平均值,即表明了服務的平均響應時間。採樣後的數據寫入Influxdb就能夠展示了。
所以,在監控引擎內部須要一個內存結構來存儲每一個Time Window的監控數據,用於後續的監控數據採樣和分析。
目前,咱們的監控平臺有2000+的監控項,這麼大的監控體量,天天上報2TB的監控數據,摺合每分鐘1.42GB監控數據。這個內存存儲結構應該如何設計?如何保證監控的準確性、實時性、有序性和併發性?
二. 言歸正傳,用演進的角度看監控內存存儲結構的設計
1. 監控數據結構優化
海量的監控數據實時地從各個服務器上報到監控引擎中。首先咱們要保證監控數據儘量的小:這樣才能傳輸的更快、更多。
2. 連續的內存仍是分佈的內存
監控引擎接收到上報的監控數據後,首先要將數據緩存到內存中,這時有兩種選擇:一直連續寫?分佈並行寫? 孰優孰劣?
一直連續寫:內存相對來講是連續的,但這是暫時的,當數據到達必定數量後,不少集合要Double擴容,內存瞬間的壓力會很大!同時,數據寫入是併發的,要保證線程安全,防止寫入失敗!
分佈並行寫:以空間換時間,同時下降了並行寫入產生的鎖爭用問題,同時能夠避免內存Double、頻繁擴容的問題,惟一的劣勢就是,內存佔用多一點,須要實現內存的分佈式管理
監控數據必須快速、及時的獲得處理,實時性要求很高,同時,內存的成本基本可控,所以,咱們選擇了分佈並行寫的策略!
3. 內存存儲的Sharding設計
在內存存儲的設計上,咱們借鑑了數據庫Sharding的思路。爲何要作Sharding?假如將全部的監控數據都存儲在一個內存集合中,每一類監控項在採樣時,都要訪問這個內存集合,線程爭用很大,
必須加鎖控制,此時會產生相似數據庫阻塞的「內存線程阻塞」。所以:
按監控項作垂直拆分:每一個監控項擁有本身單獨的內存存儲空間。內存的Sharding Key就是監控項。這樣的設計,提高了內存操做的並行度,同時減小了鎖爭用。採樣的各個線程操做各自的內存,只有線
程內存的計算處理,採樣線程間沒有交叉爭用。一個監控項一個內存存儲後,數據寫入和採樣數據讀取依然有線程爭用問題!數據在實時不斷的寫入,同時採樣線程在實時指定Time Window的數據進行採樣。
繼續優化改進。
按時間作水平拆分:每個監控項一個內存存儲,同時再按分鐘進行水平拆分,每分鐘又是一個單獨的子存儲。即一個內存槽下60個槽點。每分鐘一個槽點。
此時,監控數據接收時,首先根據監控數據對應的監控項,定位到對應的內存槽,第二步根據監控數據上的時間,定位到具體分鐘槽點。數據寫入的速度又提高了。同時,採樣線程讀取指定Time Window
下的數據時,能夠快速根據時間緯度,找到指定的槽點。讀取的速度也提高了!
4. 內存中讀寫分離設計
將內存中監控數據的讀寫分開,下降數據的讀寫爭用。
數據寫線程:實時並行寫入各個內存槽和槽點中(監控項和時間緯度)
採樣讀線程:讀取指定監控項對應的Time Window下的數據。
監控數據採樣有個前提:例如要對上一分鐘的數據進行採樣,首先要保證上一分鐘的數據儘量的所有到位,這樣數據纔會準確。所以有個延遲時間的問題,例如5s延遲,21:57:05時,咱們認爲21:56的數
據已經所有到位。這樣的話,咱們就能夠採用一個較小的延遲時間,來保證數據的準確性的同時,下降數據讀寫的爭用。
經過設置了一個合理的延遲時間(5s),21:57:05時:
採樣讀線程:讀取21:56對應槽點下的數據進行採樣分析
數據寫線程:將監控數據寫入21:57對應的內存槽點中。
讀寫分開,內存爭用很是少,性能和並行度再一次提高!!
5. 多線程優化
基於線程池,線程內的執行邏輯儘量的快,使用完後當即放回線程池中。防止線程佔用帶來的線程暴漲問題!
三. 代碼分享
1. 監控數據本地存儲
1 public class MonitorLocalStore 2 { 3 private Dictionary<int, Queue<MonitorData>> cache; 4 5 private static object syncObj = new object(); 6 7 //監控元數據ID 8 private string metaDataID; 9 10 public DateTime CreateTime; 11 /// <summary> 12 /// 構造函數 13 /// </summary> 14 /// <param name="metaDataID">監控元數據ID</param> 15 public MonitorLocalStore(string metaDataID) 16 { 17 this.metaDataID = metaDataID; 18 cache = new Dictionary<int, Queue<MonitorData>>(); 19 for (int i = 0; i < 60; i++) 20 { 21 cache.Add(i, new Queue<MonitorData>()); 22 } 23 CreateTime = DateTime.Now; 24 } 25 26 public void Add(MonitorData value) 27 { 28 if (!cache.ContainsKey(value.Time.Minute)) 29 { 30 throw new Exception("Cannot find Time slot: " + value.Time.ToString() + ", Current slots: " + string.Join(",", cache.Keys)); 31 } 32 cache[value.Time.Minute].Enqueue(value); 33 } 34 35 public void Add(IEnumerable<MonitorData> valueSet) 36 { 37 Parallel.ForEach(valueSet, (i) => 38 { 39 cache[i.Time.Minute].Enqueue(i); 40 } 41 ); 42 } 43 44 public List<MonitorData> Get(params int[] scope) 45 { 46 var valueSet = new List<MonitorData>(); 47 foreach (var item in scope) 48 { 49 while (cache[item].Count > 0) 50 { 51 MonitorData data = cache[item].Dequeue(); 52 if (data != null) 53 { 54 valueSet.Add(data); 55 } 56 } 57 //valueSet.AddRange(cache[item]); 58 cache.Remove(item); 59 cache.Add(item, new Queue<MonitorData>()); 60 } 61 62 return valueSet; 63 } 64 65 /// <summary> 66 /// 獲取本地緩存的總容量 67 /// </summary> 68 /// <returns>本地緩存的總容量</returns> 69 public long GetCapcity() 70 { 71 var length = 0; 72 foreach (var item in cache) 73 { 74 length += item.Value.Count; 75 } 76 77 return length; 78 } 79 80 public void Ecvit(params int[] scope) 81 { 82 lock (syncObj) 83 { 84 foreach (var item in scope) 85 { 86 cache[item] = new Queue<MonitorData>(); 87 } 88 } 89 } 90 91 public Dictionary<int, Queue<MonitorData>> GetCache() 92 { 93 return cache; 94 } 95 }
2. 監控存儲分佈式管理
1 class MonitorLocalStoreManager 2 { 3 private static object syncObj = new object(); 4 private ConcurrentDictionary<string, MonitorLocalStore> storeDic; 5 private List<string> metaDataList; 6 7 /// <summary> 8 /// 輸出方法委託 9 /// </summary> 10 public Action<string> Output { get; set; } 11 /// <summary> 12 /// 構造函數 13 /// </summary> 14 private MonitorLocalStoreManager() 15 { 16 storeDic = new ConcurrentDictionary<string, MonitorLocalStore>(); 17 } 18 19 //監控數據本地存儲管理器實例 20 private static MonitorLocalStoreManager instance; 21 22 /// <summary> 23 /// 獲取監控數據本地存儲管理器實例 24 /// </summary> 25 /// <returns>監控數據本地存儲管理器實例</returns> 26 public static MonitorLocalStoreManager GetInstance() 27 { 28 if (instance == null) 29 { 30 lock (syncObj) 31 { 32 if (instance == null) 33 { 34 instance = new MonitorLocalStoreManager(); 35 } 36 } 37 } 38 39 return instance; 40 } 41 42 /// <summary> 43 /// 加載監控元數據存儲 44 /// </summary> 45 /// <param name="metaDataList">監控元數據列表</param> 46 internal void LoadMetaDataStore(List<string> metaDataList) 47 { 48 this.metaDataList = metaDataList; 49 foreach (var metaDataID in metaDataList) 50 { 51 GetOrCreateStore(metaDataID); 52 } 53 } 54 55 /// <summary> 56 /// 根據監控元數據ID獲取本機緩存Store 57 /// </summary> 58 /// <param name="metaDataID">監控元數據ID</param> 59 /// <returns>本機緩存Store</returns> 60 public MonitorLocalStore GetOrCreateStore(string metaDataID) 61 { 62 MonitorLocalStore store = null; 63 if (!storeDic.ContainsKey(metaDataID)) 64 { 65 lock (syncObj) 66 { 67 if (!storeDic.ContainsKey(metaDataID)) 68 { 69 if (!metaDataList.Contains(metaDataID)) 70 { 71 LocalErrorLogService.Write("Find New MetaData:" + metaDataID); 72 } 73 store = new MonitorLocalStore(metaDataID); 74 storeDic.TryAdd(metaDataID, store); 75 } 76 } 77 } 78 else 79 { 80 storeDic.TryGetValue(metaDataID, out store); 81 } 82 83 return store; 84 } 85 86 internal ConcurrentDictionary<string, MonitorLocalStore> GetStore() 87 { 88 return storeDic; 89 } 90 }
總結:
監控引擎使用了內存存儲來接收緩存上報上來的監控數據。
使用場景:監控數據實時寫入,大批量寫入,定時採樣歸集。
技術挑戰:快速寫入、多線程讀寫安全、內存快速分配和釋放,防止內存暴漲帶來的Full GC和高CPU。
設計上的一些好的經驗:
周國慶
2017/8/24