Spark ML 特徵轉換及處理算子實戰技巧-Spark商業ML實戰

本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何商業交流,可隨時聯繫。java

1 燃燒吧特徵轉換

1.1 Tokenization 分詞器技術(RegexTokenizer)

Tokenization是將文本(例如句子)分割成單詞,默認是空格分割 RegexTokenizer是基於正則表達式進行單詞分割,默認打分割方式是'\s+',正則表達式

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")


=>  默認匹配的是空格
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val tokenized = tokenizer.transform(sentenceDataFrame)

val countTokens = udf { (words: Seq[String]) => words.length }
tokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)
    
 +-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+   
 
    
=> W 表示匹配出的是除單詞以外的任何分隔符
val regexTokenizer = new
RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern("\\W")       
  
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)

scala> regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)
+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+



=>  w 表示爲 [a-zA-Z0-9],setGaps(false)表示匹配的不是空格,而是直接匹配出單詞
val regexTokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern("\\w+").setGaps(false) 
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)

regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)

 regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(false)
 +-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+
複製代碼

1.2 移除停用詞

發現 I ,and ,had , a 被移除算法

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover().setInputCol("raw").setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)

scala> remover.transform(dataSet).show(false)
+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+

## 要求的默認停用詞列表
requirement failed: US is not in the supported language list: french, spanish, german, finnish, turkish, english, russian, norwegian, dutch, danish, hungarian, italian, swedish, portuguese.

scala> StopWordsRemover.loadDefaultStopWords("english")

