上圖是zeppelin的先後臺交互模型,zeppelin採用單獨的jvm來啓動interpreter進程,該Interpreter進程與zeppelinServer進程之間採用Thrift協議通訊,其中RemoteInterpreterProcess是Thrift-Client端,而相應的RemoteInterpreterServer是Thrift-Server端。前端
Paragraph的執行分紅「從前端UI提交ParagraphJob到其相關的Interpreter的Scheduler」和「Sheduler執行」2個部分,這2個部分是異步執行的。java
以上是從前臺請求執行指定的Note的指定的Paragraph開始,到該Paragraph提交到Scheduler之間的時序圖。這個執行邏輯是與語言無關的。任何語言寫的腳本(存儲在Paragraph之中)都是上述提交執行的過程。apache
下面是Scheduler執行該ParagraphJob的時序圖:框架
這裏有以下的幾點須要注意:dom
1) InterpreterFactory目前將全部的Interpreter都被實例化成了RemoteInterpreter,參見其createInterpretersForNote方法:異步
for (String intName : keys) { RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName); if (info.getClassName().equals(className) && info.getGroup().equals(groupName)) { Interpreter intp; if (option.isRemote()) {//根據option配置來建立 intp = createRemoteRepl(info.getPath(), key, info.getClassName(), properties, interpreterSetting.id()); } else { intp = createRepl(info.getPath(), info.getClassName(), properties); }
雖然InterpreterFactory在建立的時候作了判斷,可是其實全部的Option.remote屬性都爲true,參見InterpreterFactory初始化的方法loadFromFile():jvm
private void loadFromFile() throws IOException {
//省略了部分代碼
for (String k : info.interpreterSettings.keySet()) { InterpreterSetting setting = info.interpreterSettings.get(k); // Always use separate interpreter process // While we decided to turn this feature on always (without providing // enable/disable option on GUI). // previously created setting should turn this feature on here. setting.getOption().setRemote(true);//所有置爲true了 //省略了部分代碼 }
2) RemoteInterpreterProcess在reference相關的InterpreterGroup的時候,會使用apache common-exec框架建立新的進程。ide
public int reference(InterpreterGroup interpreterGroup) {
synchronized (referenceCount) { if (executor == null) { // start server process try { port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();//隨機可用端口 } catch (IOException e1) { throw new InterpreterException(e1); }
CommandLine cmdLine = CommandLine.parse(interpreterRunner); cmdLine.addArgument("-d", false); cmdLine.addArgument(interpreterDir, false); cmdLine.addArgument("-p", false); cmdLine.addArgument(Integer.toString(port), false); cmdLine.addArgument("-l", false); cmdLine.addArgument(localRepoDir, false); executor = new DefaultExecutor(); watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); executor.setWatchdog(watchdog); running = true; try { Map procEnv = EnvironmentUtils.getProcEnvironment(); procEnv.putAll(env); logger.info("Run interpreter process {}", cmdLine); executor.execute(cmdLine, procEnv, this);//啓動新的進程 } catch (IOException e) { running = false; throw new InterpreterException(e); }
其中interpreterRunner會指向bin/interpreter.sh腳本,該腳本的主要功能根據是否認義了SPARK_HOME環境變量(定位到Spark-submit腳本),構建classpath,而後以指定的port運行ZEPPELIN_SERVER指定的主類,該變量被定義爲: this
ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer
啓動進程的代碼以下:spa
if [[ -n "${SPARK_SUBMIT}" ]]; then ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} & else ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} & fi
而RemoteInterpreterServer實現了RemoteInterpreterService.Iface,是Thrift的Server端,RemoteInterpreterProcess是Thrift的client端。
3) Remote的含義是(至少目前是)「另一個進程「,與zeppelinServer所在的進程不是同一個,該進程並不是啓動在另一個獨立的的機器上,zeppelin目前還不支持集羣,全部的Interpreter jvm都啓動在localhost上。所以若是想調試Interpreter的方法是如何工做的,須要爲該Interpreter啓動獨立的調試進程,在zeppelinServer所在的調試進程中設置Interpreter.interpret(Stringst, InterpreterContext context)斷點想要命中是行不通的。