Map-Reduce的過程首先是由客戶端提交一個任務開始的。 java
提交任務主要是經過JobClient.runJob(JobConf)靜態函數實現的: 數組
public static RunningJob runJob(JobConf job) throws IOException { 網絡 //首先生成一個JobClient對象 數據結構 JobClient jc = new JobClient(job); app …… jvm //調用submitJob來提交一個任務 函數 running = jc.submitJob(job); oop JobID jobId = running.getID(); fetch …… this while (true) { //while循環中不斷獲得此任務的狀態,並打印到客戶端console中 } return running; } |
其中JobClient的submitJob函數實現以下:
public RunningJob submitJob(JobConf job) throws FileNotFoundException, InvalidJobConfException, IOException { //從JobTracker獲得當前任務的id JobID jobId = jobSubmitClient.getNewJobId(); //準備將任務運行所須要的要素寫入HDFS: //任務運行程序所在的jar封裝成job.jar //任務所要處理的input split信息寫入job.split //任務運行的配置項彙總寫入job.xml Path submitJobDir = new Path(getSystemDir(), jobId.toString()); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitSplitFile = new Path(submitJobDir, "job.split"); //此處將-libjars命令行指定的jar上傳至HDFS configureCommandLineOptions(job, submitJobDir, submitJarFile); Path submitJobFile = new Path(submitJobDir, "job.xml"); …… //經過input format的格式得到相應的input split,默認類型爲FileSplit InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks());
// 生成一個寫入流,將input split得信息寫入job.split文件 FSDataOutputStream out = FileSystem.create(fs, submitSplitFile, new FsPermission(JOB_FILE_PERMISSION)); try { //寫入job.split文件的信息包括:split文件頭,split文件版本號,split的個數,接着依次寫入每個input split的信息。 //對於每個input split寫入:split類型名(默認FileSplit),split的大小,split的內容(對於FileSplit,寫入文件名,此split在文件中的起始位置),split的location信息(即在那個DataNode上)。 writeSplitsFile(splits, out); } finally { out.close(); } job.set("mapred.job.split.file", submitSplitFile.toString()); //根據split的個數設定map task的個數 job.setNumMapTasks(splits.length); // 寫入job的配置信息入job.xml文件 out = FileSystem.create(fs, submitJobFile, new FsPermission(JOB_FILE_PERMISSION)); try { job.writeXml(out); } finally { out.close(); } //真正的調用JobTracker來提交任務 JobStatus status = jobSubmitClient.submitJob(jobId); …… } |
JobTracker做爲一個單獨的JVM運行,其運行的main函數主要調用有下面兩部分:
在JobTracker的構造函數中,會生成一個taskScheduler成員變量,來進行Job的調度,默認爲JobQueueTaskScheduler,也即按照FIFO的方式調度任務。
在offerService函數中,則調用taskScheduler.start(),在這個函數中,爲JobTracker(也即taskScheduler的taskTrackerManager)註冊了兩個Listener:
EagerTaskInitializationListener中有一個線程JobInitThread,不斷獲得jobInitQueue中的JobInProgress對象,調用JobInProgress對象的initTasks函數對任務進行初始化操做。
在上一節中,客戶端調用了JobTracker.submitJob函數,此函數首先生成一個JobInProgress對象,而後調用addJob函數,其中有以下的邏輯:
synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); //對JobTracker的每個listener都調用jobAdded函數 for (JobInProgressListener listener : jobInProgressListeners) { listener.jobAdded(job); } } } |
EagerTaskInitializationListener的jobAdded函數就是向jobInitQueue中添加一個JobInProgress對象,因而天然觸發了此Job的初始化操做,由JobInProgress得initTasks函數完成:
public synchronized void initTasks() throws IOException { …… //從HDFS中讀取job.split文件從而生成input splits String jobFile = profile.getJobFile(); Path sysDir = new Path(this.jobtracker.getSystemDir()); FileSystem fs = sysDir.getFileSystem(conf); DataInputStream splitFile = fs.open(new Path(conf.get("mapred.job.split.file"))); JobClient.RawSplit[] splits; try { splits = JobClient.readSplitFile(splitFile); } finally { splitFile.close(); } //map task的個數就是input split的個數 numMapTasks = splits.length; //爲每一個map tasks生成一個TaskInProgress來處理一個input split maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i); } //對於map task,將其放入nonRunningMapCache,是一個Map<Node, List<TaskInProgress>>,也即對於map task來說,其將會被分配到其input split所在的Node上。nonRunningMapCache將在JobTracker向TaskTracker分配map task的時候使用。 if (numMapTasks > 0) {
//建立reduce task this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this); //reduce task放入nonRunningReduces,其將在JobTracker向TaskTracker分配reduce task的時候使用。 nonRunningReduces.add(reduces[i]); }
//建立兩個cleanup task,一個用來清理map,一個用來清理reduce. cleanup = new TaskInProgress[2]; cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], jobtracker, conf, this, numMapTasks); cleanup[0].setJobCleanupTask(); cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this); cleanup[1].setJobCleanupTask(); //建立兩個初始化 task,一個初始化map,一個初始化reduce. setup = new TaskInProgress[2]; setup[0] = new TaskInProgress(jobId, jobFile, splits[0], jobtracker, conf, this, numMapTasks + 1 ); setup[0].setJobSetupTask(); setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this); setup[1].setJobSetupTask(); tasksInited.set(true);//初始化完畢 …… } |
TaskTracker也是做爲一個單獨的JVM來運行的,在其main函數中,主要是調用了new TaskTracker(conf).run(),其中run函數主要調用了:
State offerService() throws Exception { long lastHeartbeat = 0; //TaskTracker進行是一直存在的 while (running && !shuttingDown) { …… long now = System.currentTimeMillis(); //每隔一段時間就向JobTracker發送heartbeat long waitTime = heartbeatInterval - (now - lastHeartbeat); if (waitTime > 0) { synchronized(finishedCount) { if (finishedCount[0] == 0) { finishedCount.wait(waitTime); } finishedCount[0] = 0; } } …… //發送Heartbeat到JobTracker,獲得response HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); …… //從Response中獲得此TaskTracker須要作的事情 TaskTrackerAction[] actions = heartbeatResponse.getActions(); …… if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { //若是是運行一個新的Task,則將Action添加到任務隊列中 addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { commitResponses.add(commitAction.getTaskID()); } } else { tasksToCleanup.put(action); } } } } return State.NORMAL; } |
其中transmitHeartBeat主要邏輯以下:
private HeartbeatResponse transmitHeartBeat(long now) throws IOException { //每隔一段時間,在heartbeat中要返回給JobTracker一些統計信息 boolean sendCounters; if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) { sendCounters = true; previousUpdate = now; } else { sendCounters = false; } …… //報告給JobTracker,此TaskTracker的當前狀態 if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxCurrentMapTasks, maxCurrentReduceTasks); } } …… //當知足下面的條件的時候,此TaskTracker請求JobTracker爲其分配一個新的Task來運行: //當前TaskTracker正在運行的map task的個數小於能夠運行的map task的最大個數 //當前TaskTracker正在運行的reduce task的個數小於能夠運行的reduce task的最大個數 boolean askForNewTask; long localMinSpaceStart; synchronized (this) { askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || status.countReduceTasks() < maxCurrentReduceTasks) && acceptNewTasks; localMinSpaceStart = minSpaceStart; } …… //向JobTracker發送heartbeat,這是一個RPC調用 HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, askForNewTask, heartbeatResponseId); …… return heartbeatResponse; } |
當JobTracker被RPC調用來發送heartbeat的時候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函數被調用:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { …… String trackerName = status.getTrackerName(); …… short newResponseId = (short)(responseId + 1); …… HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); //若是TaskTracker向JobTracker請求一個task運行 if (acceptNewTasks) { TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { //setup和cleanup的task優先級最高 List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { //任務調度器分配任務 tasks = taskScheduler.assignTasks(taskTrackerStatus); } if (tasks != null) { for (Task task : tasks) { //將任務放入actions列表,返回給TaskTracker expireLaunchingTasks.addNewTask(task.getTaskID()); actions.add(new LaunchTaskAction(task)); } } } } …… int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); response.setActions( actions.toArray(new TaskTrackerAction[actions.size()])); …… return response; } |
默認的任務調度器爲JobQueueTaskScheduler,其assignTasks以下:
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException { ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); int numTaskTrackers = clusterStatus.getTaskTrackers(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); int maxCurrentMapTasks = taskTracker.getMaxMapTasks(); int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks(); int numMaps = taskTracker.countMapTasks(); int numReduces = taskTracker.countReduceTasks(); //計算剩餘的map和reduce的工做量:remaining int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { int totalMapTasks = job.desiredMaps(); int totalReduceTasks = job.desiredReduces(); remainingMapLoad += (totalMapTasks - job.finishedMaps()); remainingReduceLoad += (totalReduceTasks - job.finishedReduces()); } } } //計算平均每一個TaskTracker應有的工做量,remaining/numTaskTrackers是剩餘的工做量除以TaskTracker的個數。 int maxMapLoad = 0; int maxReduceLoad = 0; if (numTaskTrackers > 0) { maxMapLoad = Math.min(maxCurrentMapTasks, (int) Math.ceil((double) remainingMapLoad / numTaskTrackers)); maxReduceLoad = Math.min(maxCurrentReduceTasks, (int) Math.ceil((double) remainingReduceLoad / numTaskTrackers)); } ……
//map優先於reduce,當TaskTracker上運行的map task數目小於平均的工做量,則向其分配map task if (numMaps < maxMapLoad) { int totalNeededMaps = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } …… } } } //分配完map task,再分配reduce task if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } …… } } } return null; } |
從上面的代碼中咱們能夠知道,JobInProgress的obtainNewMapTask是用來分配map task的,其主要調用findNewMapTask,根據TaskTracker所在的Node從nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用來分配reduce task的,其主要調用findNewReduceTask,從nonRunningReduces查找TaskInProgress。
在向JobTracker發送heartbeat後,返回的reponse中有分配好的任務LaunchTaskAction,將其加入隊列,調用addToTaskQueue,若是是map task則放入mapLancher(類型爲TaskLauncher),若是是reduce task則放入reduceLancher(類型爲TaskLauncher):
private void addToTaskQueue(LaunchTaskAction action) { if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action); } else { reduceLauncher.addToTaskQueue(action); } } |
TaskLauncher是一個線程,其run函數從上面放入的queue中取出一個TaskInProgress,而後調用startNewTask(TaskInProgress tip)來啓動一個task,其又主要調用了localizeJob(TaskInProgress tip):
private void localizeJob(TaskInProgress tip) throws IOException { //首先要作的一件事情是有關Task的文件從HDFS拷貝的TaskTracker的本地文件系統中:job.split,job.xml以及job.jar Path localJarFile = null; Task t = tip.getTask(); JobID jobId = t.getJobID(); Path jobFile = new Path(t.getJobFile()); …… Path localJobFile = lDirAlloc.getLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "job.xml", jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) { if (!rjob.localized) { FileSystem localFs = FileSystem.getLocal(fConf); Path jobDir = localJobFile.getParent(); …… //將job.split拷貝到本地 systemFS.copyToLocalFile(jobFile, localJobFile); JobConf localJobConf = new JobConf(localJobFile); Path workDir = lDirAlloc.getLocalPathForWrite( (getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "work"), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); } System.setProperty("job.local.dir", workDir.toString()); localJobConf.set("job.local.dir", workDir.toString()); // copy Jar file to the local FS and unjar it. String jarFile = localJobConf.getJar(); long jarFileSize = -1; if (jarFile != null) { Path jarFilePath = new Path(jarFile); localJarFile = new Path(lDirAlloc.getLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "jars", 5 * jarFileSize, fConf), "job.jar"); if (!localFs.mkdirs(localJarFile.getParent())) { throw new IOException("Mkdirs failed to create jars directory "); } //將job.jar拷貝到本地 systemFS.copyToLocalFile(jarFilePath, localJarFile); localJobConf.setJar(localJarFile.toString()); //將job得configuration寫成job.xml OutputStream out = localFs.create(localJobFile); try { localJobConf.writeXml(out); } finally { out.close(); } // 解壓縮job.jar RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile.getParent().toString())); } rjob.localized = true; rjob.jobConf = localJobConf; } } //真正的啓動此Task launchTaskForJob(tip, new JobConf(rjob.jobConf)); } |
當全部的task運行所須要的資源都拷貝到本地後,則調用launchTaskForJob,其又調用TaskInProgress的launchTask函數:
public synchronized void launchTask() throws IOException { …… //建立task運行目錄 localizeTask(task); if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { this.taskStatus.setRunState(TaskStatus.State.RUNNING); } //建立並啓動TaskRunner,對於MapTask,建立的是MapTaskRunner,對於ReduceTask,建立的是ReduceTaskRunner this.runner = task.createRunner(TaskTracker.this, this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis()); } |
TaskRunner是一個線程,其run函數以下:
public final void run() { …… TaskAttemptID taskid = t.getTaskID(); LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); File jobCacheDir = null; if (conf.getJar() != null) { jobCacheDir = new File( new Path(conf.getJar()).getParent().toString()); } File workDir = new File(lDirAlloc.getLocalPathToRead( TaskTracker.getLocalTaskDir( t.getJobID().toString(), t.getTaskID().toString(), t.isTaskCleanupTask()) + Path.SEPARATOR + MRConstants.WORKDIR, conf). toString()); FileSystem fileSystem; Path localPath; …… //拼寫classpath String baseDir; String sep = System.getProperty("path.separator"); StringBuffer classPath = new StringBuffer(); // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); if (!workDir.mkdirs()) { if (!workDir.isDirectory()) { LOG.fatal("Mkdirs failed to create " + workDir.toString()); } } String jar = conf.getJar(); if (jar != null) { // if jar exists, it into workDir File[] libs = new File(jobCacheDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.append(sep); // add libs from jar to classpath classPath.append(libs[i]); } } classPath.append(sep); classPath.append(new File(jobCacheDir, "classes")); classPath.append(sep); classPath.append(jobCacheDir); } …… classPath.append(sep); classPath.append(workDir); //拼寫命令行java及其參數 Vector<String> vargs = new Vector<String>(8); File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java"); vargs.add(jvm.toString()); String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); javaOpts = javaOpts.replace("@taskid@", taskid.toString()); String [] javaOptsSplit = javaOpts.split(" "); String libraryPath = System.getProperty("java.library.path"); if (libraryPath == null) { libraryPath = workDir.getAbsolutePath(); } else { libraryPath += sep + workDir; } boolean hasUserLDPath = false; for(int i=0; i<javaOptsSplit.length ;i++) { if(javaOptsSplit[i].startsWith("-Djava.library.path=")) { javaOptsSplit[i] += sep + libraryPath; hasUserLDPath = true; break; } } if(!hasUserLDPath) { vargs.add("-Djava.library.path=" + libraryPath); } for (int i = 0; i < javaOptsSplit.length; i++) { vargs.add(javaOptsSplit[i]); } //添加Child進程的臨時文件夾 String tmp = conf.get("mapred.child.tmp", "./tmp"); Path tmpDir = new Path(tmp); if (!tmpDir.isAbsolute()) { tmpDir = new Path(workDir.toString(), tmp); } FileSystem localFs = FileSystem.getLocal(conf); if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); // Add classpath. vargs.add("-classpath"); vargs.add(classPath.toString()); //log文件夾 long logSize = TaskLog.getTaskLogLength(conf); vargs.add("-Dhadoop.log.dir=" + new File(System.getProperty("hadoop.log.dir") ).getAbsolutePath()); vargs.add("-Dhadoop.root.logger=INFO,TLA"); vargs.add("-Dhadoop.tasklog.taskid=" + taskid); vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); // 運行map task和reduce task的子進程的main class是Child vargs.add(Child.class.getName()); // main of Child …… //運行子進程 jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, workDir, env, pidFile, conf)); } |
真正的map task和reduce task都是在Child進程中運行的,Child的main函數的主要邏輯以下:
while (true) { //從TaskTracker經過網絡通訊獲得JvmTask對象 JvmTask myTask = umbilical.getTask(jvmId); …… idleLoopCount = 0; task = myTask.getTask(); taskid = task.getTaskID(); isCleanup = task.isTaskCleanupTask(); JobConf job = new JobConf(task.getJobFile()); TaskRunner.setupWorkDir(job); numTasksToExecute = job.getNumTasksToExecutePerJvm(); task.setConf(job); defaultConf.addResource(new Path(task.getJobFile())); …… //運行task task.run(job, umbilical); // run the task if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) { break; } } |
若是task是MapTask,則其run函數以下:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { //用於同TaskTracker進行通訊,彙報運行情況 final Reporter reporter = getReporter(umbilical); startCommunicationThread(umbilical); initialize(job, reporter); …… //map task的輸出 int numReduceTasks = conf.getNumReduceTasks(); MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = new MapOutputBuffer(umbilical, job, reporter); } else { collector = new DirectMapOutputCollector(umbilical, job, reporter); } //讀取input split,按照其中的信息,生成RecordReader來讀取數據 instantiatedSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job); DataInputBuffer splitBuffer = new DataInputBuffer(); splitBuffer.reset(split.getBytes(), 0, split.getLength()); instantiatedSplit.readFields(splitBuffer); if (instantiatedSplit instanceof FileSplit) { FileSplit fileSplit = (FileSplit) instantiatedSplit; job.set("map.input.file", fileSplit.getPath().toString()); job.setLong("map.input.start", fileSplit.getStart()); job.setLong("map.input.length", fileSplit.getLength()); } RecordReader rawIn = // open input job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter); RecordReader in = isSkipping() ? new SkippingRecordReader(rawIn, getCounters(), umbilical) : new TrackedRecordReader(rawIn, getCounters()); job.setBoolean("mapred.skip.on", isSkipping()); //對於map task,生成一個MapRunnable,默認是MapRunner MapRunnable runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { //MapRunner的run函數就是依次讀取RecordReader中的數據,而後調用Mapper的map函數進行處理。 runner.run(in, collector, reporter); collector.flush(); } finally { in.close(); // close input collector.close(); } done(umbilical); } |
MapRunner的run函數就是依次讀取RecordReader中的數據,而後調用Mapper的map函數進行處理:
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException { try { K1 key = input.createKey(); V1 value = input.createValue(); while (input.next(key, value)) { mapper.map(key, value, output, reporter); if(incrProcCount) { reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); } } } finally { mapper.close(); } } |
結果集所有收集到MapOutputBuffer中,其collect函數以下:
public synchronized void collect(K key, V value) throws IOException { reporter.progress(); …… //今後處看,此buffer是一個ring的數據結構 final int kvnext = (kvindex + 1) % kvoffsets.length; spillLock.lock(); try { boolean kvfull; do { //在ring中,若是下一個空閒位置接上起始位置的話,則表示滿了 kvfull = kvnext == kvstart; //在ring中計算是否須要將buffer寫入硬盤的閾值 final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit); //若是到達閾值,則開始將buffer寫入硬盤,寫成spill文件。 //startSpill主要是notify一個背後線程SpillThread的run()函數,開始調用sortAndSpill()開始排序,合併,寫入硬盤 if (kvstart == kvend && kvsoftlimit) { startSpill(); } //若是buffer滿了,則只能等待寫入完畢 if (kvfull) { while (kvstart != kvend) { reporter.progress(); spillDone.await(); } } } while (kvfull); } finally { spillLock.unlock(); } try { //若是buffer不滿,則將key, value寫入buffer int keystart = bufindex; keySerializer.serialize(key); final int valstart = bufindex; valSerializer.serialize(value); int valend = bb.markRecord(); //調用設定的partitioner,根據key, value取得partition id final int partition = partitioner.getPartition(key, value, partitions); mapOutputRecordCounter.increment(1); mapOutputByteCounter.increment(valend >= keystart ? valend - keystart : (bufvoid - keystart) + valend); //將parition id以及key, value在buffer中的偏移量寫入索引數組 int ind = kvindex * ACCTSIZE; kvoffsets[kvindex] = ind; kvindices[ind + PARTITION] = partition; kvindices[ind + KEYSTART] = keystart; kvindices[ind + VALSTART] = valstart; kvindex = kvnext; } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value); mapOutputRecordCounter.increment(1); return; } } |
內存buffer的格式以下:
(見幾位hadoop大俠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.javaeye.com/)
kvoffsets是爲了寫入內存前排序使用的。
從上面可知,內存buffer寫入硬盤spill文件的函數爲sortAndSpill:
private void sortAndSpill() throws IOException { …… FSDataOutputStream out = null; FSDataOutputStream indexOut = null; IFileOutputStream indexChecksumOut = null; //建立硬盤上的spill文件 Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); …… final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; //按照partition的順序對buffer中的數據進行排序 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; InMemValBytes value = new InMemValBytes(); //依次一個一個parition的寫入文件 for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec); //若是combiner爲空,則直接寫入文件 if (null == combinerClass) { …… writer.append(key, value); ++spindex; } else { …… //若是combiner不爲空,則先combine,調用combiner.reduce(…)函數後再寫入文件 combineAndSpill(kvIter, combineInputCounter); } } …… } |
當map階段結束的時候,MapOutputBuffer的flush函數會被調用,其也會調用sortAndSpill將buffer中的寫入文件,而後再調用mergeParts來合併寫入在硬盤上的多個spill:
private void mergeParts() throws IOException { …… //對於每個partition for (int parts = 0; parts < partitions; parts++){ //create the segments to be merged List<Segment<K, V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); TaskAttemptID mapId = getTaskID(); //依次從各個spill文件中收集屬於當前partition的段 for(int i = 0; i < numSpills; i++) { final IndexRecord indexRecord = getIndexInformation(mapId, i, parts); long segmentOffset = indexRecord.startOffset; long segmentLength = indexRecord.partLength; Segment<K, V> s = new Segment<K, V>(job, rfs, filename[i], segmentOffset, segmentLength, codec, true); segmentList.add(i, s); } //將屬於同一個partition的段merge到一塊兒 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, segmentList, job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter); //寫入合併後的段到文件 long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec); if (null == combinerClass || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combineAndSpill(kvIter, combineInputCounter); } …… } } |
ReduceTask的run函數以下:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { job.setBoolean("mapred.skip.on", isSkipping()); //對於reduce,則包含三個步驟:拷貝,排序,Reduce if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } startCommunicationThread(umbilical); final Reporter reporter = getReporter(umbilical); initialize(job, reporter); //copy階段,主要使用ReduceCopier的fetchOutputs函數得到map的輸出。建立多個線程MapOutputCopier,其中copyOutput進行拷貝。 boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local")); if (!isLocal) { reduceCopier = new ReduceCopier(umbilical, job); if (!reduceCopier.fetchOutputs()) { …… } } copyPhase.complete(); //sort階段,將獲得的map輸出合併,直到文件數小於io.sort.factor時中止,返回一個Iterator用於訪問key-value setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); final FileSystem rfs = FileSystem.getLocal(job).getRaw(); RawKeyValueIterator rIter = isLocal ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter) : reduceCopier.createKVIterator(job, rfs, reporter); mapOutputFilesOnDisk.clear(); sortPhase.complete(); //reduce階段 setPhase(TaskStatus.Phase.REDUCE); …… Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job); Class keyClass = job.getMapOutputKeyClass(); Class valClass = job.getMapOutputValueClass(); ReduceValuesIterator values = isSkipping() ? new SkippingReduceValuesIterator(rIter, job.getOutputValueGroupingComparator(), keyClass, valClass, job, reporter, umbilical) : new ReduceValuesIterator(rIter, job.getOutputValueGroupingComparator(), keyClass, valClass, job, reporter); //逐個讀出key-value list,而後調用Reducer的reduce函數 while (values.more()) { reduceInputKeyCounter.increment(1); reducer.reduce(values.getKey(), values, collector, reporter); values.nextKey(); values.informReduceProgress(); } reducer.close(); out.close(reporter); done(umbilical); } |
Map-Reduce的過程總結以下圖: