1. 並行化FP-Growthjava
並行化FP-Growth的方法有不少,最先提出使用MapReduce並行化FP-Growth算法的應該是來自Google Beijing Research的Haoyuan Li等(見參考資料[2])。他們提出使用三次MapReduce來並行化FP-Growth,整個過程大體能夠分爲五個步驟:node
Step 1:Sharding算法
Step 2:Parallel Countingapache
與WordCount相似,經過一次MapReduce來計算每個項(item)的支持度。具體來講,每個mapper將從hdfs中取得事務數據庫的若干個數據片斷(shards),因此mapper的輸入是<key, value=Ti>,Ti表示數據片斷中的一條數據。對於Ti中的每個項aj,mapper輸出<key=aj, value=1>。當集羣中的全部mapper處理完數據以後,全部key=aj的鍵值對將被分配到同一個reducer,因此reducer的輸入是<key=aj, value={1, 1, ... , 1}>。reducer只須要進行一次求和,而後輸出<key=aj, value=sum{1, 1, ... , 1}>。最終將獲得一張按照支持度遞減排序的列表,稱之爲F-List:設計模式
Step 3:Grouping Items緩存
Step 4:Parallel FP-Growth
這一步驟是並行化FP-Growth的關鍵,也是整個算法中相對難以理解的部分。這一步驟中將用到一次MapReduce。每個mapper的輸入來自第一步生成的數據片斷,因此mapper的輸入是<key, value=Ti>。在處理這些數據片斷以前,mapper將讀取第三步生成的G-List。G-List實際上是一張Hashmap,鍵是項,值是項所對應的group-id,所佔空間通常並不會很大,能夠放入內存中。從Ti的最後一項開始向前掃描,或者說從右向左掃描,若是aL在G-List中對應的group-id是第一次被掃描,則輸出{a0,a1,…,aL},不然不輸出任何數據。以圖1中所示的數據爲例,假如支持度閾值爲1,Q爲3,那麼將獲得G-List:
全部group-id相同的數據將被推送到同一個reducer,因此reducer的輸入是<key=group-id,value={{ValueList1},{ValueList2},…,{ValueListN}}>。reducer在本地構建FP-tree,而後像傳統的FP-Growth算法那樣遞歸地構建條件FP-tree,並挖掘頻繁模式。與傳統的FP-Growth算法不同的是,reducer並不直接輸出所挖掘到的頻繁模式,而是將其放入一個大小爲K,根據支持度排序創建的大根堆,而後輸出K個支持度較高的頻繁模式:<key=item,reduce={包含該item的Top K Frequent Patterns}>。
Step 5:Aggregating
上一步挖掘到的頻繁模式Top K Frequent Patterns已經包含了全部頻繁模式,然而上一步的MapReduce是按照groupID來劃分數據,所以key=item對應的頻繁模式會存在於若干個不一樣groupID的reduce節點上。爲了合併全部key=item的鍵值對,優化結果展示形式,可利用MapReduce默認對key排序的特色,對挖掘到的頻繁模式進行一下處理:依次將Top K Frequent Patterns的每個item做爲key,而後輸出包含該key的這一條Top K Frequent Patterns。因此,每個mapper的輸出是<key=item, value={該節點上的包含該item的頻繁模式}>,reducer彙總全部mapper的輸出結果,並輸出最終的結果<key=item, value={包含該item的全部頻繁模式}>。
2. Parallel FP-Growth源碼分析
Mahout提供了一些機器學習領域經典算法的實現。Mahout0.9以後的版本已經移除了Parallel FP-Growth算法。本文將分析Mahout0.8中Parallel FP-Growth的源碼。
1 public final class FPGrowthDriver extends AbstractJob { 2 3 private static final Logger log = LoggerFactory.getLogger(FPGrowthDriver.class); 4 5 private FPGrowthDriver() { 6 } 7 8 public static void main(String[] args) throws Exception { 9 //ToolRunner的靜態方法run()內有GenericOptionsParser。經過GenericOptionsParser.getRemainingArgs()可獲取傳入的命令行參數。以後,ToolRunner.run()將調用FPGrowthDriver.run()。 10 ToolRunner.run(new Configuration(), new FPGrowthDriver(), args); 11 } 12 13 /** 14 * Run TopK FPGrowth given the input file, 15 */ 16 @Override 17 public int run(String[] args) throws Exception { 18 addInputOption(); //添加默認的輸入目錄路徑 19 addOutputOption(); //添加默認的輸出目錄路徑 20 21 addOption("minSupport", "s", "(Optional) The minimum number of times a co-occurrence must be present." 22 + " Default Value: 3", "3"); //添加支持度閾值 23 addOption("maxHeapSize", "k", "(Optional) Maximum Heap Size k, to denote the requirement to mine top K items." 24 + " Default value: 50", "50"); //添加大根堆的大小 25 addOption("numGroups", "g", "(Optional) Number of groups the features should be divided in the map-reduce version." 26 + " Doesn't work in sequential version Default Value:" + PFPGrowth.NUM_GROUPS_DEFAULT, 27 Integer.toString(PFPGrowth.NUM_GROUPS_DEFAULT)); //添加組數g 28 addOption("splitterPattern", "regex", "Regular Expression pattern used to split given string transaction into" 29 + " itemsets. Default value splits comma separated itemsets. Default Value:" 30 + " \"[ ,\\t]*[,|\\t][ ,\\t]*\" ", "[ ,\t]*[,|\t][ ,\t]*"); //添加分隔符 31 addOption("numTreeCacheEntries", "tc", "(Optional) Number of entries in the tree cache to prevent duplicate" 32 + " tree building. (Warning) a first level conditional FP-Tree might consume a lot of memory, " 33 + "so keep this value small, but big enough to prevent duplicate tree building. " 34 + "Default Value:5 Recommended Values: [5-10]", "5"); 35 addOption("method", "method", "Method of processing: sequential|mapreduce", "sequential"); //添加訓練方法,順序執行或並行執行 36 addOption("encoding", "e", "(Optional) The file encoding. Default value: UTF-8", "UTF-8"); //添加編碼方式 37 addFlag("useFPG2", "2", "Use an alternate FPG implementation"); 38 39 //若是解析命令行參數失敗,則退出 40 if (parseArguments(args) == null) { 41 return -1; 42 } 43 44 Parameters params = new Parameters(); 45 46 if (hasOption("minSupport")) { 47 String minSupportString = getOption("minSupport"); 48 params.set("minSupport", minSupportString); 49 } 50 if (hasOption("maxHeapSize")) { 51 String maxHeapSizeString = getOption("maxHeapSize"); 52 params.set("maxHeapSize", maxHeapSizeString); 53 } 54 if (hasOption("numGroups")) { 55 String numGroupsString = getOption("numGroups"); 56 params.set("numGroups", numGroupsString); 57 } 58 59 if (hasOption("numTreeCacheEntries")) { 60 String numTreeCacheString = getOption("numTreeCacheEntries"); 61 params.set("treeCacheSize", numTreeCacheString); 62 } 63 64 if (hasOption("splitterPattern")) { 65 String patternString = getOption("splitterPattern"); 66 params.set("splitPattern", patternString); 67 } 68 69 String encoding = "UTF-8"; 70 if (hasOption("encoding")) { 71 encoding = getOption("encoding"); 72 } 73 params.set("encoding", encoding); 74 75 if (hasOption("useFPG2")) { 76 params.set(PFPGrowth.USE_FPG2, "true"); 77 } 78 79 Path inputDir = getInputPath(); 80 Path outputDir = getOutputPath(); 81 82 params.set("input", inputDir.toString()); 83 params.set("output", outputDir.toString()); 84 85 String classificationMethod = getOption("method"); 86 if ("sequential".equalsIgnoreCase(classificationMethod)) { 87 runFPGrowth(params); 88 } else if ("mapreduce".equalsIgnoreCase(classificationMethod)) { 89 Configuration conf = new Configuration(); 90 HadoopUtil.delete(conf, outputDir); 91 PFPGrowth.runPFPGrowth(params); 92 } 93 94 return 0; 95 }
PFPGrowth是並行化FP-Growth算法的驅動類。runPFPGrowth(params)方法內初始化了一個Configuration對象,以後調用runPFPGrowth(params, conf)方法。runPFPGrowth(params, conf)方法包括了並行化FP-Growth算法的五個關鍵步驟。其中,startParallelCounting(params, conf)對應Step1和Step2,經過相似WordCount的方法統計每一項的支持度,其輸出結果將被readFList()和saveList()用於生成FList。以後,將按照用戶輸入的命令行參數NUM_GROUPS來計算每個group所含項的個數,並將其存儲到params。startParallelFPGrowth(params, conf)對應Step3和Step4。startAggregating(params, conf)對應Step5。
1 public static void runPFPGrowth(Parameters params, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { 2 conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); 3 4 startParallelCounting(params, conf); //對應Step1和Step2 5 6 // save feature list to dcache 7 List<Pair<String,Long>> fList = readFList(params); 8 saveFList(fList, params, conf); 9 10 // set param to control group size in MR jobs 11 int numGroups = params.getInt(NUM_GROUPS, NUM_GROUPS_DEFAULT); 12 int maxPerGroup = fList.size() / numGroups; 13 if (fList.size() % numGroups != 0) { 14 maxPerGroup++; 15 } 16 params.set(MAX_PER_GROUP, Integer.toString(maxPerGroup)); 17 18 startParallelFPGrowth(params, conf); //對應Step3和Step4 19 20 startAggregating(params, conf); //對應Step5 21 }
1 /** 2 * Count the frequencies of various features in parallel using Map/Reduce 3 */ 4 public static void startParallelCounting(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 8 conf.set("mapred.compress.map.output", "true"); 9 conf.set("mapred.output.compression.type", "BLOCK"); 10 11 String input = params.get(INPUT); 12 Job job = new Job(conf, "Parallel Counting Driver running over input: " + input); 13 job.setJarByClass(PFPGrowth.class); 14 15 job.setOutputKeyClass(Text.class); 16 job.setOutputValueClass(LongWritable.class); 17 18 FileInputFormat.addInputPath(job, new Path(input)); 19 Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 20 FileOutputFormat.setOutputPath(job, outPath); 21 22 HadoopUtil.delete(conf, outPath); 23 24 job.setInputFormatClass(TextInputFormat.class); 25 job.setMapperClass(ParallelCountingMapper.class); 26 job.setCombinerClass(ParallelCountingReducer.class); 27 job.setReducerClass(ParallelCountingReducer.class); 28 job.setOutputFormatClass(SequenceFileOutputFormat.class); 29 30 boolean succeeded = job.waitForCompletion(true); 31 if (!succeeded) { 32 throw new IllegalStateException("Job failed!"); 33 } 34 35 }
ParallelCountingMapper中map方法的輸入分別是字節偏移量offset和事務數據庫中的某一行數據input。全部input數據中屢次出現的項都被視爲出現一次,因此將input數據split以後存儲到HashSet中。map方法的輸出是<key=item, value=one>。
1 public class ParallelCountingMapper extends Mapper<LongWritable,Text,Text,LongWritable> { 2 3 private static final LongWritable ONE = new LongWritable(1); 4 5 private Pattern splitter; 6 7 @Override 8 protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException { 9 10 String[] items = splitter.split(input.toString()); 11 Set<String> uniqueItems = Sets.newHashSet(Arrays.asList(items)); 12 for (String item : uniqueItems) { 13 if (item.trim().isEmpty()) { 14 continue; 15 } 16 context.setStatus("Parallel Counting Mapper: " + item); 17 context.write(new Text(item), ONE); 18 } 19 } 20 21 @Override 22 protected void setup(Context context) throws IOException, InterruptedException { 23 super.setup(context); 24 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 25 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString())); 26 } 27 }
ParallelCountingReducer中reduce方法的輸入是<key=item, value={one, one, ... , one}>。全部key=item的鍵值對將被分配到一臺機器上,因此只須要對values進行遍歷求和就能夠求出該item的支持度。
1 public class ParallelCountingReducer extends Reducer<Text,LongWritable,Text,LongWritable> { 2 3 @Override 4 protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, 5 InterruptedException { 6 long sum = 0; 7 for (LongWritable value : values) { 8 context.setStatus("Parallel Counting Reducer :" + key); 9 sum += value.get(); 10 } 11 context.setStatus("Parallel Counting Reducer: " + key + " => " + sum); 12 context.write(key, new LongWritable(sum)); 13 14 } 15 }
1 /** 2 * read the feature frequency List which is built at the end of the Parallel counting job 3 * 4 * @return Feature Frequency List 5 */ 6 public static List<Pair<String,Long>> readFList(Parameters params) { 7 int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3")); 8 Configuration conf = new Configuration(); 9 10 Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); 11 12 PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11, 13 new Comparator<Pair<String,Long>>() { 14 @Override 15 public int compare(Pair<String,Long> o1, Pair<String,Long> o2) { 16 int ret = o2.getSecond().compareTo(o1.getSecond()); 17 if (ret != 0) { 18 return ret; 19 } 20 return o1.getFirst().compareTo(o2.getFirst()); 21 } 22 }); 23 24 for (Pair<Text,LongWritable> record 25 : new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath, FILE_PATTERN), 26 PathType.GLOB, null, null, true, conf)) { 27 long value = record.getSecond().get(); 28 if (value >= minSupport) { 29 queue.add(new Pair<String,Long>(record.getFirst().toString(), value)); 30 } 31 } 32 List<Pair<String,Long>> fList = Lists.newArrayList(); 33 while (!queue.isEmpty()) { 34 fList.add(queue.poll()); 35 } 36 return fList; 37 }
因爲已經生成了fList,上一次MapReduce的輸出結果已經沒有用了,所以,saveFList方法首先刪除了這些文件。以後,saveFList方法將flist寫入到hdfs上。對於存儲在hdfs上的文件,DistributedCache提供了緩存文件的功能,在Slave Node進行計算以前可將hdfs上的文件複製到這些節點上。
1 /** 2 * Serializes the fList and returns the string representation of the List 3 */ 4 public static void saveFList(Iterable<Pair<String,Long>> flist, Parameters params, Configuration conf) 5 throws IOException { 6 Path flistPath = new Path(params.get(OUTPUT), F_LIST); 7 FileSystem fs = FileSystem.get(flistPath.toUri(), conf); 8 flistPath = fs.makeQualified(flistPath); 9 HadoopUtil.delete(conf, flistPath); 10 SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class); 11 try { 12 for (Pair<String,Long> pair : flist) { 13 writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond())); 14 } 15 } finally { 16 writer.close(); 17 } 18 DistributedCache.addCacheFile(flistPath.toUri(), conf); 19 }
1 /** 2 * Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards 3 */ 4 public static void startParallelFPGrowth(Parameters params, Configuration conf) 5 throws IOException, InterruptedException, ClassNotFoundException { 6 conf.set(PFP_PARAMETERS, params.toString()); 7 conf.set("mapred.compress.map.output", "true"); 8 conf.set("mapred.output.compression.type", "BLOCK"); 9 Path input = new Path(params.get(INPUT)); 10 Job job = new Job(conf, "PFP Growth Driver running over input" + input); 11 job.setJarByClass(PFPGrowth.class); 12 13 job.setMapOutputKeyClass(IntWritable.class); 14 job.setMapOutputValueClass(TransactionTree.class); 15 16 job.setOutputKeyClass(Text.class); 17 job.setOutputValueClass(TopKStringPatterns.class); 18 19 FileInputFormat.addInputPath(job, input); 20 Path outPath = new Path(params.get(OUTPUT), FPGROWTH); 21 FileOutputFormat.setOutputPath(job, outPath); 22 23 HadoopUtil.delete(conf, outPath); 24 25 job.setInputFormatClass(TextInputFormat.class); 26 job.setMapperClass(ParallelFPGrowthMapper.class); 27 job.setCombinerClass(ParallelFPGrowthCombiner.class); 28 job.setReducerClass(ParallelFPGrowthReducer.class); 29 job.setOutputFormatClass(SequenceFileOutputFormat.class); 30 31 boolean succeeded = job.waitForCompletion(true); 32 if (!succeeded) { 33 throw new IllegalStateException("Job failed!"); 34 } 35 }
ParallelFPGrowthMapper中的setup方法將在map方法以前被運行。setup方法中調用了readFList方法。注意這裏的readFList方法與以前分析的readFList方法參數不同,因此是兩個徹底不一樣的方法。這裏的readFList方法經過HadoopUtil.getCachedFiles(conf)來獲取緩存文件flist,將其存儲到fMap,其中item做爲fMap的鍵,item在flist中的位置序號做爲fMap的值,例如flist中的第一個item,其在fMap中將是<key=item, value=0>。這樣作的緣由是以後將fMap分Q個group時須要用到這個位置序號。在map方法中,輸入是字節偏移量和事務數據庫中的某一行數據。根據用戶指定的分隔符splitter來切分數據。爲了過濾非頻繁項,經過fMap.containsKey(item)方法來查找該項是否存在於fList中。若存在,將其所對應的位置序號加入到itemSet,不然,將其丟棄。itemArr複製itemSet中的數據,並按照位置序號遞增進行排序,即按照支持度遞減進行排序。以後的for循環從itemArr的最後一個元素向前遍歷,若是其所對應的groupID不在groups中,那麼將初始化TransactionTree,將itemArr[0],itemArr[1],…,itemArr[j]存入該TransactionTree中。groupID的計算很是簡單,將位置序號除以maxPerGroup便可。TransactionTree實現了Writable和Iterable<Pair<IntArrayList, Long>>接口,初始化TransactionTree時,構造方法將參數賦值給TransactionTree中的數據成員List<Pair<IntArrayList, Long>> transactionSet。這裏Pair對象存儲的兩個元素分別是位置序號列表和1。
1 /** 2 * maps each transaction to all unique items groups in the transaction. mapper 3 * outputs the group id as key and the transaction as value 4 * 5 */ 6 public class ParallelFPGrowthMapper extends Mapper<LongWritable,Text,IntWritable,TransactionTree> { 7 8 private final OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>(); 9 private Pattern splitter; 10 private int maxPerGroup; 11 private final IntWritable wGroupID = new IntWritable(); 12 13 @Override 14 protected void map(LongWritable offset, Text input, Context context) 15 throws IOException, InterruptedException { 16 17 String[] items = splitter.split(input.toString()); 18 19 OpenIntHashSet itemSet = new OpenIntHashSet(); 20 21 for (String item : items) { 22 if (fMap.containsKey(item) && !item.trim().isEmpty()) { 23 itemSet.add(fMap.get(item)); 24 } 25 } 26 27 IntArrayList itemArr = new IntArrayList(itemSet.size()); 28 itemSet.keys(itemArr); 29 itemArr.sort(); 30 31 OpenIntHashSet groups = new OpenIntHashSet(); 32 for (int j = itemArr.size() - 1; j >= 0; j--) { 33 // generate group dependent shards 34 int item = itemArr.get(j); 35 int groupID = PFPGrowth.getGroup(item, maxPerGroup); 36 37 if (!groups.contains(groupID)) { 38 IntArrayList tempItems = new IntArrayList(j + 1); 39 tempItems.addAllOfFromTo(itemArr, 0, j); 40 context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item); 41 wGroupID.set(groupID); 42 context.write(wGroupID, new TransactionTree(tempItems, 1L)); 43 } 44 groups.add(groupID); 45 } 46 47 } 48 49 @Override 50 protected void setup(Context context) throws IOException, InterruptedException { 51 super.setup(context); 52 53 int i = 0; 54 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 55 fMap.put(e.getFirst(), i++); 56 } 57 58 Parameters params = 59 new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 60 61 splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, 62 PFPGrowth.SPLITTER.toString())); 63 64 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 65 } 66 }
ParallelFPGrowthReducer的輸入是<key=groupID, value={TransactionTree1, TransactionTree2, … , TransactionTreeN}>。setup方法獲取了參數params,而且經過PFPGrowth.readFList(conf)方法獲取了緩存文件flist,將頻繁項存入featureReverseMap,將頻繁項對應的支持度存入freqList。以前分析到ParallelFPGrowthMapper輸出的TransactionTree實際上是List<Pair<IntArrayList, Long>> transactionSet。在ParallelFPGrowthReducer內初始化了一個TransactionTree,雖然這個TransactionTree與以前的Transaction是同一個類,可是是一棵用二維數組實現的樹。考慮到文章篇幅,建樹的過程這裏不做分析。假設已經建好了這棵樹,cTree.generateFList方法遍歷這棵樹,返回Map<Integer, MutableLong> frequencyList。具體的遍歷方法這裏不做詳細分析,提一下其調用過程:TransactionTree實現Iterator<Pair<IntArrayList, Long>>接口時重寫了iterator方法,在generateFList方法中經過iterator方法生成一個迭代器來遍歷整棵樹。iterator方法返回的是TransactionTreeIterator對象。TransactionTreeIterator對象繼承自AbstractIterator<Pair<IntArrayList, Long>>,實現了對TransactionTree進行遍歷。localFList合併了generateFList的結果並按照支持度遞減進行排序。生成頻繁模式的方法有兩種,用戶能夠本身選擇來調用FPGrowthIds.generateTopKFrequentPatterns方法或者fpGrowth.generateTopKFrequentPatterns方法來生成頻繁模式,本文將對後者進行分析。在ParallelFPGrowthReducer中還有一個IteratorAdapter類。它是設計模式中十分經典的適配器模式的具體應用,能夠將兩個不一樣類型的迭代器解耦。ParallelFPGrowthReducer的輸出是<key=item, value={Top K Frequent Patterns}>。
1 /** 2 * takes each group of transactions and runs Vanilla FPGrowth on it and 3 * outputs the the Top K frequent Patterns for each group. 4 * 5 */ 6 public final class ParallelFPGrowthReducer extends Reducer<IntWritable,TransactionTree,Text,TopKStringPatterns> { 7 8 private final List<String> featureReverseMap = Lists.newArrayList(); 9 private final LongArrayList freqList = new LongArrayList(); 10 private int maxHeapSize = 50; 11 private int minSupport = 3; 12 private int numFeatures; 13 private int maxPerGroup; 14 private boolean useFP2; 15 16 private static final class IteratorAdapter implements Iterator<Pair<List<Integer>,Long>> { 17 private final Iterator<Pair<IntArrayList,Long>> innerIter; 18 19 private IteratorAdapter(Iterator<Pair<IntArrayList,Long>> transactionIter) { 20 innerIter = transactionIter; 21 } 22 23 @Override 24 public boolean hasNext() { 25 return innerIter.hasNext(); 26 } 27 28 @Override 29 public Pair<List<Integer>,Long> next() { 30 Pair<IntArrayList,Long> innerNext = innerIter.next(); 31 return new Pair<List<Integer>,Long>(innerNext.getFirst().toList(), innerNext.getSecond()); 32 } 33 34 @Override 35 public void remove() { 36 throw new UnsupportedOperationException(); 37 } 38 } 39 40 @Override 41 protected void reduce(IntWritable key, Iterable<TransactionTree> values, Context context) throws IOException { 42 TransactionTree cTree = new TransactionTree(); 43 for (TransactionTree tr : values) { 44 for (Pair<IntArrayList,Long> p : tr) { 45 cTree.addPattern(p.getFirst(), p.getSecond()); 46 } 47 } 48 49 List<Pair<Integer,Long>> localFList = Lists.newArrayList(); 50 for (Entry<Integer,MutableLong> fItem : cTree.generateFList().entrySet()) { 51 localFList.add(new Pair<Integer,Long>(fItem.getKey(), fItem.getValue().toLong())); 52 } 53 54 Collections.sort(localFList, new CountDescendingPairComparator<Integer,Long>()); 55 56 if (useFP2) { 57 FPGrowthIds.generateTopKFrequentPatterns( 58 cTree.iterator(), 59 freqList, 60 minSupport, 61 maxHeapSize, 62 PFPGrowth.getGroupMembers(key.get(), maxPerGroup, numFeatures), 63 new IntegerStringOutputConverter( 64 new ContextWriteOutputCollector<IntWritable, TransactionTree, Text, TopKStringPatterns>(context), 65 featureReverseMap), 66 new ContextStatusUpdater<IntWritable, TransactionTree, Text, TopKStringPatterns>(context)); 67 } else { 68 FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>(); 69 fpGrowth.generateTopKFrequentPatterns( 70 new IteratorAdapter(cTree.iterator()), 71 localFList, 72 minSupport, 73 maxHeapSize, 74 Sets.newHashSet(PFPGrowth.getGroupMembers(key.get(), 75 maxPerGroup, 76 numFeatures).toList()), 77 new IntegerStringOutputConverter( 78 new ContextWriteOutputCollector<IntWritable,TransactionTree,Text,TopKStringPatterns>(context), 79 featureReverseMap), 80 new ContextStatusUpdater<IntWritable,TransactionTree,Text,TopKStringPatterns>(context)); 81 } 82 } 83 84 @Override 85 protected void setup(Context context) throws IOException, InterruptedException { 86 87 super.setup(context); 88 Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); 89 90 for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) { 91 featureReverseMap.add(e.getFirst()); 92 freqList.add(e.getSecond()); 93 } 94 95 maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50")); 96 minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3")); 97 98 maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0); 99 numFeatures = featureReverseMap.size(); 100 useFP2 = "true".equals(params.get(PFPGrowth.USE_FPG2)); 101 }
1 private int[] attribute; //節點的名稱屬性 2 private int[] childCount; //對該節點的有多少個孩子節點進行計數 3 private int[][] nodeChildren; //二維數組,記錄每個節點的孩子節點 4 private long[] nodeCount; //當前節點的支持度計數 5 private int nodes; 6 private boolean representedAsList; //true表示以List形式展示,false表示以樹的形式展示 7 private List<Pair<IntArrayList,Long>> transactionSet; 8 9 public int addPattern(IntArrayList myList, long addCount) { 10 int temp = ROOTNODEID; 11 int ret = 0; 12 boolean addCountMode = true; 13 for (int idx = 0; idx < myList.size(); idx++) { 14 int attributeValue = myList.get(idx); 15 int child; 16 if (addCountMode) { 17 child = childWithAttribute(temp, attributeValue); 18 if (child == -1) { 19 addCountMode = false; 20 } else { 21 addCount(child, addCount); 22 temp = child; 23 } 24 } 25 if (!addCountMode) { 26 child = createNode(temp, attributeValue, addCount); 27 temp = child; 28 ret++; 29 } 30 } 31 return ret; 32 }
generateTopKFrequentPatterns方法的形參有transactionStream,frequencyList,minSupport,k,Collection<A> returnableFeatures,OutputCollector<A, List<Pair<List<A>, Long>>> output,Statusupdater updater。其中,transactionStream是根據當前key=groupID所對應的Pair<List<A>, Long>類型的values創建的cTree,這裏Pair的第一項是位置序號,第二項是1;frequencyList是ParallelFPGrowthReducer中的localFList,其第一項是位置序號,第二項是支持度;Collection<A> returnableFeatures是當前key=group-id所包含的位置序號集合。
attributeIdMapping過濾了transactionStream中的非頻繁項,併爲頻繁項分配新id,將其映射成<key=位置序號, value=id>。reverseMapping倒置了attributeIdMapping的映射關係。attributeFrequentcy記錄了索引爲id的項的支持度。對於returnableFeatures中的位置序號進行遍歷,過濾非頻繁項,returnFeatures記錄了剩餘的頻繁項。以後調用generateTopKFrequentPatterns方法來構建本地的FP-tree和頭表(Header-Table),並遍歷FP-tree來輸出頻繁項。參考資料[1]詳細分析了這一過程,這裏不做進一步分析,須要注意到是在Mahout中FP-tree是以數組的形式存儲。
1 /** 2 * Generate Top K Frequent Patterns for every feature in returnableFeatures 3 * given a stream of transactions and the minimum support 4 * 5 * @param transactionStream 6 * Iterator of transaction 7 * @param frequencyList 8 * list of frequent features and their support value 9 * @param minSupport 10 * minimum support of the transactions 11 * @param k 12 * Number of top frequent patterns to keep 13 * @param returnableFeatures 14 * set of features for which the frequent patterns are mined. If the 15 * set is empty or null, then top K patterns for every frequent item (an item 16 * whose support> minSupport) is generated 17 * @param output 18 * The output collector to which the the generated patterns are 19 * written 20 * @throws IOException 21 */ 22 public final void generateTopKFrequentPatterns(Iterator<Pair<List<A>,Long>> transactionStream, 23 Collection<Pair<A, Long>> frequencyList, 24 long minSupport, 25 int k, 26 Collection<A> returnableFeatures, 27 OutputCollector<A,List<Pair<List<A>,Long>>> output, 28 StatusUpdater updater) throws IOException { 29 30 Map<Integer,A> reverseMapping = Maps.newHashMap(); 31 Map<A,Integer> attributeIdMapping = Maps.newHashMap(); 32 33 int id = 0; 34 for (Pair<A,Long> feature : frequencyList) { 35 A attrib = feature.getFirst(); 36 Long frequency = feature.getSecond(); 37 if (frequency >= minSupport) { 38 attributeIdMapping.put(attrib, id); 39 reverseMapping.put(id++, attrib); 40 } 41 } 42 43 long[] attributeFrequency = new long[attributeIdMapping.size()]; 44 for (Pair<A,Long> feature : frequencyList) { 45 A attrib = feature.getFirst(); 46 Long frequency = feature.getSecond(); 47 if (frequency < minSupport) { 48 break; 49 } 50 attributeFrequency[attributeIdMapping.get(attrib)] = frequency; 51 } 52 53 log.info("Number of unique items {}", frequencyList.size()); 54 55 Collection<Integer> returnFeatures = Sets.newHashSet(); 56 if (returnableFeatures != null && !returnableFeatures.isEmpty()) { 57 for (A attrib : returnableFeatures) { 58 if (attributeIdMapping.containsKey(attrib)) { 59 returnFeatures.add(attributeIdMapping.get(attrib)); 60 log.info("Adding Pattern {}=>{}", attrib, attributeIdMapping 61 .get(attrib)); 62 } 63 } 64 } else { 65 for (int j = 0; j < attributeIdMapping.size(); j++) { 66 returnFeatures.add(j); 67 } 68 } 69 70 log.info("Number of unique pruned items {}", attributeIdMapping.size()); 71 generateTopKFrequentPatterns(new TransactionIterator<A>(transactionStream, 72 attributeIdMapping), attributeFrequency, minSupport, k, reverseMapping 73 .size(), returnFeatures, new TopKPatternsOutputConverter<A>(output, 74 reverseMapping), updater); 75 76 }
AggregatorMapper的輸入是<key, value=TopKStringPatterns>,TopKStringPatterns是一個存儲<Pair<List<String>,Long>>類型的列表,List<String>類型元素記錄了每個key=item對應的頻繁模式,Long類型元素記錄了支持度。
1 /** 2 * 3 * outputs the pattern for each item in the pattern, so that reducer can group them 4 * and select the top K frequent patterns 5 * 6 */ 7 public class AggregatorMapper extends Mapper<Text,TopKStringPatterns,Text,TopKStringPatterns> { 8 9 @Override 10 protected void map(Text key, TopKStringPatterns values, Context context) throws IOException, 11 InterruptedException { 12 for (Pair<List<String>,Long> pattern : values.getPatterns()) { 13 for (String item : pattern.getFirst()) { 14 List<Pair<List<String>,Long>> patternSingularList = Lists.newArrayList(); 15 patternSingularList.add(pattern); 16 context.setStatus("Aggregator Mapper:Grouping Patterns for " + item); 17 context.write(new Text(item), new TopKStringPatterns(patternSingularList)); 18 } 19 } 20 21 } 22 }
AggregatorReducer彙總了全部Key相同的item,而後按照支持度遞減排序,最終輸出Top K個頻繁模式。
1 /** 2 * 3 * groups all Frequent Patterns containing an item and outputs the top K patterns 4 * containing that particular item 5 * 6 */ 7 public class AggregatorReducer extends Reducer<Text,TopKStringPatterns,Text,TopKStringPatterns> { 8 9 private int maxHeapSize = 50; 10 11 @Override 12 protected void reduce(Text key, Iterable<TopKStringPatterns> values, Context context) throws IOException, 13 InterruptedException { 14 TopKStringPatterns patterns = new TopKStringPatterns(); 15 for (TopKStringPatterns value : values) { 16 context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key); 17 patterns = patterns.merge(value, maxHeapSize); 18 } 19 context.write(key, patterns); 20 21 } 22 23 @Override 24 protected void setup(Context context) throws IOException, InterruptedException { 25 super.setup(context); 26 Parameters params = new Parameters(context.getConfiguration().get("pfp.parameters", "")); 27 maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50")); 28 29 } 30 }
4. 參考資料
[1] 關聯分析:FP-Growth算法. Mark Lin. datahunter. 2014. [Link]
[2] PFP: Parallel FP-Growth for Query Recommendation. Haoyuan Li etc. RecSys '08 Proceedings of the 2008 ACM conference on Recommender systems. 2008. [PDF]