互聯網級監控平臺以內存存儲的設計和優化

上兩篇文章咱們介紹了時序數據庫Influxdb在互聯網級監控系統下的應用:html

互聯網級監控系統必備-時序數據庫之Influxdb技術數據庫

互聯網級監控系統必備-時序數據庫之Influxdb集羣及踩過的坑緩存

在咱們監控平臺V1.0和V2.0版本的演進過程當中,設計上,咱們在監控引擎端引入了內存存儲的理念,即監控數據內存槽。安全

爲何須要一個內存存儲來作監控數據的內存槽,它的應用場景是什麼?服務器

一. 從實際應用場景出發數據結構

   首先,咱們看一個實際的監控圖表:配置中心服務的TPM多線程

      橫軸是時間,縱軸是數值。每分鐘一個點,固然也能夠每10s一個點,一段時間區間內全部的點,鏈接成一條曲線,如圖所示:一段時間內配置中心服務的TPM實時監控圖表。併發

      在咱們的服務容器中,每次服務調用咱們都會上報一次監控數據,即服務的耗時和相關的容器數據、緯度數據等。分佈式

      由點及線,咱們看每一個點背後的數據:上圖中,21:05:59:996這個時間點配置中心服務的TPM是10K,即這一分鐘內發生了1W次調用,上報了1W個監控數據函數

咱們統計了每 1 Minute內服務的調用次數,相似的咱們還能夠統計每30s, 10s, 1s的調用次數。不管1 Minute,30s,20s,1s,這都表明了一個Time Window:時間窗口。

有了這些上報的監控數據,咱們就能夠在一個Time Window範圍內進行監控數據的採樣、分析。監控數據採樣主要是在各個緯度下取這個Time Window下全部監控數據的最大值、最小值、個數、合計值、平均

值、最新值、第一個值等,示例中的TPM則是取的一分鐘內的監控數據的個數,一樣的,若是取平均值,即表明了服務的平均響應時間。採樣後的數據寫入Influxdb就能夠展示了。

  所以,在監控引擎內部須要一個內存結構來存儲每一個Time Window的監控數據,用於後續的監控數據採樣和分析。

  目前,咱們的監控平臺有2000+的監控項,這麼大的監控體量,天天上報2TB的監控數據,摺合每分鐘1.42GB監控數據。這個內存存儲結構應該如何設計?如何保證監控的準確性、實時性、有序性和併發性?

二.  言歸正傳,用演進的角度看監控內存存儲結構的設計

1. 監控數據結構優化

  海量的監控數據實時地從各個服務器上報到監控引擎中。首先咱們要保證監控數據儘量的小:這樣才能傳輸的更快、更多。

  •   不須要的數據不要上報
  •   採用ProtoBuf序列化,儘量的下降數據序列化帶來的性能消耗,保證序列化後的數據的體量要小

2. 連續的內存仍是分佈的內存 

  監控引擎接收到上報的監控數據後,首先要將數據緩存到內存中,這時有兩種選擇:一直連續寫?分佈並行寫? 孰優孰劣?

  一直連續寫:內存相對來講是連續的,但這是暫時的,當數據到達必定數量後,不少集合要Double擴容,內存瞬間的壓力會很大!同時,數據寫入是併發的,要保證線程安全,防止寫入失敗!

       分佈並行寫:以空間換時間,同時下降了並行寫入產生的鎖爭用問題,同時能夠避免內存Double、頻繁擴容的問題,惟一的劣勢就是,內存佔用多一點,須要實現內存的分佈式管理

       監控數據必須快速、及時的獲得處理,實時性要求很高,同時,內存的成本基本可控,所以,咱們選擇了分佈並行寫的策略!

3. 內存存儲的Sharding設計

  在內存存儲的設計上,咱們借鑑了數據庫Sharding的思路。爲何要作Sharding?假如將全部的監控數據都存儲在一個內存集合中,每一類監控項在採樣時,都要訪問這個內存集合,線程爭用很大,

必須加鎖控制,此時會產生相似數據庫阻塞的「內存線程阻塞」。所以:

      按監控項作垂直拆分:每一個監控項擁有本身單獨的內存存儲空間。內存的Sharding Key就是監控項。這樣的設計,提高了內存操做的並行度,同時減小了鎖爭用。採樣的各個線程操做各自的內存,只有線

