自己是一個常見簡單的需求,目的是獲得一個權值流量的分佈狀況。數據原型是幾T的日誌數據,每條記錄都有不少字段,其中有一個字段爲該記錄的權重。每一條記錄是一個流量。java
A B Capache
Tanx FourthView 0
Tencent Na 20
Allyes FirstView 200
Adx OtherView 5
Amx FirstView 133
Miaozhen Na 12
Baidu SecondView 0
Adx OtherView 5
Tanx Na 0
Adx OtherView 5
Allyes FirstView 31
Adx OtherView 5
Tanx Na 0
Adx OtherView 192數組
從原始日誌中抽取三個字段來處理,目的是統計AB字段下字段C的分佈,作法是將具備相同AB字段的記錄聚合到一塊兒,在每一個聚合中,按C字段從小到大排序,而後對排序後的記錄平均分紅100份,每份的流量爲總流量的百分之一。計算每份流量的C字段均值,就獲得了基於AB字段的C字段在100個區間內的分佈狀況。app
將鍵字段AB做爲聚合鍵聚合數據,並以字段C作排序鍵排序全部記錄是輕鬆的,pig和hive都有高效的實現。但若是把聚合排序好的記錄平均分到100個區間,並在每一個區間求均值的話,若是想用pig實現邏輯上會略顯複雜。ide
具體的解決思路是:函數
1.先用pig整合數據,從原始日誌中提取相應的字段,存入臨時文件。oop
2.使用MapReduce程序,讀取整合好的數據,在map函數中,拆分記錄,以AB字段(聚合鍵)爲鍵,C字段爲值,輸出。reduce函數中,對相同聚合鍵AB字段的記錄排序,並對排序後的每條記錄添加一個索引標識其排序後的位置。如排序後C字段爲 55,77,88,添加索引後爲 55 1,77 2,88 3。性能
3.通過MapReduce程序,每條記錄有四個字段,最後一個字段是索引字段。索引字段按排序順序給每條記錄編號,這樣一來,實現自動切分100個區間將極爲方便。作法是:使用pig,先COUNT出每一個聚合組的記錄總數N,每一個區間的記錄條數爲n=N/100,經過自定義一個公式就能夠獲得每條記錄應屬的區間。例如記每條記錄的索引爲 x,它所屬的區間爲 x/n+1。這裏的除法都是取整,例若有999條記錄,分爲100個區間,每一個區間n=999/100=99條。索引x爲1-98的記錄根據公式獲得的區間號爲1,99-197的記錄區間爲2,依次類推。這之間會有一些小BUG,能夠經過調節分區公式來調節。最後獲得每條記錄所屬的分區號後,使用pig的group操做,以AB字段和區間號做爲聚合鍵,在聚合組內對C字段使用AVG函數便可。測試
先給出第三步的pig程序:大數據
1 --flow_distrib_v2.pig 2 3 data = load '$input' as (platform:chararray,location:chararray,price:long,tag:long); 4 5 g_data = group data by (platform,location); 6 7 format_data = foreach g_data{ 8 generate (COUNT(data)/100) as sectionnum,flatten(data) ; 9 } 10 11 format_line = foreach format_data generate platform,location,price,sectionnum,(tag/sectionnum+1) as sectionid; 12 filter_format = filter format_line by sectionid <= 100; 13 14 g_section = group filter_format by (platform,location,sectionid,sectionnum); 15 16 cal_avg = foreach g_section { 17 cnt = filter_format.price; 18 generate 'null','0','$Date' as date,group.platform,group.location,'$agent','$log_type' as type,group.sectionid,group.sectionnum,AVG(cnt) as avg; 19 } 20 21 store cal_avg into '$output';
接下來就剩下第二步的MapReduce程序,如何給每條記錄按順序編號索引。這個部分也是在實踐中關鍵,也是本文主要想記錄的地方。
第二步MapReduce程序主要要達到功能是基於AB鍵聚合排序後的記錄順序編號,主要就是三個操做,分組聚合,排序,編號。下面是簡單實現:
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ProduceTag extends Configured implements Tool{ public static class TagMapper extends Mapper<Object,Text,Text,Text>{ private Text priceKey = new Text(); private Text priceValue = new Text(); public void map(Object key,Text value,Context con) throws IOException, InterruptedException{ String[] str = (value.toString()).split("\t"); String keystr = str[0]+"\t"+str[1]; priceKey.set(keystr); priceValue.set(str[2]); con.write(priceKey, priceValue); } } public static class TagReducer extends Reducer<Text,Text,Text,Text>{ private Text outKey = new Text(); private Text outTag = new Text(); private long counter = 0; private String keystr = ""; private String valuestr = ""; public void reduce(Text key,Iterable<Text> pricesgroup,Context con) throws IOException, InterruptedException{ List<Integer> fprices = new ArrayList<Integer>(); for(Text price:pricesgroup){ int format_price = Integer.parseInt(price.toString()); if(format_price < 200000) fprices.add(format_price); } java.util.Collections.sort(fprices); int index = 1; for(int i : fprices){ valuestr = i+"\t"+index; index++; outTag.set(valuestr); con.write(key, outTag); } } } public static class PlatformPatition extends Partitioner<Text,Text>{ private String[] platform = {"Adx","Amx","Allyes","Baidu","Inmobi","Iqiyi","Mogo","Miaozhen","Tanx","Tencent","Sina","Youku"}; private String[] location = {"Na","FirstView","SecondView","ThirdView","FourthView","FifthView","SixthView","SeventhView","EighthView","NinthView","TenthView","OtherView"}; @Override public int getPartition(Text key, Text value, int numreduce) { // TODO Auto-generated method stub String[] str = key.toString().split("\t"); int pla = 0; for(int i = 0;i<platform.length;i++){ if(str[0].equals(platform[i])){ pla = i; break; } } int loc = 0; for(int j = 0;j<location.length;j++){ if(str[1].equals(location[j])){ loc = j+1; break; } } int par = (pla*12+loc)%numreduce; return par; } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = new Job(conf,"Produce_Tag"); job.setJarByClass(ProduceTag.class); job.setMapperClass(TagMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(TagReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(PlatformPatition.class); job.setNumReduceTasks(28); FileInputFormat.addInputPath(job,new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args){ try{ if(args.length != 2){ System.err.println("usage error"); System.exit(0); }else{ int ret = ToolRunner.run(new ProduceTag(), args); System.exit(ret); } } catch (Exception e){ e.printStackTrace(); } } }
以上程序,map過程當中將須要聚合的鍵做爲KEY,其他字段做爲Value 做爲Map的輸出。並自定義Partition類,將Map輸出定製洗牌方式,最後具備相同AB字段的鍵被髮送到一個Reduce下,而且每一個Reduce只處理一種AB組合,Reduce的個數能夠選擇AB組合的全部狀況,但實際數據處理中有些組合並無值,故而取一個較小的值足以知足需求。
設計使每一個Reduce只處理一種AB組合的初衷是但願提升並行度,分散內存消耗。同時也但願獲得每一個Reduce的輸出文件是對一種組合的處理的結果。每種組合獨立生成一個文件。事實上,後序的實踐代表這徹底是沒有必要的。
Reduce過程當中,對相同AB組合聚合的結果按C字段排序,採用的是JAVA語言定製的排序函數。先將C字段都插入到Arraylist中,插入中去掉了離羣點,即權重大於200000的記錄。這個離羣點的去除在此處對程序的影響不大,畢竟離羣點的個數和記錄總體相比是微不足道的。但在後序的優化中,離羣點的去除能夠帶來很大的性能提高。
使用JAVA排序函數排序後,按順序遍歷列表,並打算編號。這個過程是時間複雜度是O(n),排序的時間複雜度是O(nlgn),因此貌似整個Reduce過程在時間性能上是可接受的。
上述程序在邏輯上應該沒有問題,對小數據集的測試也是經過的,但事實上這樣的程序沒法用在大數據處理上。不幸的是,本人一開始並未認識到錯誤的存在,在幾十個結點上運行此程序處理幾T的數據時,幾乎全部Reduce結點都發生了內存堆滿溢出的狀況。
罪魁禍首在於排序,JAVA定製的排序函數是把全部待排序元素一次性加入內存排序,對於幾T的數據,即便切分多個Reduce後,數據量仍然是很是大的,這一定致使運行該程序的結點內存爆滿。
解決問題的方向是如何在儘可能小的內存內對大數據排序。這個問題的解決取決於待排序數據的結構性質。對於總體數據的統計發現,大於2000000的點就能夠認爲是離羣點,即每一個數據大小集中在0~2000000。雖然數據的老是可能後又幾十億,但會有不少的重複值。
對這種類型數據排序有一個很好的方法,該方法不只能夠將排序內存限制在很小的內存之內,並且時間複雜度是線性的O(n),突破了比較排序的極限。這就是統計排序-鍵索引計數法。
下面是新的Reduce過程:
1 public static class TagReducer extends Reducer<Text,Text,Text,Text>{ 2 private Text outKey = new Text(); 3 private Text outTag = new Text(); 4 private String keystr = ""; 5 private String valuestr = ""; 6 public void reduce(Text key,Iterable<Text> prices,Context con) throws IOException, InterruptedException{ 7 long counter = 0; 8 int[] statis = new int[200001]; 9 int sub = 0; 10 for(Text price:prices){ 11 try{ 12 sub = Integer.parseInt(price.toString()); 13 if(sub < 200000){ 14 statis[sub]++; 15 }else{ 16 throw new NumberFormatException(); 17 } 18 }catch(NumberFormatException e){ 19 statis[200000]++; 20 } 21 } 22 23 for(int i = 0;i<statis.length-1;i++){ 24 int j = statis[i]; 25 while(j>0){ 26 counter++; 27 keystr = key.toString()+"\t"+i; 28 valuestr = counter+""; 29 outKey.set(keystr); 30 outTag.set(valuestr); 31 con.write(outKey, outTag); 32 j--; 33 } 34 } 35 } 36 }
新的reduce過程當中,使用一個整型的statis數組保存全部數據,數組元素下標爲數據的值,數組元素的值爲 那些數據的值爲該元素下標的 數據的個數。由於通過先驗的統計分析代表非離羣點的值都在0~2000000,因此大小爲200 0000的數組足以表示全部數據的值。並且大小爲200 0000的整型數組只需8M內存。接下來的順序編號操做與原來的Reduce過程相似,也是線性操做。
通過修改的Reduce過程在處理大數據時也不會在有內存爆滿的現象。而且即便數據量增長,也不會有問題。程序的性能取決於離羣點的閾值。而在本實踐中,離羣點的閾值變更的幅度不會很大。
新的Reduce過程雖然能在大數據處理時也能保證正確了,但一開始基於自定義Partition以但願分散Reduce數據量,提升並行度的初衷並無起到做用。事實上有些AB組合的數據量遠遠大於其餘組合,這種現象稱之爲數據傾斜。Pig語言在處理大規模數據JOIN時有完善的處理數據傾斜方案。但在這裏只能另謀他路。對程序在集羣上運行的觀察發現。某些Reduce由於非散列到的數據量過於龐大,使得Map到Reduce的COPY過程的IO延遲很是大。每每其餘Reduce完成全部任務了,幾個Reduce任務還緩慢的停留在COPY階段。而且大量數據的COPY佔用IO,也將致使其餘人的任務變的緩慢。
須要加速COPY的IO效率,很容易想到的是使用Combiner。Combiner的使用侷限在線性計算的任務。從新審視Reduce過程是Map數據的散列統計。從數據的值獲得應該散列到的數組元素的下標,並對數組元素進行加一操做,表示數據值爲x的元素又多了一個。事實上能夠把加一操做變爲加n操做。由於若是Map端輸出數據是:
A1 B1 10
A1 B1 10
... ...省略1000條
A1 B1 10
這種形式,首先Map端沒有必要把這樣的數據存1000條到中間文件讓Reduce去copy,其次,Reduce端也不須要對這樣的數據作1000次累加,只須要對下標爲10的數組元素加1000就能夠了。
那麼,Map端對這種形式的輸出應該是 A1 B1 10 1000。最後一個字段表徵這樣的數據有多少條。
鑑於A1 B1是聚合後的結果,要知道這樣的數據有多少條也須要一個統計過程這些過程徹底能夠交給Combiner來完成。Combiner對Map輸出結果到Reduce以前作一個組合壓縮處理。在這裏正好能夠對Map輸出的多條相同數據進行一個合併格式化處理。
修改後的MapReduce程序加入了Combiner,並稍微修改Reduce處理數據的格式,完整代碼以下:
1 public class ProduceTag extends Configured implements Tool{ 2 3 public static class TagMapper extends Mapper<Object,Text,Text,Text>{ 4 private Text priceKey = new Text(); 5 private Text priceValue = new Text(); 6 public void map(Object key,Text value,Context con) throws IOException, InterruptedException{ 7 String[] str = (value.toString()).split("\t"); 8 String keystr = str[0]+"\t"+str[1]; 9 priceKey.set(keystr); 10 priceValue.set(str[2]); 11 con.write(priceKey, priceValue); 12 } 13 } 14 15 public static class TagCombiner extends Reducer<Text,Text,Text,Text>{ 16 private Text outTag = new Text(); 17 private String valueStr = ""; 18 public void reduce(Text key,Iterable<Text> prices,Context con) throws IOException,InterruptedException{ 19 int sub = 0; 20 int[] statis = new int[200001]; 21 try{ 22 for(Text price:prices){ 23 sub = Integer.parseInt(price.toString()); 24 if(sub < 200000){ 25 statis[sub]++; 26 } 27 } 28 }catch(NumberFormatException e){ 29 statis[200000]++; 30 } 31 32 for(int i = 0;i<statis.length-1;i++){ 33 int j = statis[i]; 34 valueStr = i + "#"+j; 35 outTag.set(valueStr); 36 if( j > 0){ 37 con.write(key, outTag); 38 } 39 } 40 } 41 } 42 43 44 public static class TagReducer extends Reducer<Text,Text,Text,Text>{ 45 private Text outKey = new Text(); 46 private Text outTag = new Text(); 47 private String keystr = ""; 48 private String valuestr = ""; 49 public void reduce(Text key,Iterable<Text> pricesgroup,Context con) throws IOException, InterruptedException{ 50 long counter = 0; 51 int[] statis = new int[200001]; 52 int sub = 0; 53 String[] combiner = null; 54 for(Text price:pricesgroup){ 55 try{ 56 String combiners= price.toString(); 57 combiner = combiners.split("#"); 58 sub = Integer.parseInt(combiner[0]); 59 int nums = Integer.parseInt(combiner[1]); 60 if(sub < 20000 && nums>0){ 61 statis[sub]+= nums; 62 }else{ 63 throw new NumberFormatException(); 64 } 65 }catch(NumberFormatException e){ 66 statis[200000]++; 67 } 68 } 69 70 for(int i = 0;i<statis.length-1;i++){ 71 int j = statis[i]; 72 while(j>0){ 73 counter++; 74 keystr = key.toString()+"\t"+i; 75 valuestr = counter+""; 76 outKey.set(keystr); 77 outTag.set(valuestr); 78 con.write(outKey, outTag); 79 j--; 80 } 81 } 82 } 83 } 84 85 @Override 86 public int run(String[] args) throws Exception { 87 // TODO Auto-generated method stub 88 Configuration conf = new Configuration(); 89 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 90 Job job = new Job(conf,"Produce_Tag"); 91 job.setJarByClass(ProduceTag.class); 92 job.setMapperClass(TagMapper.class); 93 job.setMapOutputKeyClass(Text.class); 94 job.setMapOutputValueClass(Text.class); 95 job.setReducerClass(TagReducer.class); 96 job.setOutputKeyClass(Text.class); 97 job.setOutputValueClass(Text.class); 98 job.setCombinerClass(TagCombiner.class); 99 FileInputFormat.addInputPath(job,new Path(otherArgs[0])); 100 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 101 return (job.waitForCompletion(true) ? 0 : 1); 102 } 103 104 105 public static void main(String[] args){ 106 try{ 107 if(args.length != 2){ 108 System.err.println("usage error"); 109 System.exit(0); 110 }else{ 111 int ret = ToolRunner.run(new ProduceTag(), args); 112 System.exit(ret); 113 } 114 } 115 catch (Exception e){ 116 e.printStackTrace(); 117 } 118 } 119 }
修改後的MapReduce程序,去掉了自定以Partition過程,並且沒有定製Reduce個數,所有以自動方式運行。在集羣上的運行代表,整個過程只須要起一個Reduce,並且COPY過程飛快,比PIG程序都要快好多。整個過程輸入上萬個Map,在很短的時間內就完成了全部的處理,反然後序的pig程序影響脫了後腿。
總結:
本篇博客記錄了實習以來遇到的一個大數據處理問題,和本身找到的解決對策。總結經驗教訓,對於大型數據並在集羣上並行處理的任務,永遠不能掉以輕心,常規的方式每每會帶來不少意想不到的問題。