Azkaban的Web Server源碼探究系列22: 一次性執行execute的提交準備

首先,觸發按鈕
web

會彈出一個頁面ajax

能夠看到有個exec id 1app

爲何會發生這一切?  經過抓包,咱們得知webapp

因此須要研究 ide

stop in azkaban.webapp.servlet.ExecutorServlet.doGetui

stop in azkaban.webapp.servlet.ExecutorServlet.ajaxAttemptExecuteFlowspa

stop in azkaban.webapp.servlet.ExecutorServlet.ajaxExecuteFlowserver

stop in azkaban.server.HttpRequestUtils.parseFlowOptionsip

runrem

======================================================================

首先,系統確定會本身構造一些默認的參數,可是看到上面這個截圖,咱們知道,咱們是能夠覆蓋的

ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);

看到了吧,就是解析http請求中的一些參數,而後覆蓋過去

下面看看哪些參數能夠解析出!

------------------------------------------------------

1)failureAction

if (hasParam(req, "failureAction")) {

      String option = getParam(req, "failureAction");

      if (option.equals("finishCurrent")) {

        execOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);

      } else if (option.equals("cancelImmediately")) {

        execOptions.setFailureAction(FailureAction.CANCEL_ALL);

      } else if (option.equals("finishPossible")) {

        execOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);

      }

    }

  public enum FailureAction {

    FINISH_CURRENTLY_RUNNING, CANCEL_ALL, FINISH_ALL_POSSIBLE

  }

能夠看到,這個是獲取失敗時的行爲,有三種:1)結束當前運行的節點 2)取消全部  3)結束全部可能的

固然具體到底啥意思,要等執行者的代碼看懂了才知道!

---

2)failureEmailsOverride

if (hasParam(req, "failureEmailsOverride")) {

      boolean override = getBooleanParam(req, "failureEmailsOverride", false);

      execOptions.setFailureEmailsOverridden(override);

    }

---

3)successEmailsOverride

if (hasParam(req, "successEmailsOverride")) {

      boolean override = getBooleanParam(req, "successEmailsOverride", false);

      execOptions.setSuccessEmailsOverridden(override);

    }

---

4)failureEmails

 if (hasParam(req, "failureEmails")) {

      String emails = getParam(req, "failureEmails");

      if (!emails.isEmpty()) {

        String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");

        execOptions.setFailureEmails(Arrays.asList(emailSplit));

      }

    }

---

5)successEmails

if (hasParam(req, "successEmails")) {

      String emails = getParam(req, "successEmails");

      if (!emails.isEmpty()) {

        String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");

        execOptions.setSuccessEmails(Arrays.asList(emailSplit));

      }

    }

---

6)notifyFailureFirst

if (hasParam(req, "notifyFailureFirst")) {

      execOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req,

          "notifyFailureFirst")));

    }

7)notifyFailureLast

    if (hasParam(req, "notifyFailureLast")) {

      execOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req,

          "notifyFailureLast")));


    }

===

8)

String concurrentOption = "skip";

    if (hasParam(req, "concurrentOption")) {

      concurrentOption = getParam(req, "concurrentOption");

      execOptions.setConcurrentOption(concurrentOption);

      if (concurrentOption.equals("pipeline")) {

        int pipelineLevel = getIntParam(req, "pipelineLevel");

        execOptions.setPipelineLevel(pipelineLevel);

      } else if (concurrentOption.equals("queue")) {

        // Not yet implemented

        int queueLevel = getIntParam(req, "queueLevel", 1);

        execOptions.setPipelineLevel(queueLevel);

      }

    }

---

 String mailCreator = DefaultMailCreator.DEFAULT_MAIL_CREATOR;

    if (hasParam(req, "mailCreator")) {

      mailCreator = getParam(req, "mailCreator");

      execOptions.setMailCreator(mailCreator);//郵件發送者

    }

===

Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");

    execOptions.addAllFlowParameters(flowParamGroup);

這個應該比較重要!

===最後一個就是能夠取消job

if (hasParam(req, "disabled")) {

      String disabled = getParam(req, "disabled");

      if (!disabled.isEmpty()) {

        @SuppressWarnings("unchecked")

        List<Object> disabledList =

            (List<Object>) JSONUtils.parseJSONFromStringQuiet(disabled);

        execOptions.setDisabledJobs(disabledList);

      }

    }

===

ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);

  //優先採用從HTTP中解析出來的options

    exflow.setExecutionOptions(options);

  //若是能夠覆蓋,則從job中覆蓋

    if (!options.isFailureEmailsOverridden()) {

      options.setFailureEmails(flow.getFailureEmails());

    }

  //若是能夠覆蓋,則從job中覆蓋

    if (!options.isSuccessEmailsOverridden()) {

      options.setSuccessEmails(flow.getSuccessEmails());

    }

  //強制直接從flow中覆蓋

    options.setMailCreator(flow.getMailCreator());

==============================================================

/**

   * <pre>

   * Remove following flow param if submitting user is not an Azkaban admin

   * FLOW_PRIORITY

   * USE_EXECUTOR

   * @param userManager

   * @param options

   * @param user

   * </pre>

   */

  public static void filterAdminOnlyFlowParams(UserManager userManager,

    ExecutionOptions options, User user)  throws ExecutorManagerException {

//校驗合法性

    if (options == null || options.getFlowParameters() == null)

      return;

  //獲取參數

    Map<String, String> params = options.getFlowParameters();

    // is azkaban Admin

    //

    if (!hasPermission(userManager, user, Type.ADMIN)) {//不是ADMIN還敢指派ADMIN命令???想逆天啊?

      params.remove(ExecutionOptions.FLOW_PRIORITY);

      params.remove(ExecutionOptions.USE_EXECUTOR);

    } else {//不然的話,若是參數有必須爲數字類型

      validateIntegerParam(params, ExecutionOptions.FLOW_PRIORITY);

      validateIntegerParam(params, ExecutionOptions.USE_EXECUTOR);

    }

  }


萬事俱備,開始提交

String message =  executorManager.submitExecutableFlow(exflow, user.getUserId());

相關文章
相關標籤/搜索