Spark連續特徵轉化成離散特徵

當數據量很大的時候,分類任務一般使用【離散特徵+LR】集成【連續特徵+xgboost】,若是把連續特徵加入到LR、決策樹中,容易形成overfit。
若是想用上連續型特徵,使用集成學習集成多種算法是一種方法,可是一是過程複雜了一些,另外訓練過程會很是耗時,在不損失不少特徵信息的狀況下,能夠考慮將連續特徵轉換成離散特徵加入到LR模型中。html

轉換特徵分紅兩種狀況:算法

  • 第一種狀況: 特徵還未轉化成訓練數據所須要的向量格式,此時每一個特徵爲單獨的一列,須要對這些單獨的列進行離散化分桶。
  • 第二種狀況: 全部特徵已經轉化成訓練數據所須要的向量格式,可是離散化的特徵編號雜亂,例如:編號爲[10,15,128,……],須要轉化爲[0,1,2,……],此時全部特徵已經合併成一個向量,可是這個向量爲單獨的一列可是包含了離散特徵和連續特徵,那麼須要先識別出離散特徵,再把離散特徵進行規範化。

1. 第一種狀況

1.1.二元轉化

Binarization is the process of thresholding numerical features to binary (0/1) features.(二元轉化,把連續特徵轉化爲0/1特徵)sql

Binarizer takes the common parameters inputCol and outputCol, as well as the threshold for binarization. Feature values greater than the threshold are binarized to 1.0; values equal to or less than the threshold are binarized to 0.0. Both Vector and Double types are supported for inputCol.(支持兩種格式,double&vector,大於閾值的改成1.0,低於閾值的改成0.0)apache

import org.apache.spark.ml.feature.Binarizer val data = Array((0, 0.1), (1, 0.8), (2, 0.2)) val dataFrame = spark.createDataFrame(data).toDF("id", "feature") val binarizer: Binarizer = new Binarizer() .setInputCol("feature") .setOutputCol("binarized_feature") .setThreshold(0.5) val binarizedDataFrame = binarizer.transform(dataFrame) println(s"Binarizer output with Threshold = ${binarizer.getThreshold}") binarizedDataFrame.show()

1.2.多元轉換(分桶Bucketizer)

Bucketizer transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users. It takes a parameter:
splits: Parameter for mapping continuous features into buckets. With n+1 splits, there are n buckets. A bucket defined by splits x,y holds values in the range [x,y) except the last bucket, which also includes y. Splits should be strictly increasing. Values at -inf, inf must be explicitly provided to cover all Double values; Otherwise, values outside the splits specified will be treated as errors. Two examples of splits are Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) and Array(0.0, 1.0, 2.0).api

二元轉換的時候須要給出一個閥值,在多元換轉換中,若是要分紅n類,就要給出n+1個閥值組成的array,任意一個數均可以被放在某兩個閥值的區間內,就像把它放進屬於它的桶中,故稱爲分桶策略。
好比有x,y兩個閥值,那麼他們組成的區間是[x,y)的前開後閉區間;對於最後一個區間是前閉後閉區間。app

給出的這個閥值array,裏面的元素必須是遞增的。若是在轉換的過程當中有一個數沒有被包含在區間內,那麼就會報錯,因此,若是不肯定特徵值的最小與最大值,那麼就添加Double.NegativeInfinity(負無窮)和Double.PositiveInfinity(正無窮)到array的兩側。less

Note that if you have no idea of the upper and lower bounds of the targeted column, you should add Double.NegativeInfinity and Double.PositiveInfinity as the bounds of your splits to prevent a potential out of Bucketizer bounds exception. 當不知道範圍的時候設定成正負無窮做爲邊界。
Note also that the splits that you provided have to be in strictly increasing order, i.e. s0 < s1 < s2 < ... < sn.ide

import org.apache.spark.ml.feature.Bucketizer val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9) val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val bucketizer = new Bucketizer() .setInputCol("features") .setOutputCol("bucketedFeatures") .setSplits(splits) // Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame) println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets") bucketedData.show() val splitsArray = Array( Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity)) val data2 = Array( (-999.9, -999.9), (-0.5, -0.2), (-0.3, -0.1), (0.0, 0.0), (0.2, 0.4), (999.9, 999.9)) val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2") val bucketizer2 = new Bucketizer() .setInputCols(Array("features1", "features2")) .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2")) .setSplitsArray(splitsArray) // Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2) println(s"Bucketizer output with [" + s"${bucketizer2.getSplitsArray(0).length-1}, " + s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column") bucketedData2.show()

