圖4 消息分發架構圖算法
分析管理器的初始化服務器
咱們在第一章講到服務器將接收到的消息交給解碼器(MessageDecoder)去作解碼最後交給具體的消費者(RealtimeConsumer)去消費消息。架構
RealtimeConsumer 是在何時被建立初始化? 在第一章咱們講到,CatHomeModule經過調用setup安裝完成以後,會調用 execute 進行初始化的工做, 在execute方法中調用ctx.lookup(MessageConsumer.class) 方法來經過容器實例化RealtimeConsumer。dom
在消費者中,最重要的一個概念就是消息分析器(MessageAnalyzer),全部的消息分析統計,報表建立都是由消息分析器來完成,全部的分析器(MessageAnalyzer)都由消息分析器管理對象(MessageAnalyzerManager)管理,RealtimeConsumer就擁有消息分析器管理對象的指針,在消費者初始化以前,咱們會先實例化 MessageAnalyzerManager,而後調用initialize() 方法初始化分析管理器。異步
public class DefaultMessageAnalyzerManager extends ContainerHolder implements MessageAnalyzerManager, Initializable, LogEnabled { private List<String> m_analyzerNames; private Map<Long, Map<String, List<MessageAnalyzer>>> m_analyzers = new HashMap<Long, Map<String, List<MessageAnalyzer>>>(); @Override public void initialize() throws InitializationException { Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class); for (MessageAnalyzer analyzer : map.values()) { analyzer.destroy(); } m_analyzerNames = new ArrayList<String>(map.keySet()); ... } }
initialize() 方法經過IOC容器的lookupMap方法,找到全部的消息分析器。一共12個,以下圖,而後取出分析器的名字,放到m_analyzerNames 列表裏,能夠認爲每一個名字對應一種分析器,不一樣的分析器都將從不一樣角度去分析、統計上報的消息,彙總以後生成不一樣的報表,咱們若是有本身的擴展需求,須要對消息作其它處理,也能夠添加本身的分析器,只須要符合CAT準則便可。async
消費者與週期管理器的初始化ide
消息分析器管理對象初始化以後,RealtimeConsumer 會執行 initialize() 來實現自身的初始化,函數
public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled { @Inject private MessageAnalyzerManager m_analyzerManager; private PeriodManager m_periodManager; @Override public void initialize() throws InitializationException { m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger); m_periodManager.init(); Threads.forGroup("cat").start(m_periodManager); } }
RealtimeConsumer的初始化很簡單,僅包含3行代碼,它的任務就是實例化並初始化週期管理器(PeriodManager),並將分析器管理對象(MessageAnalyzerManager)的指針傳給它,PeriodManager保留分析管理器指針僅僅用於在啓動一個Period的時候,將MessageAnalyzerManager的指針傳遞給Period。spa
PeriodManager的構造函數中,最核心的工做就是建立一個週期策略對象(PeriodStrategy),每一個週期的開始/結束會參考PeriodStrategy的計算結果,變量duration是每一個週期的長度,默認是1個小時,並且週期時間是整點時段,例如:1:00-2:00, 2:00-3:00,週期時間是報表的最小統計單元,即分析器產生的每一個報表對象,都是當前週期時間內的統計信息。線程
接下來RealtimeConsumer將會調用 m_periodManager.init() 啓動第一個週期,仍是上面代碼,咱們會計算當前時間所處的週期的開始時間,是當前時間的整點時間,好比如今是 13:50, 那麼startTime=13:00,而後entTime=startTime + duration 算得結束時間爲 14:00, 而後根據起始結束時間來建立 Period 對象,傳入分析器的指針。並將週期對象加入到m_periods列表交給PeriodManager管理。最後調用period.start 啓動第一個週期。
public class PeriodManager implements Task { private PeriodStrategy m_strategy; private List<Period> m_periods = new ArrayList<Period>(); public PeriodManager(long duration, MessageAnalyzerManager analyzerManager, ServerStatisticManager serverStateManager, Logger logger) { m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME); m_active = true; m_analyzerManager = analyzerManager; m_serverStateManager = serverStateManager; m_logger = logger; } public void init() { long startTime = m_strategy.next(System.currentTimeMillis()); startPeriod(startTime); } private void startPeriod(long startTime) { long endTime = startTime + m_strategy.getDuration(); Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger); m_periods.add(period); period.start(); } }
咱們再回到ReatimeConsumer的initialize()初始化方法,第三行代碼,Threads.forGroup("cat").start(m_periodManager) 將開啓一個週期管理線程,線程執行代碼以下run()函數,每隔1秒鐘會計算是否須要開啓一個新的週期,value>0就開啓新的週期, value=0啥也不幹,value<0的異步開啓一個新線程結束上一個週期。結束線程調用PeriodManager的endPeriod(long startTime)方法完成周期的清理工做,而後將period從m_periods列表移除出去。
public class PeriodManager implements Task { private List<Period> m_periods = new ArrayList<Period>(); @Override public void run() { while (m_active) { try { long now = System.currentTimeMillis(); long value = m_strategy.next(now); if (value > 0) { startPeriod(value); } else if (value < 0) { // last period is over,make it asynchronous Threads.forGroup("cat").start(new EndTaskThread(-value)); } } catch (Throwable e) { Cat.logError(e); } Thread.sleep(1000L); } } private void endPeriod(long startTime) { int len = m_periods.size(); for (int i = 0; i < len; i++) { Period period = m_periods.get(i); if (period.isIn(startTime)) { period.finish(); m_periods.remove(i); break; } } } }
好了,咱們在上兩節講了分析器的初始化,週期管理器的初始化,那麼,什麼是週期?爲何會有周期?他是如何工做的?
能夠認爲週期Period就是一個消息分發的控制器,至關於MVC的Controller,受PeriodManager的管理,全部客戶端過來的消息,都會根據消息時間戳從PeriodManager中找到消息所屬的週期對象(Period),由該週期對象來派發消息給每一個註冊的分析器(MessageAnalyzer)來對消息作具體的處理。
然而Period並非直接對接分析器(MessageAnalyzer), 而是經過PeriodTask來與MessageAnalyzer交互,Period類有個成員變量m_tasks, 類型爲Map<String, List<PeriodTask>>, Map的key是String類型,表示分析器的名字,好比top、cross、transaction、event等等,咱們一共有12種類別的分析器,不過實際處理過程當中,CAT做者移除了他認爲比較雞肋的Matrix、Dependency兩個分析器,只剩下10個分析器了,如圖10。
m_analyzerNames.remove("matrix");
m_analyzerNames.remove("dependency");
圖10:參與任務處理的分析器名稱
Map的value爲List<PeriodTask> 是一個週期任務的列表, 也就是說,每一種類別的分析器,都會有至少一個MessageAnalyzer的實例,每一個MessageAnalyzer都由一個對應的PeriodTask來分配任務,MessageAnalyzer與PeriodTask是1對1的關係,每種類別分析器具體有多少個實例由 getAnalyzerCount() 函數決定,默認是 1 個, 可是有些分析任務很是耗時,須要多個線程來處理,保證處理效率,好比 TransactionAnalyzer就是2個。
public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionReport> implements LogEnabled { @Override public int getAnalyzerCount() { return 2; } }
消息分發的時候,每一筆消息默認都會發送到全部種類分析器處理,可是同一種類別的分析器下若是有多個MessageAnalyzer實例,採用domain hash 選出其中一個實例安排處理消息,分發算法參考下面源碼:
public class Period { private Map<String, List<PeriodTask>> m_tasks; public void distribute(MessageTree tree) { ... String domain = tree.getDomain(); for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) { List<PeriodTask> tasks = entry.getValue(); int length = tasks.size(); int index = 0; boolean manyTasks = length > 1; if (manyTasks) { index = Math.abs(domain.hashCode()) % length; } PeriodTask task = tasks.get(index); boolean enqueue = task.enqueue(tree); ... } ... } }
上一節咱們講到與MessageAnalyzer交互是由PeriodTask來完成的,那麼週期任務PeriodTask在哪裏被建立?他怎麼與分析器進行交互, 在Period實例化的同時,PeriodTask就被建立了,咱們看看Period類的構造函數:
public class Period { private Map<String, List<PeriodTask>> m_tasks; public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager, ServerStatisticManager serverStateManager, Logger logger) { ... List<String> names = m_analyzerManager.getAnalyzerNames(); m_tasks = new HashMap<String, List<PeriodTask>>(); for (String name : names) { List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime); for (MessageAnalyzer analyzer : messageAnalyzers) { MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE); PeriodTask task = new PeriodTask(analyzer, queue, startTime); //加入 m_tasks ... } } } }
構造函數首先獲取全部分析器名字,咱們說過每一個名字對應一種分析器,而後根據分析器名字和週期時間,獲取當前週期、該種類分析器全部實例,以前說過,有些類別分析任務邏輯複雜,耗時長,會須要更多的分析線程處理,爲每一個分析器都建立一個 PeriodTask,併爲每個PeriodTask建立任務隊列。客戶端消息過來,會由Period分發給全部種類的PeriodTask,同一類分析器下有多個分析器(MessageAnalyzer)的時候,只有一個MessageAnalyzer會被分發,採用domain hash選出這個實例,在這裏,分發實際上就是插入PeriodTask的任務隊列。
構造函數最後將建立PeriodTask加入m_tasks中。
在Period被實例化以後, 週期管理器(PeriodManager)就調用 period.start() 開啓一個週期了,start邏輯很簡單, 就是啓動period下全部週期任務(PeriodTask)線程。任務線程也很是簡單,就是調用本身的分析器的分析函數analyze(m_queue)來處理消息。
public class PeriodTask implements Task, LogEnabled { private MessageAnalyzer m_analyzer; private MessageQueue m_queue; @Override public void run() { try { m_analyzer.analyze(m_queue); } catch (Exception e) { Cat.logError(e); } } }
接下來咱們看下分析函數作了什麼,下面是源碼,只展現了核心邏輯部分,分析程序輪訓從PeriodTask傳入的任務隊列中取出消息,而後調用process處理,具體的處理邏輯就是由process完成的,process是一個抽象函數,具體實現由各類類分析器子類來實現,咱們將在下一章分別講解。
固然這裏的前提是分析器處在激活狀態,而且本週期未結束,結束的定義是當前時間比周期時間+延遲結束時間更晚,延遲結束時間會在後面週期策略章節詳細講解,一旦週期結束,分析器將會把剩餘的消息分析完而後關閉。
public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder implements MessageAnalyzer { protected abstract void process(MessageTree tree); @Override public void analyze(MessageQueue queue) { while (!isTimeout() && isActive()) { MessageTree tree = queue.poll(); if (tree != null) { ... process(tree); ... } } ... } protected boolean isTimeout() { long currentTime = System.currentTimeMillis(); long endTime = m_startTime + m_duration + m_extraTime; return currentTime > endTime; } }
消息從客戶端發上來,是如何到達PeriodTask的,又是如何分配分析器的?
客戶端消息發送到服務端,通過解碼以後,就調用 MessageConsumer的 consume() 函數對消息進行消費。源碼以下:
public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled { @Override public void consume(MessageTree tree) { String domain = tree.getDomain(); String ip = tree.getIpAddress(); if (!m_blackListManager.isBlack(domain, ip)) { long timestamp = tree.getMessage().getTimestamp(); Period period = m_periodManager.findPeriod(timestamp); if (period != null) { period.distribute(tree); } else { m_serverStateManager.addNetworkTimeError(1); } } else { m_black++; if (m_black % CatConstants.SUCCESS_COUNT == 0) { Cat.logEvent("Discard", domain); } } } }
consume函數會首先判斷domain和ip是否黑名單,若是是黑名單,丟棄消息,不然,根據消息時間戳,找到對應的週期(Period),交給Period對消息進行分發,分發邏輯前面講過,Period將消息插入PeriodTask隊列,由分析器(MessageAnalyzer)輪訓從隊列裏面取消息進行具體處理,每筆消息默認會被全部類別分析器處理,當同一類別分析器有多個MessageAnalyzer實例的時候,選擇其中一個處理,選擇算法:
Math.abs(domain.hashCode()) % length
詳細的源碼可參考章節什麼是週期?
在建立週期策略對象的時候,會傳入3個參數,一個是duration,也就是每一個週期的時間長度,默認爲1個小時,另外兩個extraTime和aheadTime分別表示我提早啓動一個週期的時間和延遲結束一個週期的時間,默認都是3分鐘,咱們並不會卡在整點時間,例如10:00去開啓或結束一個週期,由於週期建立是須要消耗必定時間,這樣能夠避免消息過來週期對象還未建立好,或者消息尚未處理完,就要去結束週期。
固然,即便提早建立了週期對象(Period),並不意味着就會當即被分發消息,只有到了該週期時間纔會被分發消息。
下面看看具體的策略方法,咱們首先計算當前時間的週期啓動時間(startTime),是當前時間的整點時間,好比當前時間是 22:47.123,那麼startTime就是 22:00.000,注意這裏的時間都是時間戳,單位爲毫秒。
接下來判斷是否開啓當前週期,若是startTime大於上次週期啓動時間(m_lastStartTime),說明應該開啓新的週期,因爲m_lastStartTime初始化爲 -1, 因此CAT服務端初始化以後第一個週期會執行到這裏,並記錄m_lastStartTime。
上面if若是未執行,咱們會判斷當前時間比起上次週期啓動時間是否是已通過了 57 分鐘(duration - aheadTime ),即提早3分鐘啓動下一個週期。
若是上面if還未執行,咱們則認爲當前週期已經被啓動,那麼會判斷是否須要結束當前週期,即當前時間比起上次週期啓動時間是否是已通過了 63 分鐘(duration + extraTime),即延遲3分鐘關閉上一個週期。
public class PeriodStrategy { public long next(long now) { long startTime = now - now % m_duration; // for current period if (startTime > m_lastStartTime) { m_lastStartTime = startTime; return startTime; } // prepare next period ahead if (now - m_lastStartTime >= m_duration - m_aheadTime) { m_lastStartTime = startTime + m_duration; return startTime + m_duration; } // last period is over if (now - m_lastEndTime >= m_duration + m_extraTime) { long lastEndTime = m_lastEndTime; m_lastEndTime = startTime; return -lastEndTime; } return 0; } }