turbine是怎麼收集指標數據的

turbine是怎麼收集指標數據的

咱們經過spring cloud圖形化dashboard是如何實現指標的收集展現的知道了,圖形化的指標是從turbine獲取到指標數據的。那麼turbine的數據是從哪裏來的呢?

一、數據來源

咱們經過url:http://localhost:10000/turbine.stream?cluster=default能夠獲取到指標的json數據。那麼指標數據又是從何處獲取到的。
答案是:從各個服務的/manage/hystrix.stream端點獲取的html

二、turbine架構設計

turbine官方的github地址:
https://github.com/Netflix/turbine/wiki
能夠找到turbine的架構設計
image_1c8cnao7u1rt5e2p1jhb1bj716b714.png-146.7kBgit

詳細信息參考
https://github.com/Netflix/Turbine/wiki/Design-And-Architecture-(1.x)github

說明:turbine啓動的時候,會去鏈接須要監控的主機,創建起監聽,每個實例會有一個監聽。當實例監遵從各個服務獲取到數據的時候,會將數據填充到派發器dispatcher中,由派發器將數據輸出到各個客戶端。spring

image_1c8cno6j71v56vab17dska015741h.png-121.6kB

三、源碼閱讀

turbine的實如今turbine核心包下
com.netflix.turbine:turbine-corejson

在該包下,能夠找到幾個關鍵的類
InstanceMonitorHandlerQueueTupleTurbineDataDispatcherTurbineStreamServlet瀏覽器

咱們啓動調試的時候,能夠看到
image_1c8co5shg1sqf16le151179ab6d1u.png-190.5kB架構

實例的url實際上是指向具體須要監控的實例的端點,即
http://sheng:8088/manage/hystrix.stream
查看這個連接咱們能夠看到
image_1c8co9jkt1gnco5b108p1bu4gds2b.png-96.4kBapp

InstanceMonitor啓動監聽ide

public void startMonitor() throws Exception {
        // This is the only state that we allow startMonitor to proceed in
        if (monitorState.get() != State.NotStarted) {
            return;
        }

        taskFuture = ThreadPool.submit(new Callable<Void>() {

            @Override
            public Void call() throws Exception {

                try {
                    //初始化,鏈接到具體實例上
                    init();
                    monitorState.set(State.Running);
                    while(monitorState.get() == State.Running) {
                        //關鍵代碼
                        doWork();
                    }
                } catch(Throwable t) {
                    logger.warn("Stopping InstanceMonitor for: " + getStatsInstance().getHostname() + " " + getStatsInstance().getCluster(), t);
                } finally {
                    if (monitorState.get() == State.Running) {
                        monitorState.set(State.StopRequested);
                    }
                    cleanup();
                    monitorState.set(State.CleanedUp);
                }
                return null;
            }
        });
    }

private void init() throws Exception {

        HttpGet httpget = new HttpGet(url);

        HttpResponse response = gatewayHttpClient.getHttpClient().execute(httpget);

        HttpEntity entity = response.getEntity();
        InputStream is = entity.getContent();
        //初始化一個輸入流
        reader = new BufferedReader(new InputStreamReader(is));

        int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode != 200) {
            // this is unexpected. We probably have the wrong endpoint. Print the response out for debugging and give up here.
            List<String> responseMessage = IOUtils.readLines(reader);
            logger.error("Could not initiate connection to host, giving up: " + responseMessage);
            throw new MisconfiguredHostException(responseMessage.toString());
        }
    }

dowork()方法作了什麼呢this

private void doWork() throws Exception {

        DataFromSingleInstance instanceData = null;

        //獲取實例數據
        instanceData = getNextStatsData();
        if(instanceData == null) {
            return;
        } else {
            lastEventUpdateTime.set(System.currentTimeMillis());
        }

        List<DataFromSingleInstance> list = new ArrayList<DataFromSingleInstance>();
        list.add(instanceData);

        /* send to all handlers */
        //將獲取到的數據添加到dispatcher中
        boolean continueRunning = dispatcher.pushData(getStatsInstance(), list);
        if(!continueRunning) {
            logger.info("No more listeners to the host monitor, stopping monitor for: " + host.getHostname() + " " + host.getCluster());
            monitorState.set(State.StopRequested);
            return;
        }
    }

getNextStatsData讀取數據
image_1c8cor2ih1cb714tc1b5m8mk7ks2o.png-58kB


那麼派發器是什麼呢,它的實現查看TurbineDataDispatcher
查看它的pushData方法
發現調用的是tuple.pushData(statsData);tuple其實就像一個管道,查看HandlerQueueTuplepushData方法

public void pushData(K data) {
        if (stopped) {
            return;
        }
        boolean success = queue.writeEvent(data);
        if (isCritical()) {
            // track stats
            if (success) {
                counter.increment(Type.EVENT_PROCESSED);
            } else {
                counter.increment(Type.EVENT_DISCARDED);
            }
        }
    }

看到queue.writeEvent(data)、往隊列裏寫數據
這個隊列又是什麼呢?
其實就是一個事件隊列EventQueue,查看它的寫事件方法

public boolean writeEvent(T event) {
        if (count.get() > maxCapacity) {  // approx check for capacity
            return false;
        }
        count.incrementAndGet();
        queue.add(event);
        return true;
    }

若是隊列中的長度大於maxCapacity,將不會再往隊列裏填充數據。


當客戶端鏈接上的時候,queue就會被消費。若是客戶端沒有鏈接上的時候,queue讀出來,通過一系列的操做會寫回queue中,直到隊列滿了就不在寫了。

一、當沒有客戶端鏈接上的時候
image_1c8evuokc1u8scad10cuef11jm413.png-119.7kB

eventHandler通過一些列的處理,數據會被寫回到queue中

二、當有客戶端連上的時候,假設咱們經過瀏覽器地址欄輸入了
http://localhost:10000/turbine.stream?cluster=default
此時
咱們看到eventHandler爲TurbineStreamingConnection,見下圖
image_1c8evlp0c1n4p491rcebcn8htm.png-159.3kB

handlData()就變成了TurbineStreamingConnection中的方法

public void handleData(Collection<T> data) {
        
        if (stopMonitoring) {
            // we have been stopped so don't try handling data
            return;
        }
        //將數據寫到steamHandler中
        writeToStream(data);
    }

writeToStream()中有個關鍵的操做streamHandler.writeData(jsonStringForDataHash)
image_1c8f022frk5f198mk0k3g31rah1g.png-90.2kB

writeData()方法就能夠將數據寫到response中
image_1c8f049sgvmpaq79b813ko1nsp1t.png-85kB

客戶端訪問http://localhost:10000/turbine.stream?cluster=default的時候,其實就是經過TurbineStreamServlet獲取到響應結果的。

相關文章
相關標籤/搜索