本文由做者林洋港受權網易雲社區發佈。數據庫
做爲服務端程序,咱們老是須要向外界報告一些統計數據,以助於瞭解系統的運行狀況,好比某個接口的調用時間、系統處理的請求數等等。當咱們的程序以Storm Topology的形式運行時一樣須要輸出這些統計數據。Storm爲咱們提供了Metric接口,能夠方便的把一些統計指標輸出到指定的地方。Storm Metric的統計方式爲每隔指定的時間間隔輸出統計內容。本文首先介紹Storm Metric相關的接口以及它們之間的關係,而後以實際應用中的一個例子來講明如何使用Metric接口。本文使用的Storm版本爲0.9.1-incubating。安全
IMetric是Storm用於保存統計數據的接口服務器
public interface IMetric {併發
public Object getValueAndReset();
}app
接口只有一個getValueAndReset方法,當須要輸出統計內容時,Storm就會調用這個方法。值得注意的是getValueAndReset方法返回的是Object類型,這爲統計內容的形式提供了靈活性,咱們能夠返回任意的類型做爲統計信息,這一點在後面的例子中咱們會再提到。另外一個引發咱們注意的地方是IMetric接口並無聲明更新統計數據的方法,這樣當咱們實現IMetric接口的時候就更加靈活了——參數類型、參數個數都沒有限制。Storm自身提供了6個IMetric實現:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。這裏只介紹CountMetric和MultiCountMetric的使用方式,以印證前面說的IMetric接口統計數據更新方式的靈活性以及getValueAndReset返回Object類型的靈活性。CountMetric就是一個簡單的計數器,有兩個方法incr()和incrBy(long incrementBy),其getValueAndReset方法返回一個long類型的值:ide
public Object getValueAndReset() { long ret = _value; _value = 0; return ret; }
MultiCountMetric,顧名思義,就是多個指標的計數器,維護着一個Map,只有一個方法CountMetric scope(String key)。所以MultiCountMetric的更新方式爲MultiCountMetric.scope(key).incr()或MultiCountMetric.scope(key).incrBy(long incrementBy)。它的getValueAndReset返回的是一個Map:性能
public Object getValueAndReset() { Map ret = new HashMap(); for(Map.Entry e : _value.entrySet()) { ret.put(e.getKey(), e.getValue().getValueAndReset()); } return ret; }
除了IMetric接口,還有另一個接口IMetricsConsumer,它負責向外輸出統計信息,即把IMetric getValueAndReset方法返回的數據輸出到外面。IMetricsConsumer有三個方法測試
void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter); void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); void cleanup();
其中prepare是初始化,cleanup是生命週期結束時的清理工做,handleDataPoints纔是真正的統計信息輸出方法,taskInfo參數存儲當前task的信息(host、port、component id、task id等等),dataPoints存儲的是IMetric返回的統計信息,多是出於性能考慮,dataPoints是一個集合,包含了多個IMetric返回的數據。讓咱們來具體看看DataPoint這個類:ui
public static class DataPoint { @Override public String toString() { return "[" + name + " = " + value + "]"; } public String name; public Object value; }
name是IMetric註冊時的名字,value就是IMetric getValueAndReset返回的那個Object。spa
Storm只提供了一個IMetricsConsumer實現——LoggingMetricsConsumer。LoggingMetricsConsumer作的事情很簡單,就是把dataPoints輸出到日誌文件metrics.log,下面是其handleDataPoints方法的部分代碼:
for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.name) .append(padding).delete(header.length()+23,sb.length()).append("\t") .append(p.value); LOG.info(sb.toString()); }
能夠看到它經過調用DataPoint的value的toString方法把統計信息輸出到日誌裏面的,因此若是你的IMetric實現返回的是本身定義的類型,記得重載toString()方法,讓統計信息以可讀的格式輸出。
到這裏Storm的Metric接口和自帶的實現基本介紹完了,接下來咱們來看看怎麼使用Storm自帶的這些實現。首先,Storm默認的配置是關掉Metric功能的,能夠有兩種方式開啓Metric功能: 1)在storm.yaml裏面配置,這種是集羣級別的設置,我的不建議這麼作,因此就很少介紹了 2)conf.registerMetricsConsumer(Class klass, long parallelismHint);這是topology級別的,klass是IMetricsConsumer的實現類,parallelismHint這個參數Storm代碼裏面沒註釋我也沒深刻看底層的實現,這裏結合本身的實驗談談它的意義:topology是在1個或多個worker上面以多個task的方式跑的嘛,parallelismHint就是指定多少個併發來輸出統計信息。這裏我也不知道parallelismHint指的是多個task、worker仍是supervisor,反正parallelismHint=1的時候只在特定的一個supervisor下面的metrics.log有統計信息,parallelismHint>1時可能取決於worker的數量,我測試的時候因爲是在多個supervisor上跑的,所以觀察到多個supervisor都有metrics.log的輸出。我的經驗是parallelismHint設爲1,這樣能夠在一個supervisor下面的metrics.log就能看到全部task的統計信息。
因爲我建議採用第二種方法,因此示例代碼爲:
//客戶端註冊IMetricsConsumer
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
//咱們假設要統計spout某段代碼的調用次數
//註冊IMetric
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { ... metric=new CountMetric(); context.registerMetric("spout time cost", metric, 60); //所以DataPoint的name爲spout time cost,60表示1分鐘統計一次 ... } //更新統計數據
@Override
public void nextTuple() { if(...)... else{ ... metric.incr(); } } 這樣就能夠了,而後你就能在metrics.log看到統計數據了。
如今,假設咱們的需求跟上面不太同樣:1)metrics.log只打印咱們本身維護的統計信息,屏蔽__system、__fail-count這種系統本身的統計信息;2)不僅統計代碼的調用次數,還要統計調用時間——最小時間、最大時間、平均時間。
第一點能夠經過重載LoggingMetricsConsumer的方法來實現:
public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {
@Override public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith("__")) return; if (dataPoints == null || dataPoints.isEmpty()) return; List<DataPoint> list = new ArrayList<DataPoint>(); for (DataPoint p : dataPoints) { if (p.name == null || p.name.startsWith("__")) continue; list.add(p); } if (list.isEmpty()) return; super.handleDataPoints(taskInfo, list); }
}
第二點須要開發咱們本身的IMetric接口實現類TimeCostMetric,如下是其主要代碼:
@Override public Object getValueAndReset() { TimeCost timeCost=new TimeCost(); timeCost.count=count; if(timeCost.count>0){ timeCost.min=min; timeCost.max=max; timeCost.mean=all*1.0/timeCost.count; } init(); return timeCost; }
public void update(long time){
count++; all+=time; if(min>time)min=time; if(max<time)max=time; }
public static class TimeCost implements Serializable{
private static final long serialVersionUID = 8355726599226036228L; int count; long min; long max; double mean; public String toString(){ return "count: "+count+", min: "+min+", max:"+max+", mean: "+mean; } }
TimeCostMetric的getValueAndReset方法返回的是一個TimeCost 對象,日誌中最終打印的就是其toString方法的內容。而後把前面紅色部分的代碼改爲下面的內容:
① conf.registerMetricsConsumer(AppLoggingMetricsConsumer .class);
② metric=new TimeCostMetric();
context.registerMetric("MQ spout time cost", metric, 60);
③ metric.incr();
再來看看metrics.log
本文中是直接把統計信息打到日誌中,你也能夠本身實現IMetricsConsumer接口,把統計信息保存到指定的地方,如數據庫、監控平臺等等。
免費領取驗證碼、內容安全、短信發送、直播點播體驗包及雲服務器等套餐
更多網易技術、產品、運營經驗分享請訪問網易雲社區。
文章來源: 網易雲社區