MapReduce中使用Avro

原文地址:itweknow.cn/detail?id=7… ,歡迎你們訪問。java

上篇文章咱們簡要介紹了一下Avro是啥,以及其幾種數據類型。那麼經過這篇文章咱們一塊兒來實踐一下Avro在MapReduce中的使用。git

前提條件

一個maven項目 Hadoop集羣,若是你尚未安裝的話,請戳這裏,查看以前的文章。github

說明

本篇文章是一個簡單的用例,使用的例子是一個txt文件中存儲了大量的學生信息,這些學生有姓名、年齡、愛好和班級信息,咱們要作的事情就是經過MapReduce程序找到各個班級年齡最大的學生。apache

項目依賴

咱們須要hadoop以及avro相關的包。json

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.8.5</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.2</version>
</dependency>

複製代碼

Avro模式

前面也說到了每一個學生有姓名、年齡、愛好、班級四個字段的信息,因此咱們定義了以下的Avro模式來描述一個學生。命名爲Student.avsc,存放在resources目錄下。bash

{
    "type": "record",
    "name": "StudentRecord",
    "doc": "A student",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
        {"name": "hobby", "type": "string"},
        {"name": "class", "type": "string"}
    ]
}
複製代碼

Mapper和Reducer

  • Mapper
public class StudentAgeMaxMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<GenericRecord>> {

    private GenericRecord record = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());

    private StudentRecordParser parser = new StudentRecordParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        parser.parse(value);
        if (parser.isValid()) {
            // 數據合法。
            record.put("name", parser.getName());
            record.put("age", parser.getAge());
            record.put("hobby", parser.getHobby());
            record.put("class", parser.getClazz());
            context.write(new AvroKey<>(parser.getClazz()), new AvroValue<>(record));
        }
    }
}
複製代碼

上面的代碼中你能夠看到咱們自定義了一個StudentRecordParser的類來解析一行記錄,因爲篇幅的緣由這裏就不展現了,你能夠在後面提供的源碼中找到。其實不難看出,Map程序主要作的事情就是將咱們存放在txt中的記錄解析成一個個的GenericRecord對戲,而後以班級名稱爲鍵,record爲值傳遞給Reducer作進一步處理。app

  • Reducer
public class StudentAgeMaxReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {

    @Override
    protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {

        GenericRecord max = null;
        for (AvroValue<GenericRecord> value : values) {
            GenericRecord record = value.datum();
            if (max == null || ((Integer)max.get("age") <
                    (Integer) record.get("age"))) {
                max = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
                max.put("name", record.get("name"));
                max.put("age", record.get("age"));
                max.put("hobby", record.get("hobby"));
                max.put("class", record.get("class"));
            }
        }
        context.write(new AvroKey<>(max), NullWritable.get());
    }
}
複製代碼

Reducer的邏輯其實也比較簡單,就是經過循環比較的方式找到年齡最大的學生。maven

驅動程序

public class StudentAgeMaxDriver {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // 註釋1:爲了解決在Hadoop集羣中運行時咱們使用的Avro版本和集羣中Avro版本不一致的問題。
        configuration.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
        Job job = Job.getInstance(configuration);
        job.setJarByClass(StudentAgeMaxDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
        AvroJob.setOutputKeySchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapperClass(StudentAgeMaxMapper.class);
        job.setReducerClass(StudentAgeMaxReducer.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }

}
複製代碼

和以前的MapReduce實戰中實例比較,咱們這裏使用AvroJob來配置做業,AvroJob類主要用來給輸入、map輸出以及最後輸出數據指定Avro模式。ide

項目打包

在打包的時候咱們須要將依賴也打到jar包中,否則後面在集羣中運行的時候會報找不到AvroJob類的錯誤。可經過在pom.xml中添加以下插件來解決打包的問題。oop

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>
複製代碼

運行

  1. 準備輸入文件,input.txt。

    zhangsan	23	music	class1
    lisi	24	pingpong	class2
    wangwu	24	dance	class1
    liuyi	25	music	class1
    chener	25	dance	class2
    zhaoliu	22	dance	class2
    sunqi	22	pingpong	class1
    zhouba	23	music	class2
    wujiu	26	dance	class1
    zhengshi	21	dance	class2
    複製代碼
  2. 將輸入文件上傳到HDFS上

    hadoop fs -mkdir /input
    hadoop fs -put input.txt /input
    複製代碼
  3. 將jar拷貝到集羣中任意一臺Hadoop機器上。

  4. 運行下面的命令執行jar包

    export HADOOP_CLASSPATH=${你的jar包名}
    export HADOOP_USER_CLASSPATH_FIRST=true
    hadoop jar {你的jar包名} {主類路徑} /input /output
    複製代碼
  5. 將運行結果拷貝到本地

    hadoop fs -copyToLocal /output/part-r-00000.avro part-r-00000.avro
    複製代碼
  6. 運行結果查看

    root@test:~# java -jar /root/extra-jar/avro-tools-1.8.2.jar tojson part-r-00000.avro
    {"name":"wujiu","age":26,"hobby":"dance","class":"class1"}
    {"name":"chener","age":25,"hobby":"dance","class":"class2"}
    
    複製代碼

想要項目源碼嗎?戳這裏就有哦

相關文章
相關標籤/搜索