MapReduce實戰 - 根據文章記錄獲取時段內發帖頻率

MapReduce簡介

  1. MapReduce是一種分佈式計算模型,是Google提出的,主要用於搜索領域,解決海量數據的計算問題。
  2. MR有兩個階段組成:Map和Reduce,用戶只需實現map()和reduce()兩個函數,便可實現分佈式計算。

例子

數據源結構

首先查看數據源結構:java

CREATE TABLE `article` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `allowed_add_tag` int(2) DEFAULT NULL,
  `attitudes` varchar(255) DEFAULT NULL,
  `attitudes_id` int(11) DEFAULT NULL,
  `banana_count` int(11) DEFAULT NULL,
  `big_cover_image` varchar(255) DEFAULT NULL,
  `channel_id` int(11) DEFAULT NULL,
  `channel_name` varchar(255) DEFAULT NULL,
  `channel_path` varchar(255) DEFAULT NULL,
  `comment_count` int(11) DEFAULT NULL,
  `contribute_time` datetime DEFAULT NULL,
  `cover_image` varchar(255) DEFAULT NULL,
  `description` varchar(255) DEFAULT NULL,
  `essense` int(2) DEFAULT NULL,
  `favorite_count` int(11) DEFAULT NULL,
  `latest_active_time` datetime DEFAULT NULL,
  `latest_comment_time` datetime DEFAULT NULL,
  `like_count` int(11) DEFAULT NULL,
  `link` varchar(255) DEFAULT NULL,
  `parent_channel_id` int(11) DEFAULT NULL,
  `parent_channel_name` varchar(255) DEFAULT NULL,
  `parent_realm_id` int(11) DEFAULT NULL,
  `realm_id` int(11) DEFAULT NULL,
  `realm_name` varchar(255) DEFAULT NULL,
  `recommended` int(2) DEFAULT NULL,
  `status` int(11) DEFAULT NULL,
  `tag_list` varchar(255) DEFAULT NULL,
  `title` varchar(255) DEFAULT NULL,
  `top_level` int(2) DEFAULT NULL,
  `tudou_domain` int(2) DEFAULT NULL,
  `type_id` int(11) DEFAULT NULL,
  `user_avatar` varchar(255) DEFAULT NULL,
  `user_id` int(11) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  `view_count` int(11) DEFAULT NULL,
  `view_only` int(2) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13103 DEFAULT CHARSET=utf8mb4;
複製代碼

這裏我將其中的數據導出爲csv文件。正則表達式

思路

在這個例子中,我要作的是根據帖子發佈時間,統計全天中每隔30分鐘的發帖個數。sql

  1. 因爲當前我沒有重寫InputFormat接口,所以採用的是hadoop默認的按行讀取文件方法。因此傳入參數爲<0, [一行數據]>.

InputFormat 接口 - 該接口指定輸入文件的內容格式。shell

其中getSplits函數將全部輸入數據分紅numSplits個split,每一個split交給一個map task處理。bash

getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每一個record解析成key/value對。app

  1. 獲取數據中的發帖時間
  2. 計算髮帖時間在全天時間中的時間段並傳遞個reduce() - <時間段, 1>
  3. reduce對時間段出現次數進行統計

util

首先先編寫工具類Times.java - period(str:String, format:String)方法,該方法的做用爲:負載均衡

根據傳入的字符串和時間格式獲取一天中改時間的時間區間,如:dom

輸入:"2018-10-18 22:05:11", "yyyy-MM-dd HH:mm:ss"分佈式

輸出: "201810182200-201810182230"ide

方法以下:

public static String period(String time, String format) {
    Objects.requireNonNull(time);
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
    LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
    int m = dateTime.getMinute();
    LocalDateTime start = dateTime.withMinute(m < 30 ? 0 : 30);
    LocalDateTime end = null;
    if (m < 30) {
        end = dateTime.withMinute(30);
    } else {
       end = dateTime.plusHours(1);
       end = end.withMinute(0);
    }

    DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
    return start.format(dateTimeFormatter) + "-" + end.format(dateTimeFormatter);
}
複製代碼

測試輸入:

period("2018-11-11 23:34", "yyyy-MM-dd HH:mm");

返回結果:

201811112330-201811120000

Map

TimeMapper.java代碼爲:

public class TimeMapper extends Mapper<LongWritable, Text, Text, LongWritable> {


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String time = Matchers.stringCutBymatchers(value.toString(), "[0-9]{4}[/][0-9]{1,2}[/][0-9]{1,2}[ ][0-9]{1,2}[:][0-9]{1,2}[:][0-9]{1,2}");
        Objects.requireNonNull(time);
        String result = Times.period(time, "yyyy/MM/dd HH:mm:ss");
        context.write(new Text(result), new LongWritable(1));
    }
}
複製代碼

因爲按行讀取.csv文件而且一行中的時間格式爲yyyy/MM/dd HH:mm:ss,所以直接用正則表達式截取時間。而後獲取時間區段,而後將<時間區段, 1>傳遞給reduce().

Matchers.stringCutBymatchers():

public static String stringCutBymatchers(String str, String mstr) {
    Pattern patternn = Pattern.compile(mstr);
    Matcher matcher = patternn.matcher(str);
    String result = null;
    if (matcher.find()) {
        result = matcher.group(0);
    }
    return result;
}
複製代碼

Reduce

reduce()階段的操做就很簡單了,只要統計時間區段出現的次數就行了

TimeReduce.java:

public class TimeReduce extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long counts = 0L;
        for (LongWritable val : values) {
            counts += val.get();
        }
        context.write(key, new LongWritable(counts));
    }
}
複製代碼

main

main方法以下:

TimeApp.java:

public class TimeApp {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.out.println("請輸入input目錄和output目錄");
            System.exit(2);
        }

        Job job = Job.getInstance(conf, "acfun-time");
        job.setJarByClass(CSVApp.class);
        job.setMapperClass(TimeMapper.class);
        job.setReducerClass(TimeReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        Path path = new Path(otherArgs[otherArgs.length - 1]);// 取第1個表示輸出目錄參數(第0個參數是輸入目錄)
        FileSystem fileSystem = path.getFileSystem(conf);// 根據path找到這個文件
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);// true的意思是,就算output有東西,也一帶刪除
        }

        FileOutputFormat.setOutputPath(job, path);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
}
複製代碼

運行

最終文件目錄以下:

其餘package是爲了以後繼承其餘類準備,目前沒用。

這裏我採用和hadoop-example同樣的啓動方法,設置一個總Main.java

public class Main {

    public static void main(String[] args) {
        int exitCode = -1;
        ProgramDriver pgd = new ProgramDriver();
        try {
            pgd.addClass("citycount", CSVApp.class, "統計文章中出現的城市個數");
            pgd.addClass("timecount", TimeApp.class, "統計文章時段發帖數目");
            exitCode = pgd.run(args);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        System.exit(exitCode);
    }

}
複製代碼

根據命令參數來選擇須要執行的job。

打包並上傳後執行。

執行

yarn jar com.dust-1.0-SNAPSHOT.jar timecount /acfun/input/dust_acfun_article.csv /acfun/output
複製代碼

等待job執行完成:

執行完成以後經過

hdfs dfs -cat /acfun/output/part-r-00000
複製代碼

查看結果

以後只要將該文件的數據提取出來畫成圖表就能直觀地查看發帖時段了。


Mapreduce中用戶能夠進行操做的類:

  1. InputFormat接口

用戶須要實現該接口以指定輸入文件的內容格式。該接口有兩個方法

public interface InputFormat<K, V> {
 
     InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
     RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
 
}
複製代碼

其中getSplits函數將全部輸入數據分紅numSplits個split,每一個split交給一個map task處理。getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每一個record解析成key/value對。

Hadoop自己提供了一些InputFormat:

InputFormat 介紹
TextInputFormat 文中的每一行都是記錄,即key - 行的偏移量;value - 行的內容. key:LongWritable - value:Text
KeyValueTextInputFormat 文本文件中的每一行都是一條記錄。 第一個分隔符分隔每一行。分隔符以前的全部內容都是鍵,後面的全部內容都是值。分隔符由鍵值設置。 輸入行屬性中的分隔符,默認爲tab[\t]字符.key:Text - value:Text
SequenceFileInputFormat<K, V> 用於讀取序列文件的inputformat。 鍵和值是用戶定義的。 sequence文件是一個hadoop特定的壓縮二進制文件格式。它被優化用於在一個mapreduce做業的輸出之間傳遞數據到一些其餘mapreduce做業的輸入. key:K - value:V
NLineInputFormat 每一個分割都保證有正好N行,mapred行輸入格式linespermap屬性,默認爲1,設置N.key:LongWritable - value:Text

2.Mapper接口 用戶需繼承Mapper接口實現本身的Mapper,Mapper中必須實現的函數是

void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter ) throws IOException 複製代碼

其中,是經過Inputformat中的RecordReader對象解析處理 的,OutputCollector獲取map()的輸出結果,Reporter保存了當前task處理進度。

Hadoop自己提供了一些Mapper供用戶使用:

Class 介紹
IdentityMapper<K, V> 實現Mapper <K,V,K,V>並將輸入直接映射到輸出
InverseMapper<K, V> 實現Mapper <K,V,V,K>並將輸入直接映射到輸出
RegexMapper 實現Mapper <K,Text,Text,LongWritable>併爲每一個正則表達式匹配生成(match,1)對
TokenCountMapper 實現Mapper <K,Text,Text,LongWritable>並在輸入值被標記化時生成(token,1)對
  1. Partitioner接口

用戶需繼承該接口實現本身的Partitioner以指定map task產生的key/value對交給哪一個reduce task處理,好的Partitioner能讓每一個reduce task處理的數據相近,從而達到負載均衡。Partitioner中需實現的函數是

getPartition( K2 key, V2 value, int numPartitions)

該函數返回對應的reduce task ID。

用戶若是不提供Partitioner,Hadoop會使用默認的(其實是個hash函數)。

  1. Combiner

Combiner使得map task與reduce task之間的數據傳輸量大大減少,可明顯提升性能。大多數狀況下,Combiner與Reducer相同。

  1. Reducer接口

用戶需繼承Reducer接口實現本身的Reducer,Reducer中必須實現的函數是

void reduce(K2 key, Iterator<V2> values, OutputCollector<K3,V3> output, Reporter reporter ) throws IOException 複製代碼

Hadoop自己提供了一些Reducer供用戶使用:

Class 介紹
IdentityReduce<K, V> 實現Reduce <K,V,K,V>並將輸入直接映射到輸出
LongSumReduce 實現Reduce <K,LongWritable,K,LongWritable>並肯定與給定鍵對應的全部值的總和
  1. OutputFormat

用戶經過OutputFormat指定輸出文件的內容格式,不過它沒有split。每一個reduce task將其數據寫入本身的文件,文件名爲part-nnnnn,其中nnnnn爲reduce task的ID。

Hadoop自己提供了幾個OutputFormat:

OutputFormat 介紹
TextOutputFormat 將每條記錄寫爲一行文本。 鍵和值寫爲字符串,並由tab(\t)分隔,可在mapred中更改。textoutputformat分隔符屬性
SequenceFileOutputFormat 以hadoop的專有序列文件格式寫入鍵/值對。 與SequenceFileInputFormat一塊兒使用
NullOutputFormat 不作輸出
相關文章
相關標籤/搜索