res17: Array[String] = Array(i, me, my, myself, we, our, ours, ourselves, you, your, yours, yourself, yourselves, he, him, his, himself, she, her, hers, herself, it, its, itself, they, them, their, theirs, themselves, what, which, who, whom, this, that, these, those, am, is, are, was, were, be, been, being, have, has, had, having, do, does, did, doing, a, an, the, and, but, if, or, because, as, until, while, of, at, by, for, with, about, against, between, into, through, during, before, after, above, below, to, from, up, down, in, out, on, off, over, under, again, further, then, once, here, there, when, where, why, how, all, any, both, each, few, more, most, other, some, such, no, nor, not, only, own, same, so, than, too, very, s, t, can, will, just, don, should, now, i'll, you'll, he'll...
複製代碼

1.3 n-gram (獲得組合序列)

每一個n-gram由n個連續字的空格分隔的字符串表示。 若是輸入序列包含少於n個字符串,則不會生成輸出。 import org.apache.spark.ml.feature.NGramsql

val wordDataFrame = spark.createDataFrame(Seq(
      (0, Array("Hi", "I", "heard", "about", "Spark")),
      (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
      (2, Array("Logistic", "regression", "models", "are", "neat"))
    )).toDF("id", "words")

    val ngram = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams")

    val ngramDataFrame = ngram.transform(wordDataFrame)
    ngramDataFrame.select("ngrams").show(false)

    +--------------------------------------------------------------------------------+
    |ngrams                                                                          |
    +--------------------------------------------------------------------------------+
    |[Hi I heard, I heard about, heard about Spark]                                  |
    |[I wish Java, wish Java could, Java could use, could use case, use case classes]|
    |[Logistic regression models, regression models are, models are neat]            |
    +--------------------------------------------------------------------------------+
複製代碼

1.4 二值化(仍是比較厲害的)

二值化是將數字特徵閾值爲二進制(0/1)特徵的過程。 Binarizer接受通用參數inputCol和outputCol以及二進制閾值。 大於閾值的特徵值被二進制化爲1.0; 等於或小於閾值的值被二值化爲0.0。 inputCol支持Vector和Double類型。apache

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer().setInputCol("feature").setOutputCol("binarized_feature").setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

scala> println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
Binarizer output with Threshold = 0.5

scala>binarizedDataFrame.show()
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+
複製代碼

1.5 規範化(StandardScaler)

StandardScaler處理的對象是每一列,也就是每一維特徵,將特徵標準化爲單位標準差或是0均值,或是0均值單位標準差。學習

StandardScaler=(x-u)/標準差Sn

主要有兩個參數能夠設置:

  • withStd: 默認爲真。將數據標準化到單位標準差。測試

  • withMean: 默認爲假。是否變換爲0均值。 (此種方法將產出一個稠密輸出,因此不適用於稀疏輸入。)ui

  • StandardScaler須要fit數據,獲取每一維的均值和標準差,來縮放每一維特徵。 StandardScaler是一個Estimator,它能夠fit數據集產生一個StandardScalerModel,用來計算彙總統計。this

  • 而後產生的模能夠用來轉換向量至統一的標準差以及(或者)零均值特徵。注意若是特徵的標準差爲零,則該特徵在向量中返回的默認值爲0.0。編碼

    import org.apache.spark.ml.linalg.Vectors
       
      val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 0.5, -1.0)),
      (1, Vectors.dense(2.0, 1.0, 1.0)),
      (2, Vectors.dense(4.0, 10.0, 2.0))
      )).toDF("id", "features")
       
      dataFrame.show
      
      +---+--------------+
      | id|      features|
      +---+--------------+
      |  0|[1.0,0.5,-1.0]|
      |  1| [2.0,1.0,1.0]|
      |  2|[4.0,10.0,2.0]|
      +---+--------------+
      
      val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)
      val scalerModel = scaler.fit(dataFrame)
      val scaledData = scalerModel.transform(dataFrame)
      scaledData.show
      
      +---+--------------+--------------------+
      | id|      features|      scaledFeatures|
      +---+--------------+--------------------+
      |  0|[1.0,0.5,-1.0]|[0.65465367070797...|
      |  1| [2.0,1.0,1.0]|[1.30930734141595...|
      |  2|[4.0,10.0,2.0]|[2.61861468283190...|
      +---+--------------+--------------------+
      
      scala>     scaledData.rdd.foreach(println)
      [0,[1.0,0.5,-1.0],[0.6546536707079772,0.09352195295828246,-0.6546536707079771]]
      [1,[2.0,1.0,1.0],[1.3093073414159544,0.18704390591656492,0.6546536707079771]]
      [2,[4.0,10.0,2.0],[2.618614682831909,1.8704390591656492,1.3093073414159542]]
      
      val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(true)
      val scalerModel = scaler.fit(dataFrame)
      val scaledData = scalerModel.transform(dataFrame)
      scaledData.show
      
      scala>     scaledData.rdd.foreach(println)
      [0,[1.0,0.5,-1.0],[-0.8728715609439697,-0.6234796863885498,-1.0910894511799618]]
      [1,[2.0,1.0,1.0],[-0.2182178902359925,-0.5299577334302673,0.2182178902359924]]
      [2,[4.0,10.0,2.0],[1.0910894511799618,1.1534374198188169,0.8728715609439696]
    複製代碼

    1.6 正則化(Normalizer) --面向行

    範數是一種強化了的距離概念,它在定義上比距離多了一條數乘的運算法則。有時候爲了便於理解,咱們能夠把範數看成距離來理解。

  • L1範數是咱們常常見到的一種範數,它的定義以下:

  • L2範數是咱們最多見最經常使用的範數了,咱們用的最多的度量距離歐氏距離就是一種L2範數,它的定義以下:

  • 是L無窮範數,它主要被用來度量向量元素的最大值,與L0同樣,一般狀況下表示爲:

    import org.apache.spark.ml.feature.Normalizer
       
      正則化每一個向量到1階範數
      將每一行的規整爲1階範數爲1的向量,1階範數即全部值絕對值之和
      val normalizer = new Normalizer().setInputCol("features") .setOutputCol("normFeatures").setP(1.0)
      l1NormData.show()
    
      +---+--------------+------------------+
      | id| features| normFeatures|
      +---+--------------+------------------+
      | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]|
      | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]|
      | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
      +---+--------------+------------------+
       
      正則化每一個向量到無窮階範數,向量的無窮階範數即向量中全部值中的最大值
      val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
      lInfNormData.show()
       
      +---+--------------+--------------+
      | id| features| normFeatures|
      +---+--------------+--------------+
      | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
      | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
      | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
      +---+--------------+--------------+
    複製代碼

    1.7 最大最小值縮放 MinMaxScaler --面向列 (value-Emin/(Emax-Emin))*[max-min]+min

    Emin最小值爲每一列最小值
      Emax最小值爲每一列最大值
      MinMaxScaler做用一樣是每一列,即每一維特徵。將每一維特徵線性地映射到指定的區間,一般是[0, 1]。
      
      MinMaxScaler計算數據集的彙總統計量,併產生一個MinMaxScalerModel。
      
      注意由於零值轉換後可能變爲非零值,因此即使爲稀疏輸入,輸出也可能爲稠密向量。
      
      該模型能夠將獨立的特徵的值轉換到指定的範圍內。
      它也有兩個參數能夠設置:
      
      min: 默認爲0。指定區間的下限。
      max: 默認爲1。指定區間的上限。
      import org.apache.spark.ml.feature.MinMaxScaler
      
      val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 0.5, -1.0)),
      (1, Vectors.dense(2.0, 1.0, 1.0)),
      (2, Vectors.dense(4.0, 10.0, 2.0))
      )).toDF("id", "features")
       
      val scaler = new MinMaxScaler() .setInputCol("features").setOutputCol("scaledFeatures")
       
      // Compute summary statistics and generate MinMaxScalerModel
      val scalerModel = scaler.fit(dataFrame)
       
      // rescale each feature to range [min, max].
      val scaledData = scalerModel.transform(dataFrame)
      scaledData.select("features", "scaledFeatures").show
       
       每維特徵線性地映射,最小值映射到0,最大值映射到1。
      +--------------+-----------------------------------------------------------+
      |features |scaledFeatures |
      +--------------+-----------------------------------------------------------+
      |[1.0,0.5,-1.0]|[0.0,0.0,0.0] |
      |[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
      |[4.0,10.0,2.0]|[1.0,1.0,1.0] |
      +--------------+-----------------------------------------------------------+
    複製代碼

    1.8 最大值-絕對值縮放MaxAbsScaler(面向列-value除以絕對值最大值)

    MaxAbsScaler將每一維的特徵變換到[-1,1]閉區間上,經過除以每一維特徵上的最大的絕對值,它不會平移整個分佈,也不會破壞原來每個特徵向量的稀疏性。由於它不會轉移/集中數據,因此不會破壞數據的稀疏性。

    import org.apache.spark.ml.feature.MaxAbsScaler
       
      val scaler = new MaxAbsScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
       
      val scalerModel = scaler.fit(dataFrame)
       1]
      val scaledData = scalerModel.transform(dataFrame)
      scaledData.select("features", "scaledFeatures").show()
       
      // 每一維的絕對值的最大值爲[4, 10, 2]
      +--------------+----------------+
      | features| scaledFeatures|
      +--------------+----------------+
      |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
      | [2.0,1.0,1.0]| [0.5,0.1,0.5]|
      |[4.0,10.0,2.0]| [1.0,1.0,1.0]|
      +--------------+----------------+
    複製代碼

