本文由做者林洋港受權網易雲社區發佈。html
做爲服務端程序,咱們老是須要向外界報告一些統計數據,以助於瞭解系統的運行狀況,好比某個接口的調用時間、系統處理的請求數等等。當咱們的程序以Storm Topology的形式運行時一樣須要輸出這些統計數據。Storm爲咱們提供了Metric接口,能夠方便的把一些統計指標輸出到指定的地方。Storm Metric的統計方式爲每隔指定的時間間隔輸出統計內容。本文首先介紹Storm Metric相關的接口以及它們之間的關係,而後以實際應用中的一個例子來講明如何使用Metric接口。本文使用的Storm版本爲0.9.1-incubating。數據庫
IMetric是Storm用於保存統計數據的接口安全
public interface IMetric {服務器
public Object getValueAndReset();架構
}併發
接口只有一個getValueAndReset方法,當須要輸出統計內容時,Storm就會調用這個方法。值得注意的是getValueAndReset方法返回的是Object類型,這爲統計內容的形式提供了靈活性,咱們能夠返回任意的類型做爲統計信息,這一點在後面的例子中咱們會再提到。另外一個引發咱們注意的地方是IMetric接口並無聲明更新統計數據的方法,這樣當咱們實現IMetric接口的時候就更加靈活了——參數類型、參數個數都沒有限制。Storm自身提供了6個IMetric實現:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。這裏只介紹CountMetric和MultiCountMetric的使用方式,以印證前面說的IMetric接口統計數據更新方式的靈活性以及getValueAndReset返回Object類型的靈活性。CountMetric就是一個簡單的計數器,有兩個方法incr()和incrBy(long incrementBy),其getValueAndReset方法返回一個long類型的值:app
public Object getValueAndReset() {分佈式
long ret = _value;ide
_value = 0;性能
return ret;
}
MultiCountMetric,顧名思義,就是多個指標的計數器,維護着一個Map,只有一個方法CountMetric scope(String key)。所以MultiCountMetric的更新方式爲MultiCountMetric.scope(key).incr()或MultiCountMetric.scope(key).incrBy(long incrementBy)。它的getValueAndReset返回的是一個Map:
public Object getValueAndReset() {
Map ret = new HashMap();
for(Map.Entry e : _value.entrySet()) {
ret.put(e.getKey(), e.getValue().getValueAndReset());
}
return ret;
}
除了IMetric接口,還有另一個接口IMetricsConsumer,它負責向外輸出統計信息,即把IMetric getValueAndReset方法返回的數據輸出到外面。IMetricsConsumer有三個方法
void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
void handleDataPoints(TaskInfo taskInfo, Collection dataPoints);
void cleanup();
其中prepare是初始化,cleanup是生命週期結束時的清理工做,handleDataPoints纔是真正的統計信息輸出方法,taskInfo參數存儲當前task的信息(host、port、component id、task id等等),dataPoints存儲的是IMetric返回的統計信息,多是出於性能考慮,dataPoints是一個集合,包含了多個IMetric返回的數據。讓咱們來具體看看DataPoint這個類:
public static class DataPoint {
@Override
public String toString() {
return "[" + name + " = " + value + "]";
}
public String name;
public Object value;
}
name是IMetric註冊時的名字,value就是IMetric getValueAndReset返回的那個Object。
Storm只提供了一個IMetricsConsumer實現——LoggingMetricsConsumer。LoggingMetricsConsumer作的事情很簡單,就是把dataPoints輸出到日誌文件metrics.log,下面是其handleDataPoints方法的部分代碼:
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
sb.append(p.name)
.append(padding).delete(header.length()+23,sb.length()).append("\t")
.append(p.value);
LOG.info(sb.toString());
}
能夠看到它經過調用DataPoint的value的toString方法把統計信息輸出到日誌裏面的,因此若是你的IMetric實現返回的是本身定義的類型,記得重載toString()方法,讓統計信息以可讀的格式輸出。
到這裏Storm的Metric接口和自帶的實現基本介紹完了,接下來咱們來看看怎麼使用Storm自帶的這些實現。首先,Storm默認的配置是關掉Metric功能的,能夠有兩種方式開啓Metric功能:
1)在storm.yaml裏面配置,這種是集羣級別的設置,我的不建議這麼作,因此就很少介紹了
2)conf.registerMetricsConsumer(Class klass, long parallelismHint);這是topology級別的,klass是IMetricsConsumer的實現類,parallelismHint這個參數Storm代碼裏面沒註釋我也沒深刻看底層的實現,這裏結合本身的實驗談談它的意義:topology是在1個或多個worker上面以多個task的方式跑的嘛,parallelismHint就是指定多少個併發來輸出統計信息。這裏我也不知道parallelismHint指的是多個task、worker仍是supervisor,反正parallelismHint=1的時候只在特定的一個supervisor下面的metrics.log有統計信息,parallelismHint>1時可能取決於worker的數量,我測試的時候因爲是在多個supervisor上跑的,所以觀察到多個supervisor都有metrics.log的輸出。我的經驗是parallelismHint設爲1,這樣能夠在一個supervisor下面的metrics.log就能看到全部task的統計信息。
因爲我建議採用第二種方法,因此示例代碼爲:
//客戶端註冊IMetricsConsumer
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
//咱們假設要統計spout某段代碼的調用次數
//註冊IMetric
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
...
metric=new CountMetric();
context.registerMetric("spout time cost", metric, 60); //所以DataPoint的name爲spout time cost,60表示1分鐘統計一次
...
}
//更新統計數據
@Override
public void nextTuple() {
if(...)...
else{
...
metric.incr();
}
}
這樣就能夠了,而後你就能在metrics.log看到統計數據了。
如今,假設咱們的需求跟上面不太同樣:1)metrics.log只打印咱們本身維護的統計信息,屏蔽__system、__fail-count這種系統本身的統計信息;2)不僅統計代碼的調用次數,還要統計調用時間——最小時間、最大時間、平均時間。
第一點能夠經過重載LoggingMetricsConsumer的方法來實現:
public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith("__")) return;
if (dataPoints == null || dataPoints.isEmpty()) return;
List<DataPoint> list = new ArrayList<DataPoint>();
for (DataPoint p : dataPoints) {
if (p.name == null || p.name.startsWith("__")) continue;
list.add(p);
}
if (list.isEmpty()) return;
super.handleDataPoints(taskInfo, list);
}
}
第二點須要開發咱們本身的IMetric接口實現類TimeCostMetric,如下是其主要代碼:
@Override
public Object getValueAndReset() {
TimeCost timeCost=new TimeCost();
timeCost.count=count;
if(timeCost.count>0){
timeCost.min=min;
timeCost.max=max;
timeCost.mean=all*1.0/timeCost.count;
}
init();
return timeCost;
}
public void update(long time){
count++;
all+=time;
if(min>time)min=time;
if(max<time)max=time;
}
public static class TimeCost implements Serializable{
private static final long serialVersionUID = 8355726599226036228L;
int count;
long min;
long max;
double mean;
public String toString(){
return "count: "+count+", min: "+min+", max:"+max+", mean: "+mean;
}
}
TimeCostMetric的getValueAndReset方法返回的是一個TimeCost 對象,日誌中最終打印的就是其toString方法的內容。而後把前面紅色部分的代碼改爲下面的內容:
① conf.registerMetricsConsumer(AppLoggingMetricsConsumer .class);
② metric=new TimeCostMetric();
context.registerMetric("MQ spout time cost", metric, 60);
③ metric.incr();
再來看看metrics.log
本文中是直接把統計信息打到日誌中,你也能夠本身實現IMetricsConsumer接口,把統計信息保存到指定的地方,如數據庫、監控平臺等等。
免費領取驗證碼、內容安全、短信發送、直播點播體驗包及雲服務器等套餐
更多網易技術、產品、運營經驗分享請訪問網易雲社區。
相關文章:
【推薦】 網頁設計簡史看設計&代碼「隔膜」
【推薦】 巧用Scrum與Kanban
【推薦】 網易美學-系統架構系列1-分佈式與服務化