This document comprehensively describes all user-facing facets of the Hadoop MapReduce framework and serves as a tutorial.html
該文檔做爲一份我的指導全面性得描述了全部用戶使用Hadoop Mapreduce框架時遇到的方方面面。java
Ensure that Hadoop is installed, configured and is running. More details:node
確保Hadoop安裝、配置和運行。更多細節:c++
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.算法
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.shell
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.express
The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application (see YARN Architecture Guide).apache
Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration.編程
The Hadoop job client then submits the job (jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.api
Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java.
Hadoop Mapreduce是一個易於編程而且能在大型集羣(上千節點)快速地並行得處理大量數據的軟件框架,以可靠,容錯的方式部署在商用機器上。
MapReduce Job一般將獨立大塊數據切片以徹底並行的方式在map任務中處理。該框架對maps輸出的作爲reduce輸入的數據進行排序,Job的輸入輸出都是存儲在文件系統中。該框架調度任務、監控任務和重啓失效的任務。
通常來講計算節點和存儲節點都是一樣的設置,MapReduce框架和HDFS運行在同組節點。這樣的設定使得MapReduce框架可以以更高的帶寬來執行任務,當數據已經在節點上時。
MapReduce 框架包含一個主ResourceManager,每一個集羣節點都有一個從NodeManager和每一個應用都有一個MRAppMaster。
應用最少必須指定輸入和輸出的路徑而且經過實現合適的接口或者抽象類來提供map和reduce功能。前面這部份內容和其餘Job參數構成了Job的配置。
Hadoop 客戶端提交Job和配置信息給ResourceManger,它將負責把配置信息分配給從屬節點,調度任務而且監控它們,把狀態信息和診斷信息傳輸給客戶端。
儘管 MapReduce 框架是用Java實現的,可是 MapReduce 應用卻不必定要用Java編寫。
The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value>pairs as the output of the job, conceivably of different types.
The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement theWritableComparable interface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
MapReduce 框架只操做鍵值對,MapReduce 將job的不一樣類型輸入當作鍵值對來處理而且生成一組鍵值對做爲輸出。
Key和Value類必須經過實現Writable接口來實現序列化。此外,Key類必須實現WritableComparable 來使得排序更簡單。
MapRedeuce job 的輸入輸出類型:
(input) ->map-> ->combine-> ->reduce-> (output)
This section provides a reasonable amount of detail on every user-facing aspect of the MapReduce framework. This should help users implement, configure and tune their jobs in a fine-grained manner. However, please note that the javadoc for each class/interface remains the most comprehensive documentation available; this is only meant to be a tutorial.
Let us first take the Mapper and Reducer interfaces. Applications typically implement them to provide the map and reduce methods.
We will then discuss other core interfaces including Job, Partitioner, InputFormat, OutputFormat, and others.
Finally, we will wrap up by discussing some useful features of the framework such as the DistributedCache, IsolationRunner etc.
這部分將展現 MapReduce 中面向用戶方面的儘量多的細節。這將會幫助用戶更小粒度地實現、配置和調試它們的Job。然而,請在 Javadoc 中查看每一個類和接口的綜合用法,這裏僅僅是做爲一份指導。
讓咱們首先來看看Mapper和Reducer接口。應用一般只實現它們提供的map和reduce方法。
咱們將會討論其餘接口包括Job、Partitioner、InputFormat和其餘的。
最後,咱們會討論一些有用的特性像分佈式緩存、隔離運行等。
Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the job.
應用一般實現Mapper和Reducer接口提供map和reduce方法。這是Job的核心代碼。
Mapper maps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.
The Hadoop MapReduce framework spawns(產卵) one map task for each InputSplit generated by the InputFormat for the job.
Overall, Mapper implementations are passed the Job for the job via the Job.setMapperClass(Class) method. The framework then calls map(WritableComparable, Writable, Context) for each key/value pair in the InputSplit for that task. Applications can then override the cleanup(Context) method to perform any required cleanup.
Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable, Writable).
Applications can use the Counter to report its statistics.
All intermediate(中間的) values associated(聯繫) with a given output key are subsequently(隨後) grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via Job.setGroupingComparatorClass(Class).
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
Users can optionally(隨意) specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len, value) format. Applications can control if, and how, the intermediate outputs are to be compressed and the CompressionCodec to be used via the Configuration.
Mappers將輸入的鍵值對轉換成中間鍵值對。
Maps是多個單獨執行的任務將輸入轉換成中間記錄。那些被轉換的中間記錄不必定要和輸入的記錄爲相同類型。輸入鍵值對能夠在map後輸出0或者更多的鍵值對。
MapReduce 會根據 InputFormat 切分紅的各個 InputSplit 都建立一個map任務
總的來講,經過 job.setMapperClass(Class)來給Job設置Mapper實現類,而且將InputSplit輸入到map方法進行處理。應用可複寫cleanup方法來執行任何須要回收清除的操做。
輸出鍵值對不必定要和輸入鍵值對爲相同的類型。一個鍵值對輸入能夠輸出0至多個不等的鍵值對。輸出鍵值對將經過context.write(WritableComparable,Writable)方法進行緩存。
應用能夠經過Counter進行統計。
全部的中間值都會按照Key進行排序,而後傳輸給一個特定的Reducer作最後肯定的輸出。用戶能夠經過Job.setGroupingComparatorClass(Class)來控制分組規則。
Mapper輸出會被排序而且分區到每個Reducer。分區數和Reduce的數目是一致的。用戶能夠經過實現一個自定義的Partitioner來控制哪一個key對應哪一個Reducer。
用戶能夠隨意指定一個combiner,Job.setCombinerClass(Class),來執行局部輸出數據的整合,將有效地下降Mapper和Reducer之間的數據傳輸量。
那些通過排序的中間記錄一般會以(key-len, key, value-len, value)的簡單格式儲存。應用能夠經過配置來決定是否須要和怎樣壓縮數據和選擇壓縮方式。
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism(平行) for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you'll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
maps的數據一般依賴於輸入數據的總長度,也就是,輸入文檔的總block數。
每一個節點map的正常並行度應該在10-100之間,儘管每一個cpu已經設置的上限值爲300。任務的配置會花費一些時間,最少須要花費一分鐘來啓動運行。
所以,若是你有10TB的數據輸入和定義blocksize爲128M,那麼你將須要82000 maps,除非經過Configuration.set(MRJobConfig.NUM_MAPS, int)(設置一個默認值通知框架)來設置更高的值。
Reducer reduces a set of intermediate values which share a key to a smaller set of values.
The number of reduces for the job is set by the user via Job.setNumReduceTasks(int).
Overall(總的來講), Reducer implementations are passed the Job for the job via the Job.setReducerClass(Class) method and can override it to initialize themselves. The framework then callsreduce(WritableComparable, Iterable<Writable>, Context) method for each <key, (list of values)> pair in the grouped inputs. Applications can then override the cleanup(Context)method to perform any required cleanup.
Reducer has 3 primary(主要) phases(階段): shuffle, sort and reduce.
Reduce處理一系列相同key的中間記錄。
用戶能夠經過 Job.setNumReduceTasks(int) 來設置reduce的數量。
總的來講,經過 Job.setReducerClass(Class) 能夠給 job 設置 recuder 的實現類而且進行初始化。框架將會調用 reduce 方法來處理每一組按照必定規則分好的輸入數據,應用能夠經過複寫cleanup 方法執行任何清理工做。
Reducer有3個主要階段:混洗、排序和reduce。
Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches(取得) the relevant(有關的,恰當的) partition of the output of all the mappers, via HTTP.
輸出到Reducer的數據都在Mapper階段通過排序的。在這個階段框架將經過HTTP從恰當的Mapper的分區中取得數據。
The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage(階段).
The shuffle and sort phases occur simultaneously(同時); while map-outputs are being fetched they are merged.
這個階段框架將對輸入到的 Reducer 的數據經過key(不一樣的 Mapper 可能輸出相同的key)進行分組。
混洗和排序階段是同時進行;map的輸出數據被獲取時會進行合併。
If equivalence(平等的) rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator via Job.setSortComparatorClass(Class). Since Job.setGroupingComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction(協調) to simulate(模擬) secondary sort on values.
若是想要對中間記錄實現與 map 階段不一樣的排序方式,能夠經過Job.setSortComparatorClass(Class) 來設置一個比較器 。Job.setGroupingComparatorClass(Class) 被用於控制中間記錄的排序方式,這些能用來進行值的二次排序。
In this phase the reduce(WritableComparable, Iterable<Writable>, Context) method is called for each <key, (list of values)> pair in the grouped inputs.
The output of the reduce task is typically written to the FileSystem via Context.write(WritableComparable, Writable).
Applications can use the Counter to report its statistics.
The output of the Reducer is not sorted.
在這個階段reduce方法將會被調用來處理每一個已經分好的組鍵值對。
reduce 任務通常經過 Context.write(WritableComparable, Writable) 將數據寫入到FileSystem。
應用可使用 Counter 進行統計。
Recuder 輸出的數據是不通過排序的。
The right number of reduces seems to be 0.95 or 1.75 multiplied(乘上) by (<no. of nodes> * <no. of maximum containers per node>).
With 0.95 all of the reduces can launch immediately(馬上) and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave(波浪) of reduces doing a much better job of load balancing(均衡).
Increasing the number of reduces increases the framework overhead(負擔,天花板), but increases load balancing and lowers the cost of failures.
The scaling(規模) factors above are slightly(輕微的) less than whole numbers to reserve a few reduce slots in the framework for speculative(推測的)-tasks and failed tasks.
合適的 reduce 總數應該在 節點數*每一個節點的容器數*0.95 至 節點數*每一個節點的容器數*1.75 之間。
當設定值爲0.95時,map任務結束後全部的 reduce 將會馬上啓動而且開始轉移數據,當設定值爲1.75時,處理更多任務的時候將會快速地一輪又一輪地運行 reduce 達到負載均衡。
reduce 的數目的增長將會增長框架的負擔(天花板),可是會提升負載均衡和下降失敗率。
總體的規模將會略小於總數,由於有一些 reduce slot 用來存儲推測任務和失敗任務。
It is legal to set the number of reduce-tasks to zero if no reduction is desired.
In this case the outputs of the map-tasks go directly to the FileSystem, into the output path set by FileOutputFormat.setOutputPath(Job, Path). The framework does not sort the map-outputs before writing them out to the FileSystem.
當沒有 reduction 需求的時候能夠將 reduce-task 的數目設置爲0,是容許的。
在這種狀況當中,map任務將直接輸出到 FileSystem,可經過 FileOutputFormat.setOutputPath(Job, Path) 來設置。該框架不會對輸出的FileSystem 的數據進行排序。
Partitioner partitions the key space.
Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset (子集)of the key) is used to derive(取得;源自) the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
HashPartitioner is the default Partitioner.
Partitioner對key進行分區。
Partitioner 對 map 輸出的中間值的 key(Recuder以前)進行分區。分區採用的默認方法是對 key 取 hashcode。分區數等於 job 的 reduce 任務數。所以這會根據中間值的key 將數據傳輸到對應的 reduce。
HashPartitioner 是默認的的分區器。
Counter is a facility for MapReduce applications to report its statistics.
Mapper and Reducer implementations can use the Counter to report statistics.
Hadoop MapReduce comes bundled(捆綁) with a library of generally(廣泛的) useful mappers, reducers, and partitioners.
計數器是一個工具用於報告 Mapreduce 應用的統計。
Mapper 和 Reducer 實現類可以使用計數器來報告統計值。
Hadoop Mapreduce 是廣泛的可用的 Mappers、Reducers 和 Partitioners 組成的一個庫。
Job represents(表明,表示) a MapReduce job configuration.
Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for execution. The framework tries to faithfully(如實的) execute the job as described by Job, however:
Job is typically used to specify the Mapper, combiner (if any), Partitioner, Reducer, InputFormat, OutputFormat implementations. FileInputFormat indicates(指定,代表) the set of input files (FileInputFormat.setInputPaths(Job, Path...)/ FileInputFormat.addInputPath(Job, Path)) and ( FileInputFormat.setInputPaths(Job, String...)/ FileInputFormat.addInputPaths(Job, String))and where the output files should be written ( FileOutputFormat.setOutputPath(Path)).
Optionally, Job is used to specify other advanced facets of the job such as the Comparator to be used, files to be put in the DistributedCache, whether intermediate and/or job outputs are to be compressed (and how), whether job tasks can be executed in a speculative manner ( setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean)), maximum number of attempts per task (setMaxMapAttempts(int)/ setMaxReduceAttempts(int)) etc.
Of course, users can use Configuration.set(String, String)/ Configuration.get(String) to set/get arbitrary parameters needed by applications. However, use the DistributedCache for large amounts of (read-only) data.
Job類用來表示MapReduce做業的配置。Job是用戶用來描述MapReduce job在Hadoop框架運行的主要接口。Hadoop將盡可能如實地按照job所描述的來執行。然而:
Job 典型地用於指定Mapper、Combiner、Partitioner、Reducer、InputFormat、OutputFormat實現類。 FileInputFormat指定輸入文檔的設定(FileInputFormat.setInputPaths(Job, Path...)/FileInputFormat.addInputPath(Job, Path))和(FileInputFormat.setInputPaths(Job, String...)/FileInputFormat.addInputPaths(Job, String))和輸出文件應該寫入經過(FileOutputFormat.setOutputPath(Path)).
隨意地,Job也經常使用來指定job的其餘高級配置,例如比較器、文檔置於分佈式緩存、中間記錄是否壓縮和怎樣壓縮, job任務是否已預測的方式去執行,每一個任務的最大處理量等等。
固然,用戶可使用來設置或者得到應用所須要的任何參數。然而,使用分佈式緩存來存儲大量的可讀數據。
The MRAppMaster executes the Mapper/Reducer task as a child process in a separate jvm.
The child-task inherits the environment of the parent MRAppMaster. The user can specify additional options to the child-jvm via the mapreduce.{map|reduce}.java.opts and configuration parameter in the Job such as non-standard paths for the run-time linker to search shared libraries via -Djava.library.path=<> etc. If the mapreduce.{map|reduce}.java.opts parameters contains the symbol @taskid@ it is interpolated with value of taskid of the MapReduce task.
Here is an example with multiple arguments and substitutions, showing jvm GC logging, and start of a passwordless(無密碼) JVM JMX agent so that it can connect with jconsole and the likes to watch child memory, threads and get thread dumps. It also sets the maximum heap-size of the map and reduce child jvm to 512MB & 1024MB respectively. It also adds an additional path to the java.library.path of the child-jvm.
1 <property> 2 3 <name>mapreduce.map.java.opts</name> 4 5 <value> 6 7 -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc 8 9 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false 10 11 </value> 12 13 </property> 14 15 16 17 <property> 18 19 <name>mapreduce.reduce.java.opts</name> 20 21 <value> 22 23 -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc 24 25 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false 26 27 </value> 28 29 </property>
MRAppMaster 在一個單獨的jvm中運行Mapper/Reducer任務作爲一個子進程。
子任務繼承父MRAppMaster的運行環境。用戶能夠經過(mapreduce.{map|reduce}.java.opts和配置參數例如經過 Djava.library.path=<>能夠設置非標準的路徑用於運行時搜索庫)指定額外的設置。若是mapreduce.{map|reduce}.java.opts參數包含@taskid@ 符號那麼Mapreduce任務將會被修改成taskid的值。
下面有個例子;配置多個參數和代替,展現jvm gc 日誌,和 JVM JMX 代理用於無密碼登陸以至能夠鏈接JConsole來監控子程序的內存、線程和線程垃圾回收。也分別設置了map和reduce的最大堆內存爲512M和1024M。它也給子jvm添加了額外的路徑經過java.library.path參數。
Users/admins can also specify the maximum virtual memory of the launched child-task, and any sub-process it launches recursively, using mapreduce.{map|reduce}.memory.mb. Note that the value set here is a per process limit. The value for mapreduce.{map|reduce}.memory.mb should be specified in mega bytes (MB). And also the value must be greater than or equal to the -Xmx passed to JavaVM, else the VM might not start.
Note: mapreduce.{map|reduce}.java.opts are used only for configuring the launched child tasks from MRAppMaster. Configuring the memory options for daemons is documented inConfiguring the Environment of the Hadoop Daemons.
The memory available to some parts of the framework is also configurable. In map and reduce tasks, performance may be influenced by adjusting parameters influencing the concurrency of operations and the frequency with which data will hit disk. Monitoring the filesystem counters for a job- particularly relative to byte counts from the map and into the reduce- is invaluable to the tuning of these parameters.
說明:mapreduce.{map|reduce}.java.opts只用來設置MRAppMaster發出的子任務。守護線程的內存選項配置在Configuring the Environment of the Hadoop Daemons.
框架的一些組成部分的內存也是可配置的。在map和reduce任務中,性能可能會受到併發數的調整和寫入到磁盤的頻率的影響。文件系統計數器監控做業的map輸出和輸入到reduce的字節數對於調整這 些參數是寶貴的。
A record emitted(發射) from a map will be serialized into a buffer and metadata will be stored into accounting buffers. As described in the following options, when either the serialization buffer or the metadata exceed(超過) a threshold(入口), the contents of the buffers will be sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.
Name |
Type |
Description |
mapreduce.task.io.sort.mb |
int |
The cumulative(累積) size of the serialization and accounting buffers storing records emitted from the map, in megabytes. |
mapreduce.map.sort.spill.percent |
float |
The soft limit in the serialization buffer. Once reached, a thread will begin to spill the contents to disk in the background. |
Other notes
Map發出的數據將會被序列化在緩存中和源數據將會儲存在統計緩存。正如接下來的配置所描述的,當序列化緩存和元數據超過設定的臨界值,緩存中的內容將會後臺中寫入到磁盤中而map將會繼續輸出記錄。當緩存徹底滿了溢出以後,map線程將會阻塞。當map任務結束,全部剩下的記錄都會被寫到磁盤中而且磁盤中全部文件塊會被合併到一個單獨的文件。減少溢出值將減小map的時間,但更大的緩存會減小mapper的內存消耗。
其餘說明:
As described previously, each reduce fetches the output assigned to it by the Partitioner via HTTP into memory and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. The following options affect the frequency of these merges to disk prior to the reduce and the memory allocated to map output during the reduce.
Name |
Type |
Description |
mapreduce.task.io.soft.factor |
int |
Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during merge. If the number of files exceeds this limit, the merge will proceed in several passes. Though this limit also applies to the map, most jobs should be configured so that hitting this limit is unlikely there. |
mapreduce.reduce.merge.inmem.thresholds |
int |
The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds in the preceding note, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk (see notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle. |
mapreduce.reduce.shuffle.merge.percent |
float |
The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. |
mapreduce.reduce.shuffle.input.buffer.percent |
float |
The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs. |
mapreduce.reduce.input.buffer.percent |
float |
The percentage of memory relative to the maximum heapsize in which map outputs may be retained during the reduce. When the reduce begins, map outputs will be merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk. |
Other notes
正如前面提到的,每一個reduce都會經過HTTP在內存中拿到Partitioner分配好的數據而且按期地合併數據寫到磁盤中。若是map輸出的中間值都進行壓縮,那麼每一個輸出都會減小內存的壓力。下面這些設置將會影響reduce以前的數據合併到磁盤的頻率和reduce過程當中分配給map輸出的內存。
其餘說明:
The following properties are localized in the job configuration for each task's execution:
Name |
Type |
Description |
mapreduce.job.id |
String |
The job id |
mapreduce.job.jar |
String |
job.jar location in job directory |
mapreduce.job.local.dir |
String |
The job specific shared scratch space |
mapreduce.task.id |
String |
The task id |
mapreduce.task.attempt.id |
String |
The task attempt id |
mapreduce.task.is.map |
boolean |
Is this a map task |
mapreduce.task.partition |
int |
The id of the task within the job |
mapreduce.map.input.file |
String |
The filename that the map is reading from |
mapreduce.map.input.start |
long |
The offset of the start of the map input split |
mapreduce.map.input.length |
long |
The number of bytes in the map input split |
mapreduce.task.output.dir |
String |
The task's temporary output directory |
Note: During the execution of a streaming job, the names of the "mapreduce" parameters are transformed. The dots ( . ) become underscores ( _ ). For example, mapreduce.job.id becomes mapreduce_job_id and mapreduce.job.jar becomes mapreduce_job_jar. To get the values in a streaming job's mapper/reducer use the parameter names with the underscores.
說明:流式任務的執行過程當中,名字以mapreduce開頭的參數會被改變。符號(.)會變成(_)。例如,mapreduce.job.id會變成mapreduce_job_id和mapreduce.job.jar會變成mapreduce_job_jar。在Mapper/Reducer中使用帶下劃線的參數名來得到對應的值。
The standard output (stdout) and error (stderr) streams and the syslog of the task are read by the NodeManager and logged to ${HADOOP_LOG_DIR}/userlogs.
NodeManager 會讀取stdout、sterr和任務的syslog並寫到${HADOOP_LOG_DIR}/userlogs。
The DistributedCache can also be used to distribute both jars and native libraries for use in the map and/or reduce tasks. The child-jvm always has its current working directory added to the java.library.path and LD_LIBRARY_PATH. And hence the cached libraries can be loaded via System.loadLibrary or System.load. More details on how to load shared libraries through distributed cache are documented at Native Libraries.
分佈是緩存也能夠在map/reduce任務中用來分不是存儲jars和本地庫。子JVM常常將它的工做路徑添加到java.librarypath和LD_LIBRARY_PATH.所以緩存的庫能經過System.loadLibrary 或者 System.load 來加載。更多關於如何經過分佈式緩存來加載第三方庫參考Native Libraries.
Job is the primary interface by which user-job interacts with the ResourceManager.
Job provides facilities to submit jobs, track their progress, access component-tasks' reports and logs, get the MapReduce cluster's status information and so on.
The job submission process involves:
Job history files are also logged to user specified directory mapreduce.jobhistory.intermediate-done-dir and mapreduce.jobhistory.done-dir, which defaults to job output directory.
User can view the history logs summary in specified directory using the following command
$ mapred job -history output.jhist
This command will print job details, failed and killed tip details.
More details about the job such as successful tasks and task attempts made for each task can be viewed using the following command
$ mapred job -history all output.jhist
Normally the user uses Job to create the application, describe various facets of the job, submit the job, and monitor its progress.
Job 是用戶Job與ResourceManager交互的主要接口。
Job 提供工具去提交jobs、跟蹤他們的進程、使用組成任務的報告和日誌,得到MapReduce集羣的狀態信息和其餘。
Job的提交包含如下內容:
Job的歷史文件也被記錄到用戶經過mapreduce.jobhistory.intermediate-done-dir and mapreduce.jobhistory.done-dir指定的路徑下,默認是Job的輸出路徑。
用戶能夠經過下面的指令來查看指定路徑下的全部的歷史記錄。
$ mapred job -history output.jhist
這個命令能夠打印job的細節,失敗和殺死Job的技巧。用如下的命令能夠考到更多關於Job例如成功任務和每一個任務的目的細節。
$ mapred job -history all output.jhist
Normally the user uses Job to create the application, describe various facets of the job, submit the job, and monitor its progress.
通常來講用戶使用Job來建立應用,描述Job的各個方面,提交Job和監控它的進程。
Users may need to chain MapReduce jobs to accomplish(實現) complex tasks which cannot be done via a single MapReduce job. This is fairly easy since the output of the job typically goes to distributed file-system, and the output, in turn(依次), can be used as the input for the next job.
However, this also means that the onus on ensuring jobs are complete (success/failure) lies squarely on the clients. In such cases, the various job-control options are:
用戶可能須要將多個任務串行實現複雜任務而沒辦法經過一個MapReduce任務實現。這是至關容易,job的output一般是輸出到分佈式緩存,而輸出,依次做爲下一個任務的輸入。
然而,這也意味確保任務的完成(成功/失敗)的義務是徹底創建在客戶端上。在這種狀況下,各類做業的控制選項有:
InputFormat describes the input-specification for a MapReduce job.
The MapReduce framework relies on the InputFormat of the job to:
The default behavior of file-based InputFormat implementations, typically sub-classes of FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set viamapreduce.input.fileinputformat.split.minsize.
Clearly, logical splits based on input-size is insufficient for many applications since record boundaries must be respected. In such cases, the application should implement a RecordReader, who is responsible for respecting record-boundaries and presents a record-oriented view of the logical InputSplit to the individual task.
TextInputFormat is the default InputFormat.
If TextInputFormat is the InputFormat for a given job, the framework detects input-files with the .gz extensions and automatically decompresses them using the appropriate CompressionCodec. However, it must be noted that compressed files with the above extensions cannot be split and each compressed file is processed in its entirety by a single mapper.
InputFormat描述MapReduce Job的輸入規定。
MapReduce框架依賴Job的InputFormat:
那些默認的基於InputFormat的實現,一般來講FileInputForamt的子類,基於總字節數將輸入基於字節數分紅邏輯輸入塊實例,然而,FileSystem的塊大小將是inputSplits的上限值,下限值能夠經過mapreduce.input.fileinputformat.split.minsize來設置。
很明顯,不少應用必須重視記錄的邊界,因存在着輸入大小不足以邏輯分割。在這種狀況,應用應當實現一個RecordReader,負責在單獨任務中處理記錄邊界和顯示,面向記錄的邏輯視圖。
TextInputForamt是默認的InputForamt。
若是job的InputForamt是TextInputFormat,框架會對輸入文件進行檢測,若是擴展名爲.gz那麼會自動用合適的壓縮編碼器進行解壓。然而,必須說明的是通過壓縮的文件將不能被切割而且每個壓縮文件都必須徹底在一個Mapper單獨處理。
InputSplit represents the data to be processed by an individual Mapper.
Typically InputSplit presents a byte-oriented view of the input, and it is the responsibility of RecordReader to process and present a record-oriented view.
FileSplit is the default InputSplit. It sets mapreduce.map.input.file to the path of the input file for the logical split.
輸入塊表示每一個單獨Mapper處理的數據。
一般來講,輸入塊表明輸入的面向字節視圖,而RecordReader表明的是面向記錄視圖。
FileSplit是默認的InputSplit。mapreduce.map.input.file設置用於邏輯分割的輸入路徑。
RecordReader reads <key, value> pairs from an InputSplit.
Typically the RecordReader converts the byte-oriented view of the input, provided by the InputSplit, and presents a record-oriented to the Mapper implementations for processing.RecordReader thus assumes the responsibility of processing record boundaries and presents the tasks with keys and values.
RecordReader從InputSplit讀取鍵值對。
一般來講,RecordReader將輸入的面向字節視圖轉換成面向記錄視圖並輸入到Mapper的實現類進行處理。RecordReader所以承擔起處理記錄邊界和顯示任務的Keys和Values的責任。
OutputFormat describes the output-specification for a MapReduce job.
The MapReduce framework relies on the OutputFormat of the job to:
TextOutputFormat is the default OutputFormat.
OutputFormat 描述MapReduce Job的輸出規定。
MapReduce 框架依賴於Job的OutputFormat:
OutputCommitter describes the commit of task output for a MapReduce job.
The MapReduce framework relies on the OutputCommitter of the job to:
FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. And JobCleanup task, TaskCleanup tasks and JobSetup task have the highest priority, and in that order.
OutputCommitter 描述着MapReduce的任務輸出的提交。
MapReduce依賴於Job的輸出提交器:
FileOutputCommitter是默認的OutputCommitter。Job 建立/清除任務佔有map或者reduce容器,不管NodeManager是否可用。Job的清除任務,任務的清除任務和Job的建立任務擁有最高的優先級。
In some applications, component tasks need to create and/or write to side-files, which differ from the actual job-output files.
In such cases there could be issues with two instances of the same Mapper or Reducer running simultaneously (for example, speculative tasks) trying to open and/or write to the same file (path) on the FileSystem. Hence the application-writer will have to pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.
To avoid these issues the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} sub-directory accessible via ${mapreduce.task.output.dir} for each task-attempt on the FileSystem where the output of the task-attempt is stored. On successful completion of the task-attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to${mapreduce.output.fileoutputformat.outputdir}. Of course, the framework discards the sub-directory of unsuccessful task-attempts. This process is completely transparent to the application.
The application-writer can take advantage of this feature by creating any side-files required in ${mapreduce.task.output.dir} during execution of a task viaFileOutputFormat.getWorkOutputPath(Conext), and the framework will promote them similarly for succesful task-attempts, thus eliminating the need to pick unique paths per task-attempt.
Note: The value of ${mapreduce.task.output.dir} during execution of a particular task-attempt is actually ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is set by the MapReduce framework. So, just create any side-files in the path returned by FileOutputFormat.getWorkOutputPath(Conext) from MapReduce task to take advantage of this feature.
The entire discussion holds true for maps of jobs with reducer=NONE (i.e. 0 reduces) since output of the map, in that case, goes directly to HDFS.
在一些應用當中,組成的任務必須建立一些其餘文檔,跟實際輸出不一樣的文檔。
在這些狀況當中將會同時存在兩個Mapper或者Reducer實例去打開或者寫到FileSystem中相同的文檔。所以應用開發者將會獲取獨一無二的任務目的(使用目的ID,假如say attempt_200709221812_0001_m_000000_0),不只是每一個任務。
說明:${mapreduce.task.output.dir}的值在一個特定任務執行過程當中其實是${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}的值,而這個值是由MapReduce框架設定的。因此,MapReduce任務利用這個特性從FileOutputForamt.getWorkOutPath(Context)返回的路徑建立副文檔。
整個討論適用於做業有map但沒有reduce的狀況,所以map的output直接寫到HDFS.
RecordWriter writes the output <key, value> pairs to an output file.
RecordWriter implementations write the job outputs to the FileSystem.
RecordWriter將鍵值對的輸出寫到輸出文件中。
RecordWriter實現類將job的輸出寫到FileSytem。
Users submit jobs to Queues. Queues, as collection of jobs, allow the system to provide specific functionality. For example, queues use ACLs to control which users who can submit jobs to them. Queues are expected to be primarily used by Hadoop Schedulers.
Hadoop comes configured with a single mandatory queue, called 'default'. Queue names are defined in the mapreduce.job.queuename> property of the Hadoop site configuration. Some job schedulers, such as the Capacity Scheduler, support multiple queues.
A job defines the queue it needs to be submitted to through the mapreduce.job.queuename property, or through the Configuration.set(MRJobConfig.QUEUE_NAME, String) API. Setting the queue name is optional. If a job is submitted without an associated queue name, it is submitted to the 'default' queue.
用戶提交job到隊列中。隊列,也就是job的集合,容許系統提供特定的功能。例如,隊列使用ACLS來控制哪些用戶能夠提交隊列。Hadoop Schedulers是隊列的主要使用者。
Hadoop設置一個單獨的強制的隊列,稱之爲「默認」。隊列的名稱是在Hadoop-site配置文件中的mapreduce.job.queuename>屬性決定的。一些做業調度器支持多個的隊列,例如容量調度器。
一個做業經過mapreduce.job.queuename屬性或者Configuration.set(MRJobConfig.QUEUE_NAME, String)API來定義一個隊列。設置隊列的名字是可選的。若是一個做業被提交時並無設置隊列名稱,那麼隊列名稱爲「默認」。
Counters represent global counters, defined either by the MapReduce framework or applications. Each Counter can be of any Enum type. Counters of a particular Enum are bunched into groups of type Counters.Group.
Applications can define arbitrary Counters (of type Enum) and update them via Counters.incrCounter(Enum, long) or Counters.incrCounter(String, String, long) in the map and/or reducemethods. These counters are then globally aggregated by the framework.
計數器是全局計數器,由MapReduce框架或者應用定義。每個計數器均可以是任何枚舉類型。Counters of a particular Enum are bunched into groups of type Counters.Group。
應用能夠定義任意計數器和經過 Counters.incrCounter(Enum, long) 或者Counters.incrCounter(String, String, long)來更新在map/reduce方法中。這些計數器是經過框架進行全局計算的。
DistributedCache distributes application-specific, large, read-only files efficiently.
DistributedCache is a facility provided by the MapReduce framework to cache files (text, archives, jars and so on) needed by applications.
Applications specify the files to be cached via urls (hdfs://) in the Job. The DistributedCache assumes that the files specified via hdfs:// urls are already present on the FileSystem.
The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.
DistributedCache tracks the modification timestamps of the cached files. Clearly the cache files should not be modified by the application or externally while the job is executing.
DistributedCache can be used to distribute simple, read-only data/text files and more complex types such as archives and jars. Archives (zip, tar, tgz and tar.gz files) are un-archived at the slave nodes. Files have execution permissions set.
The files/archives can be distributed by setting the property mapreduce.job.cache.{files|archives}. If more than one file/archive has to be distributed, they can be added as comma separated paths. The properties can also be set by APIs Job.addCacheFile(URI)/ Job.addCacheArchive(URI) and Job.setCacheFiles(URI[])/ Job.setCacheArchives(URI[]) where URI is of the form hdfs://host:port/absolute-path#link-name. In Streaming, the files can be distributed through command line option -cacheFile/-cacheArchive.
The DistributedCache can also be used as a rudimentary software distribution mechanism for use in the map and/or reduce tasks. It can be used to distribute both jars and native libraries. The Job.addArchiveToClassPath(Path) or Job.addFileToClassPath(Path) api can be used to cache files/jars and also add them to the classpath of child-jvm. The same can be done by setting the configuration properties mapreduce.job.classpath.{files|archives}. Similarly the cached files that are symlinked into the working directory of the task can be used to distribute native libraries and load them.
分佈式緩存有效地分佈存儲應用專用的、大的、只讀的文件。
分佈是緩存是MapReduce框架提供給應用用於緩存文件(文本,檔案、jar包和其餘)。
應用能夠經過urls (hdfs://)在Job中指定文件的緩存路徑。分佈式緩存假設經過hdfs:// urls指定的文件已經存在如今的FileSystem。
這個框架將在某個從屬節點執行任何任務以前複製必要的文件到該節點上。它的高效源於這樣的事實:每一個做業只複製一次到那些可以存檔可是還沒存檔的節點上。
分佈式緩存跟蹤緩存文件的修改時間戳。顯然看成業在執行時緩存文件不該該被應用或者外部修改。
分佈式緩存能夠用來分佈緩存簡單的、只讀的的數據或者文本文檔和更復雜類型例如檔案和Jar包。檔案(zip, tar, tgz and tar.gz files)指的是未存檔到從屬節點的。文檔是有執行權限的。
文件能夠經過設置mapreduce.job.cache.{files|archives}屬性來分配存儲。若是有更多的文件須要存儲,那麼在用逗號隔開路徑便可。該屬性還能夠經過Job.addCacheFile(URI)/ Job.addCacheArchive(URI) and Job.setCacheFiles(URI[])/ Job.setCacheArchives(URI[]) 來設置,URL的格式爲hdfs://host:port/absolute-path#link-name。文件能夠經過命令-cacheFile/-cacheArchive來實現分配存儲。
分佈式緩存也能夠用做一個基本的軟件分發機制用於map/reduce任務。它也能夠用來分佈存儲jar包和本地庫。Job.addArchiveToClassPath(Path) or Job.addFileToClassPath(Path) api能夠用來緩存文件/jars而且子Jvm也會將它們添加到類路徑下。經過設置mapreduce.job.classpath.{files|archives}屬性也能夠達到一樣效果。一樣地緩存文件經過符號連接到任務的工做路徑來分佈緩存本地庫和加載它們。
DistributedCache files can be private or public, that determines how they can be shared on the slave nodes.
分佈式緩存文件能夠是私有的或者公有的,以肯定它們是否能夠被分享到從屬節點。
Profiling is a utility to get a representative (2 or 3) sample of built-in java profiler for a sample of maps and reduces.
User can specify whether the system should collect profiler information for some of the tasks in the job by setting the configuration property mapreduce.task.profile. The value can be set using the api Configuration.set(MRJobConfig.TASK_PROFILE, boolean). If the value is set true, the task profiling is enabled. The profiler information is stored in the user log directory. By default, profiling is not enabled for the job.
Once user configures that profiling is needed, she/he can use the configuration property mapreduce.task.profile.{maps|reduces} to set the ranges of MapReduce tasks to profile. The value can be set using the api Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String). By default, the specified range is 0-2.
User can also specify the profiler configuration arguments by setting the configuration property mapreduce.task.profile.params. The value can be specified using the api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String). If the string contains a %s, it will be replaced with the name of the profiling output file when the task runs. These parameters are passed to the task child JVM on the command line. The default value for the profiling parameters is -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.
分析器是一個工具能夠用來獲取2到3個Java內置分析器關於map和reduce的分析樣本。
用戶能夠經過 mapreduce.task.profile來指定系統是否要收集某個做業的一些任務分析信息。這個值也能夠經過Configuration.set(MRJobConfig.TASK_PROFILE, boolean) api來設置。若是這個值爲真,那麼任務分析將會生效。分析器的信息將儲存在用戶的log路徑下。該屬性默認是不生效的。
一旦用戶配置了該屬性,那麼他/她就能夠經過 mapreduce.task.profile.{maps|reduces} 來設置MapReduce任務的範圍。這個值也能夠經過Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String) api來設置。默認的值爲0-2。
用戶也能夠經過配置mapreduce.task.profile.params屬性來指定分析器的的參數。這個值也能夠經過api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String)來設置。假如字符串裏面包含%s,那麼將會在任務執行時被替換成分析輸出文件的名字。這些參數將會在命令行中傳輸給任務所在的子JVM。默認的參數的值爲-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
The MapReduce framework provides a facility to run user-provided scripts for debugging. When a MapReduce task fails, a user can run a debug script, to process task logs for example. The script is given access to the task's stdout and stderr outputs, syslog and jobconf. The output from the debug script's stdout and stderr is displayed on the console diagnostics and also as part of the job UI.
In the following sections we discuss how to submit a debug script with a job. The script file needs to be distributed and submitted to the framework.
MapReduce框架提供一個工具用來運行用戶提供的腳本用於調試。當一個MapReduce任務失敗,用戶能夠運行調試腳本,去處理任務log。腳本能夠讀取任務的stdout、stderr輸出、syslog和jobconf。調試腳本的stdout和sterr輸出將會做爲Job UI的一部分顯示出來。
在接下來的部分咱們將討論如何提交一個調試腳本到做業中。腳本文件須要提交和存儲在框架中。
The user needs to use DistributedCache to distribute and symlink the script file.
用戶須要使用分佈式緩存來分發和符號連接腳本文件。
A quick way to submit the debug script is to set values for the properties mapreduce.map.debug.script and mapreduce.reduce.debug.script, for debugging map and reduce tasks respectively. These properties can also be set by using APIs Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String) and Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String). In streaming mode, a debug script can be submitted with the command-line options -mapdebug and -reducedebug, for debugging map and reduce tasks respectively.
The arguments to the script are the task's stdout, stderr, syslog and jobconf files. The debug command, run on the node where the MapReduce task failed, is:
$script $stdout $stderr $syslog $jobconf
Pipes programs have the c++ program name as a fifth argument for the command. Thus for the pipes programs the command is
$script $stdout $stderr $syslog $jobconf $program
經過mapreduce.map.debug.script 和nd mapreduce.reduce.debug.script屬性來分別設置map和reduce任務的調試腳本是一個快速的提交調試腳本的方法。這些屬性能夠經過APIs Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String) 和 Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String)來設置。在流式編程模式,能夠經過命令行選項 -mapdebug 和 –reducedebug來分別設置map和reduce的調試腳本用於調試。
腳本的參數是任務的標準輸出、標準錯誤、系統日誌和做業配置文檔。調試命令,運行在某個Mapreduce任務失敗的節點上,是$script $stdout $stderr $syslog $jobconf $program。
擁有C++程度的Pipes項目在命令中增長第五個參數。所以命令以下:$script $stdout $stderr $syslog $jobconf $program
For pipes, a default script is run to process core dumps under gdb, prints stack trace and gives info about running threads.
在Pipes中,默認的腳本是運行在GDP的核心轉儲,打印堆跟蹤和運行線程的信息。
Hadoop MapReduce provides facilities for the application-writer to specify compression for both intermediate map-outputs and the job-outputs i.e. output of the reduces. It also comes bundled with CompressionCodec implementation for the zlib compression algorithm. The gzip, bzip2, snappy, and lz4 file format are also supported.
Hadoop also provides native implementations of the above compression codecs for reasons of both performance (zlib) and non-availability of Java libraries. More details on their usage and availability are available here.
Hadoop MapReduce提供一個功能讓應用開發指定壓縮方式用於map輸出的中間數據和job-outputs也就是reduce的輸出。它也捆綁着實現zlib壓縮算法的壓縮編碼器。支持gzip、bzip二、snappy和lz4文件格式的文檔。
Hadoop也提供上述編碼器的本地實現,由於性能和Java庫不支持的緣由。更多關於它們的使用細節和可用性可參考官方文檔。
Applications can control compression of intermediate map-outputs via the Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api and the CompressionCodec to be used via the Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api.
應用能夠經過Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api來設置是否對map的輸出進行壓縮和Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api指定壓縮編碼器。
Applications can control compression of job-outputs via the FileOutputFormat.setCompressOutput(Job, boolean) api and the CompressionCodec to be used can be specified via the FileOutputFormat.setOutputCompressorClass(Job, Class) api.
If the job outputs are to be stored in the SequenceFileOutputFormat, the required SequenceFile.CompressionType (i.e. RECORD / BLOCK - defaults to RECORD) can be specified via the SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api.
應用能夠經過FileOutputFormat.setCompressOutput(Job, boolean) api來控制是否對做業輸出進行壓縮和經過FileOutputFormat.setOutputCompressorClass(Job, Class)api來設置壓縮編碼器。
若是做業的輸出是以SequenceFileOutputFormat格式存儲的,那麼須要序列化。壓縮類型經過SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api來指定。
Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. Applications can control this feature through the SkipBadRecords class.
This feature can be used when map tasks crash deterministically on certain input. This usually happens due to bugs in the map function. Usually, the user would have to fix these bugs. This is, however, not possible sometimes. The bug may be in third party libraries, for example, for which the source code is not available. In such cases, the task never completes successfully even after multiple attempts, and the job fails. With this feature, only a small portion of data surrounding the bad records is lost, which may be acceptable for some applications (those performing statistical analysis on very large data, for example).
By default this feature is disabled. For enabling it, refer to SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).
With this feature enabled, the framework gets into 'skipping mode' after a certain number of map failures. For more details, seeSkipBadRecords.setAttemptsToStartSkipping(Configuration, int). In 'skipping mode', map tasks maintain the range of records being processed. To do this, the framework relies on the processed record counter. See SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS. This counter enables the framework to know how many records have been processed successfully, and hence, what record range caused a task to crash. On further attempts, this range of records is skipped.
The number of records skipped depends on how frequently the processed record counter is incremented by the application. It is recommended that this counter be incremented after every record is processed. This may not be possible in some applications that typically batch their processing. In such cases, the framework may skip additional records surrounding the bad record. Users can control the number of skipped records through SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) andSkipBadRecords.setReducerMaxSkipGroups(Configuration, long). The framework tries to narrow the range of skipped records using a binary search-like approach. The skipped range is divided into two halves and only one half gets executed. On subsequent failures, the framework figures out which half contains bad records. A task will be re-executed till the acceptable skipped value is met or all task attempts are exhausted. To increase the number of task attempts, use Job.setMaxMapAttempts(int) and Job.setMaxReduceAttempts(int)
Skipped records are written to HDFS in the sequence file format, for later analysis. The location can be changed through SkipBadRecords.setSkipOutputPath(JobConf, Path).
Hadoop提供一個選項當執行map輸入時能夠跳過某一組肯定的壞數據。應用能夠經過SkipBadRecords 類來控制特性。
當map任務中某些輸入必定會致使崩潰時可使用這個屬性。這一般發生在map函數中的bug。一般地,用戶會修復這些bug。然而,某些時候不必定有用。這個bug多是第三方庫致使的,例如那些源代碼看不了的。在這些狀況當中,儘管通過屢次嘗試都沒有辦法完成任務,做業也會失敗。經過這個屬性,只有一小部分的壞數據周邊數據會丟失,這對於某些應用是能夠接受的(那麼數據量很是的統計分析)
這個屬性默認是失效的。能夠經過SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) 和SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)。來使它生效。
當這個屬性生效,框架在必定數量的map失敗後會進入「跳過模式」。在跳過模式中,map任務維持被處理數據的範圍,看看SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。爲了達到這個目標,框架依賴於記錄計數器。看看SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS的說明。這個計數器是的框架能夠知道有多少條記錄被成功處理了,所以來找出哪些記錄範圍會引發任務崩潰。在進一步的嘗試中,這些範圍的記錄會被跳過。
跳過記錄的數目取決於運行的記錄計數器的增加頻率。建議這個計數器在天天記錄處理增長。這在批量處理中可已不太可能實現。在這些狀況當中,框架會跳過不良記錄附近的額外數據。用戶能夠經過SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) andSkipBadRecords.setReducerMaxSkipGroups(Configuration, long)來控制跳過記錄的數量。框架會試圖使用二進制搜索方式來縮窄跳過記錄的範圍。跳過範圍被分紅兩部分而且只有其中一半會被拿來執行。在接下來的錯誤當中,框架將會指出哪一半範圍包含不良數據。一個任務將會從新執行直到跳過記錄或者嘗試次數用完。能夠經過Job.setMaxMapAttempts(int) and Job.setMaxReduceAttempts(int).來增長嘗試次數。
跳過的記錄將會以序列化的形式寫到HDFS中。能夠經過 SkipBadRecords.setSkipOutputPath(JobConf, Path)來修改路徑。
*因爲譯者自己能力有限,因此譯文中確定會出現表述不正確的地方,請你們多多包涵,也但願你們可以指出文中翻譯得不對或者不許確的地方,共同探討進步,謝謝。