Storm的Metric接口簡介

本文由做者林洋港受權網易雲社區發佈。html


做爲服務端程序,咱們老是須要向外界報告一些統計數據,以助於瞭解系統的運行狀況,好比某個接口的調用時間、系統處理的請求數等等。當咱們的程序以Storm Topology的形式運行時一樣須要輸出這些統計數據。Storm爲咱們提供了Metric接口,能夠方便的把一些統計指標輸出到指定的地方。Storm Metric的統計方式爲每隔指定的時間間隔輸出統計內容。本文首先介紹Storm Metric相關的接口以及它們之間的關係,而後以實際應用中的一個例子來講明如何使用Metric接口。本文使用的Storm版本爲0.9.1-incubating。數據庫

IMetric是Storm用於保存統計數據的接口安全

public interface IMetric {服務器

public Object getValueAndReset();架構

}併發

接口只有一個getValueAndReset方法,當須要輸出統計內容時,Storm就會調用這個方法。值得注意的是getValueAndReset方法返回的是Object類型,這爲統計內容的形式提供了靈活性,咱們能夠返回任意的類型做爲統計信息,這一點在後面的例子中咱們會再提到。另外一個引發咱們注意的地方是IMetric接口並無聲明更新統計數據的方法,這樣當咱們實現IMetric接口的時候就更加靈活了——參數類型、參數個數都沒有限制。Storm自身提供了6個IMetric實現:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。這裏只介紹CountMetric和MultiCountMetric的使用方式,以印證前面說的IMetric接口統計數據更新方式的靈活性以及getValueAndReset返回Object類型的靈活性。CountMetric就是一個簡單的計數器,有兩個方法incr()和incrBy(long incrementBy),其getValueAndReset方法返回一個long類型的值:app

public Object getValueAndReset() {分佈式

long ret = _value;ide

_value = 0;性能

return ret;

}

MultiCountMetric,顧名思義,就是多個指標的計數器,維護着一個Map,只有一個方法CountMetric scope(String key)。所以MultiCountMetric的更新方式爲MultiCountMetric.scope(key).incr()或MultiCountMetric.scope(key).incrBy(long incrementBy)。它的getValueAndReset返回的是一個Map:

public Object getValueAndReset() {

Map ret = new HashMap();

for(Map.Entry e : _value.entrySet()) {

ret.put(e.getKey(), e.getValue().getValueAndReset());

}

return ret;

}

除了IMetric接口,還有另一個接口IMetricsConsumer,它負責向外輸出統計信息,即把IMetric getValueAndReset方法返回的數據輸出到外面。IMetricsConsumer有三個方法

void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);

void handleDataPoints(TaskInfo taskInfo, Collection dataPoints);

void cleanup();

其中prepare是初始化,cleanup是生命週期結束時的清理工做,handleDataPoints纔是真正的統計信息輸出方法,taskInfo參數存儲當前task的信息(host、port、component id、task id等等),dataPoints存儲的是IMetric返回的統計信息,多是出於性能考慮,dataPoints是一個集合,包含了多個IMetric返回的數據。讓咱們來具體看看DataPoint這個類:

public static class DataPoint {

@Override

public String toString() {

return "[" + name + " = " + value + "]";

}

public String name;

public Object value;

}

name是IMetric註冊時的名字,value就是IMetric getValueAndReset返回的那個Object。

Storm只提供了一個IMetricsConsumer實現——LoggingMetricsConsumer。LoggingMetricsConsumer作的事情很簡單,就是把dataPoints輸出到日誌文件metrics.log,下面是其handleDataPoints方法的部分代碼:

for (DataPoint p : dataPoints) {

sb.delete(header.length(), sb.length());

sb.append(p.name)

.append(padding).delete(header.length()+23,sb.length()).append("\t")

.append(p.value);

LOG.info(sb.toString());

}

能夠看到它經過調用DataPoint的value的toString方法把統計信息輸出到日誌裏面的,因此若是你的IMetric實現返回的是本身定義的類型,記得重載toString()方法,讓統計信息以可讀的格式輸出。

到這裏Storm的Metric接口和自帶的實現基本介紹完了,接下來咱們來看看怎麼使用Storm自帶的這些實現。首先,Storm默認的配置是關掉Metric功能的,能夠有兩種方式開啓Metric功能:

1)在storm.yaml裏面配置,這種是集羣級別的設置,我的不建議這麼作,因此就很少介紹了

2)conf.registerMetricsConsumer(Class klass, long parallelismHint);這是topology級別的,klass是IMetricsConsumer的實現類,parallelismHint這個參數Storm代碼裏面沒註釋我也沒深刻看底層的實現,這裏結合本身的實驗談談它的意義:topology是在1個或多個worker上面以多個task的方式跑的嘛,parallelismHint就是指定多少個併發來輸出統計信息。這裏我也不知道parallelismHint指的是多個task、worker仍是supervisor,反正parallelismHint=1的時候只在特定的一個supervisor下面的metrics.log有統計信息,parallelismHint>1時可能取決於worker的數量,我測試的時候因爲是在多個supervisor上跑的,所以觀察到多個supervisor都有metrics.log的輸出。我的經驗是parallelismHint設爲1,這樣能夠在一個supervisor下面的metrics.log就能看到全部task的統計信息。

因爲我建議採用第二種方法,因此示例代碼爲:

//客戶端註冊IMetricsConsumer

conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

StormSubmitter.submitTopology(name, conf, builder.createTopology());

//咱們假設要統計spout某段代碼的調用次數

//註冊IMetric

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

...

metric=new CountMetric();

context.registerMetric("spout time cost", metric, 60); //所以DataPoint的name爲spout time cost,60表示1分鐘統計一次

...

}

//更新統計數據

@Override

public void nextTuple() {

if(...)...

else{

...

metric.incr();

}

}

這樣就能夠了,而後你就能在metrics.log看到統計數據了。

如今,假設咱們的需求跟上面不太同樣:1)metrics.log只打印咱們本身維護的統計信息,屏蔽__system、__fail-count這種系統本身的統計信息;2)不僅統計代碼的調用次數,還要統計調用時間——最小時間、最大時間、平均時間。

第一點能夠經過重載LoggingMetricsConsumer的方法來實現:

public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {

@Override

public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {

if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith("__")) return;

if (dataPoints == null || dataPoints.isEmpty()) return;

List<DataPoint> list = new ArrayList<DataPoint>();

for (DataPoint p : dataPoints) {

if (p.name == null || p.name.startsWith("__")) continue;

list.add(p);

}

if (list.isEmpty()) return;

super.handleDataPoints(taskInfo, list);

}

}

第二點須要開發咱們本身的IMetric接口實現類TimeCostMetric,如下是其主要代碼:

@Override

public Object getValueAndReset() {

TimeCost timeCost=new TimeCost();

timeCost.count=count;

if(timeCost.count>0){

timeCost.min=min;

timeCost.max=max;

timeCost.mean=all*1.0/timeCost.count;

}

init();

return timeCost;

}

public void update(long time){

count++;

all+=time;

if(min>time)min=time;

if(max<time)max=time;

}

public static class TimeCost implements Serializable{

private static final long serialVersionUID = 8355726599226036228L;

int count;

long min;

long max;

double mean;

public String toString(){

return "count: "+count+", min: "+min+", max:"+max+", mean: "+mean;

}

}

TimeCostMetric的getValueAndReset方法返回的是一個TimeCost 對象,日誌中最終打印的就是其toString方法的內容。而後把前面紅色部分的代碼改爲下面的內容:

① conf.registerMetricsConsumer(AppLoggingMetricsConsumer .class);

② metric=new TimeCostMetric();

context.registerMetric("MQ spout time cost", metric, 60);

③ metric.incr();

再來看看metrics.log

本文中是直接把統計信息打到日誌中,你也能夠本身實現IMetricsConsumer接口,把統計信息保存到指定的地方,如數據庫、監控平臺等等。



免費領取驗證碼、內容安全、短信發送、直播點播體驗包及雲服務器等套餐

更多網易技術、產品、運營經驗分享請訪問網易雲社區

相關文章:
【推薦】 網頁設計簡史看設計&代碼「隔膜」
【推薦】 巧用Scrum與Kanban
【推薦】 網易美學-系統架構系列1-分佈式與服務化

相關文章
相關標籤/搜索