[MapReduce編程]用MapReduce大刀砍掉海量數據離線處理問題

這篇文章我以前是拜讀過的,今天閒來沒事,就想拿來當作MapReduce的練習。 java

MapReduce這把刀太大,刀大了問題就抵不住這刀鋒了,事實上一開始我想着,這麼多些題目,當是要花很多功夫的,但當我作完一題繼續看下面的題目的時候,才發現這些題目在MapReduce模型下顯得大同小異了,看來拿大刀的人是無論砍的是木頭仍是人頭的,而是直接抽象成柱形物而後掄起刀一刀就下去了。 程序員

  直入主題: 面試

  一、海量日誌數據,提取出某日訪問百度次數最多的前K個IP。[稍微改變] 算法

                說明:每一次訪問網頁就在日誌中記錄1次訪問者的IP,獨佔一行,一個小數據能夠在這裏下載apache

    實在是想不出如何能在一個Job中解決這個問題,因此仍是把它拉扯成了兩個Job來解決。
      Job1:將相同IP的記錄合併,造成<ip,count>形式,其中count是對這個ip的計數。
      Job2:按count排序<ip,count>並選擇前K個進行輸出。
    這裏我寫了一個可序列化的類IPAndCount,若是稍微熟悉MapReduce或者看明白我以前寫的 關係型MapReduce模式:選擇、分組和組內排序 你就知道這是爲了排序而準備的。MapReduce有一個「Shuffle and sort」,這個階段是利用key來對tuple進行排序的,而排序時調用的即是key的compareTo()方法。事實上若是job1輸出的數據兩足夠小,咱們徹底能夠在內存中進行排序而利用MapReduce框架,這樣就能夠省下一個Reduce階段,可是對於這個問題顯然不行。
    IPAndCount很直白,就是包裝了上述的<ip,count>。
[java]  view plain copy print ?
  1. import java.io.DataInput;  
  2. import java.io.DataOutput;  
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.io.WritableComparable;  
  8.   
  9.   
  10. public class IPAndCount implements WritableComparable{  
  11.     Text ip;  
  12.     IntWritable count;  
  13.       
  14.     public IPAndCount(){  
  15.         this.ip = new Text("");  
  16.         this.count = new IntWritable(1);  
  17.     }  
  18.       
  19.     public IPAndCount(Text ip, IntWritable count){  
  20.         this.ip = ip;  
  21.         this.count = count;  
  22.     }  
  23.       
  24.     public IPAndCount(String ip, int count){  
  25.         this.ip = new Text(ip);  
  26.         this.count = new IntWritable(count);  
  27.     }  
  28.       
  29.     public void readFields(DataInput in) throws IOException {  
  30.         ip.readFields(in);  
  31.         count.readFields(in);  
  32.     }  
  33.   
  34.     public void write(DataOutput out) throws IOException {  
  35.         ip.write(out);  
  36.         count.write(out);  
  37.     }  
  38.   
  39.     public int compareTo(Object o) {  
  40.         return ((IPAndCount)o).count.compareTo(count) == 0?   
  41.                 ip.compareTo(((IPAndCount)o).ip):((IPAndCount)o).count.compareTo(count);//若是隻比較count會丟失數據,應該是suffle階段的問題  
  42.     }  
  43.       
  44.     public int hashCode(){  
  45.         return ip.hashCode();  
  46.     }  
  47.       
  48.     public boolean equals(Object o){  
  49.         if(!(o instanceof IPAndCount))  
  50.             return false;  
  51.         IPAndCount other = (IPAndCount)o;  
  52.         return ip.equals(other.ip) && count.equals(other.count);  
  53.     }  
  54.       
  55.     public String toString(){  
  56.         StringBuffer buf = new StringBuffer("[ip=");  
  57.         buf.append(ip.toString());  
  58.         buf.append(",count=");  
  59.         buf.append(count.toString());  
  60.         buf.append("]");  
  61.         return buf.toString();  
  62.     }  
  63.       
  64.     public Text getIp() {  
  65.         return ip;  
  66.     }  
  67.     public void setIp(Text ip) {  
  68.         this.ip = ip;  
  69.     }  
  70.     public IntWritable getCount() {  
  71.         return count;  
  72.     }  
  73.     public void setCount(IntWritable count) {  
  74.         this.count = count;  
  75.     }  
  76. }  

