zeppelin自定義interpreter

Zeppelin的interpreter體系架構

這裏介紹一個基本的例子,實現一個本身的interpreter的方法和配置、使用步驟。html

經過可視化平臺,對MPP數據平臺結果進行可視化展現,從多個角度爲用戶展現數據結果。前端

基本實現思路:zeppelin支持自定義解析器,經過自定義解析器完成後臺與前端應用的交互。python

基於zeppelin實現數據可視化

zeppelin一種基於web的數據交互平臺,支持SQL、Scala等交互語言。經過解析器(interpreter)將用戶輸入轉換爲後臺服務命令,並將後臺結果輸出,在web頁面進行展現。目前interpreter支持的後端應用包括Apache SparkPython、JDBC、Markdown以及Shell。web

自定義解析器能夠按照如下步驟實現:shell

  • 根據後臺應用,繼承org.apache.zeppelin.interpreter並實現自定義interpreter類
  • 在zeppelin的interpreter文件夾下建立文件夾(以自定義解析器的名稱命名),並放入自定義解析器的jar包
  • 修改interpreter-setting.json文件,通常位於{ZEPPELIN_INTERPRETER_DIR}/{YOUR_OWN_INTERPRETER_DIR}/interpreter-setting.json,示例以下:
[
  {
    "group": "your-group",
    "name": "your-name",
    "className": "your.own.interpreter.class",
    "properties": {
      "properties1": {
        "envName": null,
        "propertyName": "property.1.name",
        "defaultValue": "propertyDefaultValue",
        "description": "Property description"
      },
      "properties2": {
        "envName": PROPERTIES_2,
        "propertyName": null,
        "defaultValue": "property2DefaultValue",
        "description": "Property 2 description"
      }, ...
    }
  },
  {
    ...
  } 
]
  • [
  • {"group": "your-group",
    • "name": "your-name",
      "className": "your.own.interpreter.class",
      "properties":
    • {
      • "properties1":
        • {
          "envName": null,
          "propertyName": "property.1.name",
          "defaultValue": "propertyDefaultValue",
          "description": "Property description"
          },
      • "properties2":
        • {
          "envName": PROPERTIES_2,
          "propertyName": null,
          "defaultValue": "property2DefaultValue",
          "description": "Property 2 description"
          }, ...
    • }
  • },
    {
    ...
    }
    ]
  • 能夠經過zeppelin-web/bower.json自定義語法高亮顯示
  • 將自定義解析器及其依賴庫放入相應目錄[ZEPPELIN_HOME]/interpreter/[INTERPRETER_NAME]/
  • 配置解析器
    • conf/zeppelin-site.xml增長自定義解析器的屬性

      <property>
      <name>zeppelin.interpreters</name>
      <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,com.me.MyNewInterpreter</value>
      </property>
    • 若使用自定義的配置,則在相應的配置中添加自定義解析器
    • 在界面的interpreter配置頁,+Create添加對自定義解析器
  • 在代碼中使用解析器,例如解析器名稱爲xdataparser

    %xdataparser
    select count(*) from test;

interpreter主要功能接口

/**
   * Opens interpreter. You may want to place your initialize routine here.
   * open() is called only once
   */
  @ZeppelinApi
  public abstract void open();

  /**
   * Closes interpreter. You may want to free your resources up here.
   * close() is called only once
   */
  @ZeppelinApi
  public abstract void close();

  /**
   * Run code and return result, in synchronous way.
   *
   * @param st statements to run
   * @param context
   * @return
   */
  @ZeppelinApi
  public abstract InterpreterResult interpret(String st, InterpreterContext context);

  /**
   * Optionally implement the canceling routine to abort interpret() method
   *
   * @param context
   */
  @ZeppelinApi
  public abstract void cancel(InterpreterContext context);

  /**
   * Dynamic form handling
   * see http://zeppelin.apache.org/docs/dynamicform.html
   *
   * @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}),
   *         FormType.NATIVE handles form in API
   */
  @ZeppelinApi
  public abstract FormType getFormType();

  /**
   * get interpret() method running process in percentage.
   *
   * @param context
   * @return number between 0-100
   */
  @ZeppelinApi
public abstract int getProgress(InterpreterContext context);
 /**
   * Get completion list based on cursor position.
   * By implementing this method, it enables auto-completion.
   *
   * @param buf statements
   * @param cursor cursor position in statements
   * @return list of possible completion. Return empty list if there're nothing to return.
   */
  @ZeppelinApi
  public List<InterpreterCompletion> completion(String buf, int cursor) {
    return null;
  }

  /**
   * Interpreter can implements it's own scheduler by overriding this method.
   * There're two default scheduler provided, FIFO, Parallel.
   * If your interpret() can handle concurrent request, use Parallel or use FIFO.
   *
   * You can get default scheduler by using
   * SchedulerFactory.singleton().createOrGetFIFOScheduler()
   * SchedulerFactory.singleton().createOrGetParallelScheduler()
   *
   *
   * @return return scheduler instance.
   *         This method can be called multiple times and have to return the same instance.
   *         Can not return null.
   */
  @ZeppelinApi
  public Scheduler getScheduler() {
    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
  }

  /**
   * Called when interpreter is no longer used.
   */
  @ZeppelinApi
  public void destroy() {
  }

 

interpreter其餘接口實現:

