springboot集成調用Azkabanjava
1、 說明node
1.Azkaban是由Linkedin公司推出的一個批量工做流任務調度器,主要用於在一個工做流內以一個特定的順序運行一組工做和流程,它的配置是經過簡單的key:value對的方式,經過配置中的dependencies 來設置依賴關係,這個依賴關係必須是無環的,不然會被視爲無效的工做流。Azkaban使用job配置文件創建任務之間的依賴關係,並提供一個易於使用的web用戶界面維護和跟蹤你的工做流。web
2.springboot版本:2.0.5 azkaban版本:3.59.0ajax
2、maven依賴spring
<dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.9</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.7</version> </dependency>
3、代碼apache
1.azkaban配置文件(注意須要在啓動類@PropertySource標籤中引入讀取該配置文件)api
monitor.azkaban.username=azkaban
monitor.azkaban.password=azkaban
monitor.azkaban.url=http://192.168.11.12:8081
monitor.azkaban.connectTimeout=60000
monitor.azkaban.readTimeout=120000
2.azkaban配置實體類springboot
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.web.client.RestTemplate; @Configuration public class AzkabanConfig { @Value("${monitor.azkaban.username}") private String azkUsername; @Value("${monitor.azkaban.password}") private String azkPassword; @Value("${monitor.azkaban.url}") private String azkUrl; @Value("${monitor.azkaban.connectTimeout}") private int connectTimeout; @Value("${monitor.azkaban.readTimeout}") private int readTimeout; @Bean public RestTemplate getRestTemplate() { SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); requestFactory.setConnectTimeout(connectTimeout); requestFactory.setReadTimeout(readTimeout); RestTemplate restTemplate = new RestTemplate(requestFactory); return restTemplate; } public String getAzkUsername() { return azkUsername; } public void setAzkUsername(String azkUsername) { this.azkUsername = azkUsername; } public String getAzkPassword() { return azkPassword; } public void setAzkPassword(String azkPassword) { this.azkPassword = azkPassword; } public String getAzkUrl() { return azkUrl; } public void setAzkUrl(String azkUrl) { this.azkUrl = azkUrl; } }
3.HttpClient配置SSL繞過https證書 session
import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; public class SSLUtil { private static final String PROTOCOL = "SSL"; private static final TrustManager[] UNQUESTIONING_TRUST_MANAGER = new TrustManager[] { new X509TrustManager() { public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } public void checkClientTrusted(X509Certificate[] certs, String authType) { } public void checkServerTrusted(X509Certificate[] certs, String authType) { } } }; private SSLUtil() { } public static void turnOffSslChecking() throws NoSuchAlgorithmException, KeyManagementException { final SSLContext sc = SSLContext.getInstance(PROTOCOL); sc.init(null, UNQUESTIONING_TRUST_MANAGER, null); HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); } public static void turnOnSslChecking() throws KeyManagementException, NoSuchAlgorithmException { SSLContext.getInstance(PROTOCOL).init(null, null, null); } }
4.常量類app
public interface SysContants { /**azkaban成功狀態**/ String AZK_SUCCESS = "success"; }
5.根據azkaban返回數據定製實體類(註釋少抱歉)
import java.util.List; public class ExecNode { private String nestedId; private List<String> in; private String status; private String id; private String type; private Long updateTime; private Long startTime; private Long endTime; private Long attempt; public String getNestedId() { return nestedId; } public void setNestedId(String nestedId) { this.nestedId = nestedId; } public List<String> getIn() { return in; } public void setIn(List<String> in) { this.in = in; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getType() { return type; } public void setType(String type) { this.type = type; } public Long getUpdateTime() { return updateTime; } public void setUpdateTime(Long updateTime) { this.updateTime = updateTime; } public Long getStartTime() { return startTime; } public void setStartTime(Long startTime) { this.startTime = startTime; } public Long getEndTime() { return endTime; } public void setEndTime(Long endTime) { this.endTime = endTime; } public Long getAttempt() { return attempt; } public void setAttempt(Long attempt) { this.attempt = attempt; } }
import java.util.List; public class ExecNodeBean { private String nestedId; private List<String> dependencies; private String status; private String jobId; private String type; private String updateTime; private String startTime; private String endTime; private Long attempt; private String logs; private Long elapsed; public String getNestedId() { return nestedId; } public void setNestedId(String nestedId) { this.nestedId = nestedId; } public List<String> getDependencies() { return dependencies; } public void setDependencies(List<String> dependencies) { this.dependencies = dependencies; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getJobId() { return jobId; } public void setJobId(String jobId) { this.jobId = jobId; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getUpdateTime() { return updateTime; } public void setUpdateTime(String updateTime) { this.updateTime = updateTime; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public Long getAttempt() { return attempt; } public void setAttempt(Long attempt) { this.attempt = attempt; } public String getLogs() { return logs; } public void setLogs(String logs) { this.logs = logs; } public Long getElapsed() { return elapsed; } public void setElapsed(Long elapsed) { this.elapsed = elapsed; } }
public class Execution { private String submitUser; private String flowId; private String status; private Long submitTime; private Long startTime; private Long endTime; private Long projectId; private Long execId; public String getSubmitUser() { return submitUser; } public void setSubmitUser(String submitUser) { this.submitUser = submitUser; } public String getFlowId() { return flowId; } public void setFlowId(String flowId) { this.flowId = flowId; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public Long getSubmitTime() { return submitTime; } public void setSubmitTime(Long submitTime) { this.submitTime = submitTime; } public Long getStartTime() { return startTime; } public void setStartTime(Long startTime) { this.startTime = startTime; } public Long getEndTime() { return endTime; } public void setEndTime(Long endTime) { this.endTime = endTime; } public Long getProjectId() { return projectId; } public void setProjectId(Long projectId) { this.projectId = projectId; } public Long getExecId() { return execId; } public void setExecId(Long execId) { this.execId = execId; }
import java.util.List; public class ExecutionInfo { private String project; private String type; private Long updateTime; private Long attempt; private Long execid; private Long submitTime; private Long startTime; private Long endTime; private Long projectId; private String nestedId; private String submitUser; private String id; private String flowId; private String flow; private String status; private List<ExecNode> nodes; public String getProject() { return project; } public void setProject(String project) { this.project = project; } public String getType() { return type; } public void setType(String type) { this.type = type; } public Long getUpdateTime() { return updateTime; } public void setUpdateTime(Long updateTime) { this.updateTime = updateTime; } public Long getAttempt() { return attempt; } public void setAttempt(Long attempt) { this.attempt = attempt; } public Long getExecid() { return execid; } public void setExecid(Long execid) { this.execid = execid; } public Long getSubmitTime() { return submitTime; } public void setSubmitTime(Long submitTime) { this.submitTime = submitTime; } public Long getStartTime() { return startTime; } public void setStartTime(Long startTime) { this.startTime = startTime; } public Long getEndTime() { return endTime; } public void setEndTime(Long endTime) { this.endTime = endTime; } public Long getProjectId() { return projectId; } public void setProjectId(Long projectId) { this.projectId = projectId; } public String getNestedId() { return nestedId; } public void setNestedId(String nestedId) { this.nestedId = nestedId; } public String getSubmitUser() { return submitUser; } public void setSubmitUser(String submitUser) { this.submitUser = submitUser; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getFlowId() { return flowId; } public void setFlowId(String flowId) { this.flowId = flowId; } public String getFlow() { return flow; } public void setFlow(String flow) { this.flow = flow; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public List<ExecNode> getNodes() { return nodes; } public void setNodes(List<ExecNode> nodes) { this.nodes = nodes; } }
import java.util.List; public class ExecutionInfoBean{ private String project; private String type; private String updateTime; private Long attempt; private Long execid; private String submitTime; private String startTime; private String endTime; private Long projectId; private String nestedId; private String submitUser; private String jobId; private String flowId; private String flow; private String status; private String flowLog; private Long elapsed; private List<ExecNodeBean> nodes; public String getProject() { return project; } public void setProject(String project) { this.project = project; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getUpdateTime() { return updateTime; } public void setUpdateTime(String updateTime) { this.updateTime = updateTime; } public Long getAttempt() { return attempt; } public void setAttempt(Long attempt) { this.attempt = attempt; } public Long getExecid() { return execid; } public void setExecid(Long execid) { this.execid = execid; } public String getSubmitTime() { return submitTime; } public void setSubmitTime(String submitTime) { this.submitTime = submitTime; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public Long getProjectId() { return projectId; } public void setProjectId(Long projectId) { this.projectId = projectId; } public String getNestedId() { return nestedId; } public void setNestedId(String nestedId) { this.nestedId = nestedId; } public String getSubmitUser() { return submitUser; } public void setSubmitUser(String submitUser) { this.submitUser = submitUser; } public String getJobId() { return jobId; } public void setJobId(String jobId) { this.jobId = jobId; } public String getFlowId() { return flowId; } public void setFlowId(String flowId) { this.flowId = flowId; } public String getFlow() { return flow; } public void setFlow(String flow) { this.flow = flow; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getFlowLog() { return flowLog; } public void setFlowLog(String flowLog) { this.flowLog = flowLog; } public Long getElapsed() { return elapsed; } public void setElapsed(Long elapsed) { this.elapsed = elapsed; } public List<ExecNodeBean> getNodes() { return nodes; } public void setNodes(List<ExecNodeBean> nodes) { this.nodes = nodes; } }
import java.util.List; public class FlowExecution { private String project; private String flow; private Long total; private Long length; private Long from; private Long projectId; private List<Execution> executions; public String getProject() { return project; } public void setProject(String project) { this.project = project; } public String getFlow() { return flow; } public void setFlow(String flow) { this.flow = flow; } public Long getTotal() { return total; } public void setTotal(Long total) { this.total = total; } public Long getLength() { return length; } public void setLength(Long length) { this.length = length; } public Long getFrom() { return from; } public void setFrom(Long from) { this.from = from; } public Long getProjectId() { return projectId; } public void setProjectId(Long projectId) { this.projectId = projectId; } public List<Execution> getExecutions() { return executions; } public void setExecutions(List<Execution> executions) { this.executions = executions; } }
public class GovernTaskRecordBean extends PageEntity { private static final long serialVersionUID = 1L; private String createTime; private String status; private String owner; private String startTime; private String endTime; private String flowId; private Long projectId; private Long execId; private String projectPath; private Long elapsed; public String getCreateTime() { return createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getOwner() { return owner; } public void setOwner(String owner) { this.owner = owner; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public String getFlowId() { return flowId; } public void setFlowId(String flowId) { this.flowId = flowId; } public Long getProjectId() { return projectId; } public void setProjectId(Long projectId) { this.projectId = projectId; } public Long getExecId() { return execId; } public void setExecId(Long execId) { this.execId = execId; } public String getProjectPath() { return projectPath; } public void setProjectPath(String projectPath) { this.projectPath = projectPath; } public Long getElapsed() { return elapsed; } public void setElapsed(Long elapsed) { this.elapsed = elapsed; } }
6.調度執行狀態枚舉類
public enum ScheduleStatus { READY("READY","就緒"),SUCCEEDED("SUCCEEDED","成功"),KILLING("KILLING","中止中"),KILLED("KILLED","已中斷"),FAILED("FAILED","失敗"), SKIPPED("SKIPPED","跳過"),DISABLED("DISABLED","停用"),QUEUED("QUEUED","等待中"),CANCELLED("CANCELLED","取消執行"), RUNNING("RUNNING","運行中"),PAUSED("PAUSED","暫停"); /** * 狀態編碼 */ private String code; /** * 狀態描述 */ private String desc; public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } ScheduleStatus(String code, String desc) { this.code = code; this.desc = desc; } }
7.AzkabanService接口類
import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestTemplate; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.apache.http.HttpStatus; /** * azkaban接口 * @author hao * */ @Service public class AzkabanService { private static final Logger logger = LoggerFactory.getLogger(AzkabanService.class); private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8"; private static final String X_REQUESTED_WITH = "XMLHttpRequest"; private static final DateTimeFormatter formatterTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private RestTemplate restTemplate; @Autowired private AzkabanConfig azkabanConfig; /** * Azkaban登陸接口,返回sessionId * @return * @throws Exception */ public String login() throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>(); linkedMultiValueMap.add("action", "login"); linkedMultiValueMap.add("username", azkabanConfig.getAzkUsername()); linkedMultiValueMap.add("password", azkabanConfig.getAzkPassword()); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs); String result = restTemplate.postForObject(azkabanConfig.getAzkUrl(), httpEntity, String.class); if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) { logger.error("Azkaban登陸失敗!返回錯誤信息:"+result); throw new Exception("Azkaban登陸失敗!"); } return new Gson().fromJson(result, JsonObject.class).get("session.id").getAsString(); } /** * Azkaban上傳zip文件 * @param projectName 項目名稱 * @param file 文件 * @return projectId編號 * @throws Exception */ public String uploadZip(String projectName, File file) throws Exception { SSLUtil.turnOffSslChecking(); FileSystemResource resource = new FileSystemResource(file); LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>(); linkedMultiValueMap.add("session.id", login()); linkedMultiValueMap.add("ajax", "upload"); linkedMultiValueMap.add("project", projectName); linkedMultiValueMap.add("file", resource); String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", linkedMultiValueMap, String.class); if (StringUtils.isEmpty(new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString())) { logger.error("上傳文件至Azkaban失敗:",projectName,file.getPath()); logger.error("Azkaban上傳文件失敗!返回錯誤信息:"+result); return null; } return new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString(); } /** * Azkaban建立project * @param projectName,project名稱 * @param description,project描述 * @return 是否成功 * @throws Exception */ public boolean createProject(String projectName, String description) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>(); linkedMultiValueMap.add("session.id", login()); linkedMultiValueMap.add("action", "create"); linkedMultiValueMap.add("name", projectName); linkedMultiValueMap.add("description", description); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs); logger.info("Azkaban請求信息:" + httpEntity.toString()); String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", httpEntity, String.class); logger.info("Azkaban返回建立Project信息:" + result); // 建立成功和已存在,都表示建立成功 if (!SysContants.AZK_SUCCESS .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) { if (!"Project already exists." .equals(new Gson().fromJson(result, JsonObject.class).get("message").getAsString())) { logger.error("建立Azkaban Project失敗:",projectName); logger.error("建立Azkaban Project失敗!返回錯誤信息:"+result); return false; } } return true; } /** * Azkaban刪除project * @param projectName 項目名稱 * @throws Exception */ public void deleteProject(String projectName) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/manager?session.id={id}&delete=true&project={project}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("刪除Azkaban Project失敗!返回錯誤信息:"+exchange); throw new Exception("刪除Azkaban Project失敗"); } } /** * 獲取一個項目的全部流flows * @param projectName 項目名稱 * @return List<String> 項目的全部流 * @throws Exception */ public List<String> fetchFlowsProject(String projectName) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("Azkaban獲取一個項目的全部流信息失敗:" + projectName); logger.error("Azkaban獲取一個項目的全部流信息失敗:!返回錯誤信息:"+exchange); return null; } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("flows"); if (obj == null) { logger.error("Azkaban獲取一個項目的全部流信息失敗:{}:{}", projectName); return null; } List<String> flows = new ArrayList<String>(); for(JsonElement jobj:obj.getAsJsonArray()) { flows.add(jobj.getAsJsonObject().get("flowId").getAsString()); } return flows; } /** * 獲取一個流的全部做業 * @param projectName 項目名 * @param flowId 流id * @return * @throws Exception */ public String fetchJobsFlow(String projectName, String flowId) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); map.put("flow", flowId); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&flow={flow}&ajax=fetchflowgraph", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null) { logger.error("Azkaban獲取一個項目的全部流信息失敗:" + projectName); return null; } logger.debug("Azkaban獲取一個項目的全部流信息:" + exchange); if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) { throw new Exception("Azkaban獲取一個項目的全部流信息失敗"); } return exchange.toString(); } /** * Flow 獲取執行的project 列表 * azkaban api 獲取流的執行 * @param projectName 項目名 * @param flowId 流id * @param start * @param length * @return * @throws Exception */ public FlowExecution fetchFlowExecutions(String projectName, String flowId, String start,String length) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); map.put("flow", flowId); map.put("start", String.valueOf(start)); map.put("length", String.valueOf(length)); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/manager?session.id={id}&ajax=fetchFlowExecutions&project={project}&flow={flow}&start={start}&length={length}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("Azkaban獲取一個項目運行記錄信息失敗:{}:{}", projectName, flowId); return null; } return new Gson().fromJson(exchange.getBody(), FlowExecution.class); } /** * Flow 獲取正在執行的流id * @param projectName * @param flowId * @return * @throws Exception */ public String getRunning(String projectName, String flowId) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); map.put("flow", flowId); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=getRunning&project={project}&flow={flow}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); return exchange.getBody(); } /** * Execute a Flow 執行一個流 還有不少其餘參數 具體參考azkabanConfig.getAzkUrl() * * @throws KeyManagementException * @throws NoSuchAlgorithmException */ public String executeFlow(String projectName, String flowId) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); map.put("flow", flowId); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=executeFlow&project={project}&flow={flow}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("執行一個流請求失敗:{}:{}", projectName, flowId); return null; } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("execid"); if (obj == null) { logger.error("執行一個流失敗:{}:{}", projectName, flowId); return null; } return obj.getAsString(); } /** * Cancel a Flow Execution 取消流程執行 * azkaban api 取消流程執行 * @throws KeyManagementException * @throws NoSuchAlgorithmException */ public void cancelEXEaFlow(String projectName,String start,String size) throws Exception { int flag=0; List<String> flows = fetchFlowsProject(projectName);//獲取全部流 for (String flow : flows) { FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size); if (fe == null) { continue; } List<Execution> executions = fe.getExecutions();//獲取執行id for (Execution execution : executions) { if(null!=execution&&null!=execution.getExecId()&&"RUNNING".equals(execution.getStatus())){//運行中的 SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("execid", String.valueOf(execution.getExecId())); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=cancelFlow&execid={execid}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); System.out.println(exchange.getBody()); if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("取消執行調度失敗,請求azkaban接口異常:"+exchange); throw new Exception("取消執行調度失敗,請求azkaban接口異常:"+exchange); } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("error"); if (obj != null) { throw new Exception("取消執行調度失敗,請刷新列表獲取最新調度狀態!"); } flag++; } } } if(0==flag){ throw new Exception("該調度不是運行中狀態,請刷新列表獲取最新狀態!"); } } /** * 根據時間 建立調度任務 * * @param projectId * @param projectName * @param flow * @param flowName * @param recurring,是否循環,on循環 * @param period,循環頻率:M:Months,w:Weeks,d:Days,h:Hours,m:Minutes,s:Seconds;如60s,支持分鐘的倍數 * @param date,開始時間 * @return 返回scheduleId * @throws Exception */ public String scheduleEXEaFlow(String projectId, String projectName, String flow, String flowName, String recurring, String period, Date date) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>(); linkedMultiValueMap.add("session.id", login()); linkedMultiValueMap.add("ajax", "scheduleFlow"); linkedMultiValueMap.add("projectName", projectName); linkedMultiValueMap.add("projectId", projectId); linkedMultiValueMap.add("flow", flow); linkedMultiValueMap.add("flowName", flowName); linkedMultiValueMap.add("is_recurring", recurring); linkedMultiValueMap.add("period", period); scheduleTimeInit(linkedMultiValueMap, date); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs); String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class); logger.info("Azkaban返回根據時間建立定時任務信息:" + result); if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString()) || new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsInt() < 0) { logger.error("Azkaban返回根據時間建立定時任務信息失敗:!返回錯誤信息:"+result); throw new Exception("根據時間建立定時任務失敗"); } return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString(); } /** * 根據cron表達式 建立調度任務 * @param projectName 項目名稱 * @param cron cron表達式 * @param flow 流 * @param flowName 流名稱 * @return 返回scheduleId * @throws Exception */ public String scheduleByCronEXEaFlow(String projectName, String cron, String flowName) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>(); linkedMultiValueMap.add("session.id", login()); linkedMultiValueMap.add("ajax", "scheduleCronFlow"); linkedMultiValueMap.add("projectName", projectName); linkedMultiValueMap.add("cronExpression", cron); linkedMultiValueMap.add("flow", flowName); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs); String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class); if (!SysContants.AZK_SUCCESS .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) { logger.error("Azkaban返回根據時間建立定時任務信息失敗:!返回錯誤信息:"+result); return null; } return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString(); } /** * 根據scheduleId取消一個流的調度 * 暫停一次執行,輸入爲exec id。若是這個執行不是處於running狀態,會返回錯誤信息。 * @param scheduleId * @throws Exception */ public boolean unscheduleFlow(String scheduleId) { try { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>(); linkedMultiValueMap.add("session.id", login()); linkedMultiValueMap.add("action", "removeSched"); linkedMultiValueMap.add("scheduleId", scheduleId); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs); String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class); if (StringUtils.isBlank(result)) { return false; } if (!SysContants.AZK_SUCCESS .equals(String.valueOf(new Gson().fromJson(result, JsonObject.class).get("status")))) { logger.error("取消流調度信息失敗:{}", scheduleId); logger.error("Azkaban取消流調度信息失敗失敗:!返回錯誤信息:"+result); return false; } } catch (Exception e) { logger.error("取消流調度信息失敗:{}", scheduleId); return false; } return true; } /** * 下載Azkaban壓縮文件 * @param projectName 項目名稱 * @param zipPath 文件路徑 * @throws Exception 文件異常 */ public void downLoadZip(String projectName, String zipPath) throws Exception { OutputStream output = null; BufferedOutputStream bufferedOutput = null; try { URL url = new URL(azkabanConfig.getAzkUrl() + "/manager?session.id=" + login() + "&download=true&project=" + projectName); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setConnectTimeout(3 * 1000); InputStream inputStream = conn.getInputStream(); File file = new File(zipPath); output = new FileOutputStream(file); bufferedOutput = new BufferedOutputStream(output); bufferedOutput.write(IOUtils.toByteArray(inputStream)); } catch (Exception e) { logger.info("下載Azkaban壓縮文件異常:" + e.getMessage(), e); } finally { if (bufferedOutput != null) { try { bufferedOutput.flush(); bufferedOutput.close(); } catch (IOException e) { logger.info("關閉流異常:" + e.getMessage(), e); } } if (output != null) { try { output.close(); } catch (IOException e) { logger.info("關閉流異常:" + e.getMessage(), e); } } } } /** * 獲取一個調度器job的信息 根據project的id 和 flowId * @param projectId 項目名稱 * @param flowId 流編號 * @return job的信息 */ public String fetchSchedule(String projectId, String flowId) { try { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("projectId", projectId); map.put("flowId", flowId); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/schedule?session.id={id}&ajax=fetchSchedule&projectId={projectId}&flowId={flowId}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("獲取一個調度器job的信息失敗:{}:{}", projectId, flowId); return null; } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("schedule"); if (obj == null) { logger.error("獲取一個調度器job的信息失敗:{}:{}", projectId, flowId); return null; } return obj.getAsJsonObject().get("scheduleId").getAsString(); } catch (Exception e) { logger.error("獲取一個調度器job的信息失敗:{}:{}", projectId, flowId); } return null; } /** * SLA 設置調度任務 執行的時候 或者執行成功失敗等等的規則匹配 發郵件或者... * @return * @throws Exception */ public String setSla() throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>(); linkedMultiValueMap.add("session.id", "ffad7355-4427-4770-9c14-3d19736fa73a"); linkedMultiValueMap.add("ajax", "setSla"); linkedMultiValueMap.add("scheduleId", "6"); linkedMultiValueMap.add("slaEmails", "771177@qq.com"); linkedMultiValueMap.add("settings[0]", "begin,SUCCESS,5:00,true,false"); linkedMultiValueMap.add("settings[1]", "exe,SUCCESS,5:00,true,false"); linkedMultiValueMap.add("settings[2]", "end,SUCCESS,5:00,true,false"); // linkedMultiValueMap.add("settings[3]", // "xxx,SUCCESS,5:00,true,false"); HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs); String postForObject = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class); return postForObject; } /** * SLA 獲取調度的規則配置 * @throws Exception */ public void slaInfo() throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7"); map.put("scheduleId", "6"); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/schedule?session.id={id}&ajax=slaInfo&scheduleId={scheduleId}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); System.out.println(exchange.getBody()); } /** * Execution 暫停一個執行流 * @throws Exception */ public void pauseFlow() throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7"); map.put("execid", "12"); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=pauseFlow&execid={execid}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); System.out.println(exchange.getBody()); } /** * Flow Execution 從新執行一個執行流 * @throws Exception */ public void resumeFlow() throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7"); map.put("execid", "11"); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=resumeFlow&execid={execid}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); System.out.println(exchange.getBody()); } /** * 獲取一個執行流的詳細信息 這個流的每一個節點的信息 成功或者失敗等等 * @param execid 執行id * @return * @throws Exception */ public String fetchexecflow(String execid) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("execid", execid); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflow&execid={execid}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("獲取一個執行流的詳細信息失敗:" + execid); return null; } return exchange.getBody(); } /** * 獲取一個執行流的日誌 * @param execid 執行編號 * @param jobId job編號 * @param offset * @param length * @return * @throws Exception */ public String fetchExecJobLogs(String execid,String jobId,String offset,String length) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("execid", execid); map.put("jobId", jobId); map.put("offset", offset); map.put("length", length); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecJobLogs&execid={execid}&jobId={jobId}&offset={offset}&length={length}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("獲取一個執行流的詳細信息失敗:{}:{}", execid,jobId); return null; } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data"); if (obj == null) { logger.error("獲取一個執行流的詳細信息爲空:{}:{}", execid,jobId); return null; } return obj.getAsString(); } /** * 獲取一個執行流的日誌概要 * @param execid * @param offset * @param length * @return * @throws Exception */ public String fetchExecFlowLogs(String execid,String offset,String length) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("execid", execid); map.put("offset", offset); map.put("length", length); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecFlowLogs&execid={execid}&offset={offset}&length={length}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("獲取一個執行流的日誌概要信息失敗:{}", execid); return null; } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data"); if (obj == null) { logger.error("獲取一個執行流的日誌概要信息爲空:{}:{}", execid); return null; } return obj.getAsString(); } /** * 獲取執行流的信息狀態 * @throws Exception */ public void fetchexecflowupdate() throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8"); hs.add("X-Requested-With", "XMLHttpRequest"); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7"); map.put("execid", "11"); map.put("lastUpdateTime", "-1"); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflowupdate&execid={execid}&lastUpdateTime={lastUpdateTime}", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); System.out.println(exchange.getBody()); } private void scheduleTimeInit(LinkedMultiValueMap<String, String> linkedMultiValueMap, Date date) { Calendar calendar = Calendar.getInstance(); calendar.setTime(date); Integer year = calendar.get(Calendar.YEAR); Integer month = calendar.get(Calendar.MONTH) + 1; Integer day = calendar.get(Calendar.DATE); Integer hour = calendar.get(Calendar.HOUR_OF_DAY); Integer minute = calendar.get(Calendar.MINUTE); linkedMultiValueMap.add("scheduleTime", hour + "," + minute + (hour > 11 ? ",pm,PDT" : ",am,EDT")); linkedMultiValueMap.add("scheduleDate", month + "/" + day + "/" + year); } /** * 獲取azkaban調度 調度概況 (分頁查詢) * @param projectName 工程名稱 * @param start 分頁參數 * @param size 分頁參數 * @return * @throws Exception * @throws Exception */ public List<GovernTaskRecordBean> getAzkabanExcutions(String projectName,String start,String size) throws Exception { List<GovernTaskRecordBean> excInfoList = Lists.newArrayList(); List<String> flows = fetchFlowsProject(projectName); for (String flow : flows) { FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size); if (fe == null) { continue; } List<Execution> executions = fe.getExecutions(); for (Execution execution : executions) { GovernTaskRecordBean eInfo = new GovernTaskRecordBean(); eInfo.setCreateTime(new DateTime(execution.getSubmitTime()).toString(formatterTime)); if (execution.getEndTime() > 0) { eInfo.setEndTime(new DateTime(execution.getEndTime()).toString(formatterTime)); eInfo.setElapsed((execution.getEndTime() - execution.getStartTime()) / 1000); } else { eInfo.setElapsed((DateTime.now().getMillis() - execution.getStartTime()) / 1000); } eInfo.setExecId(execution.getExecId()); eInfo.setFlowId(execution.getFlowId()); eInfo.setOwner(execution.getSubmitUser()); eInfo.setProjectId(execution.getProjectId()); eInfo.setProjectPath(projectName); eInfo.setStartTime(new DateTime(execution.getStartTime()).toString(formatterTime)); if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.FAILED.getDesc())) { eInfo.setStatus(ScheduleStatus.FAILED.getCode()); } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.CANCELLED.getDesc())) { eInfo.setStatus(ScheduleStatus.CANCELLED.getCode()); } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.KILLED.getDesc())) { eInfo.setStatus(ScheduleStatus.KILLED.getCode()); } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.SUCCEEDED.getDesc())) { eInfo.setStatus(ScheduleStatus.SUCCEEDED.getCode()); } excInfoList.add(eInfo); } } return excInfoList; } /** * 獲取azkaban調度 流的執行狀況 (分頁) * @param excuteId 調度執行id * @param start * @param size * @return * @throws Exception */ public ExecutionInfoBean getAzkabanExcutionDetails(String excuteId, String start, String size) throws Exception { String result = fetchexecflow(excuteId); if (StringUtils.isBlank(result)) { throw new CommonException("查詢任務流的執行詳情失敗!"); } ExecutionInfo ei = new Gson().fromJson(result, ExecutionInfo.class); if(ei==null) { throw new CommonException("查詢任務流的執行詳情失敗!"); } List<ExecNode> nodes = ei.getNodes(); if (nodes == null || nodes.size() == 0) { return null; } ExecutionInfoBean eib = new ExecutionInfoBean(); eib.setAttempt(ei.getAttempt()); eib.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime)); if (ei.getEndTime() > 0) { eib.setEndTime(new DateTime(ei.getEndTime()).toString(formatterTime)); eib.setElapsed((ei.getEndTime() - ei.getStartTime()) / 1000); } else { eib.setElapsed((DateTime.now().getMillis() - ei.getStartTime()) / 1000); } eib.setExecid(ei.getExecid()); eib.setFlow(ei.getFlow()); eib.setFlowId(ei.getFlowId()); eib.setJobId(ei.getId()); eib.setNestedId(ei.getNestedId()); eib.setProject(ei.getProject()); eib.setProjectId(ei.getProjectId()); String stats = ScheduleStatus.getDescByCode(ei.getStatus()); if(StringUtils.isNotBlank(stats)){ eib.setStatus(stats); } eib.setSubmitTime(new DateTime(ei.getSubmitTime()).toString(formatterTime)); eib.setSubmitUser(ei.getSubmitUser()); eib.setType(ei.getType()); eib.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime)); String flowLog= fetchExecFlowLogs(excuteId, start, size); eib.setFlowLog(flowLog); List<ExecNodeBean> nodeBeanList = Lists.newArrayList(); for(ExecNode node:nodes) { ExecNodeBean ebn = new ExecNodeBean(); ebn.setAttempt(node.getAttempt()); ebn.setJobId(node.getId()); ebn.setDependencies(node.getIn()); ebn.setNestedId(node.getNestedId()); ebn.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime)); if (node.getEndTime() > 0) { ebn.setEndTime(new DateTime(node.getEndTime()).toString(formatterTime)); ebn.setElapsed((node.getEndTime() - node.getStartTime()) / 1000); } else { ebn.setElapsed((DateTime.now().getMillis() - node.getStartTime()) / 1000); } String stats2 = ScheduleStatus.getDescByCode(node.getStatus()); if(StringUtils.isNotBlank(stats2)){ ebn.setStatus(stats2); } ebn.setType(node.getType()); ebn.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime)); String logs = fetchExecJobLogs(excuteId,ebn.getJobId(),start, size); ebn.setLogs(logs); nodeBeanList.add(ebn); } eib.setNodes(nodeBeanList); return eib; } /** * 獲取一個項目的projectId * @param projectName * @return * @throws Exception */ public String fetchProjectId(String projectName) throws Exception { SSLUtil.turnOffSslChecking(); HttpHeaders hs = new HttpHeaders(); hs.add("Content-Type", CONTENT_TYPE); hs.add("X-Requested-With", X_REQUESTED_WITH); hs.add("Accept", "text/plain;charset=utf-8"); Map<String, String> map = new HashMap<>(); map.put("id", login()); map.put("project", projectName); ResponseEntity<String> exchange = restTemplate.exchange( azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET, new HttpEntity<String>(hs), String.class, map); if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) { logger.error("Azkaban獲取一個項目的全部流信息失敗:" + projectName); return null; } JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("projectId"); if (obj == null) { logger.error("Azkaban獲取一個項目的全部流信息失敗:{}:{}", projectName); return null; } String projectId = obj.getAsString(); if(StringUtils.isBlank(projectId)){ logger.error("獲取Azkaban projectId 異常"); } return projectId; } /** * 獲取 * @param projectName * @return * @throws Exception */ public String getLastScheduleStatus(String projectName) throws Exception { List<String> flows = fetchFlowsProject(projectName); if(CollectionUtils.isNotEmpty(flows)) { for (String flow : flows) { FlowExecution fe = fetchFlowExecutions(projectName, flow, "0", "1000"); if (fe == null) { continue; } List<Execution> executions = fe.getExecutions(); if (executions == null || executions.size() == 0) { continue; } String status = executions.get(0).getStatus(); return status; } } return null; } }
8.調用demo
/** * 根據flowId 當即執行任務 * @param projectName 項目名稱 * @param flow 流id * @throws CommonException 阿茲卡班異常 */ private void excuteFlowImmediately(String projectName, String flow) throws CommonException { try { azkabanService.executeFlow(projectName,flow); } catch (Exception e) { throw new CommonException("調度初始化完畢,當即執行任務異常",e); } }
4、注意
1.如遇報錯狀況,請關注azkaban相關log日誌。
2.若是pom文件jar包不全 請評論。