咱們經過url:http://localhost:10000/turbine.stream?cluster=default
能夠獲取到指標的json數據。那麼指標數據又是從何處獲取到的。
答案是:從各個服務的/manage/hystrix.stream端點獲取的html
turbine官方的github地址:https://github.com/Netflix/turbine/wiki
能夠找到turbine的架構設計
git
詳細信息參考https://github.com/Netflix/Turbine/wiki/Design-And-Architecture-(1.x)
github
說明:turbine啓動的時候,會去鏈接須要監控的主機,創建起監聽,每個實例會有一個監聽。當實例監遵從各個服務獲取到數據的時候,會將數據填充到派發器dispatcher中,由派發器將數據輸出到各個客戶端。spring
turbine的實如今turbine核心包下com.netflix.turbine:turbine-core
json
在該包下,能夠找到幾個關鍵的類InstanceMonitor
、HandlerQueueTuple
、TurbineDataDispatcher
、TurbineStreamServlet
瀏覽器
咱們啓動調試的時候,能夠看到
架構
實例的url實際上是指向具體須要監控的實例的端點,即http://sheng:8088/manage/hystrix.stream
查看這個連接咱們能夠看到
app
InstanceMonitor
啓動監聽ide
public void startMonitor() throws Exception { // This is the only state that we allow startMonitor to proceed in if (monitorState.get() != State.NotStarted) { return; } taskFuture = ThreadPool.submit(new Callable<Void>() { @Override public Void call() throws Exception { try { //初始化,鏈接到具體實例上 init(); monitorState.set(State.Running); while(monitorState.get() == State.Running) { //關鍵代碼 doWork(); } } catch(Throwable t) { logger.warn("Stopping InstanceMonitor for: " + getStatsInstance().getHostname() + " " + getStatsInstance().getCluster(), t); } finally { if (monitorState.get() == State.Running) { monitorState.set(State.StopRequested); } cleanup(); monitorState.set(State.CleanedUp); } return null; } }); }
private void init() throws Exception { HttpGet httpget = new HttpGet(url); HttpResponse response = gatewayHttpClient.getHttpClient().execute(httpget); HttpEntity entity = response.getEntity(); InputStream is = entity.getContent(); //初始化一個輸入流 reader = new BufferedReader(new InputStreamReader(is)); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200) { // this is unexpected. We probably have the wrong endpoint. Print the response out for debugging and give up here. List<String> responseMessage = IOUtils.readLines(reader); logger.error("Could not initiate connection to host, giving up: " + responseMessage); throw new MisconfiguredHostException(responseMessage.toString()); } }
dowork()
方法作了什麼呢this
private void doWork() throws Exception { DataFromSingleInstance instanceData = null; //獲取實例數據 instanceData = getNextStatsData(); if(instanceData == null) { return; } else { lastEventUpdateTime.set(System.currentTimeMillis()); } List<DataFromSingleInstance> list = new ArrayList<DataFromSingleInstance>(); list.add(instanceData); /* send to all handlers */ //將獲取到的數據添加到dispatcher中 boolean continueRunning = dispatcher.pushData(getStatsInstance(), list); if(!continueRunning) { logger.info("No more listeners to the host monitor, stopping monitor for: " + host.getHostname() + " " + host.getCluster()); monitorState.set(State.StopRequested); return; } }
getNextStatsData
讀取數據
那麼派發器是什麼呢,它的實現查看TurbineDataDispatcher
查看它的pushData
方法
發現調用的是tuple.pushData(statsData);
而tuple
其實就像一個管道,查看HandlerQueueTuple
的pushData
方法
public void pushData(K data) { if (stopped) { return; } boolean success = queue.writeEvent(data); if (isCritical()) { // track stats if (success) { counter.increment(Type.EVENT_PROCESSED); } else { counter.increment(Type.EVENT_DISCARDED); } } }
看到queue.writeEvent(data)
、往隊列裏寫數據
這個隊列又是什麼呢?
其實就是一個事件隊列EventQueue
,查看它的寫事件方法
public boolean writeEvent(T event) { if (count.get() > maxCapacity) { // approx check for capacity return false; } count.incrementAndGet(); queue.add(event); return true; }
若是隊列中的長度大於maxCapacity,將不會再往隊列裏填充數據。
當客戶端鏈接上的時候,queue就會被消費。若是客戶端沒有鏈接上的時候,queue讀出來,通過一系列的操做會寫回queue中,直到隊列滿了就不在寫了。
一、當沒有客戶端鏈接上的時候
eventHandler通過一些列的處理,數據會被寫回到queue中
二、當有客戶端連上的時候,假設咱們經過瀏覽器地址欄輸入了http://localhost:10000/turbine.stream?cluster=default
此時
咱們看到eventHandler爲TurbineStreamingConnection
,見下圖
handlData()
就變成了TurbineStreamingConnection
中的方法
public void handleData(Collection<T> data) { if (stopMonitoring) { // we have been stopped so don't try handling data return; } //將數據寫到steamHandler中 writeToStream(data); }
writeToStream()
中有個關鍵的操做streamHandler.writeData(jsonStringForDataHash)
writeData()
方法就能夠將數據寫到response中
客戶端訪問http://localhost:10000/turbine.stream?cluster=default
的時候,其實就是經過TurbineStreamServlet
獲取到響應結果的。