下面對FindActiveIp進行說明:
SumUpIpMapper和SumUpIPReducer事實上就是一個MapReduce中最基礎的詞頻統計程序WordCount。你能夠加一個Combiner來優化一下,我遺漏了。
從配置中能夠看見兩個Job的配置:job 和job2。
依賴關係是job -> job2,代碼中使用了JobControl來解決做業間的依賴關係,JobControl.run()方法會在做業都運行完後才返回。
Job2的輸入路徑是Job1的輸出路徑,從參數中能夠看出這一點。
Job1的輸出在輸出文件中的表現是:ip,count
Job2再從文件中讀入,使用的是KeyValueTextInputFormat,它對應的是TextOutputFormat,咱們能夠從job1的配置中看出來。
BeforeSortIPMapper從job1的輸出中讀取數據幷包裝成IPAndCount類型,以便MapReduce框架在「shuffle and sort」階段利用它來排序。
最後SelectTopKIPReducer選出前K個進行輸出便可,在這裏咱們設置最後的reduce只有一個reduce task,以使全部數據匯聚到一臺機子上進行處理。
[java]  view plain copy print ?
  1. import java.io.IOException;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.lib.ChainMapper;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Reducer;  
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  
  17. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  20. import org.apache.hadoop.util.Tool;  
  21. import org.apache.hadoop.util.ToolRunner;  
  22.   
  23.   
  24. public class FindActiveIP extends Configured implements Tool{  
  25.       
  26.     public static class SumUpIPMapper extends Mapper<LongWritable,Text,Text,IntWritable>{  
  27.         IntWritable  one = new IntWritable(1);  
  28.         public void map(LongWritable key, Text value, Context context)   
  29.             throws IOException,InterruptedException{  
  30.             context.write(value, one);  
  31.         }  
  32.     }  
  33.   
  34.     public static class SumUpIPReducer extends Reducer<Text,IntWritable,Text,IntWritable>{  
  35.         //這裏能夠選擇前k個進行輸出以優化  
  36.         public void reduce(Text key, Iterable<IntWritable> values, Context context)  
  37.             throws IOException, InterruptedException{  
  38.             int sum = 0;  
  39.             for(IntWritable v : values){  
  40.                 sum += v.get();  
  41.             }  
  42.             context.write(key, new IntWritable(sum));  
  43.         }  
  44.     }  
  45.       
  46.       
  47.     public static class BeforeSortIPMapper extends Mapper<Text,Text,IPAndCount,Text>{  
  48.         public void map(Text key, Text value, Context context)  
  49.             throws IOException,InterruptedException{  
  50.             IPAndCount tmp = new IPAndCount(key,new IntWritable(Integer.valueOf(value.toString())));  
  51.             System.out.println(tmp);  
  52.             context.write(tmp,new Text());  
  53.         }  
  54.     }  
  55.       
  56.       
  57.     //set num of this reducer to one  
  58.     public static class SelectTopKIPReducer extends Reducer<IPAndCount,Text,IPAndCount,Text>{  
  59.         int counter = 0;  
  60.         int K = 10;  
  61.         public void reduce(IPAndCount key, Iterable<Text> values, Context context)  
  62.             throws IOException, InterruptedException{  
  63.             System.out.println(key);  
  64.             if(counter < K){  
  65.                 context.write(key, null);  
  66.                   
  67.                 counter++;  
  68.             }  
  69.               
  70.         }  
  71.     }  
  72.     public int run(String[] args) throws Exception {  
  73.         Configuration conf = new Configuration();  
  74.         Job job = new Job(conf,"SumUpIP");  
  75.         job.setJarByClass(FindActiveIP.class);  
  76.         job.setInputFormatClass(TextInputFormat.class);  
  77.         job.setOutputFormatClass(TextOutputFormat.class);  
  78.         job.getConfiguration().set("mapred.textoutputformat.separator"",");  
  79.         Path in = new Path(args[0]);  
  80.         Path out = new Path(args[1]);  
  81.         FileInputFormat.setInputPaths(job, in);  
  82.         FileOutputFormat.setOutputPath(job, out);  
  83.         job.setMapperClass(SumUpIPMapper.class);  
  84.         job.setReducerClass(SumUpIPReducer.class);  
  85.         job.setMapOutputKeyClass(Text.class);  
  86.         job.setMapOutputValueClass(IntWritable.class);  
  87.         job.setOutputKeyClass(Text.class);  
  88.         job.setOutputValueClass(IntWritable.class);  
  89.         job.setNumReduceTasks(7);  
  90.           
  91.         Configuration conf2 = new Configuration();  
  92.         Job job2 = new Job(conf2,"SortAndFindTopK");  
  93.         job2.setJarByClass(FindActiveIP.class);  
  94.         job2.setInputFormatClass(KeyValueTextInputFormat.class);  
  95.         job2.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator"",");  
  96.         job2.setOutputFormatClass(TextOutputFormat.class);  
  97.         Path in2 = new Path(args[1]);  
  98.         Path out2 = new Path(args[2]);  
  99.         FileInputFormat.setInputPaths(job2,in2);  
  100.         FileOutputFormat.setOutputPath(job2, out2);  
  101.         job2.setMapperClass(BeforeSortIPMapper.class);  
  102.         job2.setReducerClass(SelectTopKIPReducer.class);  
  103.         job2.setMapOutputKeyClass(IPAndCount.class);  
  104.         job2.setMapOutputValueClass(Text.class);  
  105.         job2.setOutputKeyClass(IPAndCount.class);  
  106.         job2.setOutputValueClass(Text.class);  
  107.         job2.setNumReduceTasks(1);  
  108.           
  109.         JobControl jobControl = new JobControl("FindTopKIP");  
  110.         ControlledJob cJob1 = new ControlledJob(conf);  
  111.         cJob1.setJob(job);  
  112.         ControlledJob cJob2 = new ControlledJob(conf2);  
  113.         cJob2.setJob(job2);  
  114.         jobControl.addJob(cJob1);  
  115.         jobControl.addJob(cJob2);  
  116.         cJob2.addDependingJob(cJob1);  
  117.         jobControl.run();  
  118.         return 0;  
  119.     }  
  120.       
  121.     public static void main(String args[]) throws Exception{  
  122.         int res = ToolRunner.run(new FindActiveIP(), args);  
  123.         System.exit(res);  
  124.     }  
  125.   
  126. }  

  大刀拿習慣了,從前的大刀就成了如今的繡花針,不是繡花針很差,只是用着不順手。當你聽着歌用java寫着MapReduce,忽然有人在你耳邊喊了一句:Pig~Pig~Pig~
  你很難不心動!程序員愛偷懶堪比女人愛逛街,都是爲了快樂啊~
  下面是用Pig來處理上述的問題:
  