封裝成函數調用:函數

  //連續特徵離散化(分多個桶)
  def QuantileDiscretizer_multi_class(df:DataFrame,InputCol:String,OutputCol:String,NumBuckets:Int):(DataFrame) = { import org.apache.spark.ml.feature.Bucketizer val discretizer = new QuantileDiscretizer() .setHandleInvalid("skip") .setInputCol(InputCol) .setOutputCol(OutputCol) .setNumBuckets(NumBuckets) println("\n\n*********分桶數量:"+ NumBuckets  + "***********分桶列:" + InputCol + "**********輸出列:" + OutputCol + "**********\n\n") val result = discretizer.fit(df).transform(df) result.show(false) result }

1.3.QuantileDiscretizer(分位數離散化)

QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. It is possible that the number of buckets used will be smaller than this value, for example, if there are too few distinct values of the input to create enough distinct quantiles.學習

NaN values: NaN values will be removed from the column during QuantileDiscretizer fitting. This will produce a Bucketizermodel for making predictions. During the transformation, Bucketizer will raise an error when it finds NaN values in the dataset, but the user can also choose to either keep or remove NaN values within the dataset by setting handleInvalid. If the user chooses to keep NaN values, they will be handled specially and placed into their own bucket, for example, if 4 buckets are used, then non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4].

Algorithm: The bin ranges are chosen using an approximate algorithm (see the documentation for approxQuantile for a detailed description). The precision of the approximation can be controlled with the relativeError parameter. When set to zero, exact quantiles are calculated (Note: Computing exact quantiles is an expensive operation). The lower and upper bin bounds will be -Infinity and +Infinity covering all real values.

QuantileDiscretizer(分位數離散化)。經過取一個樣本的數據,並將其分爲大體相等的部分,設定範圍。其下限爲 -Infinity(負無重大) ,上限爲+Infinity(正無重大)。
分桶的數量由numbucket參數設置,但若是樣本數據只存在n個區間,此時設置numBuckets爲n+1,則仍只能劃分出n個區間。
分級的範圍有漸進算法決定。漸進的精度由relativeError參數決定。當relativeError設置爲0時,將會計算精確的分位點(計算代價較大,一般使用默認便可)。relativeError參數必須在[0,1]範圍內,默認值爲0.001。
當分桶器分桶遇到NaN值時,會出現一個錯誤(默認)。handleInvalid參數能夠來選擇保留或者刪除NaN值,若是選擇不刪除,NaN值的數據會單獨放入一個桶中。
handleInvalid的選項有'skip'(過濾掉具備無效值的行)、'error'(拋出錯誤)或'keep'(將無效值保留在一個特殊的額外bucket中,默認是'error'。

import org.apache.spark.ml.feature.QuantileDiscretizer val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)) val df = spark.createDataFrame(data).toDF("id", "hour") val discretizer = new QuantileDiscretizer() .setHandleInvalid("skip") .setInputCol("hour") .setOutputCol("result") .setNumBuckets(3) val result = discretizer.fit(df).transform(df) result.show(false)

封裝使用:

 //連續特徵離散化(分多個桶)
  def QuantileDiscretizer_multi_class(df:DataFrame,InputCol:String,OutputCol:String,NumBuckets:Int):(DataFrame) = { import org.apache.spark.ml.feature.QuantileDiscretizer val discretizer = new QuantileDiscretizer() .setHandleInvalid("skip") .setInputCol(InputCol) .setOutputCol(OutputCol) .setNumBuckets(NumBuckets) println("\n\n*********分桶數量:"+ NumBuckets  + "***********分桶列:" + InputCol + "**********輸出列:" + OutputCol + "**********\n\n") val result = discretizer.fit(df).transform(df) result.show(false) result }

實際使用中不建議直接對全量數據作處理,由於一般全量數據都很大,使用這個函數時集羣常常會出現各類問題,建議只對訓練集作處理或者對全量數據採樣處理,再保存訓練好的模型直接轉換全量數據。

2. 第二種狀況

2.1.向量轉規範的離散特徵-VectorIndexer

import org.apache.spark.ml.feature.VectorIndexer VectorIndexerModel featureIndexerModel=new VectorIndexer() .setInputCol("features")  //定義特徵列
                 .setMaxCategories(5)     //多於5個取值視爲連續值,連續值不進行轉換。
                 .setOutputCol("indexedFeatures") .fit(rawData); //加入到Pipeline
Pipeline pipeline=new Pipeline() .setStages(new PipelineStage[] {labelIndexerModel, featureIndexerModel, dtClassifier, converter}); pipeline.fit(rawData).transform(rawData).select("features","indexedFeatures").show(20,false);

2.2.字符串轉離散特徵

import org.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))).toDF("id", "category") val indexer = new StringIndexer() .setInputCol("category")   //改成帶索引的標籤,索引爲該標籤出現的次數。
         .setOutputCol("categoryIndex") .setHandleInvalid("skip")  //若是category數量多於label的數量,選擇error會報錯,選擇skip則直接跳過這些數據
val indexed = indexer.fit(df).transform(df) indexed.show()
相關文章
相關標籤/搜索