MapReduce分析明星微博數據

  互聯網時代的到來,使得名人的形象變得更加鮮活,也拉近了明星和粉絲之間的距離。歌星、影星、體育明星、做家等名人經過互聯網可以輕易實現和粉絲的互動,賺錢也變得史無前例的簡單。同時,互聯網的飛速發展自己也造就了一批互聯網明星,這些人藉助新的手段,最大程度發揮了粉絲經濟的能量和做用,在互聯網時代賺得盆滿鉢滿。java

  正是基於這樣一個大背景,今天咱們作一個分析明星微博數據的小項目git

一、項目需求apache

  自定義輸入格式,將明星微博數據排序後按粉絲數關注數 微博數分別輸出到不一樣文件中。數組

二、數據集網絡

  明星 明星微博名稱 粉絲數 關注數 微博數app

  俞灝明 俞灝明 10591367 206 558ide

  李敏鎬 李敏鎬 22898071 11 268oop

  林心如 林心如 57488649 214 5940this

  黃曉明 黃曉明 22616497 506 2011url

  張靚穎 張靚穎 27878708 238 3846

  李娜 李娜 23309493 81 631

  徐小平 徐小平 11659926 1929 13795

  唐嫣 唐嫣 24301532 200 2391

  有斐君 有斐君 8779383 577 4251

三、分析

  自定義InputFormat讀取明星微博數據,經過自定義getSortedHashtableByValue方法分別對明星的fan、followers、microblogs數據進行排序,而後利用MultipleOutputs輸出不一樣項到不一樣的文件中

四、實現

  一、定義WeiBo實體類,實現WritableComparable接口

package com.buaa;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/** 
* @ProjectName MicroblogStar
* @PackageName com.buaa
* @ClassName WeiBo
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-07 14:54:29
*/
public class WeiBo implements WritableComparable<Object> {
    // 粉絲
    private int fan;
    // 關注
    private int followers;
    // 微博數
    private int microblogs;
    
    public WeiBo(){};
    
    public WeiBo(int fan,int followers,int microblogs){
        this.fan = fan;
        this.followers = followers;
        this.microblogs = microblogs;
    }
    
    public void set(int fan,int followers,int microblogs){
        this.fan = fan;
        this.followers = followers;
        this.microblogs = microblogs;
    }
    
    // 實現WritableComparable的readFields()方法,以便該數據能被序列化後完成網絡傳輸或文件輸入
    @Override
    public void readFields(DataInput in) throws IOException {
        fan  = in.readInt();
        followers = in.readInt();
        microblogs = in.readInt();
    }
    
    // 實現WritableComparable的write()方法,以便該數據能被序列化後完成網絡傳輸或文件輸出 
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(fan);
        out.writeInt(followers);
        out.writeInt(microblogs);
    }
    
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        return 0;
    }

    public int getFan() {
        return fan;
    }

    public void setFan(int fan) {
        this.fan = fan;
    }

    public int getFollowers() {
        return followers;
    }

    public void setFollowers(int followers) {
        this.followers = followers;
    }

    public int getMicroblogs() {
        return microblogs;
    }

    public void setMicroblogs(int microblogs) {
        this.microblogs = microblogs;
    }
}

  二、自定義WeiboInputFormat,繼承FileInputFormat抽象類

package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