[plain]  view plain copy print ?
  1. grunt> records = LOAD 'input/ipdata' AS (ip:chararray);                           
  2. grunt> grouped_records = GROUP records BY ip;                                     
  3. grunt> counted_records = FOREACH grouped_records GENERATE group, COUNT(records);  
  4. grunt> sorted_records = ORDER counted_records BY $1 DESC;                         
  5. grunt> topK = LIMIT sorted_records 10;                                                    
  6. grunt> DUMP topK;    

你看,數數,沒暈數一數,6行!僅僅6行就解決了。
行1:將文件裝入
行2:按ip分組
行3:組內計數
行4:組間按ip訪問計數排序
行5:選擇前10個數據
行6:運行並輸出。

雖然咱們的Pig方法實際上跑了3個job才完成任務,相比於java寫的MapReduce多了一個job,但Pig顯然更愉快些。

這是最後結果:
(192.168.0.1,1559)
(192.168.0.21,7)
(192.168.0.14,4)
(192.168.0.10,4)
(192.168.0.12,4)
(192.168.0.32,4)
(192.168.0.13,3)
(192.168.0.3,3)
(192.168.0.2,2)
(192.168.0.11,1)

這個算法對帶寬的壓力仍是比較大的,除了加一個Combiner以外,代碼中還提到了另外一個小小的優化在進入第一個Reduce階段的一個reduce task中的數據足以裝入內存時,這是很容易解決的。這不是多好的優化,應當有更加好的優化方式能過濾更多的數據,不然。。。這不科學~

  二、尋找熱門查詢:搜索引擎會經過日誌文件把用戶每次檢索使用的全部檢索串都記錄下來,每一個查詢串的長度爲1-255字節。
    假設目前有100億個記錄(這些查詢串的重複度比較高,雖然總數是100億,但若是除去重複後,不超過10億個。一個查詢串的重複度越高,說明查詢它的用戶越多,也就是越熱門),請你統計最熱門的10個查詢串。[我仍是稍微修改了題目]
    看見這個題目,我以爲我寫下去會對不起July費了萬千腦細胞辛苦的寫做成果,儘管我心裏十分但願它跟上面那題同樣,但這多麼讓人不甘心又不盡興~
    如今暫且不考慮優化的問題:這個題目無非就是統計查詢串的計數,而後排序,而後取出前10個。事實上,這個問題在不考慮細節上徹底能夠用上面的pig腳原本處理。

