原文地址:itweknow.cn/detail?id=7… ,歡迎你們訪問。java
上篇文章咱們簡要介紹了一下Avro是啥,以及其幾種數據類型。那麼經過這篇文章咱們一塊兒來實踐一下Avro在MapReduce中的使用。git
一個maven項目 Hadoop集羣,若是你尚未安裝的話,請戳這裏,查看以前的文章。github
本篇文章是一個簡單的用例,使用的例子是一個txt文件中存儲了大量的學生信息,這些學生有姓名、年齡、愛好和班級信息,咱們要作的事情就是經過MapReduce程序找到各個班級年齡最大的學生。apache
咱們須要hadoop以及avro相關的包。json
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.8.2</version>
</dependency>
複製代碼
前面也說到了每一個學生有姓名、年齡、愛好、班級四個字段的信息,因此咱們定義了以下的Avro模式來描述一個學生。命名爲Student.avsc,存放在resources目錄下。bash
{
"type": "record",
"name": "StudentRecord",
"doc": "A student",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "hobby", "type": "string"},
{"name": "class", "type": "string"}
]
}
複製代碼
public class StudentAgeMaxMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<GenericRecord>> {
private GenericRecord record = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
private StudentRecordParser parser = new StudentRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValid()) {
// 數據合法。
record.put("name", parser.getName());
record.put("age", parser.getAge());
record.put("hobby", parser.getHobby());
record.put("class", parser.getClazz());
context.write(new AvroKey<>(parser.getClazz()), new AvroValue<>(record));
}
}
}
複製代碼
上面的代碼中你能夠看到咱們自定義了一個StudentRecordParser的類來解析一行記錄,因爲篇幅的緣由這裏就不展現了,你能夠在後面提供的源碼中找到。其實不難看出,Map程序主要作的事情就是將咱們存放在txt中的記錄解析成一個個的GenericRecord對戲,而後以班級名稱爲鍵,record爲值傳遞給Reducer作進一步處理。app
public class StudentAgeMaxReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
@Override
protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
GenericRecord max = null;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
if (max == null || ((Integer)max.get("age") <
(Integer) record.get("age"))) {
max = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
max.put("name", record.get("name"));
max.put("age", record.get("age"));
max.put("hobby", record.get("hobby"));
max.put("class", record.get("class"));
}
}
context.write(new AvroKey<>(max), NullWritable.get());
}
}
複製代碼
Reducer的邏輯其實也比較簡單,就是經過循環比較的方式找到年齡最大的學生。maven
public class StudentAgeMaxDriver {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 註釋1:爲了解決在Hadoop集羣中運行時咱們使用的Avro版本和集羣中Avro版本不一致的問題。
configuration.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Job job = Job.getInstance(configuration);
job.setJarByClass(StudentAgeMaxDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
AvroJob.setOutputKeySchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapperClass(StudentAgeMaxMapper.class);
job.setReducerClass(StudentAgeMaxReducer.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
複製代碼
和以前的MapReduce實戰中實例比較,咱們這裏使用AvroJob來配置做業,AvroJob類主要用來給輸入、map輸出以及最後輸出數據指定Avro模式。ide
在打包的時候咱們須要將依賴也打到jar包中,否則後面在集羣中運行的時候會報找不到AvroJob類的錯誤。可經過在pom.xml中添加以下插件來解決打包的問題。oop
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
複製代碼
準備輸入文件,input.txt。
zhangsan 23 music class1
lisi 24 pingpong class2
wangwu 24 dance class1
liuyi 25 music class1
chener 25 dance class2
zhaoliu 22 dance class2
sunqi 22 pingpong class1
zhouba 23 music class2
wujiu 26 dance class1
zhengshi 21 dance class2
複製代碼
將輸入文件上傳到HDFS上
hadoop fs -mkdir /input
hadoop fs -put input.txt /input
複製代碼
將jar拷貝到集羣中任意一臺Hadoop機器上。
運行下面的命令執行jar包
export HADOOP_CLASSPATH=${你的jar包名}
export HADOOP_USER_CLASSPATH_FIRST=true
hadoop jar {你的jar包名} {主類路徑} /input /output
複製代碼
將運行結果拷貝到本地
hadoop fs -copyToLocal /output/part-r-00000.avro part-r-00000.avro
複製代碼
運行結果查看
root@test:~# java -jar /root/extra-jar/avro-tools-1.8.2.jar tojson part-r-00000.avro
{"name":"wujiu","age":26,"hobby":"dance","class":"class1"}
{"name":"chener","age":25,"hobby":"dance","class":"class2"}
複製代碼
想要項目源碼嗎?戳這裏就有哦。