首先查看數據源結構:java
CREATE TABLE `article` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`allowed_add_tag` int(2) DEFAULT NULL,
`attitudes` varchar(255) DEFAULT NULL,
`attitudes_id` int(11) DEFAULT NULL,
`banana_count` int(11) DEFAULT NULL,
`big_cover_image` varchar(255) DEFAULT NULL,
`channel_id` int(11) DEFAULT NULL,
`channel_name` varchar(255) DEFAULT NULL,
`channel_path` varchar(255) DEFAULT NULL,
`comment_count` int(11) DEFAULT NULL,
`contribute_time` datetime DEFAULT NULL,
`cover_image` varchar(255) DEFAULT NULL,
`description` varchar(255) DEFAULT NULL,
`essense` int(2) DEFAULT NULL,
`favorite_count` int(11) DEFAULT NULL,
`latest_active_time` datetime DEFAULT NULL,
`latest_comment_time` datetime DEFAULT NULL,
`like_count` int(11) DEFAULT NULL,
`link` varchar(255) DEFAULT NULL,
`parent_channel_id` int(11) DEFAULT NULL,
`parent_channel_name` varchar(255) DEFAULT NULL,
`parent_realm_id` int(11) DEFAULT NULL,
`realm_id` int(11) DEFAULT NULL,
`realm_name` varchar(255) DEFAULT NULL,
`recommended` int(2) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`tag_list` varchar(255) DEFAULT NULL,
`title` varchar(255) DEFAULT NULL,
`top_level` int(2) DEFAULT NULL,
`tudou_domain` int(2) DEFAULT NULL,
`type_id` int(11) DEFAULT NULL,
`user_avatar` varchar(255) DEFAULT NULL,
`user_id` int(11) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`view_count` int(11) DEFAULT NULL,
`view_only` int(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13103 DEFAULT CHARSET=utf8mb4;
複製代碼
這裏我將其中的數據導出爲csv文件。正則表達式
在這個例子中,我要作的是根據帖子發佈時間,統計全天中每隔30分鐘的發帖個數。sql
InputFormat 接口 - 該接口指定輸入文件的內容格式。shell
其中getSplits函數將全部輸入數據分紅numSplits個split,每一個split交給一個map task處理。bash
getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每一個record解析成key/value對。app
首先先編寫工具類Times.java - period(str:String, format:String)方法,該方法的做用爲:負載均衡
根據傳入的字符串和時間格式獲取一天中改時間的時間區間,如:dom
輸入:"2018-10-18 22:05:11", "yyyy-MM-dd HH:mm:ss"分佈式
輸出: "201810182200-201810182230"ide
方法以下:
public static String period(String time, String format) {
Objects.requireNonNull(time);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
int m = dateTime.getMinute();
LocalDateTime start = dateTime.withMinute(m < 30 ? 0 : 30);
LocalDateTime end = null;
if (m < 30) {
end = dateTime.withMinute(30);
} else {
end = dateTime.plusHours(1);
end = end.withMinute(0);
}
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
return start.format(dateTimeFormatter) + "-" + end.format(dateTimeFormatter);
}
複製代碼
測試輸入:
period("2018-11-11 23:34", "yyyy-MM-dd HH:mm");
返回結果:
201811112330-201811120000
TimeMapper.java代碼爲:
public class TimeMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String time = Matchers.stringCutBymatchers(value.toString(), "[0-9]{4}[/][0-9]{1,2}[/][0-9]{1,2}[ ][0-9]{1,2}[:][0-9]{1,2}[:][0-9]{1,2}");
Objects.requireNonNull(time);
String result = Times.period(time, "yyyy/MM/dd HH:mm:ss");
context.write(new Text(result), new LongWritable(1));
}
}
複製代碼
因爲按行讀取.csv文件而且一行中的時間格式爲yyyy/MM/dd HH:mm:ss,所以直接用正則表達式截取時間。而後獲取時間區段,而後將<時間區段, 1>傳遞給reduce().
Matchers.stringCutBymatchers():
public static String stringCutBymatchers(String str, String mstr) {
Pattern patternn = Pattern.compile(mstr);
Matcher matcher = patternn.matcher(str);
String result = null;
if (matcher.find()) {
result = matcher.group(0);
}
return result;
}
複製代碼
reduce()階段的操做就很簡單了,只要統計時間區段出現的次數就行了
TimeReduce.java:
public class TimeReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long counts = 0L;
for (LongWritable val : values) {
counts += val.get();
}
context.write(key, new LongWritable(counts));
}
}
複製代碼
main方法以下:
TimeApp.java:
public class TimeApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("請輸入input目錄和output目錄");
System.exit(2);
}
Job job = Job.getInstance(conf, "acfun-time");
job.setJarByClass(CSVApp.class);
job.setMapperClass(TimeMapper.class);
job.setReducerClass(TimeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
Path path = new Path(otherArgs[otherArgs.length - 1]);// 取第1個表示輸出目錄參數(第0個參數是輸入目錄)
FileSystem fileSystem = path.getFileSystem(conf);// 根據path找到這個文件
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);// true的意思是,就算output有東西,也一帶刪除
}
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
複製代碼
最終文件目錄以下:
其餘package是爲了以後繼承其餘類準備,目前沒用。這裏我採用和hadoop-example同樣的啓動方法,設置一個總Main.java
public class Main {
public static void main(String[] args) {
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("citycount", CSVApp.class, "統計文章中出現的城市個數");
pgd.addClass("timecount", TimeApp.class, "統計文章時段發帖數目");
exitCode = pgd.run(args);
} catch (Throwable e) {
e.printStackTrace();
}
System.exit(exitCode);
}
}
複製代碼
根據命令參數來選擇須要執行的job。
打包並上傳後執行。
執行
yarn jar com.dust-1.0-SNAPSHOT.jar timecount /acfun/input/dust_acfun_article.csv /acfun/output
複製代碼
等待job執行完成:
執行完成以後經過
hdfs dfs -cat /acfun/output/part-r-00000
複製代碼
查看結果
以後只要將該文件的數據提取出來畫成圖表就能直觀地查看發帖時段了。
Mapreduce中用戶能夠進行操做的類:
用戶須要實現該接口以指定輸入文件的內容格式。該接口有兩個方法
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
複製代碼
其中getSplits函數將全部輸入數據分紅numSplits個split,每一個split交給一個map task處理。getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每一個record解析成key/value對。
Hadoop自己提供了一些InputFormat:
InputFormat | 介紹 |
---|---|
TextInputFormat | 文中的每一行都是記錄,即key - 行的偏移量;value - 行的內容. key:LongWritable - value:Text |
KeyValueTextInputFormat | 文本文件中的每一行都是一條記錄。 第一個分隔符分隔每一行。分隔符以前的全部內容都是鍵,後面的全部內容都是值。分隔符由鍵值設置。 輸入行屬性中的分隔符,默認爲tab[\t]字符.key:Text - value:Text |
SequenceFileInputFormat<K, V> | 用於讀取序列文件的inputformat。 鍵和值是用戶定義的。 sequence文件是一個hadoop特定的壓縮二進制文件格式。它被優化用於在一個mapreduce做業的輸出之間傳遞數據到一些其餘mapreduce做業的輸入. key:K - value:V |
NLineInputFormat | 每一個分割都保證有正好N行,mapred行輸入格式linespermap屬性,默認爲1,設置N.key:LongWritable - value:Text |
2.Mapper接口 用戶需繼承Mapper接口實現本身的Mapper,Mapper中必須實現的函數是
void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter ) throws IOException 複製代碼
其中,是經過Inputformat中的RecordReader對象解析處理 的,OutputCollector獲取map()的輸出結果,Reporter保存了當前task處理進度。
Hadoop自己提供了一些Mapper供用戶使用:
Class | 介紹 |
---|---|
IdentityMapper<K, V> | 實現Mapper <K,V,K,V>並將輸入直接映射到輸出 |
InverseMapper<K, V> | 實現Mapper <K,V,V,K>並將輸入直接映射到輸出 |
RegexMapper | 實現Mapper <K,Text,Text,LongWritable>併爲每一個正則表達式匹配生成(match,1)對 |
TokenCountMapper | 實現Mapper <K,Text,Text,LongWritable>並在輸入值被標記化時生成(token,1)對 |
用戶需繼承該接口實現本身的Partitioner以指定map task產生的key/value對交給哪一個reduce task處理,好的Partitioner能讓每一個reduce task處理的數據相近,從而達到負載均衡。Partitioner中需實現的函數是
getPartition( K2 key, V2 value, int numPartitions)
該函數返回對應的reduce task ID。
用戶若是不提供Partitioner,Hadoop會使用默認的(其實是個hash函數)。
Combiner使得map task與reduce task之間的數據傳輸量大大減少,可明顯提升性能。大多數狀況下,Combiner與Reducer相同。
用戶需繼承Reducer接口實現本身的Reducer,Reducer中必須實現的函數是
void reduce(K2 key, Iterator<V2> values, OutputCollector<K3,V3> output, Reporter reporter ) throws IOException 複製代碼
Hadoop自己提供了一些Reducer供用戶使用:
Class | 介紹 |
---|---|
IdentityReduce<K, V> | 實現Reduce <K,V,K,V>並將輸入直接映射到輸出 |
LongSumReduce | 實現Reduce <K,LongWritable,K,LongWritable>並肯定與給定鍵對應的全部值的總和 |
用戶經過OutputFormat指定輸出文件的內容格式,不過它沒有split。每一個reduce task將其數據寫入本身的文件,文件名爲part-nnnnn,其中nnnnn爲reduce task的ID。
Hadoop自己提供了幾個OutputFormat:
OutputFormat | 介紹 |
---|---|
TextOutputFormat | 將每條記錄寫爲一行文本。 鍵和值寫爲字符串,並由tab(\t)分隔,可在mapred中更改。textoutputformat分隔符屬性 |
SequenceFileOutputFormat | 以hadoop的專有序列文件格式寫入鍵/值對。 與SequenceFileInputFormat一塊兒使用 |
NullOutputFormat | 不作輸出 |