不寫了,以如今的水平繼續寫實在是不優美:
3題:有一個1G大小的一個文件,裏面每一行是一個詞,詞的大小不超過16字節,內存限制大小是1M。返回頻數最高的100個詞。
4題:海量數據分佈在100臺電腦中,想個辦法高效統計出這批數據的TOP10。
5題:有10個文件,每一個文件1G,每一個文件的每一行存放的都是用戶的query,每一個文件的query均可能重複。要求你按照query的頻度排序。
7題:怎麼在海量數據中找出重複次數最多的一個?
8題:上千萬或上億數據(有重複),統計其中出現次數最多的前N個數據。
9題:一個文本文件,大約有一萬行,每行一個詞,要求統計出其中最頻繁出現的前10個詞。

1,2,3,4,5,7,8,9題思路基本一致,值得注意的是,有時候咱們徹底能夠肯定咱們須要的數據的一些特徵,好比上面的熱門查詢中熱門串必定被查詢超過1000次,那麼咱們就可使用FILTER來進行過濾以減小處理的數據(從而減小對帶寬的壓力)[filted_records = FILTER grouped_records BY SIZE(records) > 1000;]。

6題: 給定a、b兩個文件,各存放50億個url,每一個url各佔64字節,內存限制是4G,讓你找出a、b文件共同的url?
[Hadoop]使用DistributedCache進行復制聯結  以及  使用hadoop的datajoin包進行關係型join操做,你也能夠參考Data-Intensive Text Processing with MapReduce看看原生態的join操做是怎麼進行的。
[plain]  view plain copy print ?
  1. grunt> A = LOAD 'input/url1' AS (url:chararray);  
  2. grunt> B = LOAD 'input/url2' AS (url:chararray);  
  3. grunt> grouped_A = GROUP A BY url;  
  4. grunt> non_duplicated_A = FOREACH grouped_A GENERATE group;  --去重  
  5. grunt> grouped_B = GROUP B BY url;  
  6. grunt> non_duplicated_B = FOREACH grouped_B GENERATE group;  --B去重  
  7. grunt> C = JOIN non_duplicated_B BY group, non_duplicated_A BY group;  --A 、B 內聯結  
  8. grunt> D = FOREACH C GENERATE $0;   //生成重複url  
  9. grunt> DUMP D;  