程內存的計算處理,採樣線程間沒有交叉爭用。一個監控項一個內存存儲後,數據寫入和採樣數據讀取依然有線程爭用問題!數據在實時不斷的寫入,同時採樣線程在實時指定Time Window的數據進行採樣。

繼續優化改進。

  按時間作水平拆分:每個監控項一個內存存儲,同時再按分鐘進行水平拆分,每分鐘又是一個單獨的子存儲。即一個內存槽下60個槽點。每分鐘一個槽點

  此時,監控數據接收時,首先根據監控數據對應的監控項,定位到對應的內存槽,第二步根據監控數據上的時間,定位到具體分鐘槽點。數據寫入的速度又提高了。同時,採樣線程讀取指定Time Window

下的數據時,能夠快速根據時間緯度,找到指定的槽點。讀取的速度也提高了!

4. 內存中讀寫分離設計

  將內存中監控數據的讀寫分開,下降數據的讀寫爭用。

  數據寫線程:實時並行寫入各個內存槽和槽點中(監控項和時間緯度)

       採樣讀線程:讀取指定監控項對應的Time Window下的數據。

  監控數據採樣有個前提:例如要對上一分鐘的數據進行採樣,首先要保證上一分鐘的數據儘量的所有到位,這樣數據纔會準確。所以有個延遲時間的問題,例如5s延遲,21:57:05時,咱們認爲21:56的數

據已經所有到位。這樣的話,咱們就能夠採用一個較小的延遲時間,來保證數據的準確性的同時,下降數據讀寫的爭用。

  經過設置了一個合理的延遲時間(5s),21:57:05時:

  採樣讀線程:讀取21:56對應槽點下的數據進行採樣分析

       數據寫線程:將監控數據寫入21:57對應的內存槽點中。

       讀寫分開,內存爭用很是少,性能和並行度再一次提高!!

5. 多線程優化

  基於線程池,線程內的執行邏輯儘量的快,使用完後當即放回線程池中。防止線程佔用帶來的線程暴漲問題!

三. 代碼分享

1. 監控數據本地存儲

 1  public class MonitorLocalStore
 2     {
 3         private Dictionary<int, Queue<MonitorData>> cache;
 4 
 5         private static object syncObj = new object();
 6 
 7         //監控元數據ID
 8         private string metaDataID;
 9 
10         public DateTime CreateTime;
11         /// <summary>
12         /// 構造函數
13         /// </summary>
14         /// <param name="metaDataID">監控元數據ID</param>
15         public MonitorLocalStore(string metaDataID)
16         {
17             this.metaDataID = metaDataID;
18             cache = new Dictionary<int, Queue<MonitorData>>();
19             for (int i = 0; i < 60; i++)
20             {
21                 cache.Add(i, new Queue<MonitorData>());
22             }
23             CreateTime = DateTime.Now;
24         }
25 
26         public void Add(MonitorData value)
27         {
28             if (!cache.ContainsKey(value.Time.Minute))
29             {
30                 throw new Exception("Cannot find Time slot: " + value.Time.ToString() + ", Current slots: " + string.Join(",", cache.Keys));
31             }
32             cache[value.Time.Minute].Enqueue(value);
33         }
34 
35         public void Add(IEnumerable<MonitorData> valueSet)
36         {
37             Parallel.ForEach(valueSet, (i) =>
38                     {
39                         cache[i.Time.Minute].Enqueue(i);
40                     }
41                 );
42         }
43 
44         public List<MonitorData> Get(params int[] scope)
45         {
46             var valueSet = new List<MonitorData>();
47             foreach (var item in scope)
48             {
49                 while (cache[item].Count > 0)
50                 {
51                     MonitorData data = cache[item].Dequeue();
52                     if (data != null)
53                     {
54                         valueSet.Add(data);
55                     }
56                 }
57                 //valueSet.AddRange(cache[item]);    
58                 cache.Remove(item);
59                 cache.Add(item, new Queue<MonitorData>());
60             }
61 
62             return valueSet;
63         }
64 
65         /// <summary>
66         /// 獲取本地緩存的總容量
67         /// </summary>
68         /// <returns>本地緩存的總容量</returns>
69         public long GetCapcity()
70         {
71             var length = 0;
72             foreach (var item in cache)
73             {
74                 length += item.Value.Count;
75             }
76 
77             return length;
78         }
79 
80         public void Ecvit(params int[] scope)
81         {
82             lock (syncObj)
83             {
84                 foreach (var item in scope)
85                 {
86                     cache[item] = new Queue<MonitorData>();
87                 }
88             }
89         }
90 
91         public Dictionary<int, Queue<MonitorData>> GetCache()
92         {
93             return cache;
94         }
95     }

