本文主要研究一下OtterControllerjava
otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.javanode
public class OtterController implements NodeTaskListener, OtterControllerMBean { private static final Logger logger = LoggerFactory.getLogger(OtterController.class); // 第一層爲pipelineId,第二層爲S.E.T.L模塊 private Map<Long, Map<StageType, GlobalTask>> controllers = OtterMigrateMap.makeComputingMap(new Function<Long, Map<StageType, GlobalTask>>() { public Map<StageType, GlobalTask> apply(Long pipelineId) { return new MapMaker().makeMap(); } }); private ConfigClientService configClientService; private ArbitrateManageService arbitrateManageService; private NodeTaskService nodeTaskService; // 各類資源管理 private DataSourceService dataSourceService; // 鏈接池資源 private DbDialectFactory dbDialectFactory; // 數據庫信息資源 private ArbitrateEventService arbitrateEventService; // 仲裁器資源 private ExecutorService executorService; private StageAggregationCollector stageAggregationCollector; public void start() throws Throwable { // 初始化節點 initNid(); nodeTaskService.addListener(this); // 將本身添加爲NodeTask響應者 } public void stop() throws Throwable { for (Map<StageType, GlobalTask> tasks : controllers.values()) { for (GlobalTask task : tasks.values()) { try { task.shutdown(); } catch (Exception e) { logger.error("##shutdown task error!", e); } } } try { Long nid = configClientService.currentNode().getId(); arbitrateManageService.nodeEvent().destory(Long.valueOf(nid)); } catch (Exception e) { logger.error("##destory node error!", e); } try { arbitrateEventService.toolEvent().release(); } catch (Exception e) { logger.error("##destory arbitrate error!", e); } try { nodeTaskService.stopNode(); // 通知manager中止當前node } catch (Exception e) { logger.error("##stop node error!", e); } try { OtterContextLocator.close(); } catch (Exception e) { logger.error("##cloes spring error!", e); } ZooKeeperClient.destory();// 關閉zookeeper } //...... }
otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.javagit
private void initNid() { // 獲取一下nid變量 String nid = System.getProperty(OtterConstants.NID_NAME); if (StringUtils.isEmpty(nid)) { throw new ConfigException("nid is not set!"); } logger.info("INFO ## the nodeId = {}", nid); checkNidVaild(nid); arbitrateManageService.nodeEvent().init(Long.valueOf(nid)); // 添加session expired處理 NodeSessionExpired sessionExpired = new NodeSessionExpired(); sessionExpired.setNodeEvent(arbitrateManageService.nodeEvent()); ZooKeeperClient.registerNotification(sessionExpired); }
otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.javagithub
public void startPipeline(NodeTask nodeTask) { Long pipelineId = nodeTask.getPipeline().getId(); releasePipeline(pipelineId); Map<StageType, GlobalTask> tasks = controllers.get(pipelineId); // 處理具體的任務命令 List<StageType> stage = nodeTask.getStage(); List<TaskEvent> event = nodeTask.getEvent(); for (int i = 0; i < stage.size(); i++) { StageType stageType = stage.get(i); TaskEvent taskEvent = event.get(i); if (taskEvent.isCreate()) { startTask(nodeTask.getPipeline(), tasks, stageType); } else { stopTask(tasks, stageType); } } } private void startTask(Pipeline pipeline, Map<StageType, GlobalTask> tasks, StageType taskType) { if (tasks.get(taskType) != null && tasks.get(taskType).isAlive()) { logger.warn("WARN ## this task = {} has started", taskType); } GlobalTask task = null; if (taskType.isSelect()) { task = new SelectTask(pipeline.getId()); } else if (taskType.isExtract()) { task = new ExtractTask(pipeline.getId()); } else if (taskType.isTransform()) { task = new TransformTask(pipeline.getId()); } else if (taskType.isLoad()) { task = new LoadTask(pipeline.getId()); } if (task != null) { OtterContextLocator.autowire(task); // 注入一下spring資源 task.start(); tasks.put(taskType, task); logger.info("INFO ## start this task = {} success", taskType.toString()); } } private void stopTask(Map<StageType, GlobalTask> tasks, StageType taskType) { GlobalTask task = tasks.remove(taskType); if (task != null) { task.shutdown(); logger.info("INFO ## taskName = {} has shutdown", taskType); } else { logger.info("INFo ## taskName = {} is not started", taskType); } }
otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.javaspring
private void stopPipeline(Long pipelineId, Map<StageType, GlobalTask> tasks) { for (GlobalTask task : tasks.values()) { try { task.shutdown(); } catch (Exception e) { logger.error("## stop s/e/t/l task error!", e); } finally { tasks.remove(task); } } // close other resources. try { Thread.sleep(1 * 1000); // sleep 5s,等待S.E.T.L釋放線程 } catch (InterruptedException e) { logger.error("ERROR ## ", e); } // 釋放資源 releasePipeline(pipelineId); arbitrateEventService.toolEvent().release(pipelineId); } private void releasePipeline(Long pipelineId) { dataSourceService.destroy(pipelineId); dbDialectFactory.destory(pipelineId); }
OtterController實現了NodeTaskListener接口,提供了start、stop、process方法;其start方法主要是執行initNid及nodeTaskService.addListener(this);其stop方法則遍歷controllers的GlobalTask,挨個執行其shutdown方法,而後執行arbitrateManageService.nodeEvent().destory()、arbitrateEventService.toolEvent().release()、nodeTaskService.stopNode()、OtterContextLocator.close()及ZooKeeperClient.destory();process方法主要是遍歷nodeTasks,挨個執行stopPipeline或者startPipeline數據庫