這裏介紹一個基本的例子,實現一個本身的interpreter的方法和配置、使用步驟。html
經過可視化平臺,對MPP數據平臺結果進行可視化展現,從多個角度爲用戶展現數據結果。前端
基本實現思路:zeppelin支持自定義解析器,經過自定義解析器完成後臺與前端應用的交互。python
zeppelin一種基於web的數據交互平臺,支持SQL、Scala等交互語言。經過解析器(interpreter)將用戶輸入轉換爲後臺服務命令,並將後臺結果輸出,在web頁面進行展現。目前interpreter支持的後端應用包括Apache Spark、Python、JDBC、Markdown以及Shell。web
自定義解析器能夠按照如下步驟實現:shell
{ZEPPELIN_INTERPRETER_DIR}/{YOUR_OWN_INTERPRETER_DIR}/interpreter-setting.json
,示例以下:[ { "group": "your-group", "name": "your-name", "className": "your.own.interpreter.class", "properties": { "properties1": { "envName": null, "propertyName": "property.1.name", "defaultValue": "propertyDefaultValue", "description": "Property description" }, "properties2": { "envName": PROPERTIES_2, "propertyName": null, "defaultValue": "property2DefaultValue", "description": "Property 2 description" }, ... } }, { ... } ]
[
{"group": "your-group",
"name": "your-name",
"className": "your.own.interpreter.class",
"properties":
{
"properties1":
{
"envName": null,
"propertyName": "property.1.name",
"defaultValue": "propertyDefaultValue",
"description": "Property description"
},
"properties2":
{
"envName": PROPERTIES_2,
"propertyName": null,
"defaultValue": "property2DefaultValue",
"description": "Property 2 description"
}, ...
}
},
{
...
}
]
[ZEPPELIN_HOME]/interpreter/[INTERPRETER_NAME]/
conf/zeppelin-site.xml
增長自定義解析器的屬性<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,com.me.MyNewInterpreter</value>
</property>
+Create
添加對自定義解析器%xdataparser
select count(*) from test;
/** * Opens interpreter. You may want to place your initialize routine here. * open() is called only once */ @ZeppelinApi public abstract void open(); /** * Closes interpreter. You may want to free your resources up here. * close() is called only once */ @ZeppelinApi public abstract void close(); /** * Run code and return result, in synchronous way. * * @param st statements to run * @param context * @return */ @ZeppelinApi public abstract InterpreterResult interpret(String st, InterpreterContext context); /** * Optionally implement the canceling routine to abort interpret() method * * @param context */ @ZeppelinApi public abstract void cancel(InterpreterContext context); /** * Dynamic form handling * see http://zeppelin.apache.org/docs/dynamicform.html * * @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}), * FormType.NATIVE handles form in API */ @ZeppelinApi public abstract FormType getFormType(); /** * get interpret() method running process in percentage. * * @param context * @return number between 0-100 */ @ZeppelinApi public abstract int getProgress(InterpreterContext context); /** * Get completion list based on cursor position. * By implementing this method, it enables auto-completion. * * @param buf statements * @param cursor cursor position in statements * @return list of possible completion. Return empty list if there're nothing to return. */ @ZeppelinApi public List<InterpreterCompletion> completion(String buf, int cursor) { return null; } /** * Interpreter can implements it's own scheduler by overriding this method. * There're two default scheduler provided, FIFO, Parallel. * If your interpret() can handle concurrent request, use Parallel or use FIFO. * * You can get default scheduler by using * SchedulerFactory.singleton().createOrGetFIFOScheduler() * SchedulerFactory.singleton().createOrGetParallelScheduler() * * * @return return scheduler instance. * This method can be called multiple times and have to return the same instance. * Can not return null. */ @ZeppelinApi public Scheduler getScheduler() { return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); } /** * Called when interpreter is no longer used. */ @ZeppelinApi public void destroy() { }
public static Logger logger = LoggerFactory.getLogger(Interpreter.class); private InterpreterGroup interpreterGroup; private URL [] classloaderUrls; protected Properties property; @ZeppelinApi public Interpreter(Properties property) { logger.debug("Properties: {}", property); this.property = property; } public void setProperty(Properties property) { this.property = property; } @ZeppelinApi public Properties getProperty() { Properties p = new Properties(); p.putAll(property); RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName( getClassName()); if (null != registeredInterpreter) { Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties(); for (String k : defaultProperties.keySet()) { if (!p.containsKey(k)) { String value = defaultProperties.get(k).getValue(); if (value != null) { p.put(k, defaultProperties.get(k).getValue()); } } } } return p; } @ZeppelinApi public String getProperty(String key) { logger.debug("key: {}, value: {}", key, getProperty().getProperty(key)); return getProperty().getProperty(key); } public String getClassName() { return this.getClass().getName(); } public void setInterpreterGroup(InterpreterGroup interpreterGroup) { this.interpreterGroup = interpreterGroup; } @ZeppelinApi public InterpreterGroup getInterpreterGroup() { return this.interpreterGroup; } public URL[] getClassloaderUrls() { return classloaderUrls; } public void setClassloaderUrls(URL[] classloaderUrls) { this.classloaderUrls = classloaderUrls; } @ZeppelinApi public Interpreter getInterpreterInTheSameSessionByClassName(String className) { synchronized (interpreterGroup) { for (List<Interpreter> interpreters : interpreterGroup.values()) { boolean belongsToSameNoteGroup = false; Interpreter interpreterFound = null; for (Interpreter intp : interpreters) { if (intp.getClassName().equals(className)) { interpreterFound = intp; } Interpreter p = intp; while (p instanceof WrappedInterpreter) { p = ((WrappedInterpreter) p).getInnerInterpreter(); } if (this == p) { belongsToSameNoteGroup = true; } } if (belongsToSameNoteGroup) { return interpreterFound; } } } return null; } /** * Type of interpreter. */ public static enum FormType { NATIVE, SIMPLE, NONE } /** * Represent registered interpreter class */ public static class RegisteredInterpreter { //@SerializedName("interpreterGroup") private String group; //@SerializedName("interpreterName") private String name; //@SerializedName("interpreterClassName") private String className; private boolean defaultInterpreter; private Map<String, InterpreterProperty> properties; private String path; public RegisteredInterpreter(String name, String group, String className, Map<String, InterpreterProperty> properties) { this(name, group, className, false, properties); } public RegisteredInterpreter(String name, String group, String className, boolean defaultInterpreter, Map<String, InterpreterProperty> properties) { super(); this.name = name; this.group = group; this.className = className; this.defaultInterpreter = defaultInterpreter; this.properties = properties; } public String getName() { return name; } public String getGroup() { return group; } public String getClassName() { return className; } public boolean isDefaultInterpreter() { return defaultInterpreter; } public void setDefaultInterpreter(boolean defaultInterpreter) { this.defaultInterpreter = defaultInterpreter; } public Map<String, InterpreterProperty> getProperties() { return properties; } public void setPath(String path) { this.path = path; } public String getPath() { return path; } public String getInterpreterKey() { return getGroup() + "." + getName(); } } /** * Type of Scheduling. */ public static enum SchedulingMode { FIFO, PARALLEL } public static Map<String, RegisteredInterpreter> registeredInterpreters = Collections .synchronizedMap(new HashMap<String, RegisteredInterpreter>()); public static void register(String name, String className) { register(name, name, className); } public static void register(String name, String group, String className) { register(name, group, className, false, new HashMap<String, InterpreterProperty>()); } public static void register(String name, String group, String className, Map<String, InterpreterProperty> properties) { register(name, group, className, false, properties); } public static void register(String name, String group, String className, boolean defaultInterpreter) { register(name, group, className, defaultInterpreter, new HashMap<String, InterpreterProperty>()); } @Deprecated public static void register(String name, String group, String className, boolean defaultInterpreter, Map<String, InterpreterProperty> properties) { logger.error("Static initialization is deprecated. You should change it to use " + "interpreter-setting.json in your jar or " + "interpreter/{interpreter}/interpreter-setting.json"); register(new RegisteredInterpreter(name, group, className, defaultInterpreter, properties)); } public static void register(RegisteredInterpreter registeredInterpreter) { // TODO(jongyoul): Error should occur when two same interpreter key with different settings String interpreterKey = registeredInterpreter.getInterpreterKey(); if (!registeredInterpreters.containsKey(interpreterKey)) { registeredInterpreters.put(interpreterKey, registeredInterpreter); } } public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) { for (RegisteredInterpreter ri : registeredInterpreters.values()) { if (ri.getClassName().equals(className)) { return ri; } } return null; }
官網參考:apache
https://zeppelin.apache.org/docs/0.6.0/development/writingzeppelininterpreter.html#what-is-apache-zeppelin-interpreterjson