10題: 1000萬字符串,其中有些是重複的,須要把重複的所有去掉,保留沒有重複的字符串。
使用pig:
[plain]  view plain copy print ?
  1. grunt> records = LOAD 'input/retrived_strings' AS (str:chararray);  
  2. grunt> grouped_records = GROUP records BY str;  
  3. grunt> filted_records = FILTER grouped_records BY SIZE(records) <= 1;  
  4. grunt> DUMP filted_records;  

    今日在CSDN看再次碰見July的這篇博文:教你如何迅速秒殺掉:99%的海量數據處理面試題 app

    這篇文章我以前是拜讀過的,今天閒來沒事,就想拿來當作MapReduce的練習。 框架

    MapReduce這把刀太大,刀大了問題就抵不住這刀鋒了,事實上一開始我想着,這麼多些題目,當是要花很多功夫的,但當我作完一題繼續看下面的題目的時候,才發現這些題目在MapReduce模型下顯得大同小異了,看來拿大刀的人是無論砍的是木頭仍是人頭的,而是直接抽象成柱形物而後掄起刀一刀就下去了。 grunt

      直入主題: oop

      一、海量日誌數據,提取出某日訪問百度次數最多的前K個IP。[稍微改變] 優化

                    說明:每一次訪問網頁就在日誌中記錄1次訪問者的IP,獨佔一行,一個小數據能夠在這裏下載

        實在是想不出如何能在一個Job中解決這個問題,因此仍是把它拉扯成了兩個Job來解決。
          Job1:將相同IP的記錄合併,造成<ip,count>形式,其中count是對這個ip的計數。
          Job2:按count排序<ip,count>並選擇前K個進行輸出。
        這裏我寫了一個可序列化的類IPAndCount,若是稍微熟悉MapReduce或者看明白我以前寫的 關係型MapReduce模式:選擇、分組和組內排序 你就知道這是爲了排序而準備的。MapReduce有一個「Shuffle and sort」,這個階段是利用key來對tuple進行排序的,而排序時調用的即是key的compareTo()方法。事實上若是job1輸出的數據兩足夠小,咱們徹底能夠在內存中進行排序而利用MapReduce框架,這樣就能夠省下一個Reduce階段,可是對於這個問題顯然不行。
        IPAndCount很直白,就是包裝了上述的<ip,count>。
    [java]  view plain copy print ?
    1. import java.io.DataInput;  
    2. import java.io.DataOutput;  
    3. import java.io.IOException;  
    4.   
    5. import org.apache.hadoop.io.IntWritable;  
    6. import org.apache.hadoop.io.Text;  
    7. import org.apache.hadoop.io.WritableComparable;  
    8.   
    9.   
    10. public class IPAndCount implements WritableComparable{  
    11.     Text ip;  
    12.     IntWritable count;  
    13.       
    14.     public IPAndCount(){  
    15.         this.ip = new Text("");  
    16.         this.count = new IntWritable(1);  
    17.     }  
    18.       
    19.     public IPAndCount(Text ip, IntWritable count){  
    20.         this.ip = ip;  
    21.         this.count = count;  
    22.     }  
    23.       
    24.     public IPAndCount(String ip, int count){  
    25.         this.ip = new Text(ip);  
    26.         this.count = new IntWritable(count);  
    27.     }  
    28.       
    29.     public void readFields(DataInput in) throws IOException {  
    30.         ip.readFields(in);  
    31.         count.readFields(in);  
    32.     }  
    33.   
    34.     public void write(DataOutput out) throws IOException {  
    35.         ip.write(out);  
    36.         count.write(out);  
    37.     }  
    38.   
    39.     public int compareTo(Object o) {  
    40.         return ((IPAndCount)o).count.compareTo(count) == 0?   
    41.                 ip.compareTo(((IPAndCount)o).ip):((IPAndCount)o).count.compareTo(count);//若是隻比較count會丟失數據,應該是suffle階段的問題  
    42.     }  
    43.       
    44.     public int hashCode(){  
    45.         return ip.hashCode();  
    46.     }  
    47.       
    48.     public boolean equals(Object o){  
    49.         if(!(o instanceof IPAndCount))  
    50.             return false;  
    51.         IPAndCount other = (IPAndCount)o;  
    52.         return ip.equals(other.ip) && count.equals(other.count);  
    53.     }  
    54.       
    55.     public String toString(){  
    56.         StringBuffer buf = new StringBuffer("[ip=");  
    57.         buf.append(ip.toString());  
    58.         buf.append(",count=");  
    59.         buf.append(count.toString());  
    60.         buf.append("]");  
    61.         return buf.toString();  
    62.     }  
    63.       
    64.     public Text getIp() {  
    65.         return ip;  
    66.     }  
    67.     public void setIp(Text ip) {  
    68.         this.ip = ip;  
    69.     }  
    70.     public IntWritable getCount() {  
    71.         return count;  
    72.     }  
    73.     public void setCount(IntWritable count) {  
    74.         this.count = count;  
    75.     }  
    76. }  

    下面對FindActiveIp進行說明:
    SumUpIpMapper和SumUpIPReducer事實上就是一個MapReduce中最基礎的詞頻統計程序WordCount。你能夠加一個Combiner來優化一下,我遺漏了。
    從配置中能夠看見兩個Job的配置:job 和job2。
    依賴關係是job -> job2,代碼中使用了JobControl來解決做業間的依賴關係,JobControl.run()方法會在做業都運行完後才返回。
    Job2的輸入路徑是Job1的輸出路徑,從參數中能夠看出這一點。
    Job1的輸出在輸出文件中的表現是:ip,count
    Job2再從文件中讀入,使用的是KeyValueTextInputFormat,它對應的是TextOutputFormat,咱們能夠從job1的配置中看出來。
    BeforeSortIPMapper從job1的輸出中讀取數據幷包裝成IPAndCount類型,以便MapReduce框架在「shuffle and sort」階段利用它來排序。
    最後SelectTopKIPReducer選出前K個進行輸出便可,在這裏咱們設置最後的reduce只有一個reduce task,以使全部數據匯聚到一臺機子上進行處理。
    [java]  view plain copy print ?
    1. import java.io.IOException;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.conf.Configured;  
    5. import org.apache.hadoop.fs.Path;  
    6. import org.apache.hadoop.io.IntWritable;  
    7. import org.apache.hadoop.io.LongWritable;  
    8. import org.apache.hadoop.io.Text;  
    9. import org.apache.hadoop.mapred.lib.ChainMapper;  
    10. import org.apache.hadoop.mapreduce.Job;  
    11. import org.apache.hadoop.mapreduce.Mapper;  
    12. import org.apache.hadoop.mapreduce.Reducer;  
    13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    14. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
    15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
    16. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  
    17. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  
    18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
    20. import org.apache.hadoop.util.Tool;  
    21. import org.apache.hadoop.util.ToolRunner;  
    22.   
    23.   
    24. public class FindActiveIP extends Configured implements Tool{  
    25.       
    26.     public static class SumUpIPMapper extends Mapper<LongWritable,Text,Text,IntWritable>{  
    27.         IntWritable  one = new IntWritable(1);  
    28.         public void map(LongWritable key, Text value, Context context)   
    29.             throws IOException,InterruptedException{  
    30.             context.write(value, one);  
    31.         }  
    32.     }  
    33.   
    34.     public static class SumUpIPReducer extends Reducer<Text,IntWritable,Text,IntWritable>{  
    35.         //這裏能夠選擇前k個進行輸出以優化  
    36.         public void reduce(Text key, Iterable<IntWritable> values, Context context)  
    37.             throws IOException, InterruptedException{  
    38.             int sum = 0;  
    39.             for(IntWritable v : values){  
    40.                 sum += v.get();  
    41.             }  
    42.             context.write(key, new IntWritable(sum));  
    43.         }  
    44.     }  
    45.       
    46.       
    47.     public static class BeforeSortIPMapper extends Mapper<Text,Text,IPAndCount,Text>{  
    48.         public void map(Text key, Text value, Context context)  
    49.             throws IOException,InterruptedException{  
    50.             IPAndCount tmp = new IPAndCount(key,new IntWritable(Integer.valueOf(value.toString())));  
    51.             System.out.println(tmp);  
    52.             context.write(tmp,new Text());  
    53.         }  
    54.     }  
    55.       
    56.       
    57.     //set num of this reducer to one  
    58.     public static class SelectTopKIPReducer extends Reducer<IPAndCount,Text,IPAndCount,Text>{  
    59.         int counter = 0;  
    60.         int K = 10;  
    61.         public void reduce(IPAndCount key, Iterable<Text> values, Context context)  
    62.             throws IOException, InterruptedException{  
    63.             System.out.println(key);  
    64.             if(counter < K){  
    65.                 context.write(key, null);  
    66.                   
    67.                 counter++;  
    68.             }  
    69.               
    70.         }  
    71.     }  
    72.     public int run(String[] args) throws Exception {  
    73.         Configuration conf = new Configuration();  
    74.         Job job = new Job(conf,"SumUpIP");  
    75.         job.setJarByClass(FindActiveIP.class);  
    76.         job.setInputFormatClass(TextInputFormat.class);  
    77.         job.setOutputFormatClass(TextOutputFormat.class);  
    78.         job.getConfiguration().set("mapred.textoutputformat.separator"",");  
    79.         Path in = new Path(args[0]);  
    80.         Path out = new Path(args[1]);  
    81.         FileInputFormat.setInputPaths(job, in);  
    82.         FileOutputFormat.setOutputPath(job, out);  
    83.         job.setMapperClass(SumUpIPMapper.class);  
    84.         job.setReducerClass(SumUpIPReducer.class);  
    85.         job.setMapOutputKeyClass(Text.class);  
    86.         job.setMapOutputValueClass(IntWritable.class);  
    87.         job.setOutputKeyClass(Text.class);  
    88.         job.setOutputValueClass(IntWritable.class);  
    89.         job.setNumReduceTasks(7);  
    90.           
    91.         Configuration conf2 = new Configuration();  
    92.         Job job2 = new Job(conf2,"SortAndFindTopK");  
    93.         job2.setJarByClass(FindActiveIP.class);  
    94.         job2.setInputFormatClass(KeyValueTextInputFormat.class);  
    95.         job2.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator"",");  
    96.         job2.setOutputFormatClass(TextOutputFormat.class);  
    97.         Path in2 = new Path(args[1]);  
    98.         Path out2 = new Path(args[2]);  
    99.         FileInputFormat.setInputPaths(job2,in2);  
    100.         FileOutputFormat.setOutputPath(job2, out2);  
    101.         job2.setMapperClass(BeforeSortIPMapper.class);  
    102.         job2.setReducerClass(SelectTopKIPReducer.class);  
    103.         job2.setMapOutputKeyClass(IPAndCount.class);  
    104.         job2.setMapOutputValueClass(Text.class);  
    105.         job2.setOutputKeyClass(IPAndCount.class);  
    106.         job2.setOutputValueClass(Text.class);  
    107.         job2.setNumReduceTasks(1);  
    108.           
    109.         JobControl jobControl = new JobControl("FindTopKIP");  
    110.         ControlledJob cJob1 = new ControlledJob(conf);  
    111.         cJob1.setJob(job);  
    112.         ControlledJob cJob2 = new ControlledJob(conf2);  
    113.         cJob2.setJob(job2);  
    114.         jobControl.addJob(cJob1);  
    115.         jobControl.addJob(cJob2);  
    116.         cJob2.addDependingJob(cJob1);  
    117.         jobControl.run();  
    118.         return 0;  
    119.     }  
    120.       
    121.     public static void main(String args[]) throws Exception{  
    122.         int res = ToolRunner.run(new FindActiveIP(), args);  
    123.         System.exit(res);  
    124.     }  
    125.   
    126. }  

      大刀拿習慣了,從前的大刀就成了如今的繡花針,不是繡花針很差,只是用着不順手。當你聽着歌用java寫着MapReduce,忽然有人在你耳邊喊了一句:Pig~Pig~Pig~
      你很難不心動!程序員愛偷懶堪比女人愛逛街,都是爲了快樂啊~
      下面是用Pig來處理上述的問題:
      
    [plain]  view plain copy print ?
    1. grunt> records = LOAD 'input/ipdata' AS (ip:chararray);                           
    2. grunt> grouped_records = GROUP records BY ip;                                     
    3. grunt> counted_records = FOREACH grouped_records GENERATE group, COUNT(records);  
    4. grunt> sorted_records = ORDER counted_records BY $1 DESC;                         
    5. grunt> topK = LIMIT sorted_records 10;                                                    
    6. grunt> DUMP topK;    

    你看,數數,沒暈數一數,6行!僅僅6行就解決了。
    行1:將文件裝入
    行2:按ip分組
    行3:組內計數
    行4:組間按ip訪問計數排序
    行5:選擇前10個數據
    行6:運行並輸出。

    雖然咱們的Pig方法實際上跑了3個job才完成任務,相比於java寫的MapReduce多了一個job,但Pig顯然更愉快些。

    這是最後結果:
    (192.168.0.1,1559)
    (192.168.0.21,7)
    (192.168.0.14,4)
    (192.168.0.10,4)
    (192.168.0.12,4)
    (192.168.0.32,4)
    (192.168.0.13,3)
    (192.168.0.3,3)
    (192.168.0.2,2)
    (192.168.0.11,1)

    這個算法對帶寬的壓力仍是比較大的,除了加一個Combiner以外,代碼中還提到了另外一個小小的優化在進入第一個Reduce階段的一個reduce task中的數據足以裝入內存時,這是很容易解決的。這不是多好的優化,應當有更加好的優化方式能過濾更多的數據,不然。。。這不科學~

      二、尋找熱門查詢:搜索引擎會經過日誌文件把用戶每次檢索使用的全部檢索串都記錄下來,每一個查詢串的長度爲1-255字節。
        假設目前有100億個記錄(這些查詢串的重複度比較高,雖然總數是100億,但若是除去重複後,不超過10億個。一個查詢串的重複度越高,說明查詢它的用戶越多,也就是越熱門),請你統計最熱門的10個查詢串。[我仍是稍微修改了題目]
        看見這個題目,我以爲我寫下去會對不起July費了萬千腦細胞辛苦的寫做成果,儘管我心裏十分但願它跟上面那題同樣,但這多麼讓人不甘心又不盡興~
        如今暫且不考慮優化的問題:這個題目無非就是統計查詢串的計數,而後排序,而後取出前10個。事實上,這個問題在不考慮細節上徹底能夠用上面的pig腳原本處理。

    不寫了,以如今的水平繼續寫實在是不優美:
    3題:有一個1G大小的一個文件,裏面每一行是一個詞,詞的大小不超過16字節,內存限制大小是1M。返回頻數最高的100個詞。
    4題:海量數據分佈在100臺電腦中,想個辦法高效統計出這批數據的TOP10。
    5題:有10個文件,每一個文件1G,每一個文件的每一行存放的都是用戶的query,每一個文件的query均可能重複。要求你按照query的頻度排序。
    7題:怎麼在海量數據中找出重複次數最多的一個?
    8題:上千萬或上億數據(有重複),統計其中出現次數最多的前N個數據。
    9題:一個文本文件,大約有一萬行,每行一個詞,要求統計出其中最頻繁出現的前10個詞。

    1,2,3,4,5,7,8,9題思路基本一致,值得注意的是,有時候咱們徹底能夠肯定咱們須要的數據的一些特徵,好比上面的熱門查詢中熱門串必定被查詢超過1000次,那麼咱們就可使用FILTER來進行過濾以減小處理的數據(從而減小對帶寬的壓力)[filted_records = FILTER grouped_records BY SIZE(records) > 1000;]。

    6題: 給定a、b兩個文件,各存放50億個url,每一個url各佔64字節,內存限制是4G,讓你找出a、b文件共同的url?
    [Hadoop]使用DistributedCache進行復制聯結  以及  使用hadoop的datajoin包進行關係型join操做,你也能夠參考Data-Intensive Text Processing with MapReduce看看原生態的join操做是怎麼進行的。
    [plain]  view plain copy print ?
    1. grunt> A = LOAD 'input/url1' AS (url:chararray);  
    2. grunt> B = LOAD 'input/url2' AS (url:chararray);  
    3. grunt> grouped_A = GROUP A BY url;  
    4. grunt> non_duplicated_A = FOREACH grouped_A GENERATE group;  --去重  
    5. grunt> grouped_B = GROUP B BY url;  
    6. grunt> non_duplicated_B = FOREACH grouped_B GENERATE group;  --B去重  
    7. grunt> C = JOIN non_duplicated_B BY group, non_duplicated_A BY group;  --A 、B 內聯結  
    8. grunt> D = FOREACH C GENERATE $0;   //生成重複url  
    9. grunt> DUMP D;  

    10題: 1000萬字符串,其中有些是重複的,須要把重複的所有去掉,保留沒有重複的字符串。
    使用pig:
    [plain]  view plain copy print ?
    1. grunt> records = LOAD 'input/retrived_strings' AS (str:chararray);  
    2. grunt> grouped_records = GROUP records BY str;  
    3. grunt> filted_records = FILTER grouped_records BY SIZE(records) <= 1;  
    4. grunt> DUMP filted_records;  
相關文章
相關標籤/搜索