Zeppelin中Interpreter插件開發

 

項目背景:前端

    (1) 已有監控系統採用的OpenTSDB方案apache

    (2)  目前一些大數據應用,尤爲是基於spark streaming的流式應用,會實時計算生成一些指標數據,借用監控系統的存儲。後端

    (3) 須要前端展現實時分析結果,採用zeppelin展現方式,可是目前zeppelin不支持OpenTSDB後端引擎支持app

   So, 本身開發!ide

 

一 Interpreter插件流程

        插播:    剛去訪問官方發現0.6.0版本發佈了! http://zeppelin.apache.org/docs/0.6.0/大數據

    (1) 下載Zeppelin源碼

    (2)  建立Zeppelin Maven工程的 Module

    

    (3) 添加對zeppelin-interpreter插件包的依賴

        

         因爲Zeppelin運行環境已經有了該依賴包,因此咱們再建立自定義Interpreter插件的時候只須要在代碼中對其依賴,打包過程當中不須要打包該包。因此使用provided依賴方式。ui

     (4) 添加對OpenTSDB客戶端操做API包的依賴

    

       注意:該包爲內部開發依賴包spa

      (5) 建立實現類繼承Zeppelin提供的抽象類org.apache.zeppelin.interpreter.Interpreter;

            public class TsdInterpreter extends Interpreter插件

 

       (6) 代碼中註冊當前插件

          在實現類中添加如下代碼實現當前插件的註冊code

            static {

              Interpreter.register("tsd", "tsd", TsdInterpreter.class.getName());
          }

          以tsd名稱註冊,那麼Zeppelin前端在調用OpenTSDB查詢的時候,只須要指定後端引擎名稱%tsd便可。

       (7) 實現核心抽象方法,即Zeppelin前端提交過來的命令

          public InterpreterResult interpret(String cmd, InterpreterContext context)

          cmd: 即在Zeppelin交互式界面編寫的命令,不包含%tsd

          context: 當前插件的上下文,主要包含插件的配置信息,例如操做OpenTSDB的時候就須要從上下文中獲取OpenTSDB的IP和端口參數。

            

          該方法實現的核心思想就是:   解析命令=>實例化OpenTSDB操做客戶端=>操做OpenTSDB客戶端進行數據查詢=> 獲取返回結果 封裝成InterpreterResult對象

 

            貼核心代碼吧:

            

          Properties intpProperty = getProperty();
          for (Object k : intpProperty.keySet()) {
            String key = (String) k;
            String value = (String) intpProperty.get(key);

            if (key.equals("tsd.host") ) {
                host = value;
            } else if (key.equals("tsd.port")) {
                port = value;
            }
        }
         propertiesUtil.setOpentsdbIp(host);
          propertiesUtil.setPort(Integer.parseInt(port));

            Scanner scanner = new Scanner(items[1]);
            String start, end, metric, tagsStr;
            if (scanner.hasNext())
                start = scanner.next();
            else {
                return processHelp(InterpreterResult.Code.ERROR,
                        "1!Please enter the correct format!");
            }
            
            if (scanner.hasNext())
                end = scanner.next();
            else {
                return processHelp(InterpreterResult.Code.ERROR,
                        "2!Please enter the correct format!");
            }
            
            if (scanner.hasNext())
                metric = scanner.next();
            else {
                return processHelp(InterpreterResult.Code.ERROR,
                        "3!Please enter the correct format!");
            }
            
            if (scanner.hasNext())
                tagsStr = scanner.next();
            else {
                return processHelp(InterpreterResult.Code.ERROR,
                        "4!Please enter the correct format!");
            }
            

            // cpid=tudou,busiid=*,code=1
            String[] tagsStrs = tagsStr.split(",");
            Map<String, String> tags = new HashMap<String, String>();
            for (String s : tagsStrs) {
                int index = s.indexOf('=');
                if (index == -1)
                    continue;
                String tagK = s.substring(0, index);
                String tagV = s.substring(index + 1);
                tags.put(tagK, tagV);
            }

            QueryService queryService = new QueryService();
            try {
                List<QueryResponseEntity> responses = queryService
                        .queryByMetric(start, end, metric, tags, null, "sum");

                StringBuffer sb = new StringBuffer();
//                Map<String, String> alldps = new HashMap<String, String>();

                // build header
                Set<String> keys = new HashSet<String>();
                sb.append("time\t");
                for (QueryResponseEntity st : responses) {
                    sb.append(st.getTags().toString() + "\t");
                    keys.addAll(st.getDps().keySet());
                }
                sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1, "\n");

                List<String> keys2 = new ArrayList<String>(keys);
                Collections.sort(keys2);
                // build lines
                Iterator<String> it = keys2.iterator();
                
                long t;
                while (it.hasNext()) {
                    String key = it.next(); // 每一行的時間戳
                    
                    t = Long.parseLong(key);
                    sb.append(sdf.format(new Date(t*1000)) + "\t");
                    for (QueryResponseEntity st : responses) {
                        Map<String, String> dps = st.getDps();
                        String value = dps.get(key);
                        if (value != null) {
                            sb.append(value + "\t");
                        } else {
                            sb.append(" \t");
                        }
                    }
                    sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1,
                            "\n");
                }
                // sb.toString()

                return new InterpreterResult(InterpreterResult.Code.SUCCESS,
                        InterpreterResult.Type.TABLE, sb.toString());

    二 插件部署

            (1)  實現類的配置  

                    在ZEPPELIN_HOME/conf/zeppelin-site.xml

                  

          (2) 拷貝OpenTSDB插件包

               在ZEPPELIN_HOME/interpreter

                建立文件夾tsd,將全部依賴包拷貝到該文件夾下

                    

          (3) 重啓Zeppelin,在Zeppelin管理界面的 Interpreter中添加 TSD配置

                    

 

 

三  實現效果

   

相關文章
相關標籤/搜索