2. 監控存儲分佈式管理

 1   class MonitorLocalStoreManager
 2     {
 3         private static object syncObj = new object();
 4         private ConcurrentDictionary<string, MonitorLocalStore> storeDic;
 5         private List<string> metaDataList;
 6 
 7         /// <summary>
 8         /// 輸出方法委託
 9         /// </summary>
10         public Action<string> Output { get; set; }
11         /// <summary>
12         /// 構造函數
13         /// </summary>
14         private MonitorLocalStoreManager()
15         {
16             storeDic = new ConcurrentDictionary<string, MonitorLocalStore>();
17         }
18 
19         //監控數據本地存儲管理器實例
20         private static MonitorLocalStoreManager instance;
21 
22         /// <summary>
23         /// 獲取監控數據本地存儲管理器實例
24         /// </summary>
25         /// <returns>監控數據本地存儲管理器實例</returns>
26         public static MonitorLocalStoreManager GetInstance()
27         {
28             if (instance == null)
29             {
30                 lock (syncObj)
31                 {
32                     if (instance == null)
33                     {
34                         instance = new MonitorLocalStoreManager();
35                     }
36                 }
37             }
38 
39             return instance;
40         }
41 
42         /// <summary>
43         /// 加載監控元數據存儲
44         /// </summary>
45         /// <param name="metaDataList">監控元數據列表</param>
46         internal void LoadMetaDataStore(List<string> metaDataList)
47         {
48             this.metaDataList = metaDataList;
49             foreach (var metaDataID in metaDataList)
50             {
51                 GetOrCreateStore(metaDataID);
52             }
53         }
54 
55         /// <summary>
56         /// 根據監控元數據ID獲取本機緩存Store
57         /// </summary>
58         /// <param name="metaDataID">監控元數據ID</param>
59         /// <returns>本機緩存Store</returns>
60         public MonitorLocalStore GetOrCreateStore(string metaDataID)
61         {
62             MonitorLocalStore store = null;
63             if (!storeDic.ContainsKey(metaDataID))
64             {
65                 lock (syncObj)
66                 {
67                     if (!storeDic.ContainsKey(metaDataID))
68                     {
69                         if (!metaDataList.Contains(metaDataID))
70                         {
71                             LocalErrorLogService.Write("Find New MetaData:" + metaDataID);
72                         }
73                         store = new MonitorLocalStore(metaDataID);
74                         storeDic.TryAdd(metaDataID, store);
75                     }
76                 }
77             }
78             else
79             {
80                 storeDic.TryGetValue(metaDataID, out store);
81             }
82 
83             return store;
84         }
85 
86         internal ConcurrentDictionary<string, MonitorLocalStore> GetStore()
87         {
88             return storeDic;
89         }
90

總結:

監控引擎使用了內存存儲來接收緩存上報上來的監控數據。

使用場景:監控數據實時寫入,大批量寫入,定時採樣歸集。

技術挑戰:快速寫入、多線程讀寫安全、內存快速分配和釋放,防止內存暴漲帶來的Full GC和高CPU。

設計上的一些好的經驗:

  • 內存存儲採用Sharding分區,每一個監控項一個內存槽,每一個內存槽分爲60個槽點:以空間換時間,同時避免大內存分配。
  • 多線程寫入時,以監控項做爲分區鍵,路由定位到指定的內存槽,以監控數據上的時間(分鐘)做爲二次分區鍵,存儲到指定內存槽的槽點。實現多線程並行寫入,最大程度上避免了多線程寫入時的鎖爭用。
  • 讀寫線程分離,讀線程設置一個最佳的延遲時間(5~10s),讀取內存槽點的已就緒的數據,此時數據沒有寫入,毫秒級讀取,近乎實時的監控數據採樣。

 

周國慶

2017/8/24

相關文章
相關標籤/搜索