Spark數據挖掘-FPGrowth算法

Spark數據挖掘-FPGrowth算法

主要內容

  1. 什麼是關聯規則挖掘?
  2. 關聯規則有哪些術語?
  3. 什麼是FP-Growth算法?

1.1 FPGrowth算法

1.1.1 基本概念

關聯規則挖掘的一個典型例子是購物籃分析。關聯規則研究有助於發現交易數據庫中不一樣商品(項)之間的聯繫,找出顧客購買行爲模式,如購買了某一商品對購買其餘商品的影響,分析結果能夠應用於商品貨架佈局、貨存安排以及根據購買模式對用戶進行分類。node

關聯規則的相關術語以下:算法

**(1)項與項集 **
這是一個集合的概念,在一籃子商品中的一件消費品即爲一項(Item),則若干項的集合爲項集,如{啤酒,尿布}構成一個二元項集。數據庫

**(2)關聯規則 **
通常記爲的形式,X爲先決條件,Y爲相應的關聯結果,用於表示數據內隱含的關聯性。如:表示購買了尿布的消費者每每也會購買啤酒。 關聯性強度如何,由三個概念——支持度、置信度、提高度來控制和評價。 例:有10000個消費者購買了商品,其中購買尿布1000個,購買啤酒2000個,購買麪包500個,同時購買尿布和麪包800個,同時購買尿布和麪包100個。微信

(3)支持度(Support)
支持度是指在全部項集中{X, Y}出現的可能性,即項集中同時含有X和Y的機率: 該指標做爲創建強關聯規則的第一個門檻,衡量了所考察關聯規則在「量」上的多少。經過設定最小閾值(minsup),剔除「出鏡率」較低的無心義規則,保留出現較爲頻繁的項集所隱含的規則。 設定最小閾值爲5%,因爲{尿布,啤酒}的支持度爲800/10000=8%,知足基本輸了要求,成爲頻繁項集,保留規則;而{尿布,麪包}的支持度爲100/10000=1%,被剔除。機器學習

(4)置信度(Confidence)
置信度表示在先決條件X發生的條件下,關聯結果Y發生的機率: 這是生成強關聯規則的第二個門檻,衡量了所考察的關聯規則在「質」上的可靠性。類似的,咱們須要對置信度設定最小閾值(mincon)來實現進一步篩選。 具體的,當設定置信度的最小閾值爲70%時,置信度爲800/1000=80%,而的置信度爲800/2000=40%,被剔除。ide

(5)提高度(lift)
提高度表示在含有X的條件下同時含有Y的可能性與沒有X這個條件下項集中含有Y的可能性之比:公式爲confidence(artichok => cracker)/support(cracker) = 80%/50% = 1.6。該指標與置信度一樣衡量規則的可靠性,能夠看做是置信度的一種互補指標。源碼分析

1.1.2 FP-Growth算法

FP-Growth(頻繁模式增加)算法是韓家煒老師在2000年提出的關聯分析算法,它採起以下分治策略:將提供頻繁項集的數據庫壓縮到一棵頻繁模式樹(FP-Tree),但仍保留項集關聯信息;該算法和Apriori算法最大的不一樣有兩點:第一,不產生候選集,第二,只須要兩次遍歷數據庫,大大提升了效率。佈局

**(1)按如下步驟構造FP-樹 **
(a) 掃描事務數據庫D一次。收集頻繁項的集合F和它們的支持度。對F按支持度降序排序,結果爲頻繁項表L。
(b) 建立FP-樹的根結點,以「null」標記它。對於D 中每一個事務Trans,執行:選擇 Trans 中的頻繁項,並按L中的次序排序。設排序後的頻繁項表爲[p | P],其中,p 是第一個元素,而P 是剩餘元素的表。調用insert_tree([p | P], T)。該過程執行狀況以下。若是T有子女N使得N.item-name = p.item-name,則N 的計數增長1;不然建立一個新結點N將其計數設置爲1,連接到它的父結點T,而且經過結點鏈結構將其連接到具備相同item-name的結點。若是P非空,遞歸地調用insert_tree(P, N)。學習

