項目背景:前端
(1) 已有監控系統採用的OpenTSDB方案apache
(2) 目前一些大數據應用,尤爲是基於spark streaming的流式應用,會實時計算生成一些指標數據,借用監控系統的存儲。後端
(3) 須要前端展現實時分析結果,採用zeppelin展現方式,可是目前zeppelin不支持OpenTSDB後端引擎支持app
So, 本身開發!ide
插播: 剛去訪問官方發現0.6.0版本發佈了! http://zeppelin.apache.org/docs/0.6.0/大數據
因爲Zeppelin運行環境已經有了該依賴包,因此咱們再建立自定義Interpreter插件的時候只須要在代碼中對其依賴,打包過程當中不須要打包該包。因此使用provided依賴方式。ui
注意:該包爲內部開發依賴包spa
public class TsdInterpreter extends Interpreter插件
在實現類中添加如下代碼實現當前插件的註冊code
static {
Interpreter.register("tsd", "tsd", TsdInterpreter.class.getName());
}
以tsd名稱註冊,那麼Zeppelin前端在調用OpenTSDB查詢的時候,只須要指定後端引擎名稱%tsd便可。
public InterpreterResult interpret(String cmd, InterpreterContext context)
cmd: 即在Zeppelin交互式界面編寫的命令,不包含%tsd
context: 當前插件的上下文,主要包含插件的配置信息,例如操做OpenTSDB的時候就須要從上下文中獲取OpenTSDB的IP和端口參數。
該方法實現的核心思想就是: 解析命令=>實例化OpenTSDB操做客戶端=>操做OpenTSDB客戶端進行數據查詢=> 獲取返回結果 封裝成InterpreterResult對象。
貼核心代碼吧:
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String value = (String) intpProperty.get(key);
if (key.equals("tsd.host") ) {
host = value;
} else if (key.equals("tsd.port")) {
port = value;
}
}
propertiesUtil.setOpentsdbIp(host);
propertiesUtil.setPort(Integer.parseInt(port));
Scanner scanner = new Scanner(items[1]);
String start, end, metric, tagsStr;
if (scanner.hasNext())
start = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"1!Please enter the correct format!");
}
if (scanner.hasNext())
end = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"2!Please enter the correct format!");
}
if (scanner.hasNext())
metric = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"3!Please enter the correct format!");
}
if (scanner.hasNext())
tagsStr = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"4!Please enter the correct format!");
}
// cpid=tudou,busiid=*,code=1
String[] tagsStrs = tagsStr.split(",");
Map<String, String> tags = new HashMap<String, String>();
for (String s : tagsStrs) {
int index = s.indexOf('=');
if (index == -1)
continue;
String tagK = s.substring(0, index);
String tagV = s.substring(index + 1);
tags.put(tagK, tagV);
}
QueryService queryService = new QueryService();
try {
List<QueryResponseEntity> responses = queryService
.queryByMetric(start, end, metric, tags, null, "sum");
StringBuffer sb = new StringBuffer();
// Map<String, String> alldps = new HashMap<String, String>();
// build header
Set<String> keys = new HashSet<String>();
sb.append("time\t");
for (QueryResponseEntity st : responses) {
sb.append(st.getTags().toString() + "\t");
keys.addAll(st.getDps().keySet());
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1, "\n");
List<String> keys2 = new ArrayList<String>(keys);
Collections.sort(keys2);
// build lines
Iterator<String> it = keys2.iterator();
long t;
while (it.hasNext()) {
String key = it.next(); // 每一行的時間戳
t = Long.parseLong(key);
sb.append(sdf.format(new Date(t*1000)) + "\t");
for (QueryResponseEntity st : responses) {
Map<String, String> dps = st.getDps();
String value = dps.get(key);
if (value != null) {
sb.append(value + "\t");
} else {
sb.append(" \t");
}
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1,
"\n");
}
// sb.toString()
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE, sb.toString());
在ZEPPELIN_HOME/conf/zeppelin-site.xml
在ZEPPELIN_HOME/interpreter
建立文件夾tsd,將全部依賴包拷貝到該文件夾下