深刻詳解美團點評CAT跨語言服務監控(四)服務端消息分發

這邊首先介紹下大衆點評CAT消息分發大概的架構以下:

 

圖4 消息分發架構圖算法

 

分析管理器的初始化服務器

 

    咱們在第一章講到服務器將接收到的消息交給解碼器(MessageDecoder)去作解碼最後交給具體的消費者(RealtimeConsumer)去消費消息。架構

    RealtimeConsumer 是在何時被建立初始化? 在第一章咱們講到,CatHomeModule經過調用setup安裝完成以後,會調用 execute 進行初始化的工做, 在execute方法中調用ctx.lookup(MessageConsumer.class) 方法來經過容器實例化RealtimeConsumer。dom

    在消費者中,最重要的一個概念就是消息分析器(MessageAnalyzer),全部的消息分析統計,報表建立都是由消息分析器來完成,全部的分析器(MessageAnalyzer)都由消息分析器管理對象(MessageAnalyzerManager)管理,RealtimeConsumer就擁有消息分析器管理對象的指針,在消費者初始化以前,咱們會先實例化 MessageAnalyzerManager,而後調用initialize() 方法初始化分析管理器。異步

public class DefaultMessageAnalyzerManager extends ContainerHolder implements MessageAnalyzerManager, Initializable, LogEnabled {
    private List<String> m_analyzerNames;
    private Map<Long, Map<String, List<MessageAnalyzer>>> m_analyzers = new HashMap<Long, Map<String, List<MessageAnalyzer>>>();
    
    @Override
    public void initialize() throws InitializationException {
        Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class);
        for (MessageAnalyzer analyzer : map.values()) {
            analyzer.destroy();
        }
 
        m_analyzerNames = new ArrayList<String>(map.keySet());
        ...
    }
}

 

    initialize() 方法經過IOC容器的lookupMap方法,找到全部的消息分析器。一共12個,以下圖,而後取出分析器的名字,放到m_analyzerNames 列表裏,能夠認爲每一個名字對應一種分析器,不一樣的分析器都將從不一樣角度去分析、統計上報的消息,彙總以後生成不一樣的報表,咱們若是有本身的擴展需求,須要對消息作其它處理,也能夠添加本身的分析器,只須要符合CAT準則便可。
async

 

消費者與週期管理器的初始化ide

消息分析器管理對象初始化以後,RealtimeConsumer 會執行 initialize() 來實現自身的初始化,函數

public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled {
    @Inject
    private MessageAnalyzerManager m_analyzerManager;
    
    private PeriodManager m_periodManager;
 
    @Override
    public void initialize() throws InitializationException {
        m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
        m_periodManager.init();
 
        Threads.forGroup("cat").start(m_periodManager);
    }
}

    RealtimeConsumer的初始化很簡單,僅包含3行代碼,它的任務就是實例化並初始化週期管理器(PeriodManager),並將分析器管理對象(MessageAnalyzerManager)的指針傳給它,PeriodManager保留分析管理器指針僅僅用於在啓動一個Period的時候,將MessageAnalyzerManager的指針傳遞給Period。spa

    PeriodManager的構造函數中,最核心的工做就是建立一個週期策略對象(PeriodStrategy),每一個週期的開始/結束會參考PeriodStrategy的計算結果,變量duration是每一個週期的長度,默認是1個小時,並且週期時間是整點時段,例如:1:00-2:00, 2:00-3:00,週期時間是報表的最小統計單元,即分析器產生的每一個報表對象,都是當前週期時間內的統計信息。線程

    接下來RealtimeConsumer將會調用 m_periodManager.init() 啓動第一個週期,仍是上面代碼,咱們會計算當前時間所處的週期的開始時間,是當前時間的整點時間,好比如今是 13:50, 那麼startTime=13:00,而後entTime=startTime + duration 算得結束時間爲 14:00, 而後根據起始結束時間來建立 Period 對象,傳入分析器的指針。並將週期對象加入到m_periods列表交給PeriodManager管理。最後調用period.start 啓動第一個週期。

 