**(2)FP-樹的挖掘 **
經過調用FP_growth(FP_tree, null)實現。該過程實現以下:大數據

  • FP_growth(Tree, α)
  • if Tree 含單個路徑P then
  • for 路徑 P 中結點的每一個組合(記做β)
  • 產生模式β ∪ α,其支持度support = β中結點的最小支持度;
  • else for each ai在Tree的頭部(按照支持度由低到高順序進行掃描) {
  • 產生一個模式β = ai ∪ α,其支持度support = ai .support;
  • 構造β的條件模式基,而後構造β的條件FP-樹Treeβ;
  • if Treeβ ≠ ∅ then
  • 調用 FP_growth (Treeβ, β);}
  • end

1.1.3 FP-Growth算法演示—構造FP-樹

  • **事務數據庫創建 **
    0

  • **建立根結點和頻繁項目表 **
    1

  • 加入第一個事務(I2,I1,I5)
    2

  • 加入第二個事務(I2,I4)
    3

  • 加入第三個事務(I2,I3)
    4

以此類推加入第五、六、七、八、9個事務。

  • 加入第九個事務(I2,I1,I3)
    5

1.1.4 FP-Growth算法演示—FP-樹挖掘

FP-樹建好後,就能夠進行頻繁項集的挖掘,挖掘算法稱爲FpGrowth(Frequent Pattern Growth)算法,挖掘從表頭header的最後一個項開始,以此類推。本文以I五、I3爲例進行挖掘。

(1)挖掘I5:
對於I5,獲得條件模式基:<(I2,I1:1)>、<I2,I1,I3:1>
構造條件FP-tree:
6
獲得I5頻繁項集:{{I2,I5:2},{I1,I5:2},{I2,I1,I5:2}}
I四、I1的挖掘與I5相似,條件FP-樹都是單路徑。
(2)挖掘I3:
I5的狀況是比較簡單的,由於I5對應的條件FP-樹是單路徑的,I3稍微複雜一點。I3的條件模式基是(I2 I1:2), (I2:2), (I1:2),生成的條件FP-樹以下圖:
6
I3的條件FP-樹仍然是一個多路徑樹,首先把模式後綴I3和條件FP-樹中的項頭表中的每一項取並集,獲得一組模式{I2 I3:4, I1 I3:4},可是這一組模式不是後綴爲I3的全部模式。還須要遞歸調用FP-growth,模式後綴爲{I1,I3},{I1,I3}的條件模式基爲{I2:2},其生成的條件FP-樹以下圖所示。
6
在FP_growth中把I2和模式後綴{I1,I3}取並獲得模式{I1 I2 I3:2}。 理論上還應該計算一下模式後綴爲{I2,I3}的模式集,可是{I2,I3}的條件模式基爲空,遞歸調用結束。最終模式後綴I3的支持度>2的全部模式爲:{ I2 I3:4, I1 I3:4, I1 I2 I3:2}。

1.2 Spark Mllib FPGrowth源碼分析

FPGrowth源碼包括:FPGrowth、FPTree兩部分。 其中FPGrowth中包括:run方法、genFreqItems方法、genFreqItemsets方法、genCondTransactions方法; FPTree中包括:add方法、merge方法、project方法、getTransactions方法、extract方法。

// run 計算頻繁項集
  /**
   * Computes an FP-Growth model that contains frequent itemsets.
   * @param data input data set, each element contains a transaction
   * @return an [[FPGrowthModel]]
   */
  def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("Input data is not cached.")
    }
    val count = data.count()//計算事務總數
    val minCount = math.ceil(minSupport * count).toLong//計算最小支持度
    val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
