前面介紹瞭如何把Drill部署在YARN上,而後經過Drill-on-YARN客戶端,你能夠啓動、中止、調整、清零命令操做Drill。可是在這麼命令背後,究竟是如何執行的呢,下面會對Drill-on-YARN的源碼進行詳細的解析,重點解析啓動過程,其餘命令簡單介紹。java
說明:下面涉及到的代碼,以drill 1.14.0爲準,而且爲了減小篇幅,進行了刪減。node
經過查看drill-on-yarn.sh腳本,很容易發現最終執行的java類是CLIENT_CMD="$JAVA $VM_OPTS -cp $CP org.apache.drill.yarn.client.DrillOnYarn ${args[@]}"
。 org.apache.drill.yarn.client.DrillOnYarn
即是啓動Drill-on-YARN的入口。咱們能夠總覽一下這個類:web
public class DrillOnYarn { public static void main(String argv[]) { BasicConfigurator.configure(); ClientContext.init(); run(argv); } public static void run(String argv[]) { ClientContext context = ClientContext.instance(); CommandLineOptions opts = new CommandLineOptions(); if (!opts.parse(argv)) { opts.usage(); context.exit(-1); } if (opts.getCommand() == null) { opts.usage(); context.exit(-1); } try { DrillOnYarnConfig.load().setClientPaths(); } catch (DoyConfigException e) { ClientContext.err.println(e.getMessage()); context.exit(-1); } ClientCommand cmd; switch (opts.getCommand()) { case UPLOAD: cmd = new StartCommand(true, false); break; case START: cmd = new StartCommand(true, true); break; case DESCRIBE: cmd = new PrintConfigCommand(); break; case STATUS: cmd = new StatusCommand(); break; case STOP: cmd = new StopCommand(); break; case CLEAN: cmd = new CleanCommand(); break; case RESIZE: cmd = new ResizeCommand(); break; default: cmd = new HelpCommand(); } cmd.setOpts(opts); try { cmd.run(); } catch (ClientException e) { displayError(opts, e); context.exit(1); } } }
能夠看到入口main方法,其中最關鍵的即是run方法,包含了不少的命令,咱們重點看start命令,代碼以下:apache
public void run() throws ClientException { checkExistingApp(); dryRun = opts.dryRun; config = DrillOnYarnConfig.config(); FileUploader uploader = upload(); if (launch) { launch(uploader); } }
歸納的來講,它主要包含如下流程:json
public void run() throws ClientException { setup(); uploadDrillArchive(); if (hasSiteDir()) { uploadSite(); } }
// AMRunner#connectToYarn private void connectToYarn() { System.out.print("Loading YARN Config..."); client = new YarnRMClient(); System.out.println(" Loaded."); }
// AMRunner#createApp private void createApp() throws ClientException { try { appResponse = client.createAppMaster(); } catch (YarnClientException e) { throw new ClientException("Failed to allocate Drill application master", e); } appId = appResponse.getApplicationId(); System.out.println("Application ID: " + appId.toString()); }
private void launchApp(AppSpec master) throws ClientException { try { client.submitAppMaster(master); } catch (YarnClientException e) { throw new ClientException("Failed to start Drill application master", e); } }
ApplicationMaster啓動後,會向RM申請資源,啓動Drillbits,下面詳細介紹ApplicationMaster啓動後的操做api
經過查看drill-am.sh腳本,很容易發現最終執行的java類是AMCMD="$JAVA $AM_JAVA_OPTS ${args[@]} -cp $CP org.apache.drill.yarn.appMaster.DrillApplicationMaster"
。org.apache.drill.yarn.appMaster.DrillApplicationMaste
表示ApplicationMaster執行的入口,下面總覽一下這個類:多線程
public class DrillApplicationMaster { public static void main(String[] args) { LOG.trace("Drill Application Master starting."); try { DrillOnYarnConfig.load().setAmDrillHome(); } catch (DoyConfigException e) { System.err.println(e.getMessage()); System.exit(-1); } Dispatcher dispatcher; try { dispatcher = (new DrillControllerFactory()).build(); } catch (ControllerFactoryException e) { LOG.error("Setup failed, exiting: " + e.getMessage(), e); System.exit(-1); return; } try { if (!dispatcher.start()) { return; } } catch (Throwable e) { LOG.error("Fatal error, exiting: " + e.getMessage(), e); System.exit(-1); } WebServer webServer = new WebServer(dispatcher); try { webServer.start(); } catch (Exception e) { LOG.error("Web server setup failed, exiting: " + e.getMessage(), e); System.exit(-1); } try { dispatcher.run(); } catch (Throwable e) { LOG.error("Fatal error, exiting: " + e.getMessage(), e); System.exit(-1); } finally { try { webServer.close(); } catch (Exception e) { } } } }
概況的來講,它主要包含如下流程:app
/home/admin/tmp2/hadoop/nm-local-dir/usercache/admin/appcache/application_1534698866098_0022/container_1534698866098_0022_01_000001/drill/apache-drill-1.14.0
private Map<String, LocalResource> prepareResources() { ... drillArchivePath = drillConfig.getDrillArchiveDfsPath(); siteArchivePath = drillConfig.getSiteArchiveDfsPath(); ... }
private TaskSpec buildDrillTaskSpec(Map<String, LocalResource> resources) throws DoyConfigException { ... ContainerRequestSpec containerSpec = new ContainerRequestSpec(); containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY); ... LaunchSpec drillbitSpec = new LaunchSpec(); ... TaskSpec taskSpec = new TaskSpec(); taskSpec.name = "Drillbit"; taskSpec.containerSpec = containerSpec; taskSpec.launchSpec = drillbitSpec; }
public void setYarn(AMYarnFacade yarn) throws YarnFacadeException { this.yarn = yarn; controller = new ClusterControllerImpl(yarn); }
cluster: [ { name: "drill-group1" type: "basic" count: 1 } ]
... ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0); Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec, pool.getCount(), requestTimeoutSecs, maxExtraNodes); dispatcher.getController().registerScheduler(testGroup); ...
String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT); String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT); String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
... yarn.start(new ResourceCallback(), new NodeCallback()); String url = trackingUrl.replace("<port>", Integer.toString(httpPort)); if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) { url = url.replace("http:", "https:"); } yarn.register(url); controller.started(); ...
... resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback); resourceMgr.init(conf); resourceMgr.start(); ... nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback); nodeMgr.init(conf); nodeMgr.start(); ... client = YarnClient.createYarnClient(); client.init(conf); client.start(); ...
WebServer webServer = new WebServer(dispatcher); webServer.start();
if (state == State.LIVE) { adjustTasks(curTime); requestContainers(); }
ContainerRequest request = containerSpec.makeRequest(); resourceMgr.addContainerRequest(containerSpec.makeRequest()); return request;
private class ResourceCallback implements AMRMClientAsync.CallbackHandler { @Override public void onContainersAllocated(List<Container> containers) { controller.containersAllocated(containers); } }
public void containerAllocated(EventContext context, Container container) { Task task = context.task; LOG.info(task.getLabel() + " - Received container: " + DoYUtil.describeContainer(container)); context.group.dequeueAllocatingTask(task); // No matter what happens below, we don't want to ask for this // container again. The RM async API is a bit bizarre in this // regard: it will keep asking for container over and over until // we tell it to stop. context.yarn.removeContainerRequest(task.containerRequest); // The container is need both in the normal and in the cancellation // path, so set it here. task.container = container; if (task.cancelled) { context.yarn.releaseContainer(container); taskStartFailed(context, Disposition.CANCELLED); return; } task.error = null; task.completionStatus = null; transition(context, LAUNCHING); // The pool that manages this task wants to know that we have // a container. The task manager may want to do some task- // specific setup. context.group.containerAllocated(context.task); context.getTaskManager().allocated(context); // Go ahead and launch a task in the container using the launch // specification provided by the task group (pool). try { context.yarn.launchContainer(container, task.getLaunchSpec()); task.launchTime = System.currentTimeMillis(); } catch (YarnFacadeException e) { LOG.error("Container launch failed: " + task.getContainerId(), e); // This may not be the right response. RM may still think // we have the container if the above is a local failure. task.error = e; context.group.containerReleased(task); task.container = null; taskStartFailed(context, Disposition.LAUNCH_FAILED); } }
public class NodeCallback implements NMClientAsync.CallbackHandler { @Override public void onStartContainerError(ContainerId containerId, Throwable t) { controller.taskStartFailed(containerId, t); } @Override public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) { controller.containerStarted(containerId); } @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { } @Override public void onGetContainerStatusError(ContainerId containerId, Throwable t) { } @Override public void onStopContainerError(ContainerId containerId, Throwable t) { controller.stopTaskFailed(containerId, t); } @Override public void onContainerStopped(ContainerId containerId) { controller.containerStopped(containerId); } }
Drill-on-YARN除了提供start、stop、resize功能外,還提供了fail over功能,當前某個drillbit掛掉後,Drill-on-YARN會嘗試再次啓動drillbit,目前重試的次數爲2。此外,若是一個drillbit所在的節點頻繁掛掉,會被列入黑名單。運維
咱們能夠經過手動kill drillbit來模擬drillbit掛掉的狀況,而後等待一下子,能夠看到,drillbit進程從新啓動了。下面咱們看看,代碼的執行流程async
private class ResourceCallback implements AMRMClientAsync.CallbackHandler { @Override public void onContainersCompleted(List<ContainerStatus> statuses) { controller.containersCompleted(statuses); } }
protected void taskTerminated(EventContext context) { Task task = context.task; context.getTaskManager().completed(context); context.group.containerReleased(task); assert task.completionStatus != null; // container結束的狀態不是0,說明不是正常結束 if (task.completionStatus.getExitStatus() == 0) { taskEnded(context, Disposition.COMPLETED); context.group.taskEnded(context.task); } else { taskEnded(context, Disposition.RUN_FAILED); retryTask(context); } }
private void retryTask(EventContext context) { Task task = context.task; assert task.state == END; if (!context.controller.isLive() || !task.retryable()) { context.group.taskEnded(task); return; } if (task.tryCount > task.taskGroup.getMaxRetries()) { LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount); task.disposition = Disposition.TOO_MANY_RETRIES; context.group.taskEnded(task); return; } LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount); context.group.taskRetried(task); task.reset(); transition(context, START); context.group.enqueuePendingRequest(task); }
除了前面詳情介紹的start命令外,Drill-on-YARN也提供了stop命令,其中stop分兩種:
yarnClient.killApplication(appId);
... for (Task task : getStartingTasks()) { context.setTask(task); context.getState().cancel(context); } for (Task task : getActiveTasks()) { context.setTask(task); context.getState().cancel(context); } ...
... context.yarn.killContainer(task.container); ...
public void run() throws YarnFacadeException { ... boolean success = controller.waitForCompletion(); ... ... finish(success, null); ... }
public boolean waitForCompletion() { start(); synchronized (completionMutex) { try { completionMutex.wait(); } catch (InterruptedException e) { } } return succeeded(); }
public void finish(boolean succeeded, String msg) throws YarnFacadeException { nodeMgr.stop(); String appMsg = "Drill Cluster Shut-Down"; FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED; if (!succeeded) { appMsg = "Drill Cluster Fatal Error - check logs"; status = FinalApplicationStatus.FAILED; } if (msg != null) { appMsg = msg; } try { resourceMgr.unregisterApplicationMaster(status, appMsg, ""); } catch (YarnException | IOException e) { throw new YarnFacadeException("Deregister AM failed", e); } resourceMgr.stop(); }
resize流程爲:調整quantity(保留多少個container),以後輪詢線程會根據quantity,調整任務,執行resize操做
public int resize(int level) { int limit = quantity + state.getController().getFreeNodeCount() +maxExtraNodes; return super.resize( Math.min( limit, level ) ); }
總的來講,Drill-on-YARN分爲兩大模塊,drill-on-yarn.sh和drill-am.sh。drill-on-yarn.sh用於啓動ApplicationMaster,drill-am.sh用於向ResourceManager申請資源並啓動Drill集羣。其中Drill的啓動、中止、縮容、擴容,都被封裝爲一個任務,在執行這些命令時,會構建一個任務,放入任務隊列中。有一個線程會一直輪詢此隊列,根據隊列中的任務執行不一樣的操做,從而達到啓動、中止、縮容、擴容Drill集羣的功能。此外,相比獨立部署,Drill-on-YARN提供的failover功能強化了Drill的穩定性。