Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文將剖析Alink 「特徵工程」 部分對應代碼實現。java
機器學習的特徵工程是將原始的輸入數據轉換成特徵,以便於更好的表示潛在的問題,並有助於提升預測模型準確性的過程。算法
找出合適的特徵是很困難且耗時的工做,它須要專家知識,而應用機器學習基本也能夠理解成特徵工程。可是,特徵工程對機器學習模型的應用有很大影響,有句俗話叫作「數據和特徵決定了機器學習模型的性能上限」。apache
機器學習的輸入特徵包括幾種:app
特徵工程處理技巧大概有:dom
分箱(Binning)機器學習
獨熱編碼(One-Hot Encoding)ide
特徵哈希(Hashing Trick)函數
嵌套法(Embedding)工具
取對數(Log Transformation)性能
特徵縮放(Scaling)
標準化(Normalization)
特徵交互(Feature Interaction)
本文將爲你們講解特徵縮放和特徵哈希的實現。
特徵縮放是一種用於標準化獨立變量或數據特徵範圍的方法。 在數據處理中,它也稱爲數據標準化,而且一般在數據預處理步驟期間執行。特徵縮放能夠將很大範圍的數據限定在指定範圍內。因爲原始數據的值範圍變化很大,在一些機器學習算法中,若是沒有標準化,目標函數將沒法正常工做。 例如,大多數分類器按歐幾里德距離計算兩點之間的距離。 若是其中一個要素具備寬範圍的值,則距離將受此特定要素的控制。 所以,應對全部特徵的範圍進行歸一化,以使每一個特徵大體與最終距離成比例。
應用特徵縮放的另外一個緣由是梯度降低與特徵縮放比沒有它時收斂得快得多。特徵縮放主要包括兩種:
大多數機器學習算法的輸入要求都是實數矩陣,將原始數據轉換成實數矩陣就是所謂的特徵工程,而特徵哈希(feature hashing,也稱哈希技巧,hashing trick)就是一種特徵工程技術。
特徵哈希的目標就是將一個數據點轉換成一個向量 或者 把原始的高維特徵向量壓縮成較低維特徵向量,且儘可能不損失原始特徵的表達能力。
特徵哈希利用的是哈希函數將原始數據轉換成指定範圍內的散列值,相比較獨熱模型具備不少優勢,如支持在線學習,維度減少不少。
好比咱們將梁山好漢進行特徵哈希,以關勝爲例:
姓 名:關勝
排 名:坐第5把交椅
籍 貫:運城(今山西省-運城市)
綽 號:大刀
武 器:青龍偃月刀
星 號:天勇星
相 貌:堂堂八尺五六身軀,細細三柳髭髯,兩眉入鬢,鳳眼朝天,面如重棗,脣若塗朱。
原 型:南宋初,劉豫任濟南知府,金軍攻濟南。劉豫受金人利誘,殺守將關勝,降金。這段故事被清陳忱加以演義,寫入了《水滸後傳》。此關勝可能就是小說中的原型。
出場回合:第063回
後 代:關鈴,在《說岳全傳》出場,岳雲的義弟。
上面都是原始的輸入數據,包括數值特徵,分類特徵,文本特徵等等,計算機沒法識別,必須用特徵哈希轉換成計算機能夠識別的數據。
轉換以後以下(虛構,只是展現使用 ^_^):
// 假設結果是一個 30000 大小的稀疏向量,下面格式是:"index":"value" "725":"0.8223484445229384" //姓 名 "1000":"0.8444219609970856" //排 名 "4995":"-0.18307661612028242 " //籍 貫 "8049":"0.060151616110215377" //綽 號 "8517":"-0.7340742756048447 " //武 器 "26798":":-0.734299689415312" //星 號 "24390":"0.545435" //相 貌 "25083":"0.4543543" //原 型 "25435":"-0.243432" //出場回合 "25721":"-0.7340742756048447" //後 代
這樣關勝就變成了一個能夠被程序處理的向量。
咱們的數據集和示例代碼都是從FTRLExample獲取到的。
首先看數據集。
String schemaStr = "id string, click string, dt string, C1 string, banner_pos int, site_id string, site_domain string, " + "site_category string, app_id string, app_domain string, app_category string, device_id string, " + "device_ip string, device_model string, device_type string, device_conn_type string, C14 int, C15 int, " + "C16 int, C17 int, C18 int, C19 int, C20 int, C21 int"; //打印出前面幾列看看 trainBatchData.firstN(5).print(); id|click|dt|C1|banner_pos|site_id|site_domain|site_category|app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|C14|C15|C16|C17|C18|C19|C20|C21 --|-----|--|--|----------|-------|-----------|-------------|------|----------|------------|---------|---------|------------|-----------|----------------|---|---|---|---|---|---|---|--- 3199889859719711212|0|14102101|1005|0|1fbe01fe|f3845767|28905ebd|ecad2386|7801e8d9|07d7df22|a99f214a|cfa82746|c6263d8a|1|0|15708|320|50|1722|0|35|-1|79 3200127078337687811|0|14102101|1005|1|e5c60a05|7256c623|f028772b|ecad2386|7801e8d9|07d7df22|a99f214a|ffb0e59a|83ca6fdb|1|0|19771|320|50|2227|0|687|100075|48 3200382705425230287|1|14102101|1005|0|85f751fd|c4e18dd6|50e219e0|98fed791|d9b5648e|0f2161f8|a99f214a|f69683cc|f51246a7|1|0|20984|320|50|2371|0|551|-1|46 320073658191290816|0|14102101|1005|0|1fbe01fe|f3845767|28905ebd|ecad2386|7801e8d9|07d7df22|a99f214a|8e5b1a31|711ee120|1|0|15706|320|50|1722|0|35|100083|79 3200823995473818776|0|14102101|1005|0|f282ab5a|61eb5bc4|f028772b|ecad2386|7801e8d9|07d7df22|a99f214a|9cf693b4|8a4875bd|1|0|18993|320|50|2161|0|35|-1|157
從示例代碼能夠看到,首先作特徵縮放,而後作特徵哈希。
String[] selectedColNames = new String[]{ "C1", "banner_pos", "site_category", "app_domain", "app_category", "device_type", "device_conn_type", "C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21", "site_id", "site_domain", "device_id", "device_model"}; String[] categoryColNames = new String[]{ "C1", "banner_pos", "site_category", "app_domain", "app_category", "device_type", "device_conn_type", "site_id", "site_domain", "device_id", "device_model"}; String[] numericalColNames = new String[]{ "C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21"}; // setup feature engineering pipeline Pipeline featurePipeline = new Pipeline() .add( // 特徵縮放 new StandardScaler() .setSelectedCols(numericalColNames) // 對Double類型的列作變換 ) .add( // 特徵哈希 new FeatureHasher() .setSelectedCols(selectedColNames) .setCategoricalCols(categoryColNames) .setOutputCol(vecColName) .setNumFeatures(numHashFeatures) ); // fit feature pipeline model PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData);
StandardScaler的做用是把數據集的每個特徵進行標準差(standard deviation)轉換 和/或 零均值化(zero mean)。transforms a dataset, normalizing each feature to have unit standard deviation and/or zero mean.
對於作特徵縮放的好處,網上文章說的挺好:
當x全爲正或者全爲負時,每次返回的梯度都只會沿着一個方向發生變化,即梯度變化的方向就會向圖中紅色箭頭所示,一會向上太多,一會向下太多。這樣就會使得權重收斂效率很低。
但當x正負數量「差很少」時,就能對梯度變化方向進行「修正」,加速了權重的收斂。
讓咱們想一想若是作標準化縮放,具體須要怎麼作:
StandardScalerTrainBatchOp 類作了標準化縮放相關工做。這裏只對數字類型的列作轉換。
/* StandardScaler transforms a dataset, normalizing each feature to have unit standard deviation and/or zero mean. */ public class StandardScalerTrainBatchOp extends BatchOperator<StandardScalerTrainBatchOp> implements StandardTrainParams<StandardScalerTrainBatchOp> { @Override public StandardScalerTrainBatchOp linkFrom(BatchOperator<?>... inputs) { BatchOperator<?> in = checkAndGetFirst(inputs); String[] selectedColNames = getSelectedCols(); StandardScalerModelDataConverter converter = new StandardScalerModelDataConverter(); converter.selectedColNames = selectedColNames; converter.selectedColTypes = new TypeInformation[selectedColNames.length]; // 獲取須要轉換的列 for (int i = 0; i < selectedColNames.length; i++) { converter.selectedColTypes[i] = Types.DOUBLE; } //獲得變量以下 converter = {StandardScalerModelDataConverter@9229} selectedColNames = {String[8]@9228} 0 = "C14" 1 = "C15" 2 = "C16" 3 = "C17" 4 = "C18" 5 = "C19" 6 = "C20" 7 = "C21" selectedColTypes = {TypeInformation[8]@9231} 0 = {FractionalTypeInfo@9269} "Double" 1 = {FractionalTypeInfo@9269} "Double" 2 = {FractionalTypeInfo@9269} "Double" 3 = {FractionalTypeInfo@9269} "Double" 4 = {FractionalTypeInfo@9269} "Double" 5 = {FractionalTypeInfo@9269} "Double" 6 = {FractionalTypeInfo@9269} "Double" 7 = {FractionalTypeInfo@9269} "Double" // 用獲取到的列信息經過 StatisticsHelper.summary 作總結,而後經過 BuildStandardScalerModel 進行操做 DataSet<Row> rows = StatisticsHelper.summary(in, selectedColNames) .flatMap(new BuildStandardScalerModel(converter.selectedColNames, converter.selectedColTypes, getWithMean(), getWithStd())); this.setOutput(rows, converter.getModelSchema()); return this; }
這裏調用一環套一環,因此先打印出構建執行計劃時候的調用棧給你們看看。
summarizer:277, StatisticsHelper (com.alibaba.alink.operator.common.statistics) summarizer:240, StatisticsHelper (com.alibaba.alink.operator.common.statistics) summary:71, StatisticsHelper (com.alibaba.alink.operator.common.statistics) linkFrom:49, StandardScalerTrainBatchOp (com.alibaba.alink.operator.batch.dataproc) train:22, StandardScaler (com.alibaba.alink.pipeline.dataproc) fit:34, Trainer (com.alibaba.alink.pipeline) fit:117, Pipeline (com.alibaba.alink.pipeline) main:59, FTRLExample (com.alibaba.alink)
StandardScalerTrainBatchOp.linkFrom 構建出來的執行計劃從邏輯上講是:
具體結合代碼以下
StatisticsHelper.summary 首先調用summarizer對原始輸入table作總結,對應代碼 2)
/* table summary, selectedColNames must be set. */ public static DataSet<TableSummary> summary(BatchOperator in, String[] selectedColNames) { return summarizer(in, selectedColNames, false) // 將會調用代碼 2.1) .map(new MapFunction<TableSummarizer, TableSummary>() { @Override public TableSummary map(TableSummarizer summarizer) throws Exception { return summarizer.toSummary(); // 對應代碼 2.2) } }).name("toSummary"); }
summarizer(in, selectedColNames, false)
從原始輸入中獲取到那些選中的列,而後繼續調用另外同名函數summarizer。
/** * table stat */ private static DataSet<TableSummarizer> summarizer(BatchOperator in, String[] selectedColNames, boolean calculateOuterProduct) { // 對應代碼 2.1) in = in.select(selectedColNames); // 對應代碼2.1.1) return summarizer(in.getDataSet(), calculateOuterProduct, getNumericalColIndices(in.getColTypes()), selectedColNames); //對應代碼2.1.2) }
同名函數summarizer調用 TableSummarizerPartition 對每一個partition處理,固然你們知道如今只是把執行計劃搭建起來,不是真正的執行。當對每一個partition處理完成以後,會回到這裏的reduce函數進行merge。
/* given data, return summary. numberIndices is the indices of cols which are number type in selected cols. */ private static DataSet<TableSummarizer> summarizer(DataSet<Row> data, boolean bCov, int[] numberIndices, String[] selectedColNames) { return data // mapPartition 對應代碼 2.1.2.1) .mapPartition(new TableSummarizerPartition(bCov, numberIndices, selectedColNames)) .reduce(new ReduceFunction<TableSummarizer>() { // reduce對應代碼 2.1.2.2) @Override public TableSummarizer reduce(TableSummarizer left, TableSummarizer right) { return TableSummarizer.merge(left, right); //最終會merge全部的partition處理結果 } }); }
TableSummarizerPartition針對每一個partition,讓每一個worker用來TableSummarizer.visit來作table summary,之後會合並。對應代碼 2.1.2.1.1) 。
/* It is table summary partition of one worker, will merge result later. */ public static class TableSummarizerPartition implements MapPartitionFunction<Row, TableSummarizer> { @Override public void mapPartition(Iterable<Row> iterable, Collector<TableSummarizer> collector) { TableSummarizer srt = new TableSummarizer(selectedColNames, numericalIndices, outerProduct); srt.colNames = selectedColNames; for (Row sv : iterable) { srt = (TableSummarizer) srt.visit(sv); } collector.collect(srt); } } // 變量以下 srt = {TableSummarizer@10742} "count: 0\n" sv = {Row@10764} "15708,320,50,1722,0,35,-1,79" srt.colNames = {String[8]@10733} 0 = "C14" 1 = "C15" 2 = "C16" 3 = "C17" 4 = "C18" 5 = "C19" 6 = "C20" 7 = "C21"
咱們能夠看到,上面代碼中會對 iterable 作循環調用 TableSummarizer.visit函數。即經過visit
來對輸入的每一個item(這個item就是srt.colNames對應的那些列集合起來作了一個Row)作累積計算,算出好比squareSum,min, max, normL1等等,具體在下面的變量中有體現。
this = {TableSummarizer@10742} "count: 1\nsum: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0\nsquareSum: 2.46741264E8 102400.0 2500.0 2965284.0 0.0 1225.0 1.0 6241.0\nmin: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0\nmax: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0" colNames = {String[8]@10733} xSum = null xSquareSum = null xyCount = null numericalColIndices = {int[8]@10734} numMissingValue = {DenseVector@10791} "0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0" sum = {DenseVector@10792} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0" squareSum = {DenseVector@10793} "2.46741264E8 102400.0 2500.0 2965284.0 0.0 1225.0 1.0 6241.0" min = {DenseVector@10794} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0" max = {DenseVector@10795} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0" normL1 = {DenseVector@10796} "15708.0 320.0 50.0 1722.0 0.0 35.0 1.0 79.0" vals = {Double[8]@10797} outerProduct = null count = 1 calculateOuterProduct = false
這裏的功能是生成模型 / 存儲。
/* table summary build model. */ public static class BuildStandardScalerModel implements FlatMapFunction<TableSummary, Row> { private String[] selectedColNames; private TypeInformation[] selectedColTypes; private boolean withMean; private boolean withStdDevs; @Override public void flatMap(TableSummary srt, Collector<Row> collector) throws Exception { if (null != srt) { StandardScalerModelDataConverter converter = new StandardScalerModelDataConverter(); converter.selectedColNames = selectedColNames; converter.selectedColTypes = selectedColTypes; // 業務 converter.save(new Tuple3<>(this.withMean, this.withStdDevs, srt), collector); } } }
save函數調用的是StandardScalerModelDataConverter.save,邏輯比較清晰:
/* * Serialize the model data to "Tuple3<Params, List<String>, List<Row>>". * * @param modelData The model data to serialize. * @return The serialization result. */ @Override public Tuple3<Params, Iterable<String>, Iterable<Row>> serializeModel(Tuple3<Boolean, Boolean, TableSummary> modelData) { Boolean withMean = modelData.f0; Boolean withStandarDeviation = modelData.f1; TableSummary summary = modelData.f2; String[] colNames = summary.getColNames(); double[] means = new double[colNames.length]; double[] stdDevs = new double[colNames.length]; for (int i = 0; i < colNames.length; i++) { means[i] = summary.mean(colNames[i]); // 1. 存儲mean stdDevs[i] = summary.standardDeviation(colNames[i]); // 2. 存儲stdDevs } for (int i = 0; i < colNames.length; i++) { if (!withMean) { means[i] = 0; } if (!withStandarDeviation) { stdDevs[i] = 1; } } // 3. 構建元數據Params Params meta = new Params() .set(StandardTrainParams.WITH_MEAN, withMean) .set(StandardTrainParams.WITH_STD, withStandarDeviation); // 4. 序列化 List<String> data = new ArrayList<>(); data.add(JsonConverter.toJson(means)); data.add(JsonConverter.toJson(stdDevs)); return new Tuple3<>(meta, data, new ArrayList<>()); }
調用棧和變量以下,咱們能夠看出來模型是如何構建的。
save:68, RichModelDataConverter (com.alibaba.alink.common.model) flatMap:84, StandardScalerTrainBatchOp$BuildStandardScalerModel (com.alibaba.alink.operator.batch.dataproc) flatMap:63, StandardScalerTrainBatchOp$BuildStandardScalerModel (com.alibaba.alink.operator.batch.dataproc) collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) run:152, AllReduceDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang) // 如下是輸入 modelData = {Tuple3@10723} f0 = {Boolean@10726} true f1 = {Boolean@10726} true f2 = {TableSummary@10707} "colName|count|numMissingValue|numValidValue|sum|mean|variance|standardDeviation|min|max|normL1|normL2\r\n-------|-----|---------------|-------------|---|----|--------|-----------------|---|---|------|------\nC14|399999|0.0000|399999.0000|7257042877.0000|18142.6525|10993280.1107|3315.6116|375.0000|21705.0000|7257042877.0000|11664445.8724\nC15|399999|0.0000|399999.0000|127629988.0000|319.0758|411.3345|20.2814|120.0000|1024.0000|127629988.0000|202208.2328\nC16|399999|0.0000|399999.0000|22663266.0000|56.6583|1322.7015|36.3690|20.0000|768.0000|22663266.0000|42580.9842\nC17|399999|0.0000|399999.0000|809923879.0000|2024.8148|170166.5008|412.5124|112.0000|2497.0000|809923879.0000|1306909.3634\nC18|399999|0.0000|399999.0000|414396.0000|1.0360|1.5871|1.2598|0.0000|3.0000|414396.0000|1031.5736\nC19|399999|0.0000|399999.0000|77641159.0000|194.1034|73786.4929|271.6367|33.0000|1835.0000|77641159.0000|211151.2756\nC20|399999|0.0000|399999.0000|16665597769.0000|41664.0986|2434589745.2799|49341.5620|-1.0000|100" colNames = {String[8]@10728} numMissingValue = {DenseVector@10729} "0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0" sum = {DenseVector@10730} "7.257042877E9 1.27629988E8 2.2663266E7 8.09923879E8 414396.0 7.7641159E7 1.6665597769E10 3.0589982E7" squareSum = {DenseVector@10731} "1.36059297509295E14 4.0888169392E10 1.813140212E9 1.708012084269E12 1064144.0 4.4584861175E10 1.668188137320503E15 3.044124336E9" min = {DenseVector@10732} "375.0 120.0 20.0 112.0 0.0 33.0 -1.0 13.0" max = {DenseVector@10733} "21705.0 1024.0 768.0 2497.0 3.0 1835.0 100248.0 195.0" normL1 = {DenseVector@10734} "7.257042877E9 1.27629988E8 2.2663266E7 8.09923879E8 414396.0 7.7641159E7 1.6666064771E10 3.0589982E7" numericalColIndices = {int[8]@10735} count = 399999 // 這是輸出 model = {Tuple3@10816} "(Params {withMean=true, withStd=true},[[18142.652549131373,319.07576768941925,56.658306645766615,2024.814759536899,1.035992589981475,194.1033827584569,41664.098582746454,76.47514618786548], [3315.6115741652725,20.281383913437733,36.36896282478844,412.51242496870356,1.259797591740416,271.6366927754722,49341.56204742555,41.974829196745965]],[])" f0 = {Params@10817} "Params {withMean=true, withStd=true}" f1 = {ArrayList@10820} size = 2 0 = "[18142.652549131373,319.07576768941925,56.658306645766615,2024.814759536899,1.035992589981475,194.1033827584569,41664.098582746454,76.47514618786548]" 1 = "[3315.6115741652725,20.281383913437733,36.36896282478844,412.51242496870356,1.259797591740416,271.6366927754722,49341.56204742555,41.974829196745965]" f2 = {ArrayList@10818} size = 0
訓練好以後,當轉換時候,會對每一個item row進行map,這裏面使用以前計算出來的 means/stdDevs 進行具體標準化。
@Override public Row map(Row row) throws Exception { Row r = new Row(this.selectedColIndices.length); for (int i = 0; i < this.selectedColIndices.length; i++) { Object obj = row.getField(this.selectedColIndices[i]); if (null != obj) { if (this.stddevs[i] > 0) { double d = (((Number) obj).doubleValue() - this.means[i]) / this.stddevs[i]; r.setField(i, d); } else { r.setField(i, 0.0); } } } return this.predResultColsHelper.getResultRow(row, r); } // means,stddevs 是對應那幾列以前統計出來的整體數值,是根據這些來進行轉換的。 this = {StandardScalerModelMapper@10909} selectedColNames = {String[8]@10873} selectedColTypes = {TypeInformation[8]@10874} selectedColIndices = {int[8]@10912} means = {double[8]@10913} 0 = 18142.652549131373 ... 7 = 76.47514618786548 stddevs = {double[8]@10914} 0 = 3315.6115741652725 ... 7 = 41.974829196745965
變量以下,Row是輸入數據,r 是對那幾個須要轉換的數據進行轉換以後,生成的數據。
標準化以後,用 OutputColsHelper.getResultRow把 Row 和 r 歸併起來。
row = {Row@10865} "3200382705425230287,1,14102101,1005,0,85f751fd,c4e18dd6,50e219e0,98fed791,d9b5648e,0f2161f8,a99f214a,f69683cc,f51246a7,1,0,20984,320,50,2371,0,551,-1,46" 其中 "20984,320,50,2371,0,551,-1,46" 是須要轉換的數據。 r = {Row@10866} "0.8569602884149525,0.04557047559108551,-0.18307661612028242,0.8392116685682023,-0.8223484445229384,1.313874843618953,-0.8444219609970856,-0.7260338343491822" 這裏是上面須要轉換的數據進行標準化以後的結果
堆棧以下
getResultRow:177, OutputColsHelper (com.alibaba.alink.common.utils) map:88, StandardScalerModelMapper (com.alibaba.alink.operator.common.dataproc) map:43, ModelMapperAdapter (com.alibaba.alink.common.mapper) map:18, ModelMapperAdapter (com.alibaba.alink.common.mapper) run:103, MapDriver (org.apache.flink.runtime.operators) run:504, BatchTask (org.apache.flink.runtime.operators) invoke:369, BatchTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
FeatureHasher完成了特徵哈希功能,這個沒有訓練,就是mapper。具體細節是:
對應代碼看。
最終生成了一個30000大小的,最後名字是"vec"的特徵矩陣。這裏是稀疏矩陣。
String vecColName = "vec"; int numHashFeatures = 30000; // setup feature engineering pipeline Pipeline featurePipeline = new Pipeline() .add( new StandardScaler() .setSelectedCols(numericalColNames) ) .add( new FeatureHasher() .setSelectedCols(selectedColNames) .setCategoricalCols(categoryColNames) .setOutputCol(vecColName) .setNumFeatures(numHashFeatures) );
傳入map函數時候,Row就是 「原始數據通過標準化處理以後的數據」。
遍歷數值特徵列,進行哈希變換;遍歷categorical特徵列,進行哈希轉換。
public class FeatureHasherMapper extends Mapper { /** * Projects a number of categorical or numerical features into a feature vector of a specified dimension. * * @param row the input Row type data * @return the output row. */ @Override public Row map(Row row) { TreeMap<Integer, Double> feature = new TreeMap<>(); // 遍歷數值特徵列,進行哈希變換; for (int key : numericColIndexes) { if (null != row.getField(key)) { double value = ((Number)row.getField(key)).doubleValue(); String colName = colNames[key]; updateMap(colName, value, feature, numFeature); } } // 遍歷categorical特徵列,進行哈希轉換 for (int key : categoricalColIndexes) { if (null != row.getField(key)) { String colName = colNames[key]; updateMap(colName + "=" + row.getField(key).toString(), 1.0, feature, numFeature); } } return outputColsHelper.getResultRow(row, Row.of(new SparseVector(numFeature, feature))); } } //運行時候打印變量以下 selectedCols = {String[19]@9817} 0 = "C1" 1 = "banner_pos" 2 = "site_category" 3 = "app_domain" 4 = "app_category" 5 = "device_type" 6 = "device_conn_type" 7 = "C14" 8 = "C15" 9 = "C16" 10 = "C17" 11 = "C18" 12 = "C19" 13 = "C20" 14 = "C21" 15 = "site_id" 16 = "site_domain" 17 = "device_id" 18 = "device_model" numericColIndexes = {int[8]@10789} 0 = 16 1 = 17 2 = 18 3 = 19 4 = 20 5 = 21 6 = 22 7 = 23 categoricalColIndexes = {int[11]@10791} 0 = 3 1 = 4 2 = 7 3 = 9 4 = 10 5 = 14 6 = 15 7 = 5 8 = 6 9 = 11 10 = 13
updateMap完成了具體哈希操做,用哈希函數生成了稀疏矩陣的index,而後把value放入對應的index中。
具體哈希函數使用 org.apache.flink.shaded.guava18.com.google.common.hash
。
/* Update the treeMap which saves the key-value pair of the final vector, use the hash value of the string as key * and the accumulate the corresponding value. * * @param s the string to hash * @param value the accumulated value */ private static void updateMap(String s, double value, TreeMap<Integer, Double> feature, int numFeature) { // HASH = {Murmur3_32HashFunction@10755} "Hashing.murmur3_32(0)" int hashValue = Math.abs(HASH.hashUnencodedChars(s).asInt()); int index = Math.floorMod(hashValue, numFeature); if (feature.containsKey(index)) { feature.put(index, feature.get(index) + value); } else { feature.put(index, value); } }
好比當以下輸入時候,獲得index是26798,因此會在vec中的 26798 中設置Value
s = "C14" value = 0.33428145187593655 feature = {TreeMap@10836} size = 1 {Integer@10895} 26798 -> {Double@10896} 0.33428145187593655 numFeature = 30000 hashValue = 23306798 index = 26798
最終特徵哈希以後,獲得的vec會附加在原始Row上的第25項(原來是24項,如今在最後附加一項),就是下面的 24 = {SparseVector@10932}
。
row = {Row@10901} fields = {Object[25]@10907} 0 = "3199889859719711212" 1 = "0" 2 = "14102101" 3 = "1005" 4 = {Integer@10912} 0 5 = "1fbe01fe" // "device_type" 是這個數值,這個是原始輸入,你們若是遺忘能夠回頭看看示例代碼輸出。 6 = "f3845767" 7 = "28905ebd" 8 = "ecad2386" 9 = "7801e8d9" 10 = "07d7df22" 11 = "a99f214a" 12 = "cfa82746" 13 = "c6263d8a" 14 = "1" 15 = "0" 16 = {Double@10924} -0.734299689415312 17 = {Double@10925} 0.04557047559108551 18 = {Double@10926} -0.18307661612028242 19 = {Double@10927} -0.7340742756048447 20 = {Double@10928} -0.8223484445229384 21 = {Double@10929} -0.5857212482334542 22 = {Double@10930} -0.8444219609970856 23 = {Double@10931} 0.060151616110215377 24 = {SparseVector@10932} "$30000$725:-0.8223484445229384 1000:1.0 3044:-0.8444219609970856 4995:-0.18307661612028242 8049:0.060151616110215377 8517:1.0 10962:1.0 17954:1.0 18556:1.0 21430:1.0 23250:1.0 24010:1.0 24390:1.0 25083:0.04557047559108551 25435:-0.5857212482334542 25721:-0.7340742756048447 26169:1.0 26798:-0.734299689415312 29671:1.0" // 30000 表示一共是30000大小的稀疏向量 // 725:-0.8223484445229384 表示第725的item中的數值是-0.8223484445229384,依次類推。