public class PeriodManager implements Task {
    private PeriodStrategy m_strategy;
    
    private List<Period> m_periods = new ArrayList<Period>();
    
    public PeriodManager(long duration, MessageAnalyzerManager analyzerManager,
          ServerStatisticManager serverStateManager, Logger logger) {
        m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
        m_active = true;
        m_analyzerManager = analyzerManager;
        m_serverStateManager = serverStateManager;
        m_logger = logger;
    }
    
    public void init() {
        long startTime = m_strategy.next(System.currentTimeMillis());
 
        startPeriod(startTime);
    }
    
    private void startPeriod(long startTime) {
        long endTime = startTime + m_strategy.getDuration();
        Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
 
        m_periods.add(period);
        period.start();
    }
}

 

 

    咱們再回到ReatimeConsumer的initialize()初始化方法,第三行代碼,Threads.forGroup("cat").start(m_periodManager) 將開啓一個週期管理線程,線程執行代碼以下run()函數,每隔1秒鐘會計算是否須要開啓一個新的週期,value>0就開啓新的週期, value=0啥也不幹,value<0的異步開啓一個新線程結束上一個週期。結束線程調用PeriodManager的endPeriod(long startTime)方法完成周期的清理工做,而後將period從m_periods列表移除出去。

public class PeriodManager implements Task {
    private List<Period> m_periods = new ArrayList<Period>();
 
    @Override
    public void run() {
        while (m_active) {
            try {
                long now = System.currentTimeMillis();
                long value = m_strategy.next(now);
 
                if (value > 0) {
                    startPeriod(value);
                } else if (value < 0) {
                    // last period is over,make it asynchronous
                    Threads.forGroup("cat").start(new EndTaskThread(-value));
                }
            } catch (Throwable e) {
                Cat.logError(e);
            }
            
            Thread.sleep(1000L);
        }
    }
    
    private void endPeriod(long startTime) {
        int len = m_periods.size();
 
        for (int i = 0; i < len; i++) {
            Period period = m_periods.get(i);
 
            if (period.isIn(startTime)) {
                period.finish();
                m_periods.remove(i);
                break;
            }
        }
    }
}

 

 

什麼是週期?

    好了,咱們在上兩節講了分析器的初始化,週期管理器的初始化,那麼,什麼是週期?爲何會有周期?他是如何工做的?

    能夠認爲週期Period就是一個消息分發的控制器,至關於MVC的Controller,受PeriodManager的管理,全部客戶端過來的消息,都會根據消息時間戳從PeriodManager中找到消息所屬的週期對象(Period),由該週期對象來派發消息給每一個註冊的分析器(MessageAnalyzer)來對消息作具體的處理。

    然而Period並非直接對接分析器(MessageAnalyzer), 而是經過PeriodTask來與MessageAnalyzer交互,Period類有個成員變量m_tasks, 類型爲Map<String, List<PeriodTask>>, Map的key是String類型,表示分析器的名字,好比top、cross、transaction、event等等,咱們一共有12種類別的分析器,不過實際處理過程當中,CAT做者移除了他認爲比較雞肋的Matrix、Dependency兩個分析器,只剩下10個分析器了,如圖10。

m_analyzerNames.remove("matrix");
m_analyzerNames.remove("dependency");

圖10:參與任務處理的分析器名稱

    Map的value爲List<PeriodTask> 是一個週期任務的列表, 也就是說,每一種類別的分析器,都會有至少一個MessageAnalyzer的實例,每一個MessageAnalyzer都由一個對應的PeriodTask來分配任務,MessageAnalyzer與PeriodTask是1對1的關係,每種類別分析器具體有多少個實例由 getAnalyzerCount() 函數決定,默認是 1 個, 可是有些分析任務很是耗時,須要多個線程來處理,保證處理效率,好比 TransactionAnalyzer就是2個。