1.9 獨熱編碼(OneHotEncoderEstimator)- 面向列

把每一列的全部可能編碼成向量形式:如: 0被編碼爲:(1,0) 也即一個值變成一個向量。

import org.apache.spark.ml.feature.OneHotEncoderEstimator
val df = spark.createDataFrame(Seq(
  (0.0, 1.0),
  (1.0, 0.0),
  (2.0, 1.0),
  (0.0, 2.0),
  (0.0, 1.0),
  (2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")

val encoder = new OneHotEncoderEstimator().setInputCols(Array("categoryIndex1", "categoryIndex2")) .setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()

最終結果:
+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])|
|           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])|
|           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])|
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|
+--------------+--------------+-------------+-------------+


把每一列的全部可能編碼成向量形式:如:第一行0被編碼爲:(1,0,0)
val encoder = new OneHotEncoderEstimator().setInputCols(Array("categoryIndex1", "categoryIndex2")) .setOutputCols(Array("categoryVec1", "categoryVec2")).setDropLast(false)
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()

+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(3,[0],[1.0])|(3,[1],[1.0])|
|           1.0|           0.0|(3,[1],[1.0])|(3,[0],[1.0])|
|           2.0|           1.0|(3,[2],[1.0])|(3,[1],[1.0])|
|           0.0|           2.0|(3,[0],[1.0])|(3,[2],[1.0])|
|           0.0|           1.0|(3,[0],[1.0])|(3,[1],[1.0])|
|           2.0|           0.0|(3,[2],[1.0])|(3,[0],[1.0])|
+--------------+--------------+-------------+-------------+
複製代碼

1.9 字符串-索引變換

根據字符串出現的頻率來定義標籤列,出現頻率最高者爲0,所以能夠看到0,1,2.....,可是若出現訓練的模型中沒有測試的值,將會報錯,此時須要setHandleInvalid,跳過便可。

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex")
val model =indexer.fit(df)

val indexed = model.transform(df)
indexed.show()
scala> indexed.show
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+


訓練模型中沒有d時,會報錯,因此須要設置setHandleInvalid
 val df_test = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"),(5, "d"))
).toDF("id", "category")

 val indexer_test = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex").setHandleInvalid("skip")
 val model =indexer_test.fit(df)
 
 val indexed_test = model.transform(df_test)
複製代碼

結語

今天是個人CSDN技術專欄突破50篇的日子,值得慶祝!

秦凱新 於深圳 2018 11 18 1:47
相關文章
相關標籤/搜索