FP-Growth是一種常被用來進行關聯分析,挖掘頻繁項的算法。與Aprior算法相比,FP-Growth算法採用前綴樹的形式來表徵數據,減小了掃描事務數據庫的次數,經過遞歸地生成條件FP-tree來挖掘頻繁項。參考資料[1]詳細分析了這一過程。事實上,面對大數據量時,FP-Growth算法生成的FP-tree很是大,沒法放入內存,挖掘到的頻繁項也可能有指數多個。本文將分析如何並行化FP-Growth算法以及Mahout中並行化FP-Growth算法的源碼。html
1. 並行化FP-Growthjava
並行化FP-Growth的方法有不少,最先提出使用MapReduce並行化FP-Growth算法的應該是來自Google Beijing Research的Haoyuan Li等(見參考資料[2])。他們提出使用三次MapReduce來並行化FP-Growth,整個過程大體能夠分爲五個步驟:node
Step 1:Sharding算法
爲了均衡整個集羣的讀寫性能,將事務數據庫分紅若干個數據片斷(shard),存儲到P個節點中。數據庫
Step 2:Parallel Countingapache
與WordCount相似,經過一次MapReduce來計算每個項(item)的支持度。具體來講,每個mapper將從hdfs中取得事務數據庫的若干個數據片斷(shards),因此mapper的輸入是<key, value=Ti>,Ti表示數據片斷中的一條數據。對於Ti中的每個項aj,mapper輸出<key=aj, value=1>。當集羣中的全部mapper處理完數據以後,全部key=aj的鍵值對將被分配到同一個reducer,因此reducer的輸入是<key=aj, value={1, 1, ... , 1}>。reducer只須要進行一次求和,而後輸出<key=aj, value=sum{1, 1, ... , 1}>。最終將獲得一張按照支持度遞減排序的列表,稱之爲F-List:設計模式
圖1數組
Step 3:Grouping Items緩存
將F-List中的項(item)分爲Q個組(group),每個組都有一個惟一的group-id,咱們將全部項以及其所對應的group-id記爲G-List。數據結構
Step 4:Parallel FP-Growth
這一步驟是並行化FP-Growth的關鍵,也是整個算法中相對難以理解的部分。這一步驟中將用到一次MapReduce。每個mapper的輸入來自第一步生成的數據片斷,因此mapper的輸入是<key, value=Ti>。在處理這些數據片斷以前,mapper將讀取第三步生成的G-List。G-List實際上是一張Hashmap,鍵是項,值是項所對應的group-id,所佔空間通常並不會很大,能夠放入內存中。從Ti的最後一項開始向前掃描,或者說從右向左掃描,若是aL在G-List中對應的group-id是第一次被掃描,則輸出{a0,a1,…,aL},不然不輸出任何數據。以圖1中所示的數據爲例,假如支持度閾值爲1,Q爲3,那麼將獲得G-List:
圖2
其中,第三列是group-id。假如mapper的輸入是{牛奶,雞蛋,麪包,薯片},從最後一項開始掃描,輸出<key=1,value={牛奶,雞蛋,麪包,薯片}>。以後的兩項是麪包和雞蛋,其所對應的group-id和薯片相同,因此不輸出任何數據。第一項是牛奶,其所對應的group-id不曾出現過,因此輸出<key=2,value={牛奶}>。
全部group-id相同的數據將被推送到同一個reducer,因此reducer的輸入是<key=group-id,value={{ValueList1},{ValueList2},…,{ValueListN}}>。reducer在本地構建FP-tree,而後像傳統的FP-Growth算法那樣遞歸地構建條件FP-tree,並挖掘頻繁模式。與傳統的FP-Growth算法不同的是,reducer並不直接輸出所挖掘到的頻繁模式,而是將其放入一個大小爲K,根據支持度排序創建的大根堆,而後輸出K個支持度較高的頻繁模式:<key=item,reduce={包含該item的Top K Frequent Patterns}>。
Step 5:Aggregating
上一步挖掘到的頻繁模式Top K Frequent Patterns已經包含了全部頻繁模式,然而上一步的MapReduce是按照groupID來劃分數據,所以key=item對應的頻繁模式會存在於若干個不一樣groupID的reduce節點上。爲了合併全部key=item的鍵值對,優化結果展示形式,可利用MapReduce默認對key排序的特色,對挖掘到的頻繁模式進行一下處理:依次將Top K Frequent Patterns的每個item做爲key,而後輸出包含該key的這一條Top K Frequent Patterns。因此,每個mapper的輸出是<key=item, value={該節點上的包含該item的頻繁模式}>,reducer彙總全部mapper的輸出結果,並輸出最終的結果<key=item, value={包含該item的全部頻繁模式}>。
2. Parallel FP-Growth源碼分析
Mahout提供了一些機器學習領域經典算法的實現。Mahout0.9以後的版本已經移除了Parallel FP-Growth算法。本文將分析Mahout0.8中Parallel FP-Growth的源碼。
圖3
FPGrowthDriver.java
FPGrowthDriver是FPGrowth算法的驅動類,繼承自AbstractJob類。運行Hadoop任務通常都是經過命令行中執行bin/hadoop腳本,同時傳入一些參數。ToolRunner類中的GenericOptionsParser可獲取這些命令行參數。AbstractJob類封裝了addInputOption,addOutputOption,addOption,parseArguments等方法,爲解析命令行參數提供了幫助。params對象存儲了整個算法所須要的參數。FPGrowthDriver根據命令行參數,若順序執行,則調用該文件內的runFPGrowth方法,若並行化執行,則調用PFPGrowth.java文件中的runPFPGrowth方法。
1 public final class FPGrowthDriver extends AbstractJob { 2 3 private static final Logger log = LoggerFactory.getLogger(FPGrowthDriver.class); 4 5 private FPGrowthDriver() { 6 } 7 8 public static void main(String[] args) throws Exception { 9 //ToolRunner的靜態方法run()內有GenericOptionsParser。經過GenericOptionsParser.getRemainingArgs()可獲取傳入的命令行參數。以後,ToolRunner.run()將調用FPGrowthDriver.run()。 10 ToolRunner.run(new Configuration(), new FPGrowthDriver(), args); 11 } 12 13 /** 14 * Run TopK FPGrowth given the input file, 15 */ 16 @Override 17 public int run(String[] args) throws Exception { 18 addInputOption(); //添加默認的輸入目錄路徑 19 addOutputOption(); //添加默認的輸出目錄路徑 20 21 addOption("minSupport", "s", "(Optional) The minimum number of times a co-occurrence must be present." 22 + " Default Value: 3", "3"); //添加支持度閾值 23 addOption("maxHeapSize", "k", "(Optional) Maximum Heap Size k, to denote the requirement to mine top K items." 24 + " Default value: 50", "50"); //添加大根堆的大小 25 addOption("numGroups", "g", "(Optional) Number of groups the features should be divided in the map-reduce version." 26 + " Doesn't work in sequential version Default Value:" + PFPGrowth.NUM_GROUPS_DEFAULT, 27 Integer.toString(PFPGrowth.NUM_GROUPS_DEFAULT)); //添加組數g 28 addOption("splitterPattern", "regex", "Regular Expression pattern used to split given string transaction into" 29 + " itemsets. Default value splits comma separated itemsets. Default Value:" 30 + " \"[ ,\\t]*[,|\\t][ ,\\t]*\" ", "[ ,\t]*[,|\t][ ,\t]*"); //添加分隔符 31 addOption("numTreeCacheEntries", "tc", "(Optional) Number of entries in the tree cache to prevent duplicate" 32 + " tree building. (Warning) a first level conditional FP-Tree might consume a lot of memory, " 33 + "so keep this value small, but big enough to prevent duplicate tree building. " 34 + "Default Value:5 Recommended Values: [5-10]", "5"); 35 addOption("method", "method", "Method of processing: sequential|mapreduce", "sequential"); //添加訓練方法,順序執行或並行執行 36 addOption("encoding", "e", "(Optional) The file encoding. Default value: UTF-8", "UTF-8"); //添加編碼方式 37 addFlag("useFPG2", "2", "Use an alternate FPG implementation"); 38 39 //若是解析命令行參數失敗,則退出 40 if (parseArguments(args) == null) { 41 return -1; 42 } 43 44 Parameters params = new Parameters(); 45 46 if (hasOption("minSupport")) { 47 String minSupportString = getOption("minSupport"); 48 params.set("minSupport", minSupportString); 49 } 50 if (hasOption("maxHeapSize")) { 51 String maxHeapSizeString = getOption("maxHeapSize"); 52 params.set("maxHeapSize", maxHeapSizeString); 53 } 54 if (hasOption("numGroups")) { 55 String numGroupsString = getOption("numGroups"); 56 params.set("numGroups", numGroupsString); 57 } 58 59 if (hasOption("numTreeCacheEntries")) { 60 String numTreeCacheString = getOption("numTreeCacheEntries"); 61 params.set("treeCacheSize", numTreeCacheString); 62 } 63 64 if (hasOption("splitterPattern")) { 65 String patternString = getOption("splitterPattern"); 66 params.set("splitPattern", patternString); 67 } 68 69 String encoding = "UTF-8"; 70 if (hasOption("encoding")) { 71 encoding = getOption("encoding"); 72 } 73 params.set("encoding", encoding); 74 75 if (hasOption("useFPG2")) { 76 params.set(PFPGrowth.USE_FPG2, "true"); 77 } 78 79 Path inputDir = getInputPath(); 80 Path outputDir = getOutputPath(); 81 82 params.set("input", inputDir.toString()); 83 params.set("output", outputDir.toString()); 84 85 String classificationMethod = getOption("method"); 86 if ("sequential".equalsIgnoreCase(classificationMethod)) { 87 runFPGrowth(params); 88 } else if ("mapreduce".equalsIgnoreCase(classificationMethod)) { 89 Configuration conf = new Configuration(); 90 HadoopUtil.delete(conf, outputDir); 91 PFPGrowth.runPFPGrowth(params); 92 } 93 94 return 0; 95 }
PFPGrowth.java
PFPGrowth是並行化FP-Growth算法的驅動類。runPFPGrowth(params)方法內初始化了一個Configuration對象,以後調用runPFPGrowth(params, conf)方法。runPFPGrowth(params, conf)方法包括了並行化FP-Growth算法的五個關鍵步驟。其中,startParallelCounting(params, conf)對應Step1和Step2,經過相似WordCount的方法統計每一項的支持度,其輸出結果將被readFList()和saveList()用於生成FList。以後,將按照用戶輸入的命令行參數NUM_GROUPS來計算每個group所含項的個數,並將其存儲到params。startParallelFPGrowth(params, conf)對應Step3和Step4。startAggregating(params, conf)對應Step5。
1 public static void runPFPGrowth(Parameters params, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { 2 conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); 3 4 startParallelCounting(params, conf); //對應Step1和Step2 5 6 // save feature list to dcache 7 List<Pair<String,Long>> fList = readFList(params); 8 saveFList(fList, params, conf); 9 10 // set param to control group size in MR jobs 11 int numGroups = params.getInt(NUM_GROUPS, NUM_GROUPS_DEFAULT); 12 int maxPerGroup = fList.size() / numGroups; 13 if (fList.size() % numGroups != 0) { 14 maxPerGroup++; 15 } 16 params.set(MAX_PER_GROUP, Integer.toString(maxPerGroup)); 17 18 startParallelFPGrowth(params, conf); //對應Step3和Step4 19 20 startAggregating(params, conf); //對應Step5 21 }
startParallelCounting方法初始化了一個Job對象。該Job對象將調用ParallelCountingMapper和ParallelCountingReducer來完成支持度的統計。
1 /** 2 * Count the frequencies of various features in parallel using Map/Reduce 3 */ 4 public static void startParallelCounting(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 8 conf.set("mapred.compress.map.output", "true"); 9 conf.set("mapred.output.compression.type", "BLOCK"); 10 11 String input = params.get(INPUT); 12 Job job = new Job(conf, "Parallel Counting Driver running over input: " + input); 13 job.setJarByClass(PFPGrowth.class); 14 15 job.setOutputKeyClass(Text.class); 16 job.setOutputValueClass(LongWritable.class); 17 18 FileInputFormat.addInputPath(job, new Path(input)); 19 Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 20 FileOutputFormat.setOutputPath(job, outPath); 21 22 HadoopUtil.delete(conf, outPath); 23 24 job.setInputFormatClass(TextInputFormat.class); 25 job.setMapperClass(ParallelCountingMapper.class); 26 job.setCombinerClass(ParallelCountingReducer.class); 27 job.setReducerClass(ParallelCountingReducer.class); 28 job.setOutputFormatClass(SequenceFileOutputFormat.class); 29 30 boolean succeeded = job.waitForCompletion(true); 31 if (!succeeded) { 32 throw new IllegalStateException("Job failed!"); 33 } 34 35 }
ParallelCountingMapper.java
ParallelCountingMapper中map方法的輸入分別是字節偏移量offset和事務數據庫中的某一行數據input。全部input數據中屢次出現的項都被視爲出現一次,因此將input數據split以後存儲到HashSet中。map方法的輸出是<key=item, value=one>。
1 public class ParallelCountingMapper extends Mapper<LongWritable,Text,Text,LongWritable> { 2 3 private static final LongWritable ONE = new LongWritable(1); 4 5 private Pattern splitter; 6 7 @Override 8 protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException { 9 10 String[] items = splitter.split(input.toString()); 11 Set<String> uniqueItems = Sets.newHashSet(Arrays.asList(items)); 12 for (String item : uniqueItems) { 13 if (item.trim().isEmpty()) { 14 continue; 15 } 16 context.setStatus("Parallel Counting Mapper: " + item); 17 context.write(new Text(item), ONE); 18 } 19 } 20 21 @Override 22 protected void setup(Context context) throws IOException, InterruptedException { 23 super.setup(context); 24 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 25 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString())); 26 } 27 }
ParallelCountingReducer.java
ParallelCountingReducer中reduce方法的輸入是<key=item, value={one, one, ... , one}>。全部key=item的鍵值對將被分配到一臺機器上,因此只須要對values進行遍歷求和就能夠求出該item的支持度。
1 public class ParallelCountingReducer extends Reducer<Text,LongWritable,Text,LongWritable> { 2 3 @Override 4 protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, 5 InterruptedException { 6 long sum = 0; 7 for (LongWritable value : values) { 8 context.setStatus("Parallel Counting Reducer :" + key); 9 sum += value.get(); 10 } 11 context.setStatus("Parallel Counting Reducer: " + key + " => " + sum); 12 context.write(key, new LongWritable(sum)); 13 14 } 15 }
PFPGrowth.java
經過params中的OUTPUT參數能夠獲取ParallelCountingReducer的輸出路徑。在readFList這個方法中用到了幾個數據結構。Pair實現了Comparable接口和Serializable接口,其數據成員first和second分別用來表示item和item所對應的支持度。PriorityQueue是一個用平衡二叉樹實現的小頂堆,若是指定了Comparator,將按照Comparator對PriorityQueue中的元素進行排序,若是未指定Comparator,則將按照元素實現的Comparable接口進行排序。在並行化FP-Growth算法中,初始化PriorityQueue時指定了Comparator,其按照Pair的第一個元素進行排序,若是第一個元素相等,則按照第二個元素進行排序。經過初始化SequenceFileDirIterable來遍歷上一次MapReduce輸出的結果,每次將Pair添加到PriorityQueue的同時完成排序。最後,逐一將PriorityQueue中的元素取出放入fList。所以,fList是一個按照支持度遞減的列表。
1 /** 2 * read the feature frequency List which is built at the end of the Parallel counting job 3 * 4 * @return Feature Frequency List 5 */ 6 public static List<Pair<String,Long>> readFList(Parameters params) { 7 int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3")); 8 Configuration conf = new Configuration(); 9 10 Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 11 12 PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11, 13 new Comparator<Pair<String,Long>>() { 14 @Override 15 public int compare(Pair<String,Long> o1, Pair<String,Long> o2) { 16 int ret = o2.getSecond().compareTo(o1.getSecond()); 17 if (ret != 0) { 18 return ret; 19 } 20 return o1.getFirst().compareTo(o2.getFirst()); 21 } 22 }); 23 24 for (Pair<Text,LongWritable> record 25 : new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath, FILE_PATTERN), 26 PathType.GLOB, null, null, true, conf)) { 27 long value = record.getSecond().get(); 28 if (value >= minSupport) { 29 queue.add(new Pair<String,Long>(record.getFirst().toString(), value)); 30 } 31 } 32 List<Pair<String,Long>> fList = Lists.newArrayList(); 33 while (!queue.isEmpty()) { 34 fList.add(queue.poll()); 35 } 36 return fList; 37 }
因爲已經生成了fList,上一次MapReduce的輸出結果已經沒有用了,所以,saveFList方法首先刪除了這些文件。以後,saveFList方法將flist寫入到hdfs上。對於存儲在hdfs上的文件,DistributedCache提供了緩存文件的功能,在Slave Node進行計算以前可將hdfs上的文件複製到這些節點上。
1 /** 2 * Serializes the fList and returns the string representation of the List 3 */ 4 public static void saveFList(Iterable<Pair<String,Long>> flist, Parameters params, Configuration conf) 5 throws IOException { 6 Path flistPath = new Path(params.get(OUTPUT), F_LIST); 7 FileSystem fs = FileSystem.get(flistPath.toUri(), conf); 8 flistPath = fs.makeQualified(flistPath); 9 HadoopUtil.delete(conf, flistPath); 10 SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class); 11 try { 12 for (Pair<String,Long> pair : flist) { 13 writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond())); 14 } 15 } finally { 16 writer.close(); 17 } 18 DistributedCache.addCacheFile(flistPath.toUri(), conf); 19 }
startParallelFPGrowth方法初始化了一個Job對象。該Job對象將調用ParallelFPGrowthMapper和ParallelFPGrowthReducer來實現Step3和Step4。
1 /** 2 * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards 3 */ 4 public static void startParallelFPGrowth(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 conf.set("mapred.compress.map.output", "true"); 8 conf.set("mapred.output.compression.type", "BLOCK"); 9 Path input = new Path(params.get(INPUT)); 10 Job job = new Job(conf, "PFP Growth Driver running over input" + input); 11 job.setJarByClass(PFPGrowth.class); 12 13 job.setMapOutputKeyClass(IntWritable.class); 14 job.setMapOutputValueClass(TransactionTree.class); 15 16 job.setOutputKeyClass(Text.class); 17 job.setOutputValueClass(TopKStringPatterns.class); 18 19 FileInputFormat.addInputPath(job, input); 20 Path outPath = new Path(params.get(OUTPUT), FPGROWTH); 21 FileOutputFormat.setOutputPath(job, outPath); 22 23 HadoopUtil.delete(conf, outPath); 24 25 job.setInputFormatClass(TextInputFormat.class); 26 job.setMapperClass(ParallelFPGrowthMapper.class); 27 job.setCombinerClass(ParallelFPGrowthCombiner.class); 28 job.setReducerClass(ParallelFPGrowthReducer.class); 29 job.setOutputFormatClass(SequenceFileOutputFormat.class); 30 31 boolean succeeded = job.waitForCompletion(true); 32 if (!succeeded) { 33 throw new IllegalStateException("Job failed!"); 34 } 35 }
ParallelFPGrowthMapper.java
ParallelFPGrowthMapper中的setup方法將在map方法以前被運行。setup方法中調用了readFList方法。注意這裏的readFList方法與以前分析的readFList方法參數不同,因此是兩個徹底不一樣的方法。這裏的readFList方法經過HadoopUtil.getCachedFiles(conf)來獲取緩存文件flist,將其存儲到fMap,其中item做爲fMap的鍵,item在flist中的位置序號做爲fMap的值,例如flist中的第一個item,其在fMap中將是<key=item, value=0>。這樣作的緣由是以後將fMap分Q個group時須要用到這個位置序號。在map方法中,輸入是字節偏移量和事務數據庫中的某一行數據。根據用戶指定的分隔符splitter來切分數據。爲了過濾非頻繁項,經過fMap.containsKey(item)方法來查找該項是否存在於fList中。若存在,將其所對應的位置序號加入到itemSet,不然,將其丟棄。itemArr複製itemSet中的數據,並按照位置序號遞增進行排序,即按照支持度遞減進行排序。以後的for循環從itemArr的最後一個元素向前遍歷,若是其所對應的groupID不在groups中,那麼將初始化TransactionTree,將itemArr[0],itemArr[1],…,itemArr[j]存入該TransactionTree中。groupID的計算很是簡單,將位置序號除以maxPerGroup便可。TransactionTree實現了Writable和Iterable<Pair<IntArrayList, Long>>接口,初始化TransactionTree時,構造方法將參數賦值給TransactionTree中的數據成員List<Pair<IntArrayList, Long>> transactionSet。這裏Pair對象存儲的兩個元素分別是位置序號列表和1。
1 /** 2 * maps each transaction to all unique items groups in the transaction. mapper 3 * outputs the group id as key and the transaction as value 4 * 5 */ 6 public class ParallelFPGrowthMapper extends Mapper<LongWritable,Text,IntWritable,TransactionTree> { 7 8 private final OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>(); 9 private Pattern splitter; 10 private int maxPerGroup; 11 private final IntWritable wGroupID = new IntWritable(); 12 13 @Override 14 protected void map(LongWritable offset, Text input, Context context) 15 throws IOException, InterruptedException { 16 17 String[] items = splitter.split(input.toString()); 18 19 OpenIntHashSet itemSet = new OpenIntHashSet(); 20 21 for (String item : items) { 22 if (fMap.containsKey(item) && !item.trim().isEmpty()) { 23 itemSet.add(fMap.get(item)); 24 } 25 } 26 27 IntArrayList itemArr = new IntArrayList(itemSet.size()); 28 itemSet.keys(itemArr); 29 itemArr.sort(); 30 31 OpenIntHashSet groups = new OpenIntHashSet(); 32 for (int j = itemArr.size() - 1; j >= 0; j--) { 33 // generate group dependent shards 34 int item = itemArr.get(j); 35 int groupID = PFPGrowth.getGroup(item, maxPerGroup); 36 37 if (!groups.contains(groupID)) { 38 IntArrayList tempItems = new IntArrayList(j + 1); 39 tempItems.addAllOfFromTo(itemArr, 0, j); 40 context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item); 41 wGroupID.set(groupID); 42 context.write(wGroupID, new TransactionTree(tempItems, 1L)); 43 } 44 groups.add(groupID); 45 } 46 47 } 48 49 @Override 50 protected void setup(Context context) throws IOException, InterruptedException { 51 super.setup(context); 52 53 int i = 0; 54 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 55 fMap.put(e.getFirst(), i++); 56 } 57 58 Parameters params = 59 new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 60 61 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, 62 PFPGrowth.SPLITTER.toString())); 63 64 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 65 } 66 }
ParallelFPGrowthReducer.java
ParallelFPGrowthReducer的輸入是<key=groupID, value={TransactionTree1, TransactionTree2, … , TransactionTreeN}>。setup方法獲取了參數params,而且經過PFPGrowth.readFList(conf)方法獲取了緩存文件flist,將頻繁項存入featureReverseMap,將頻繁項對應的支持度存入freqList。以前分析到ParallelFPGrowthMapper輸出的TransactionTree實際上是List<Pair<IntArrayList, Long>> transactionSet。在ParallelFPGrowthReducer內初始化了一個TransactionTree,雖然這個TransactionTree與以前的Transaction是同一個類,可是是一棵用二維數組實現的樹。考慮到文章篇幅,建樹的過程這裏不做分析。假設已經建好了這棵樹,cTree.generateFList方法遍歷這棵樹,返回Map<Integer, MutableLong> frequencyList。具體的遍歷方法這裏不做詳細分析,提一下其調用過程:TransactionTree實現Iterator<Pair<IntArrayList, Long>>接口時重寫了iterator方法,在generateFList方法中經過iterator方法生成一個迭代器來遍歷整棵樹。iterator方法返回的是TransactionTreeIterator對象。TransactionTreeIterator對象繼承自AbstractIterator<Pair<IntArrayList, Long>>,實現了對TransactionTree進行遍歷。localFList合併了generateFList的結果並按照支持度遞減進行排序。生成頻繁模式的方法有兩種,用戶能夠本身選擇來調用FPGrowthIds.generateTopKFrequentPatterns方法或者fpGrowth.generateTopKFrequentPatterns方法來生成頻繁模式,本文將對後者進行分析。在ParallelFPGrowthReducer中還有一個IteratorAdapter類。它是設計模式中十分經典的適配器模式的具體應用,能夠將兩個不一樣類型的迭代器解耦。ParallelFPGrowthReducer的輸出是<key=item, value={Top K Frequent Patterns}>。
1 /** 2 * takes each group of transactions and runs Vanilla FPGrowth on it and 3 * outputs the the Top K frequent Patterns for each group. 4 * 5 */ 6 public final class ParallelFPGrowthReducer extends Reducer<IntWritable,TransactionTree,Text,TopKStringPatterns> { 7 8 private final List<String> featureReverseMap = Lists.newArrayList(); 9 private final LongArrayList freqList = new LongArrayList(); 10 private int maxHeapSize = 50; 11 private int minSupport = 3; 12 private int numFeatures; 13 private int maxPerGroup; 14 private boolean useFP2; 15 16 private static final class IteratorAdapter implements Iterator<Pair<List<Integer>,Long>> { 17 private final Iterator<Pair<IntArrayList,Long>> innerIter; 18 19 private IteratorAdapter(Iterator<Pair<IntArrayList,Long>> transactionIter) { 20 innerIter = transactionIter; 21 } 22 23 @Override 24 public boolean hasNext() { 25 return innerIter.hasNext(); 26 } 27 28 @Override 29 public Pair<List<Integer>,Long> next() { 30 Pair<IntArrayList,Long> innerNext = innerIter.next(); 31 return new Pair<List<Integer>,Long>(innerNext.getFirst().toList(), innerNext.getSecond()); 32 } 33 34 @Override 35 public void remove() { 36 throw new UnsupportedOperationException(); 37 } 38 } 39 40 @Override 41 protected void reduce(IntWritable key, Iterable<TransactionTree> values, Context context) throws IOException { 42 TransactionTree cTree = new TransactionTree(); 43 for (TransactionTree tr : values) { 44 for (Pair<IntArrayList,Long> p : tr) { 45 cTree.addPattern(p.getFirst(), p.getSecond()); 46 } 47 } 48 49 List<Pair<Integer,Long>> localFList = Lists.newArrayList(); 50 for (Entry<Integer,MutableLong> fItem : cTree.generateFList().entrySet()) { 51 localFList.add(new Pair<Integer,Long>(fItem.getKey(), fItem.getValue().toLong())); 52 } 53 54 Collections.sort(localFList, new CountDescendingPairComparator<Integer,Long>()); 55 56 if (useFP2) { 57 FPGrowthIds.generateTopKFrequentPatterns( 58 cTree.iterator(), 59 freqList, 60 minSupport, 61 maxHeapSize, 62 PFPGrowth.getGroupMembers(key.get(), maxPerGroup, numFeatures), 63 new IntegerStringOutputConverter( 64 new ContextWriteOutputCollector<IntWritable, TransactionTree, Text, TopKStringPatterns>(context), 65 featureReverseMap), 66 new ContextStatusUpdater<IntWritable, TransactionTree, Text, TopKStringPatterns>(context)); 67 } else { 68 FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>(); 69 fpGrowth.generateTopKFrequentPatterns( 70 new IteratorAdapter(cTree.iterator()), 71 localFList, 72 minSupport, 73 maxHeapSize, 74 Sets.newHashSet(PFPGrowth.getGroupMembers(key.get(), 75 maxPerGroup, 76 numFeatures).toList()), 77 new IntegerStringOutputConverter( 78 new ContextWriteOutputCollector<IntWritable,TransactionTree,Text,TopKStringPatterns>(context), 79 featureReverseMap), 80 new ContextStatusUpdater<IntWritable,TransactionTree,Text,TopKStringPatterns>(context)); 81 } 82 } 83 84 @Override 85 protected void setup(Context context) throws IOException, InterruptedException { 86 87 super.setup(context); 88 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 89 90 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 91 featureReverseMap.add(e.getFirst()); 92 freqList.add(e.getSecond()); 93 } 94 95 maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50")); 96 minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3")); 97 98 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 99 numFeatures = featureReverseMap.size(); 100 useFP2 = "true".equals(params.get(PFPGrowth.USE_FPG2)); 101 }
TransactionTree.java
在分析fpGrowth.generateTopKFrequentPatterns方法以前,先來分析一下建樹過程當中使用的addPattern方法。下面的代碼列出了TransactionTree的數據成員和addPattern方法。在addPattern方法中,首先從根節點開始與myList中的節點進行比較。childWithAttribute返回temp節點下的孩子節點中是否有和attributeValue名稱相同的節點。若是沒有,addCountMode置爲false,將myList中剩餘的節點添加到這棵樹中;若是有,則經過addCount方法增長child節點的支持度。這一建樹的思路與傳統的FP-Growth中建樹的思路徹底一致。
1 private int[] attribute; //節點的名稱屬性 2 private int[] childCount; //對該節點的有多少個孩子節點進行計數 3 private int[][] nodeChildren; //二維數組,記錄每個節點的孩子節點 4 private long[] nodeCount; //當前節點的支持度計數 5 private int nodes; 6 private boolean representedAsList; //true表示以List形式展示,false表示以樹的形式展示 7 private List<Pair<IntArrayList,Long>> transactionSet; 8 9 public int addPattern(IntArrayList myList, long addCount) { 10 int temp = ROOTNODEID; 11 int ret = 0; 12 boolean addCountMode = true; 13 for (int idx = 0; idx < myList.size(); idx++) { 14 int attributeValue = myList.get(idx); 15 int child; 16 if (addCountMode) { 17 child = childWithAttribute(temp, attributeValue); 18 if (child == -1) { 19 addCountMode = false; 20 } else { 21 addCount(child, addCount); 22 temp = child; 23 } 24 } 25 if (!addCountMode) { 26 child = createNode(temp, attributeValue, addCount); 27 temp = child; 28 ret++; 29 } 30 } 31 return ret; 32 }
FPGrowth.java
generateTopKFrequentPatterns方法的形參有transactionStream,frequencyList,minSupport,k,Collection<A> returnableFeatures,OutputCollector<A, List<Pair<List<A>, Long>>> output,Statusupdater updater。其中,transactionStream是根據當前key=groupID所對應的Pair<List<A>, Long>類型的values創建的cTree,這裏Pair的第一項是位置序號,第二項是1;frequencyList是ParallelFPGrowthReducer中的localFList,其第一項是位置序號,第二項是支持度;Collection<A> returnableFeatures是當前key=group-id所包含的位置序號集合。
attributeIdMapping過濾了transactionStream中的非頻繁項,併爲頻繁項分配新id,將其映射成<key=位置序號, value=id>。reverseMapping倒置了attributeIdMapping的映射關係。attributeFrequentcy記錄了索引爲id的項的支持度。對於returnableFeatures中的位置序號進行遍歷,過濾非頻繁項,returnFeatures記錄了剩餘的頻繁項。以後調用generateTopKFrequentPatterns方法來構建本地的FP-tree和頭表(Header-Table),並遍歷FP-tree來輸出頻繁項。參考資料[1]詳細分析了這一過程,這裏不做進一步分析,須要注意到是在Mahout中FP-tree是以數組的形式存儲。
1 /** 2 * Generate Top K Frequent Patterns for every feature in returnableFeatures 3 * given a stream of transactions and the minimum support 4 * 5 * @param transactionStream 6 * Iterator of transaction 7 * @param frequencyList 8 * list of frequent features and their support value 9 * @param minSupport 10 * minimum support of the transactions 11 * @param k 12 * Number of top frequent patterns to keep 13 * @param returnableFeatures 14 * set of features for which the frequent patterns are mined. If the 15 * set is empty or null, then top K patterns for every frequent item (an item 16 * whose support> minSupport) is generated 17 * @param output 18 * The output collector to which the the generated patterns are 19 * written 20 * @throws IOException 21 */ 22 public final void generateTopKFrequentPatterns(Iterator<Pair<List<A>,Long>> transactionStream, 23 Collection<Pair<A, Long>> frequencyList, 24 long minSupport, 25 int k, 26 Collection<A> returnableFeatures, 27 OutputCollector<A,List<Pair<List<A>,Long>>> output, 28 StatusUpdater updater) throws IOException { 29 30 Map<Integer,A> reverseMapping = Maps.newHashMap(); 31 Map<A,Integer> attributeIdMapping = Maps.newHashMap(); 32 33 int id = 0; 34 for (Pair<A,Long> feature : frequencyList) { 35 A attrib = feature.getFirst(); 36 Long frequency = feature.getSecond(); 37 if (frequency >= minSupport) { 38 attributeIdMapping.put(attrib, id); 39 reverseMapping.put(id++, attrib); 40 } 41 } 42 43 long[] attributeFrequency = new long[attributeIdMapping.size()]; 44 for (Pair<A,Long> feature : frequencyList) { 45 A attrib = feature.getFirst(); 46 Long frequency = feature.getSecond(); 47 if (frequency < minSupport) { 48 break; 49 } 50 attributeFrequency[attributeIdMapping.get(attrib)] = frequency; 51 } 52 53 log.info("Number of unique items {}", frequencyList.size()); 54 55 Collection<Integer> returnFeatures = Sets.newHashSet(); 56 if (returnableFeatures != null && !returnableFeatures.isEmpty()) { 57 for (A attrib : returnableFeatures) { 58 if (attributeIdMapping.containsKey(attrib)) { 59 returnFeatures.add(attributeIdMapping.get(attrib)); 60 log.info("Adding Pattern {}=>{}", attrib, attributeIdMapping 61 .get(attrib)); 62 } 63 } 64 } else { 65 for (int j = 0; j < attributeIdMapping.size(); j++) { 66 returnFeatures.add(j); 67 } 68 } 69 70 log.info("Number of unique pruned items {}", attributeIdMapping.size()); 71 generateTopKFrequentPatterns(new TransactionIterator<A>(transactionStream, 72 attributeIdMapping), attributeFrequency, minSupport, k, reverseMapping 73 .size(), returnFeatures, new TopKPatternsOutputConverter<A>(output, 74 reverseMapping), updater); 75 76 }
AggregatorMapper的輸入是<key, value=TopKStringPatterns>,TopKStringPatterns是一個存儲<Pair<List<String>,Long>>類型的列表,List<String>類型元素記錄了每個key=item對應的頻繁模式,Long類型元素記錄了支持度。
1 /** 2 * 3 * outputs the pattern for each item in the pattern, so that reducer can group them 4 * and select the top K frequent patterns 5 * 6 */ 7 public class AggregatorMapper extends Mapper<Text,TopKStringPatterns,Text,TopKStringPatterns> { 8 9 @Override 10 protected void map(Text key, TopKStringPatterns values, Context context) throws IOException, 11 InterruptedException { 12 for (Pair<List<String>,Long> pattern : values.getPatterns()) { 13 for (String item : pattern.getFirst()) { 14 List<Pair<List<String>,Long>> patternSingularList = Lists.newArrayList(); 15 patternSingularList.add(pattern); 16 context.setStatus("Aggregator Mapper:Grouping Patterns for " + item); 17 context.write(new Text(item), new TopKStringPatterns(patternSingularList)); 18 } 19 } 20 21 } 22 }
AggregatorReducer彙總了全部Key相同的item,而後按照支持度遞減排序,最終輸出Top K個頻繁模式。
1 /** 2 * 3 * groups all Frequent Patterns containing an item and outputs the top K patterns 4 * containing that particular item 5 * 6 */ 7 public class AggregatorReducer extends Reducer<Text,TopKStringPatterns,Text,TopKStringPatterns> { 8 9 private int maxHeapSize = 50; 10 11 @Override 12 protected void reduce(Text key, Iterable<TopKStringPatterns> values, Context context) throws IOException, 13 InterruptedException { 14 TopKStringPatterns patterns = new TopKStringPatterns(); 15 for (TopKStringPatterns value : values) { 16 context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key); 17 patterns = patterns.merge(value, maxHeapSize); 18 } 19 context.write(key, patterns); 20 21 } 22 23 @Override 24 protected void setup(Context context) throws IOException, InterruptedException { 25 super.setup(context); 26 Parameters params = new Parameters(context.getConfiguration().get("pfp.parameters", "")); 27 maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50")); 28 29 } 30 }
3. 討論
並行化FP-Growth算法解決了大數據量時傳統FP-Growth的性能瓶頸。除了並行化FP-Growth算法外,還有許多方法能夠優化FP-Growth算法,好比並行化FP-Growth算法時考慮負載均衡,採用極大頻繁項集和閉頻繁項集表示頻繁模式。
極大頻繁項集是這樣的頻繁項集,它的直接超集都不是頻繁的。極大頻繁項集造成了能夠導出全部頻繁項集的最小項集集合,可是極大頻繁項集卻不包含它們子集的支持度信息。
若是項集的直接超集都不具備和它相同的支持度而且該項集的支持度大於或等於最小支持度閾值,則該項集是閉頻繁項集。閉頻繁項集提供了頻繁項集的一種最小表示,該表示不丟失支持度信息。
4. 參考資料
[1] 關聯分析:FP-Growth算法. Mark Lin. datahunter. 2014. [Link]
[2] PFP: Parallel FP-Growth for Query Recommendation. Haoyuan Li etc. RecSys '08 Proceedings of the 2008 ACM conference on Recommender systems. 2008. [PDF]