1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FSDataOutputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 10 public class PutMerge { 11 12 public static void main(String[] args) throws IOException { 13 14 Configuration conf = new Configuration(); 15 FileSystem hdfs = FileSystem.get(conf); 16 FileSystem local = FileSystem.getLocal(conf); 17 18 Path inputDir = new Path(args[0]); //(1)設定輸入目錄和輸出文件 19 Path hdfsFile = new Path(args[1]); 20 21 try { 22 FileStatus[] inputFiles = local.listStatus(inputDir); //(2)獲得本地文件列表 23 FSDataOutputStream out = hdfs.create(hdfsFile); //(3)生成HDFS輸出流 24 25 for (int i=0; i<inputFiles.length; i++) { 26 System.out.println(inputFiles[i].getPath().getName()); 27 FSDataInputStream in = local.open(inputFiles[i].getPath()); //(4)打開本地輸入流 28 byte buffer[] = new byte[256]; 29 int bytesRead = 0; 30 while( (bytesRead = in.read(buffer)) > 0) { 31 out.write(buffer, 0, bytesRead); 32 } 33 in.close(); 34 } 35 out.close(); 36 } catch (IOException e) { 37 e.printStackTrace(); 38 } 39 } 40 }
類
|
描述
|
BooleanWritable
|
標準布爾變量的封裝
|
ByteWritable
|
單字節數的封裝
|
DoubleWritable
|
雙字節數的封裝
|
FloatWritable
|
浮點數的封裝
|
IntWritable
|
整數的封裝
|
LongWritable
|
長整數的封裝
|
Text
|
使用UTF8格式的文本封裝
|
NullWritable
|
無鍵值的佔位符
|
1 import java.io.DataInput; 2 import java.io.DataOutput; 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.WritableComparable; 6 7 public class Edge implements WritableComparable<Edge> { 8 9 private String departureNode; 10 private String arrivalNode; 11 12 public String getDepartureNode() { return departureNode;} 13 14 @Override 15 public void readFields(DataInput in) throws IOException { //(1)說明如何讀入數據 16 departureNode = in.readUTF(); 17 arrivalNode = in.readUTF(); 18 } 19 20 @Override 21 public void write(DataOutput out) throws IOException { //(2)說明如何寫入數據 22 out.writeUTF(departureNode); 23 out.writeUTF(arrivalNode); 24 } 25 26 @Override 27 public int compareTo(Edge o) { //(3)定義數據排序 28 return (departureNode.compareTo(o.departureNode) != 0) 29 ? departureNode.compareTo(o.departureNode) 30 : arrivalNode.compareTo(o.arrivalNode); 31 } 32 }
類
|
描述
|
IdentityMapper<k,v>
|
實現Mapper<k,v,k,v>將輸入直接映射到輸出
|
InverseMapper<k,v>
|
實現Mapper<k,v,v,k>反轉鍵/值對
|
RegexMapper<k>
|
實現Mapper<k,text,text,LongWritable>,爲每一個常規表達式的匹配項生成一個(match,1)對
|
TokenCountMapper<k>
|
實現Mapper<k,text,text,LongWritable>,當輸入的值爲分詞時,生成一個(token,1)對
|
類
|
描述
|
IdentityReudcer<k,v>
|
實現Reducer<k,v,k,v>將輸入直接映射到輸出
|
LongSumReducer<k>
|
實現<k,LongWritable,k,LongWritable>, 計算與給定鍵相對應的全部值的和
|
1 public class EdgePartitioner implements Partitioner<Edge, Writable> 2 { 3 @verride 4 public int getPartition(Edge key, Writable value, int numPartitions) 5 { 6 return key.getDepartureNode().hashCode() % numPartitions; 7 } 8 9 @verride 10 public void configure(JobConf conf) { } 11 }
1 import org.apache.hadoop.fs.Path; 2 import org.apache.hadoop.io.Text; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.mapred.FileInputFormat; 5 import org.apache.hadoop.mapred.FileOutputFormat; 6 import org.apache.hadoop.mapred.JobClient; 7 import org.apache.hadoop.mapred.JobConf; 8 import org.apache.hadoop.mapred.lib.TokenCountMapper; 9 import org.apache.hadoop.mapred.lib.LongSumReducer; 10 11 public class WordCount2 { 12 public static void main(String[] args) { 13 JobClient client = new JobClient(); 14 JobConf conf = new JobConf(WordCount2.class); 15 16 FileInputFormat.addInputPath(conf, new Path(args[0])); 17 FileOutputFormat.setOutputPath(conf, new Path(args[1])); 18 19 conf.setOutputKeyClass(Text.class); 20 conf.setOutputValueClass(LongWritable.class); 21 conf.setMapperClass(TokenCountMapper.class); 22 conf.setCombinerClass(LongSumReducer.class); 23 conf.setReducerClass(LongSumReducer.class); 24 25 client.setConf(conf); 26 try { 27 JobClient.runJob(conf); 28 } catch (Exception e) { 29 e.printStackTrace(); 30 } 31 } 32 }
InputFormat
|
描述
|
TextInputFormat
|
在文本文件中每一行均爲一個記錄。鍵(key)爲一行的字節偏移,而值(value)爲一行的內容
key: LongWritable
value: Text
|
KeyValueTextInputFormat
|
在文本文件中的每一行均爲一個記錄。以每行的第一個分隔符爲界,分隔符以前的是鍵(key),以後的是值(value)。分離器在屬性key.value.separator.in.input.line中設定,默認爲製表符(\t)。
key: Text
Value: Text
|
SequenceFileInputFormat<k,v>
|
用於讀取序列文件的InputFormat。鍵和值由用戶定義。序列文件爲hadoop專用的壓縮二進制文件格式。它專用於一個MapReduce做業和其餘MapReduce做業之間傳送數據。
key: K(用戶定義)
value: V(用戶定義)
|
NLineInputFormat
|
與TextInputFormat相同,但每一個分片必定有N行。N在屬性mapred.line.input.format.linespermap中設定,默認爲1.
key: LongWritable
value: Text
|
OutputFormat
|
描述
|
TextOutputFormat<k,v>
|
將每一個記錄寫爲一行文本。鍵和值以字符串的形式寫入,並以製表符(\t)分隔。這個分隔符能夠在屬性mapred.textoutputformat.separator中修改 |
SequenceFileOutputFormat<k,v>
|
以hadoop專有序列文件格式寫入鍵/值對。與SequenceFileInputForamt配合使用 |
NullOutputFormat<k,v>
|
無輸出 |