public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionReport> implements LogEnabled {
    @Override
    public int getAnalyzerCount() {
        return 2;
    }
}

 

    消息分發的時候,每一筆消息默認都會發送到全部種類分析器處理,可是同一種類別的分析器下若是有多個MessageAnalyzer實例,採用domain hash 選出其中一個實例安排處理消息,分發算法參考下面源碼:

public class Period {
    private Map<String, List<PeriodTask>> m_tasks;
    
    public void distribute(MessageTree tree) {
        ...
        String domain = tree.getDomain();
 
        for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
            List<PeriodTask> tasks = entry.getValue();
            int length = tasks.size();
            int index = 0;
            boolean manyTasks = length > 1;
 
            if (manyTasks) {
                index = Math.abs(domain.hashCode()) % length;
            }
            PeriodTask task = tasks.get(index);
            boolean enqueue = task.enqueue(tree);
            ...
        }
        ...
    }
}

 

 

週期任務-任務隊列

    上一節咱們講到與MessageAnalyzer交互是由PeriodTask來完成的,那麼週期任務PeriodTask在哪裏被建立?他怎麼與分析器進行交互, 在Period實例化的同時,PeriodTask就被建立了,咱們看看Period類的構造函數:

public class Period {
    private Map<String, List<PeriodTask>> m_tasks;
    
    public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager,
          ServerStatisticManager serverStateManager, Logger logger) {
          
        ...
        
        List<String> names = m_analyzerManager.getAnalyzerNames();
 
        m_tasks = new HashMap<String, List<PeriodTask>>();
        for (String name : names) {
            List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime);
 
            for (MessageAnalyzer analyzer : messageAnalyzers) {
                MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE);
                PeriodTask task = new PeriodTask(analyzer, queue, startTime);
                
                //加入 m_tasks
                ...
            }
        }
    }
}

 

 

    構造函數首先獲取全部分析器名字,咱們說過每一個名字對應一種分析器,而後根據分析器名字和週期時間,獲取當前週期、該種類分析器全部實例,以前說過,有些類別分析任務邏輯複雜,耗時長,會須要更多的分析線程處理,爲每一個分析器都建立一個 PeriodTask,併爲每個PeriodTask建立任務隊列。客戶端消息過來,會由Period分發給全部種類的PeriodTask,同一類分析器下有多個分析器(MessageAnalyzer)的時候,只有一個MessageAnalyzer會被分發,採用domain hash選出這個實例,在這裏,分發實際上就是插入PeriodTask的任務隊列。

    構造函數最後將建立PeriodTask加入m_tasks中。

    在Period被實例化以後, 週期管理器(PeriodManager)就調用 period.start() 開啓一個週期了,start邏輯很簡單, 就是啓動period下全部週期任務(PeriodTask)線程。任務線程也很是簡單,就是調用本身的分析器的分析函數analyze(m_queue)來處理消息。

public class PeriodTask implements Task, LogEnabled {
 
    private MessageAnalyzer m_analyzer;
 
    private MessageQueue m_queue;
    
    @Override
    public void run() {
        try {
            m_analyzer.analyze(m_queue);
        } catch (Exception e) {
            Cat.logError(e);
        }
    }
}

    接下來咱們看下分析函數作了什麼,下面是源碼,只展現了核心邏輯部分,分析程序輪訓從PeriodTask傳入的任務隊列中取出消息,而後調用process處理,具體的處理邏輯就是由process完成的,process是一個抽象函數,具體實現由各類類分析器子類來實現,咱們將在下一章分別講解。

    固然這裏的前提是分析器處在激活狀態,而且本週期未結束,結束的定義是當前時間比周期時間+延遲結束時間更晚,延遲結束時間會在後面週期策略章節詳細講解,一旦週期結束,分析器將會把剩餘的消息分析完而後關閉。

public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder implements MessageAnalyzer {
    protected abstract void process(MessageTree tree);
 
    @Override
    public void analyze(MessageQueue queue) {
        while (!isTimeout() && isActive()) {
            MessageTree tree = queue.poll();
 
            if (tree != null) {
                ...
                process(tree);
                ...
            }
        }
        ...
    }
    
    protected boolean isTimeout() {
        long currentTime = System.currentTimeMillis();
        long endTime = m_startTime + m_duration + m_extraTime;
 
        return currentTime > endTime;
    }
}

 

 