public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
  private InterpreterGroup interpreterGroup;
  private URL [] classloaderUrls;
  protected Properties property;

  @ZeppelinApi
  public Interpreter(Properties property) {
    logger.debug("Properties: {}", property);
    this.property = property;
  }

  public void setProperty(Properties property) {
    this.property = property;
  }

  @ZeppelinApi
  public Properties getProperty() {
    Properties p = new Properties();
    p.putAll(property);

    RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
        getClassName());
    if (null != registeredInterpreter) {
      Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties();
      for (String k : defaultProperties.keySet()) {
        if (!p.containsKey(k)) {
          String value = defaultProperties.get(k).getValue();
          if (value != null) {
            p.put(k, defaultProperties.get(k).getValue());
          }
        }
      }
    }

    return p;
  }

  @ZeppelinApi
  public String getProperty(String key) {
    logger.debug("key: {}, value: {}", key, getProperty().getProperty(key));

    return getProperty().getProperty(key);
  }


  public String getClassName() {
    return this.getClass().getName();
  }

  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
    this.interpreterGroup = interpreterGroup;
  }

  @ZeppelinApi
  public InterpreterGroup getInterpreterGroup() {
    return this.interpreterGroup;
  }

  public URL[] getClassloaderUrls() {
    return classloaderUrls;
  }

  public void setClassloaderUrls(URL[] classloaderUrls) {
    this.classloaderUrls = classloaderUrls;
  }

  @ZeppelinApi
  public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
    synchronized (interpreterGroup) {
      for (List<Interpreter> interpreters : interpreterGroup.values()) {
        boolean belongsToSameNoteGroup = false;
        Interpreter interpreterFound = null;
        for (Interpreter intp : interpreters) {
          if (intp.getClassName().equals(className)) {
            interpreterFound = intp;
          }

          Interpreter p = intp;
          while (p instanceof WrappedInterpreter) {
            p = ((WrappedInterpreter) p).getInnerInterpreter();
          }
          if (this == p) {
            belongsToSameNoteGroup = true;
          }
        }

        if (belongsToSameNoteGroup) {
          return interpreterFound;
        }
      }
    }
    return null;
  }


  /**
   * Type of interpreter.
   */
  public static enum FormType {
    NATIVE, SIMPLE, NONE
  }

  /**
   * Represent registered interpreter class
   */
  public static class RegisteredInterpreter {
    //@SerializedName("interpreterGroup")
    private String group;
    //@SerializedName("interpreterName")
    private String name;
    //@SerializedName("interpreterClassName")
    private String className;
    private boolean defaultInterpreter;
    private Map<String, InterpreterProperty> properties;
    private String path;

    public RegisteredInterpreter(String name, String group, String className,
        Map<String, InterpreterProperty> properties) {
      this(name, group, className, false, properties);
    }

    public RegisteredInterpreter(String name, String group, String className,
        boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
      super();
      this.name = name;
      this.group = group;
      this.className = className;
      this.defaultInterpreter = defaultInterpreter;
      this.properties = properties;
    }

    public String getName() {
      return name;
    }

    public String getGroup() {
      return group;
    }

    public String getClassName() {
      return className;
    }

    public boolean isDefaultInterpreter() {
      return defaultInterpreter;
    }

    public void setDefaultInterpreter(boolean defaultInterpreter) {
      this.defaultInterpreter = defaultInterpreter;
    }

    public Map<String, InterpreterProperty> getProperties() {
      return properties;
    }

    public void setPath(String path) {
      this.path = path;
    }

    public String getPath() {
      return path;
    }

    public String getInterpreterKey() {
      return getGroup() + "." + getName();
    }

  }

  /**
   * Type of Scheduling.
   */
  public static enum SchedulingMode {
    FIFO, PARALLEL
  }

  public static Map<String, RegisteredInterpreter> registeredInterpreters = Collections
      .synchronizedMap(new HashMap<String, RegisteredInterpreter>());

  public static void register(String name, String className) {
    register(name, name, className);
  }

  public static void register(String name, String group, String className) {
    register(name, group, className, false, new HashMap<String, InterpreterProperty>());
  }

  public static void register(String name, String group, String className,
      Map<String, InterpreterProperty> properties) {
    register(name, group, className, false, properties);
  }

  public static void register(String name, String group, String className,
      boolean defaultInterpreter) {
    register(name, group, className, defaultInterpreter,
        new HashMap<String, InterpreterProperty>());
  }

  @Deprecated
  public static void register(String name, String group, String className,
      boolean defaultInterpreter, Map<String, InterpreterProperty> properties) {
    logger.error("Static initialization is deprecated. You should change it to use " +
                     "interpreter-setting.json in your jar or " +
                     "interpreter/{interpreter}/interpreter-setting.json");
    register(new RegisteredInterpreter(name, group, className, defaultInterpreter, properties));
  }

  public static void register(RegisteredInterpreter registeredInterpreter) {
    // TODO(jongyoul): Error should occur when two same interpreter key with different settings
    String interpreterKey = registeredInterpreter.getInterpreterKey();
    if (!registeredInterpreters.containsKey(interpreterKey)) {
      registeredInterpreters.put(interpreterKey, registeredInterpreter);
    }
  }

  public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) {
    for (RegisteredInterpreter ri : registeredInterpreters.values()) {
      if (ri.getClassName().equals(className)) {
        return ri;
      }
    }
    return null;
}

官網參考:apache

https://zeppelin.apache.org/docs/0.6.0/development/writingzeppelininterpreter.html#what-is-apache-zeppelin-interpreterjson

相關文章
相關標籤/搜索