方案選擇可參考:[Scikit-learn] 4.3 Preprocessing datahtml
代碼示範可參考:[ML] Pyspark ML tutorial for beginners git
本篇涉及:Feature Transformersgithub
對於沒有 "數字自己" 的意義的特徵時,能夠考慮。算法
from pyspark.ml.feature import Binarizer continuousDataFrame = spark.createDataFrame([ (0, 0.1), (1, 0.8), (2, 0.2) ], ["id", "feature"]) continuousDataFrame.show() +---+-------+ | id|feature| +---+-------+ | 0| 0.1| | 1| 0.8| | 2| 0.2| +---+-------+
# define model, and no fit, and transform binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature") binarizedDataFrame = binarizer.transform(continuousDataFrame) print("Binarizer output with Threshold = %f" % binarizer.getThreshold()) binarizedDataFrame.show()
Binarizer output with Threshold = 0.500000 +---+-------+-----------------+ | id|feature|binarized_feature| +---+-------+-----------------+ | 0| 0.1| 0.0| | 1| 0.8| 1.0| | 2| 0.2| 0.0| +---+-------+-----------------+
有時候,等級可能採用字母,而非數字去表示。sql
from pyspark.ml.feature import IndexToString, StringIndexer df = spark.createDataFrame( [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"])
# StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") model = indexer.fit(df) indexed = model.transform(df) print("Transformed string column '%s' to indexed column '%s'" % (indexer.getInputCol(), indexer.getOutputCol())) indexed.show() print("StringIndexer will store labels in output column metadata\n")
Transformed string column 'category' to indexed column 'categoryIndex' +---+--------+-------------+ | id|category|categoryIndex| +---+--------+-------------+ | 0| a| 0.0| | 1| b| 2.0| | 2| c| 1.0| | 3| a| 0.0| | 4| a| 0.0| | 5| c| 1.0| +---+--------+-------------+
# StringIndexer will store labels in output column metadata converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory") converted = converter.transform(indexed) print("Transformed indexed column '%s' back to original string column '%s' using labels in metadata" % (converter.getInputCol(), converter.getOutputCol())) converted.select("id", "categoryIndex", "originalCategory").show() Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata +---+-------------+----------------+ | id|categoryIndex|originalCategory| +---+-------------+----------------+ | 0| 0.0| a| | 1| 2.0| b| | 2| 1.0| c| | 3| 0.0| a| | 4| 0.0| a| | 5| 1.0| c| +---+-------------+----------------+
多是個臨時的api。apache
from pyspark.ml.feature import OneHotEncoderEstimator df = spark.createDataFrame([ (0, 3), (2, 0) ], ["categoryIndex1", "categoryIndex2"]) encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"], outputCols=["categoryVec1", "categoryVec2"]) model = encoder.fit(df) encoded = model.transform(df) encoded.show() +--------------+--------------+-------------+-------------+ |categoryIndex1|categoryIndex2| categoryVec1| categoryVec2| +--------------+--------------+-------------+-------------+ | 0| 3|(2,[0],[1.0])| (3,[],[])| | 2| 0| (2,[],[])|(3,[0],[1.0])| +--------------+--------------+-------------+-------------+
VectorAssembler
將多個數值列按順序彙總成一個向量列。api
from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler dataset = spark.createDataFrame( [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)], ["id", "hour", "mobile", "userFeatures", "clicked"]) assembler = VectorAssembler( inputCols=["hour", "mobile", "userFeatures"], outputCol="features") output = assembler.transform(dataset) print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'") output.select("features", "clicked").show(truncate=False)
Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features' +-----------------------+-------+ |features |clicked| +-----------------------+-------+ |[18.0,1.0,0.0,10.0,0.5]|1.0 | +-----------------------+-------+
本篇章結合: [Feature] Compare the effect of different scalersmarkdown
sklearn提供的常見 「去量綱」 的策略。函數
distributions = [ ('Unscaled data', X), ('Data after standard scaling', StandardScaler().fit_transform(X)), ('Data after min-max scaling', MinMaxScaler().fit_transform(X)), ('Data after max-abs scaling', MaxAbsScaler().fit_transform(X)), ('Data after robust scaling', RobustScaler(quantile_range=(25, 75)).fit_transform(X)), [spark ml暫不支持] ('Data after power transformation (Yeo-Johnson)', PowerTransformer(method='yeo-johnson').fit_transform(X)), ('Data after power transformation (Box-Cox)', PowerTransformer(method='box-cox').fit_transform(X)), ('Data after quantile transformation (gaussian pdf)', QuantileTransformer(output_distribution='normal').fit_transform(X)), ('Data after quantile transformation (uniform pdf)', QuantileTransformer(output_distribution='uniform').fit_transform(X)), ('Data after sample-wise L2 normalizing', Normalizer().fit_transform(X)), ]
使用的緣由:若是某個特徵的方差遠大於其它特徵的方差,那麼它將會在算法學習中佔據主導位置,致使咱們的學習器不能像咱們指望的那樣,去學習其餘的特徵,這將致使最後的模型收斂速度慢甚至不收斂,所以咱們須要對這樣的特徵數據進行標準化/歸一化。post
In [19]: from pyspark.ml.feature import StandardScaler
# 稀疏表示法 dataFrame = spark.read.format("libsvm").load("file:///usr/local/spark/data/mllib/sample_libsvm_data.txt") dataFrame.show() +-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(692,[127,128,129...| | 1.0|(692,[158,159,160...| | 1.0|(692,[124,125,126...| | 1.0|(692,[152,153,154...| | 1.0|(692,[151,152,153...| | 0.0|(692,[129,130,131...| | 1.0|(692,[158,159,160...| | 1.0|(692,[99,100,101,...| | 0.0|(692,[154,155,156...| | 0.0|(692,[127,128,129...| | 1.0|(692,[154,155,156...| | 0.0|(692,[153,154,155...| | 0.0|(692,[151,152,153...| | 1.0|(692,[129,130,131...| | 0.0|(692,[154,155,156...| | 1.0|(692,[150,151,152...| | 0.0|(692,[124,125,126...| | 0.0|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...| +-----+--------------------+ only showing top 20 rows
In [21]: scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False) scalerModel = scaler.fit(dataFrame) scaledData = scalerModel.transform(dataFrame)
scaledData.show() +-----+--------------------+--------------------+ |label| features| scaledFeatures| +-----+--------------------+--------------------+ | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| | 1.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[151,152,153...|(692,[151,152,153...| | 0.0|(692,[129,130,131...|(692,[129,130,131...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[99,100,101,...|(692,[99,100,101,...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[153,154,155...|(692,[153,154,155...| | 0.0|(692,[151,152,153...|(692,[151,152,153...| | 1.0|(692,[129,130,131...|(692,[129,130,131...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 1.0|(692,[150,151,152...|(692,[150,151,152...| | 0.0|(692,[124,125,126...|(692,[124,125,126...| | 0.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| +-----+--------------------+--------------------+ only showing top 20 rows
縮放到一個指定的最大和最小值(一般是1-0)之間:(1)對於方差很是小的屬性能夠加強其穩定性。(2)維持稀疏矩陣中爲0的條目。
from pyspark.ml.feature import MinMaxScaler from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.1, -1.0]),), (1, Vectors.dense([2.0, 1.1, 1.0]),), (2, Vectors.dense([3.0, 10.1, 3.0]),) ], ["id", "features"]) dataFrame.show() +---+--------------+ | id| features| +---+--------------+ | 0|[1.0,0.1,-1.0]| | 1| [2.0,1.1,1.0]| | 2|[3.0,10.1,3.0]| +---+--------------+
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures") scalerModel = scaler.fit(dataFrame) scaledData = scalerModel.transform(dataFrame) print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax())) scaledData.select("features", "scaledFeatures").show()
Features scaled to range: [0.000000, 1.000000] +--------------+--------------+ | features|scaledFeatures| +--------------+--------------+ |[1.0,0.1,-1.0]| [0.0,0.0,0.0]| | [2.0,1.1,1.0]| [0.5,0.1,0.5]| |[3.0,10.1,3.0]| [1.0,1.0,1.0]| +--------------+--------------+
縮放到一個指定的最大和最小值(一般是1到-1)之間。
from pyspark.ml.feature import MaxAbsScaler from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.1, -8.0]),), (1, Vectors.dense([2.0, 1.0, -4.0]),), (2, Vectors.dense([4.0, 10.0, 8.0]),) ], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures") scalerModel = scaler.fit(dataFrame) scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show() +--------------+----------------+ | features| scaledFeatures| +--------------+----------------+ |[1.0,0.1,-8.0]|[0.25,0.01,-1.0]| |[2.0,1.0,-4.0]| [0.5,0.1,-0.5]| |[4.0,10.0,8.0]| [1.0,1.0,1.0]| +--------------+----------------+
Ref: 標準化和歸一化的區別
[歸一化],適用於「線性模型」,讓不一樣維度之間的特徵在數值上有必定比較性,能夠大大提升分類器的準確性。可是,當有新數據加入時,可能致使max和min的變化,須要從新定義。
以下,這兩個維度特徵的量級不一樣,會致使訓練出來模型中老虎這個特徵對應的w參數大,而麻雀數量這個特徵對應的w參數小,容易致使參數小的特徵對目標函數的影響被覆蓋;
因此須要對每一個特徵的數據進行歸一化處理,以減小不一樣量級的特徵數據覆蓋其餘特徵對目標函數的影響。
[標準化],消除分佈產生的度量誤差,例如:班級數學考試,數學成績在90-100之間,語文成績在60-100之間,那麼,小明數學90,語文100,小花數學95,語文95,如何評價兩個綜合成績好壞的數學處理方式。
from pyspark.ml.feature import Normalizer from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.5, -1.0]),), (1, Vectors.dense([2.0, 1.0, 1.0]),), (2, Vectors.dense([4.0, 10.0, 2.0]),) ], ["id", "features"]) normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
# Normalize each Vector using $L^1$ norm. l1NormData = normalizer.transform(dataFrame) print("Normalized using L^1 norm") l1NormData.show()
Normalized using L^1 norm +---+--------------+------------------+ | id| features| normFeatures| +---+--------------+------------------+ | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]| | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]| | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]| +---+--------------+------------------+
# Normalize each Vector using L∞ norm. lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")}) print("Normalized using L^inf norm") lInfNormData.show()
Normalized using L^inf norm +---+--------------+--------------+ | id| features| normFeatures| +---+--------------+--------------+ | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]| | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]| | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]| +---+--------------+--------------+
Ref: Map data to a normal distribution
數據分佈的傾斜有不少負面的影響。
咱們可使用特徵工程技巧,利用統計或數學變換來減輕數據分佈傾斜的影響。使本來密集的區間的值儘量的分散,本來分散的區間的值儘可能的聚合。
Log變換傾向於拉伸那些落在較低的幅度範圍內自變量值的範圍,壓縮或減小較高幅度範圍內的自變量值的範圍。從而使得傾斜分佈儘量的接近正態分佈。
判斷特徵數據是有有偏。
# Here's how you check skewness (we will do it for the 'balance' feature only). fraud_pd.agg({'balance': 'skewness'}).show()
+------------------+ | skewness(balance)| +------------------+ |1.1818315552993002| +------------------+
白化是例如pca,ica操做以前的必要數據預處理步驟。
舉例來講,假設訓練數據是圖像,因爲圖像中相鄰像素之間具備很強的相關性,因此用於訓練時輸入是冗餘的。
白化的目的就是下降輸入的冗餘性;更正式的說,咱們但願經過白化過程使得學習算法的輸入具備以下性質:
(i) 特徵之間相關性較低;
(ii) 全部特徵具備相同的方差。
代碼:Unsupervised Feature Learning and Deep Learning [matlab代碼]
from pyspark.ml.feature import PCA from pyspark.ml.linalg import Vectors data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] df = spark.createDataFrame(data, ["features"]) df.show() +--------------------+ | features| +--------------------+ | (5,[1,3],[1.0,7.0])| |[2.0,0.0,3.0,4.0,...| |[4.0,0.0,0.0,6.0,...| +--------------------+
# define model, and fit, and transform pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures") model = pca.fit(df) result = model.transform(df).select("pcaFeatures") result.show(truncate=False) +-----------------------------------------------------------+ |pcaFeatures | +-----------------------------------------------------------+ |[1.6485728230883807,-4.013282700516296,-5.524543751369388] | |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]| |[-6.428880535676489,-5.337951427775355,-5.524543751369389] | +-----------------------------------------------------------+
End.