消息分發

 

消息從客戶端發上來,是如何到達PeriodTask的,又是如何分配分析器的?

客戶端消息發送到服務端,通過解碼以後,就調用 MessageConsumer的 consume() 函數對消息進行消費。源碼以下:

public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled {
    @Override
    public void consume(MessageTree tree) {
        String domain = tree.getDomain();
        String ip = tree.getIpAddress();
 
        if (!m_blackListManager.isBlack(domain, ip)) {
            long timestamp = tree.getMessage().getTimestamp();
            Period period = m_periodManager.findPeriod(timestamp);
 
            if (period != null) {
                period.distribute(tree);
            } else {
                m_serverStateManager.addNetworkTimeError(1);
            }
        } else {
            m_black++;
 
            if (m_black % CatConstants.SUCCESS_COUNT == 0) {
                Cat.logEvent("Discard", domain);
            }
        }
    }
}

 

 

    consume函數會首先判斷domain和ip是否黑名單,若是是黑名單,丟棄消息,不然,根據消息時間戳,找到對應的週期(Period),交給Period對消息進行分發,分發邏輯前面講過,Period將消息插入PeriodTask隊列,由分析器(MessageAnalyzer)輪訓從隊列裏面取消息進行具體處理,每筆消息默認會被全部類別分析器處理,當同一類別分析器有多個MessageAnalyzer實例的時候,選擇其中一個處理,選擇算法:

Math.abs(domain.hashCode()) % length

 

 

詳細的源碼可參考章節什麼是週期?

 

週期策略

    在建立週期策略對象的時候,會傳入3個參數,一個是duration,也就是每一個週期的時間長度,默認爲1個小時,另外兩個extraTime和aheadTime分別表示我提早啓動一個週期的時間和延遲結束一個週期的時間,默認都是3分鐘,咱們並不會卡在整點時間,例如10:00去開啓或結束一個週期,由於週期建立是須要消耗必定時間,這樣能夠避免消息過來週期對象還未建立好,或者消息尚未處理完,就要去結束週期。

    固然,即便提早建立了週期對象(Period),並不意味着就會當即被分發消息,只有到了該週期時間纔會被分發消息。

    下面看看具體的策略方法,咱們首先計算當前時間的週期啓動時間(startTime),是當前時間的整點時間,好比當前時間是 22:47.123,那麼startTime就是 22:00.000,注意這裏的時間都是時間戳,單位爲毫秒。

    接下來判斷是否開啓當前週期,若是startTime大於上次週期啓動時間(m_lastStartTime),說明應該開啓新的週期,因爲m_lastStartTime初始化爲 -1, 因此CAT服務端初始化以後第一個週期會執行到這裏,並記錄m_lastStartTime。

    上面if若是未執行,咱們會判斷當前時間比起上次週期啓動時間是否是已通過了 57 分鐘(duration - aheadTime ),即提早3分鐘啓動下一個週期。

    若是上面if還未執行,咱們則認爲當前週期已經被啓動,那麼會判斷是否須要結束當前週期,即當前時間比起上次週期啓動時間是否是已通過了 63 分鐘(duration + extraTime),即延遲3分鐘關閉上一個週期。

public class PeriodStrategy {
    public long next(long now) {
        long startTime = now - now % m_duration;
 
        // for current period
        if (startTime > m_lastStartTime) {
            m_lastStartTime = startTime;
            return startTime;
        }
 
        // prepare next period ahead
        if (now - m_lastStartTime >= m_duration - m_aheadTime) {
            m_lastStartTime = startTime + m_duration;
            return startTime + m_duration;
        }
 
        // last period is over
        if (now - m_lastEndTime >= m_duration + m_extraTime) {
            long lastEndTime = m_lastEndTime;
            m_lastEndTime = startTime;
            return -lastEndTime;
        }
 
        return 0;
    }
}
相關文章
相關標籤/搜索