//freqItems計算知足最小支持度的Items項
val freqItems = genFreqItems(data, minCount, partitioner)
//freqItemsets計算頻繁項集
    val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
    new FPGrowthModel(freqItemsets)
}
// genFreqItems計算知足最小支持度的Items項
/**
   * Generates frequent items by filtering the input data using minimal support level.
   * @param minCount minimum count for frequent itemsets
   * @param partitioner partitioner used to distribute items
   * @return array of frequent pattern ordered by their frequencies
   */
  privatedef genFreqItems[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      partitioner: Partitioner): Array[Item] = {
    data.flatMap { t =>
      val uniq = t.toSet
      if (t.size != uniq.size) {
        thrownew SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
      }
      t
    }.map(v => (v, 1L))
      .reduceByKey(partitioner, _ + _)
      .filter(_._2 >= minCount)
      .collect()
      .sortBy(-_._2)
      .map(_._1)
}//統計每一個Items項的頻次,對小於minCount的Items項過濾,返回Items項。

// genFreqItemsets計算頻繁項集:生成FP-Trees,挖掘FP-Trees
/**
   * Generate frequent itemsets by building FP-Trees, the extraction is done on each partition.
   * @param data transactions
   * @param minCount minimum count for frequent itemsets
   * @param freqItems frequent items
   * @param partitioner partitioner used to distribute transactions
   * @return an RDD of (frequent itemset, count)
   */
  privatedef genFreqItemsets[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      freqItems: Array[Item],
      partitioner: Partitioner): RDD[FreqItemset[Item]] = {
    val itemToRank = freqItems.zipWithIndex.toMap//表頭
    data.flatMap { transaction =>
      genCondTransactions(transaction, itemToRank, partitioner)
    }.aggregateByKey(new FPTree[Int], partitioner.numPartitions)( //生成FP樹
      (tree, transaction) => tree.add(transaction, 1L), //FP樹增長一條事務
      (tree1, tree2) => tree1.merge(tree2)) //FP樹合併
    .flatMap { case (part, tree) =>
      tree.extract(minCount, x => partitioner.getPartition(x) == part)//FP樹挖掘頻繁項
    }.map { case (ranks, count) =>
      new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
    }
}
// add FP-Trees增長一條事務數據
/** Adds a transaction with count. */
  def add(t: Iterable[T], count: Long = 1L): this.type = {
    require(count > 0)
    var curr = root
    curr.count += count
    t.foreach { item =>
      val summary = summaries.getOrElseUpdate(item, new Summary)
      summary.count += count
      val child = curr.children.getOrElseUpdate(item, {
        val newNode = new Node(curr)
        newNode.item = item
        summary.nodes += newNode
        newNode
      })
      child.count += count
      curr = child
    }
    this
}
// merge FP-Trees合併
  /** Merges another FP-Tree. */
  def merge(other: FPTree[T]): this.type = {
    other.transactions.foreach { case (t, c) =>
      add(t, c)
    }
    this
}
// extract FP-Trees挖掘,返回全部頻繁項集
  /** Extracts all patterns with valid suffix and minimum count. */
  def extract(
      minCount: Long,
      validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = {
    summaries.iterator.flatMap { case (item, summary) =>
      if (validateSuffix(item) && summary.count >= minCount) {
        Iterator.single((item :: Nil, summary.count)) ++
          project(item).extract(minCount).map { case (t, c) =>
            (item :: t, c)
          }
      } else {
        Iterator.empty
      }
    }
  }
}

1.3 Mllib FPGrowth實例

一、數據

數據格式爲:物品1物品2物品3…
r z h k p
z y x w v u t s
s x o n r
x z y m t s q e
z
x z y r q t p

二、代碼

//讀取樣本數據
 valdata_path = "/home/tmp/sample_fpgrowth.txt"
 valdata = sc.textFile(data_path)
 valexamples = data.map(_.split(" ")).cache()
 //創建模型
 valminSupport = 2
 valnumPartition = 10
 valmodel = new FPGrowth()
   .setMinSupport(minSupport)
   .setNumPartitions(numPartition)
   .run(examples)
 //打印結果
 println(s"Number of frequent itemsets: ${model.freqItemsets.count()}")
 model.freqItemsets.collect().foreach { itemset =>
   println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
 }

我的微信公衆號

歡迎關注本人微信公衆號,會定時發送關於大數據、機器學習、Java、Linux 等技術的學習文章,並且是一個系列一個系列的發佈,無任何廣告,純屬我的興趣。
Clebeg能量集結號

相關文章
相關標籤/搜索