/** 
* @ProjectName MicroblogStar
* @PackageName com.buaa
* @ClassName WeiboInputFormat
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-07 10:23:28
*/
public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{

     @Override
     public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
          // 自定義WeiboRecordReader類,按行讀取
          return new WeiboRecordReader();
     }

     public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
            public LineReader in; 
            // 聲明key類型
            public Text lineKey = new Text();
            // 聲明 value類型
            public WeiBo lineValue = new WeiBo();
            
            @Override
            public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
                // 獲取split
                FileSplit split = (FileSplit)input;
                // 獲取配置 
                Configuration job = context.getConfiguration();
                // 分片路徑 
                Path file = split.getPath();
                
                FileSystem fs = file.getFileSystem(job); 
                // 打開文件   
                FSDataInputStream filein = fs.open(file);
                
                in = new LineReader(filein,job); 
            }

            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                // 一行數據
                Text line = new Text();
                
                int linesize = in.readLine(line);
                
                if(linesize == 0) 
                    return false; 
                
                // 經過分隔符'\t',將每行的數據解析成數組
                String[] pieces = line.toString().split("\t");
                
                if(pieces.length != 5){  
                    throw new IOException("Invalid record received");  
                } 
                
                int a,b,c;
                try{  
                    // 粉絲  
                    a = Integer.parseInt(pieces[2].trim());
                    // 關注
                    b = Integer.parseInt(pieces[3].trim());
                    // 微博數
                    c = Integer.parseInt(pieces[4].trim());
                }catch(NumberFormatException nfe){  
                    throw new IOException("Error parsing floating poing value in record");  
                }
                
                //自定義key和value值
                lineKey.set(pieces[0]);  
                lineValue.set(a, b, c);
                
                return true;
            }
            
            @Override
            public void close() throws IOException {
                if(in != null){
                    in.close();
                }
            }

            @Override
            public Text getCurrentKey() throws IOException, InterruptedException {
                return lineKey;
            }

            @Override
            public WeiBo getCurrentValue() throws IOException, InterruptedException {
                return lineValue;
            }

            @Override
            public float getProgress() throws IOException, InterruptedException {
                return 0;
            }
            
        }
}

  三、編寫mr程序

package com.buaa;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName MicroblogStar
* @PackageName com.buaa
* @ClassName WeiboCount
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-07 09:07:36
*/
public class WeiboCount extends Configured implements Tool {
    // tab分隔符
    private static String TAB_SEPARATOR = "\t";
    // 粉絲
    private static String FAN = "fan";
    // 關注
    private static String FOLLOWERS = "followers";
    // 微博數
    private static String MICROBLOGS = "microblogs";
    
    public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {
        @Override
        protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {
            // 粉絲
            context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));
            // 關注
            context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));
            // 微博數
            context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));
        }
    }
    
    public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
        private MultipleOutputs<Text, IntWritable> mos;

        protected void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }

        protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {
            Map<String,Integer> map = new HashMap< String,Integer>();
            
            for(Text value : Values){
                // value = 名稱 + (粉絲數 或 關注數 或 微博數)
                String[] records = value.toString().split(TAB_SEPARATOR);
                map.put(records[0], Integer.parseInt(records[1].toString()));
            }
            
            // 對Map內的數據進行排序
            Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);
            
            for(int i = 0; i < entries.length;i++){
                mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());
            }               
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }
    }
    
    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        // 配置文件對象
        Configuration conf = new Configuration();
        
        // 判斷路徑是否存在,若是存在,則刪除
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        
        // 構造任務
        Job job = new Job(conf, "weibo");
        // 主類
        job.setJarByClass(WeiboCount.class);

        // Mapper
        job.setMapperClass(WeiBoMapper.class);
        // Mapper key輸出類型
        job.setMapOutputKeyClass(Text.class);
        // Mapper value輸出類型
        job.setMapOutputValueClass(Text.class);
        
        // Reducer
        job.setReducerClass(WeiBoReducer.class);
        // Reducer key輸出類型
        job.setOutputKeyClass(Text.class);
        // Reducer value輸出類型
        job.setOutputValueClass(IntWritable.class);
        
        // 輸入路徑
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 自定義輸入格式
        job.setInputFormatClass(WeiboInputFormat.class) ;
        //自定義文件輸出類別
        MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);
        
        // 去掉job設置outputFormatClass,改成經過LazyOutputFormat設置  
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 
        
         //提交任務  
        return job.waitForCompletion(true)?0:1;
    }
    
    // 對Map內的數據進行排序(只適合小數據量)
    @SuppressWarnings("unchecked")
    public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {  
        Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);  
        // 排序
        Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {
            public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
                return entry2.getValue().compareTo(entry1.getValue());
            } 
        });
        return entries;  
    }
    
    public static void main(String[] args) throws Exception {
        String[] args0 = {
                "hdfs://ljc:9000/buaa/microblog/weibo.txt",
                "hdfs://ljc:9000/buaa/microblog/out/" 
        };
        int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
        System.exit(ec);
    }
}

五、運行結果

  image

若是,您認爲閱讀這篇博客讓您有些收穫,不妨點擊一下右下角的【推薦】。
若是,您但願更容易地發現個人新博客,不妨點擊一下左下角的【關注我】。
若是,您對個人博客所講述的內容有興趣,請繼續關注個人後續博客,我是【劉超★ljc】。

本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。

實現代碼及數據:下載

相關文章
相關標籤/搜索