Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式、流式算法的機器學習平臺。本文和下文將帶領你們來分析Alink中 Word2Vec 的實現。html
由於Alink的公開資料太少,因此如下均爲自行揣測,確定會有疏漏錯誤,但願你們指出,我會隨時更新。java
one-hot編碼就是保證每一個樣本中的單個特徵只有1位處於狀態1,其餘的都是0。 具體編碼舉例以下,把語料庫中,杭州、上海、寧波、北京每一個都對應一個向量,向量中只有一個值爲1,其他都爲0。node
杭州 [0,0,0,0,0,0,0,1,0,……,0,0,0,0,0,0,0] 上海 [0,0,0,0,1,0,0,0,0,……,0,0,0,0,0,0,0] 寧波 [0,0,0,1,0,0,0,0,0,……,0,0,0,0,0,0,0] 北京 [0,0,0,0,0,0,0,0,0,……,1,0,0,0,0,0,0]
其缺點是:git
因此,人們想對獨熱編碼作以下改進:github
簡單說,要尋找一個空間映射,把高維詞向量嵌入到一個低維空間。而後就能夠繼續處理。算法
分佈式表示(Distributed Representation)其實Hinton 最先在1986年就提出了,基本思想是將每一個詞表達成 n 維稠密、連續的實數向量。而實數向量之間的關係能夠表明詞語之間的類似度,好比向量的夾角cosine或者歐氏距離。數組
有一個專門的術語來描述詞向量的分佈式表示技術——詞嵌入【word embedding】。從名稱上也能夠看出來,獨熱編碼至關於對詞進行編碼,而分佈式表示則是將詞從稀疏的大維度壓縮嵌入到較低維度的向量空間中。網絡
Distributed representation 最大的貢獻就是讓相關或者類似的詞,在距離上更接近了。其核心思想是:上下文類似的詞,其語義也類似。這就是著名的詞空間模型(word space model)。數據結構
Distributed representation 相較於One-hot方式另外一個區別是維數降低極多,對於一個100W的詞表,咱們能夠用100維的實數向量來表示一個詞,而One-hot得要100W維。dom
爲何映射到向量空間當中的詞向量就能表示是肯定的哪一個詞而且還能知道它們之間的類似度呢?
詞向量的分佈式表示的核心思想由兩部分組成:
事實上,無論是神經網絡的隱層,仍是多個潛在變量的機率主題模型,都是應用分佈式表示。
在word2vec出現以前,已經有用神經網絡DNN來用訓練詞向量進而處理詞與詞之間的關係了。採用的方法通常是一個三層的神經網絡結構(固然也能夠多層),分爲輸入層,隱藏層和輸出層(softmax層)。
這個模型是如何定義數據的輸入和輸出呢?通常分爲CBOW(Continuous Bag-of-Words Model) 和 Skip-gram (Continuous Skip-gram Model)兩種模型。
CBOW經過上下文來預測當前值。至關於一句話中扣掉一個詞,讓你猜這個詞是什麼。CBOW就是根據某個詞前面的C個詞或者先後C個連續的詞,來計算某個詞出現的機率。
CBOW的訓練過程以下:
Skip-gram用當前詞來預測上下文。至關於給你一個詞,讓你猜前面和後面可能出現什麼詞。即根據某個詞,而後分別計算它先後出現某幾個詞的各個機率。從這裏能夠看出,對於每個詞,Skip-gram要訓練C次,這裏C是預設的窗口大小,而CBOW只須要計算一次,所以CBOW計算量是Skip-gram的1/C,但也正由於Skip-gram同時擬合了C個詞,所以在避免過擬合上比CBOW效果更好,所以在訓練大型語料庫的時候,Skip-gram的效果比CBOW更好。
Skip-gram的訓練方法與CBOW一模一樣,惟一區別就是Skip-gram的輸入是單個詞的向量,而不是C個詞的求和平均。同時,訓練的話對於一箇中心詞,要訓練C次,每一次是一個不一樣的上下文詞,好比中心詞是北京,窗口詞是來到、天安門這兩個,那麼Skip-gram要對北京-來到、北京-天安門進行分別訓練。
目前的實現有一個問題:從隱藏層到輸出的softmax層的計算量很大,由於要計算全部詞的softmax機率,再去找機率最大的值。好比Vocab大小有10^5,那麼每算一個機率都要計算10^5次矩陣乘法,不現實。因而就引入了Word2vec。
所謂的語言模型,就是指對天然語言進行假設和建模,使得可以用計算機可以理解的方式來表達天然語言。word2vec採用的是n元語法模型(n-gram model),即假設一個詞只與周圍n個詞有關,而與文本中的其餘詞無關。
若是 把詞當作特徵,那麼就能夠把特徵映射到 K 維向量空間,能夠爲文本數據尋求更加深層次的特徵表示 。因此 Word2vec的基本思想是 經過訓練將每一個詞映射成 K 維實數向量(K 通常爲模型中的超參數),經過詞之間的距離(好比 cosine 類似度、歐氏距離等)來判斷它們之間的語義類似度。
其採用一個 三層的神經網絡 ,輸入層-隱層-輸出層。有個核心的技術是 根據詞頻用Huffman編碼 ,使得全部詞頻類似的詞隱藏層激活的內容基本一致,出現頻率越高的詞語,他們激活的隱藏層數目越少,這樣有效的下降了計算的複雜度。
這個三層神經網絡自己是 對語言模型進行建模 ,但也同時 得到一種單詞在向量空間上的表示,而這個反作用纔是Word2vec的真正目標。
word2vec對以前的模型作了改進,
Word2vec計算能夠用 層次Softmax算法 ,這種算法結合了Huffman編碼,其實藉助了分類問題中,使用一連串二分類近似多分類的思想。例如咱們是把全部的詞都做爲輸出,那麼「桔子」、「汽車」都是混在一塊兒。給定w_t的上下文,先讓模型判斷w_t是否是名詞,再判斷是否是食物名,再判斷是否是水果,再判斷是否是「桔子」。
取一個適當大小的窗口當作語境,輸入層讀入窗口內的詞,將它們的向量(K維,初始隨機)加和在一塊兒,造成隱藏層K個節點。輸出層是一個巨大的二叉樹,葉節點表明語料裏全部的詞(語料含有V個獨立的詞,則二叉樹有|V|個葉節點)。而這整顆二叉樹構建的算法就是Huffman樹。
這樣,語料庫中的某個詞w_t 都對應着二叉樹的某個葉子節點,這樣每一個詞 w 均可以從樹的根結點root沿着惟一一條路徑被訪問到,其路徑也就造成了其全局惟一的二進制編碼code,如"010011"。
不妨記左子樹爲1,右子樹爲0。接下來,隱層的每個節點都會跟二叉樹的內節點有連邊,因而對於二叉樹的每個內節點都會有K條連邊,每條邊上也會有權值。假設 n(w, j)爲這條路徑上的第 j 個結點,且 L(w)爲這條路徑的長度, j 從 1 開始編碼,即 n(w, 1)=root,n(w, L(w)) = w。對於第 j 個結點,層次 Softmax 定義的Label 爲 1 - code[j]。
在訓練階段,當給定上下文,要預測後面的詞w_t的時候,咱們就從二叉樹的根節點開始遍歷,這裏的目標就是預測這個詞的二進制編號的每一位。即對於給定的上下文,咱們的目標是使得預測詞的二進制編碼機率最大。形象地說,對於 "010011",咱們但願在根節點,詞向量和與根節點相連通過logistic計算獲得bit=1的機率儘可能接近0,在第二層,但願其bit=1的機率儘可能接近1,這麼一直下去,咱們把一路上計算獲得的機率相乘,即獲得目標詞w_t在當前網絡下的機率P(w_t),那麼對於當前這個sample的殘差就是1-P(w_t),因而就可使用梯度降低法訓練這個網絡獲得全部的參數值了。顯而易見,按照目標詞的二進制編碼計算到最後的機率值就是歸一化的。
在訓練過程當中,模型會賦予這些抽象的中間結點一個合適的向量,這個向量表明了它對應的全部子結點。由於真正的單詞公用了這些抽象結點的向量,因此Hierarchical Softmax方法和原始問題並非等價的,可是這種近似並不會顯著帶來性能上的損失同時又使得模型的求解規模顯著上升。
傳統的Softmax能夠當作是一個線性表,平均查找時間O(n)。HS方法將Softmax作成一顆平衡的滿二叉樹,維護詞頻後,變成Huffman樹。
因爲咱們把以前全部都要計算的從輸出softmax層的機率計算變成了一顆二叉霍夫曼樹,那麼咱們的softmax機率計算只須要沿着樹形結構進行就能夠了。咱們能夠沿着霍夫曼樹從根節點一直走到咱們的葉子節點的詞w2。
和以前的神經網絡語言模型相比,咱們的霍夫曼樹的全部內部節點就相似以前神經網絡隱藏層的神經元,其中,根節點的詞向量對應咱們的投影后的詞向量,而全部葉子節點就相似於以前神經網絡softmax輸出層的神經元,葉子節點的個數就是詞彙表的大小。在霍夫曼樹中,隱藏層到輸出層的softmax映射不是一會兒完成的,而是沿着霍夫曼樹一步步完成的,所以這種softmax取名爲"Hierarchical Softmax"。
如何「沿着霍夫曼樹一步步完成」呢?在word2vec中,咱們採用了二元邏輯迴歸的方法,即規定沿着左子樹走,那麼就是負類(霍夫曼樹編碼1),沿着右子樹走,那麼就是正類(霍夫曼樹編碼0)。判別正類和負類的方法是使用sigmoid函數即:
其中xw是當前內部節點的詞向量,而θ則是咱們須要從訓練樣本求出的邏輯迴歸的模型參數。
使用霍夫曼樹有什麼好處呢?
容易理解,被劃分爲左子樹而成爲負類的機率爲P(−)=1−P(+)。在某一個內部節點,要判斷是沿左子樹仍是右子樹走的標準就是看P(−),P(+)誰的機率值大。而控制P(−),P(+)誰的機率值大的因素一個是當前節點的詞向量,另外一個是當前節點的模型參數θ。
對於上圖中的w2,若是它是一個訓練樣本的輸出,那麼咱們指望對於裏面的隱藏節點n(w2,1)的P(−)機率大,n(w2,2)的P(−)機率大,n(w2,3)的P(+)機率大。
回到基於Hierarchical Softmax的word2vec自己,咱們的目標就是找到合適的全部節點的詞向量和全部內部節點θ, 使訓練樣本達到最大似然。
定義 w 通過的霍夫曼樹某一個節點j的邏輯迴歸機率爲:
那麼對於某一個目標輸出詞w,其最大似然爲:
在word2vec中,因爲使用的是隨機梯度上升法,因此並無把全部樣本的似然乘起來獲得真正的訓練集最大似然,僅僅每次只用一個樣本更新梯度,這樣作的目的是減小梯度計算量。
能夠求出的梯度表達式以下:
有了梯度表達式,咱們就能夠用梯度上升法進行迭代來一步步的求解咱們須要的全部的θwj−1和xw。
注意!word2vec要訓練兩組參數:一個是網絡隱藏層的參數,一個是輸入單詞的參數(1 * dim)
在skip gram和CBOW中,中心詞詞向量在迭代過程當中是不會更新的,只更新窗口詞向量,這個中心詞對應的詞向量須要下一次在做爲非中心詞的時候才能進行迭代更新。
Alink的實現核心是以 https://github.com/tmikolov/word2vec 爲基礎進行修改,實際上若是不是對C語言很是抵觸,建議先閱讀這個代碼。由於Alink的並行處理代碼真的挺難理解,尤爲是數據預處理部分。
以問題爲導向:
咱們把Alink的測試代碼修改下。須要說明的是Word2vec也吃內存,因此個人機器上須要配置VM啓動參數:-Xms256m -Xmx640m -XX:PermSize=128m -XX:MaxPermSize=512m
。
public class Word2VecTest { public static void main(String[] args) throws Exception { TableSchema schema = new TableSchema( new String[] {"docid", "content"}, new TypeInformation <?>[] {Types.LONG(), Types.STRING()} ); List <Row> rows = new ArrayList <>(); rows.add(Row.of(0L, "老王 是 咱們 團隊 裏 最胖 的")); rows.add(Row.of(1L, "老黃 是 第二 胖 的")); rows.add(Row.of(2L, "胖")); rows.add(Row.of(3L, "胖 胖 胖")); MemSourceBatchOp source = new MemSourceBatchOp(rows, schema); Word2Vec word2Vec = new Word2Vec() .setSelectedCol("content") .setOutputCol("output") .setMinCount(1); List<Row> result = word2Vec.fit(source).transform(source).collect(); System.out.println(result); } }
程序輸出是
[0,老王 是 咱們 團隊 裏 最胖 的,0.8556591824716802 0.4185472857807756 0.5917632873908979 0.445803358747732 0.5351499521578621 0.6559828965377957 0.5965739474021792 0.473846881662404 0.516117276817363 0.3434555277582306 0.38403383919352685 ..., 1,老黃 是 第二 胖 的,0.9227240557894372 0.5697617202790405 0.42338677208067105 0.5483285740408497 0.5950012315151869 0.4155926470754411 0.6283449603326386 0.47098108241809644 0.2874100346124693 0.41205111525453264 0.59972461077888 ..., 3,胖 胖 胖,0.9220798404216994 0.8056990255747927 0.166767439210223 0.1651382099869762 0.7498624766177563 0.12363837145024788 0.16301554444226507 0.5992360550912706 0.6408649011941911 0.5504539398019214 0.4935531765920934 0.13805809361251292 0.2869384374291237 0.47796081976004645 0.6305720374272978 0.1745491550099714 ...]
Word2VecTrainBatchOp 類是訓練的代碼實現,其linkFrom函數體現了程序的整體邏輯,其省略版代碼以下,具體後期咱們會一一詳述。
public Word2VecTrainBatchOp linkFrom(BatchOperator<?>... inputs) { BatchOperator<?> in = checkAndGetFirst(inputs); final int vectorSize = getVectorSize(); // 計算單詞出現次數 DataSet <Row> wordCnt = WordCountUtil .splitDocAndCount(in, getSelectedCol(), getWordDelimiter()) .filter("cnt >= " + String.valueOf(getMinCount())) .getDataSet(); // 根據詞頻對單詞進行排序 DataSet <Row> sorted = sortedIndexVocab(wordCnt); // 計算排序以後單詞數目 DataSet <Long> vocSize = DataSetUtils .countElementsPerPartition(sorted) .sum(1) .map(new MapFunction <Tuple2 <Integer, Long>, Long>() { @Override public Long map(Tuple2 <Integer, Long> value) throws Exception { return value.f1; } }); // 創建字典和二叉樹 DataSet <Tuple3 <Integer, String, Word>> vocab = sorted .reduceGroup(new CreateVocab()) .withBroadcastSet(vocSize, "vocSize") .rebalance(); // 再次分割單詞 DataSet <String[]> split = in .select("`" + getSelectedCol() + "`") .getDataSet() .flatMap(new WordCountUtil.WordSpliter(getWordDelimiter())) .rebalance(); // 生成訓練數據 DataSet <int[]> trainData = encodeContent(split, vocab) .rebalance(); final long seed = System.currentTimeMillis(); // 獲取精簡詞典 DataSet <Tuple2 <Integer, Word>> vocabWithoutWordStr = vocab .map(new UseVocabWithoutWordString()); // 初始化模型 DataSet <Tuple2 <Integer, double[]>> initialModel = vocabWithoutWordStr .mapPartition(new initialModel(seed, vectorSize)) .rebalance(); // 計算迭代次數 DataSet <Integer> syncNum = DataSetUtils .countElementsPerPartition(trainData) .sum(1) .map(new MapFunction <Tuple2 <Integer, Long>, Integer>() { @Override public Integer map(Tuple2 <Integer, Long> value) throws Exception { return Math.max((int) (value.f1 / 100000L), 5); } }); // 迭代訓練 DataSet <Row> model = new IterativeComQueue() .initWithPartitionedData("trainData", trainData) .initWithBroadcastData("vocSize", vocSize) .initWithBroadcastData("initialModel", initialModel) .initWithBroadcastData("vocabWithoutWordStr", vocabWithoutWordStr) .initWithBroadcastData("syncNum", syncNum) .add(new InitialVocabAndBuffer(getParams())) .add(new UpdateModel(getParams())) .add(new AllReduce("input")) .add(new AllReduce("output")) .add(new AvgInputOutput()) .setCompareCriterionOfNode0(new Criterion(getParams())) .closeWith(new SerializeModel(getParams())) .exec(); // 輸出模型 model = model .map(new MapFunction <Row, Tuple2 <Integer, DenseVector>>() { @Override public Tuple2 <Integer, DenseVector> map(Row value) throws Exception { return Tuple2.of((Integer) value.getField(0), (DenseVector) value.getField(1)); } }) .join(vocab) .where(0) .equalTo(0) .with(new JoinFunction <Tuple2 <Integer, DenseVector>, Tuple3 <Integer, String, Word>, Row>() { @Override public Row join(Tuple2 <Integer, DenseVector> first, Tuple3 <Integer, String, Word> second) throws Exception { return Row.of(second.f1, first.f1); } }) .mapPartition(new MapPartitionFunction <Row, Row>() { @Override public void mapPartition(Iterable <Row> values, Collector <Row> out) throws Exception { Word2VecModelDataConverter model = new Word2VecModelDataConverter(); model.modelRows = StreamSupport .stream(values.spliterator(), false) .collect(Collectors.toList()); model.save(model, out); } }); setOutput(model, new Word2VecModelDataConverter().getModelSchema()); return this; }
此部分是最複雜的,也是和 C 代碼 差別最大的地方。由於Alink須要考慮處理大規模輸入數據,因此進行了分佈式處理,而一旦分佈式處理,就會各類細節糾纏在一塊兒。
這部分代碼以下,具體又分爲兩個部分。
DataSet <Row> wordCnt = WordCountUtil .splitDocAndCount(in, getSelectedCol(), getWordDelimiter()) .filter("cnt >= " + String.valueOf(getMinCount())) .getDataSet();
此處邏輯相對清晰,就是 分割單詞 splitDoc, 而後計數 count。
public static BatchOperator<?> splitDocAndCount(BatchOperator<?> input, String docColName, String wordDelimiter) { return count(splitDoc(input, docColName, wordDelimiter), WORD_COL_NAME, COUNT_COL_NAME); }
分割單詞使用 DocWordSplitCount 這個UDTF。
public static BatchOperator splitDoc(BatchOperator<?> input, String docColName, String wordDelimiter) { return input.udtf( docColName, new String[] {WORD_COL_NAME, COUNT_COL_NAME}, new DocWordSplitCount(wordDelimiter), new String[] {} ); }
DocWordSplitCount的功能就是分割單詞,計數。
public class DocWordSplitCount extends TableFunction <Row> { private String delimiter; public DocWordSplitCount(String delimiter) { this.delimiter = delimiter; } public void eval(String content) { String[] words = content.split(this.delimiter); // 分割單詞 HashMap <String, Long> map = new HashMap <>(0); for (String word : words) { if (word.length() > 0) { map.merge(word, 1L, Long::sum); // 計數 } } for (Map.Entry <String, Long> entry : map.entrySet()) { collect(Row.of(entry.getKey(), entry.getValue())); // 發送二元組<單詞,個數> } } } // runtime時候,變量以下: content = "老王 是 咱們 團隊 裏 最胖 的" words = {String[7]@10021} 0 = "老王" 1 = "是" 2 = "咱們" 3 = "團隊" 4 = "裏" 5 = "最胖" 6 = "的" map = {HashMap@10024} size = 7 "最胖" -> {Long@10043} 1 "的" -> {Long@10043} 1 "裏" -> {Long@10043} 1 "老王" -> {Long@10043} 1 "團隊" -> {Long@10043} 1 "咱們" -> {Long@10043} 1 "是" -> {Long@10043} 1
此處會把分佈式計算出來的 二元組<單詞,個數> 作 groupBy,這樣就獲得了最終的 單詞出現次數。其中 Flink 的groupBy起到了關鍵做用,你們有興趣能夠閱讀 [ 源碼解析] Flink的groupBy和reduce究竟作了什麼 。
public static BatchOperator count(BatchOperator input, String wordColName) { return count(input, wordColName, null); } public static BatchOperator count(BatchOperator input, String wordColName, String wordValueColName) { if (null == wordValueColName) { return input.groupBy(wordColName, wordColName + " AS " + WORD_COL_NAME + ", COUNT(" + wordColName + ") AS " + COUNT_COL_NAME); } else { return input.groupBy(wordColName, wordColName + " AS " + WORD_COL_NAME + ", SUM(" + wordValueColName + ") AS " + COUNT_COL_NAME); } }
若是單詞出現次數太少,就沒有加入字典的必要,因此須要過濾。
Word2VecTrainBatchOp 須要實現配置參數 Word2VecTrainParams,具體以下:
public interface Word2VecTrainParams<T> extends HasNumIterDefaultAs1<T>, HasSelectedCol <T>, HasVectorSizeDv100 <T>, HasAlpha <T>, HasWordDelimiter <T>, HasMinCount <T>, HasRandomWindow <T>, HasWindow <T> { }
其中 HasMinCount 就是用來配置低頻單詞的閾值。
public interface HasMinCount<T> extends WithParams<T> { ParamInfo <Integer> MIN_COUNT = ParamInfoFactory .createParamInfo("minCount", Integer.class) .setDescription("minimum count of word") .setHasDefaultValue(5) .build(); default Integer getMinCount() { return get(MIN_COUNT); } default T setMinCount(Integer value) { return set(MIN_COUNT, value); } }
在實例代碼中有以下,就是設置最低閾值是 1,這是由於咱們的輸入不多,不會過濾低頻詞。若是詞彙量多,能夠設置爲 5。
.setMinCount(1);
咱們再取出使用代碼.
DataSet <Row> wordCnt = WordCountUtil .splitDocAndCount(in, getSelectedCol(), getWordDelimiter()) .filter("cnt >= " + String.valueOf(getMinCount())) .getDataSet();
能夠看到,.filter("cnt >= " + String.valueOf(getMinCount()))
這部分是過濾。這是簡單的SQL用法。
而後會返回 DataSet
過濾低頻單詞以後,會對獲得的單詞進行排序。
DataSet <Row> sorted = sortedIndexVocab(wordCnt);
此處比較艱深晦澀,須要仔細梳理,大體邏輯是:
SortUtils.pSort
對<單詞,頻次> 進行大規模並行排序;sorted.f0.partitionCustom
, 由於上一步返回值的 f0 是 <partition id, Row> ,得倒數據集 partitioned。countElementsPerPartition(partitioned)
; 得倒 Tuple2
; 得倒的結果數據集 cnt 會廣播出來,下一步計算時候會用到;DataSet
;注1,pSort 能夠參見 Alink漫談(六) : TF-IDF算法的實現。SortUtils.pSort是大規模並行排序。pSort返回值是:
@return f0: dataset which is indexed by partition id, f1: dataset which has partition id and count.
。
具體實現以下:
private static DataSet <Row> sortedIndexVocab(DataSet <Row> vocab) { final int sortIdx = 1; Tuple2 <DataSet <Tuple2 <Integer, Row>>, DataSet <Tuple2 <Integer, Long>>> sorted = SortUtils.pSort(vocab, sortIdx); // 進行大規模並行排序 DataSet <Tuple2 <Integer, Row>> partitioned = sorted.f0.partitionCustom(new Partitioner <Integer>() { @Override public int partition(Integer key, int numPartitions) { return key; // 利用分區 idx 進行分區 } }, 0); DataSet <Tuple2 <Integer, Long>> cnt = DataSetUtils.countElementsPerPartition(partitioned); return partitioned.mapPartition(new RichMapPartitionFunction <Tuple2 <Integer, Row>, Row>() { int start; int curLen; int total; @Override public void open(Configuration parameters) throws Exception { List <Tuple2 <Integer, Long>> cnts = getRuntimeContext().getBroadcastVariable("cnt"); int taskId = getRuntimeContext().getIndexOfThisSubtask(); start = 0; curLen = 0; total = 0; for (Tuple2 <Integer, Long> val : cnts) { if (val.f0 < taskId) { start += val.f1; // 本區單詞起始位置 } if (val.f0 == taskId) { // 只計算本分區對應的記錄,由於 f0 是分區idx curLen = val.f1.intValue(); // 本區單詞數目curLen } total += val.f1.intValue(); // 得倒 本分區內 全部單詞的總數total } // runtime 打印以下 val = {Tuple2@10585} "(7,0)" f0 = {Integer@10586} 7 f1 = {Long@10587} 0 } @Override public void mapPartition(Iterable <Tuple2 <Integer, Row>> values, Collector <Row> out) throws Exception { Row[] all = new Row[curLen]; int i = 0; for (Tuple2 <Integer, Row> val : values) { all[i++] = val.f1; // 得倒全部的單詞 } Arrays.sort(all, (o1, o2) -> (int) ((Long) o1.getField(sortIdx) - (Long) o2.getField(sortIdx))); // 排序 i = start; for (Row row : all) { // 歸併 & 發送 out.collect(RowUtil.merge(row, -(i - total + 1))); ++i; } // runtime時的變量以下: all = {Row[2]@10655} 0 = {Row@13346} "咱們,1" 1 = {Row@13347} "裏,1" i = 0 total = 10 start = 0 } }).withBroadcastSet(cnt, "cnt"); // 廣播進來的變量 }
此處是計算排序後每一個分區的單詞數目,相對邏輯簡單,其結果數據集 會廣播出來給下一步使用。
DataSet <Long> vocSize = DataSetUtils // vocSize是詞彙的個數 .countElementsPerPartition(sorted) .sum(1) // 累計第一個key .map(new MapFunction <Tuple2 <Integer, Long>, Long>() { @Override public Long map(Tuple2 <Integer, Long> value) throws Exception { return value.f1; } });
本部分會利用上兩步得倒的結果:"排序好的單詞"&"每一個分區的單詞數目" 來創建 詞典 和 二叉樹。
DataSet <Tuple3 <Integer, String, Word>> vocab = sorted // 排序後的單詞數據集 .reduceGroup(new CreateVocab()) .withBroadcastSet(vocSize, "vocSize") // 廣播上一步產生的結果集 .rebalance();
CreateVocab 完成了具體工做,結果集是:Tuple3<單詞在詞典的idx,單詞,單詞在詞典中對應的元素>。
private static class CreateVocab extends RichGroupReduceFunction <Row, Tuple3 <Integer, String, Word>> { int vocSize; @Override public void open(Configuration parameters) throws Exception { vocSize = getRuntimeContext().getBroadcastVariableWithInitializer("vocSize", new BroadcastVariableInitializer <Long, Integer>() { @Override public Integer initializeBroadcastVariable(Iterable <Long> data) { return data.iterator().next().intValue(); } }); } @Override public void reduce(Iterable <Row> values, Collector <Tuple3 <Integer, String, Word>> out) throws Exception { String[] words = new String[vocSize]; Word[] vocab = new Word[vocSize]; // 創建詞典 for (Row row : values) { Word word = new Word(); word.cnt = (long) row.getField(1); vocab[(int) row.getField(2)] = word; words[(int) row.getField(2)] = (String) row.getField(0); } // runtime變量以下 words = {String[10]@10606} 0 = "胖" 1 = "的" 2 = "是" 3 = "團隊" 4 = "老王" 5 = "第二" 6 = "最胖" 7 = "老黃" 8 = "裏" 9 = "咱們" // 創建二叉樹,創建過程當中會更新詞典內容 createBinaryTree(vocab); // runtime變量以下 vocab = {Word2VecTrainBatchOp$Word[10]@10669} 0 = {Word2VecTrainBatchOp$Word@13372} cnt = 5 point = {int[2]@13382} 0 = 8 1 = 7 code = {int[2]@13383} 0 = 1 1 = 1 1 = {Word2VecTrainBatchOp$Word@13373} cnt = 2 point = {int[3]@13384} 0 = 8 1 = 7 2 = 5 code = {int[3]@13385} 0 = 1 1 = 0 2 = 1 for (int i = 0; i < vocab.length; ++i) { // 結果集是:Tuple3<單詞在詞典的idx,單詞,單詞對應的詞典元素> out.collect(Tuple3.of(i, words[i], vocab[i])); } } }
詞典的數據結構以下:
private static class Word implements Serializable { public long cnt; // 詞頻,左右兩個輸入節點的詞頻之和 public int[] point; //在樹中的節點序列, 即從根結點到葉子節點的路徑 public int[] code; //霍夫曼碼, HuffmanCode }
一個容易混淆的地方:
好比vocab[word].point[0]確定是Root結點,而 vocab[word].code[0]確定是Root結點走到下一個點的編碼。
這裏基於語料訓練樣本創建霍夫曼樹(基於詞頻)。
Alink這裏基本就是c語言的java實現。可能不少兄弟還不熟悉,因此須要講解下。
Word2vec 利用數組下標的移動就完成了構建、編碼。它最重要的是隻用了parent這個數組來標記生成的Parent結點( 範圍 VocabSize,VocabSize∗2−2 )。最後對Parent結點減去VocabSize,獲得從0開始的Point路徑數組。
基本套路是:
private static void createBinaryTree(Word[] vocab) { int vocabSize = vocab.length; int[] point = new int[MAX_CODE_LENGTH]; int[] code = new int[MAX_CODE_LENGTH]; // 首先定義了3個長度爲vocab_size*2+1的數組 // count數組中前vocab_size存儲的是每一個詞的相應的詞頻。後面初始化的是很是大的數,已知詞庫中的詞是依照降序排列的。 long[] count = new long[vocabSize * 2 - 1]; int[] binary = new int[vocabSize * 2 - 1]; int[] parent = new int[vocabSize * 2 - 1]; // 前半部分初始化爲每一個詞出現的次數 for (int i = 0; i < vocabSize; ++i) { count[i] = vocab[i].cnt; } // 後半部分初始化爲一個固定的常數 Arrays.fill(count, vocabSize, vocabSize * 2 - 1, Integer.MAX_VALUE); // pos1, pos2 能夠理解爲 下一步 將要構建的左右兩個節點 // min1i, min2i 是當前正在構建的左右兩個節點 int min1i, min2i, pos1, pos2; pos1 = vocabSize - 1; // pos1指向前半截的尾部 pos2 = vocabSize; // pos2指向後半截的開始 // 每次增長一個節點,構建Huffman樹 for (int a = 0; a < vocabSize - 1; ++a) { // First, find two smallest nodes 'min1, min2' // 選擇最小的節點min1 // 根據pos1, pos2找到目前的 左 min1i 的位置,而且調整下一次的pos1, pos2 if (pos1 >= 0) { if (count[pos1] < count[pos2]) { min1i = pos1; pos1--; } else { min1i = pos2; pos2++; } } else { min1i = pos2; pos2++; } // 選擇最小的節點min2 // 根據上一步調整的pos1, pos2找到目前的 右 min2i 的位置,而且調整下一次的pos1, pos2 if (pos1 >= 0) { if (count[pos1] < count[pos2]) { min2i = pos1; pos1--; } else { min2i = pos2; pos2++; } } else { min2i = pos2; pos2++; } // 新生成的節點的機率是兩個輸入節點的機率之和,其左右子節點即爲輸入的兩個節點。值得注意的是,新生成的節點確定不是葉節點,而非葉結點的value值是中間向量,初始化爲零向量。 count[vocabSize + a] = count[min1i] + count[min2i]; parent[min1i] = vocabSize + a; // 設置父節點 parent[min2i] = vocabSize + a; binary[min2i] = 1; // 設置一個子樹的編碼爲1 } // runtime變量以下: binary = {int[19]@13405} 0 = 1 1 = 1 2 = 0 3 = 0 4 = 1 5 = 0 6 = 1 7 = 0 8 = 1 9 = 0 10 = 1 11 = 0 12 = 1 13 = 0 14 = 1 15 = 0 16 = 0 17 = 1 18 = 0 parent = {int[19]@13406} 0 = 17 1 = 15 2 = 15 3 = 13 4 = 12 5 = 12 6 = 11 7 = 11 8 = 10 9 = 10 10 = 13 11 = 14 12 = 14 13 = 16 14 = 16 15 = 17 16 = 18 17 = 18 18 = 0 count = {long[19]@13374} 0 = 5 1 = 2 2 = 2 3 = 1 4 = 1 5 = 1 6 = 1 7 = 1 8 = 1 9 = 1 10 = 2 11 = 2 12 = 2 13 = 3 14 = 4 15 = 4 16 = 7 17 = 9 18 = 16 // Now assign binary code to each vocabulary word // 生成Huffman碼,即找到每個字的code,和對應的在樹中的節點序列,在生成Huffman編碼的過程當中。針對每一個詞(詞都在葉子節點上),從葉子節點開始。將編碼存入到code數組中,如對於上圖中的「R」節點來講。其code數組爲{1,0}。再對其反轉即是Huffman編碼: for (int a = 0; a < vocabSize; ++a) { // 爲每個詞分配二進制編碼,即Huffman編碼 int b = a; int i = 0; do { code[i] = binary[b]; // 找到當前的節點的編碼 point[i] = b; // 記錄從葉子節點到根結點的序列 i++; b = parent[b]; // 找到當前節點的父節點 } while (b != vocabSize * 2 - 2); // 已經找到了根結點,根節點是沒有編碼的 vocab[a].code = new int[i]; for (b = 0; b < i; ++b) { vocab[a].code[i - b - 1] = code[b]; // 編碼的反轉 } vocab[a].point = new int[i]; vocab[a].point[0] = vocabSize - 2; for (b = 1; b < i; ++b) { vocab[a].point[i - b] = point[b] - vocabSize; // 記錄的是從根結點到葉子節點的路徑 } } }
最終二叉樹結果以下:
vocab = {Word2VecTrainBatchOp$Word[10]@10608} 0 = {Word2VecTrainBatchOp$Word@13314} cnt = 5 point = {int[2]@13329} 0 = 8 1 = 7 code = {int[2]@13330} 0 = 1 1 = 1 1 = {Word2VecTrainBatchOp$Word@13320} cnt = 2 point = {int[3]@13331} 0 = 8 1 = 7 2 = 5 code = {int[3]@13332} 0 = 1 1 = 0 2 = 1 2 = {Word2VecTrainBatchOp$Word@13321} 3 = {Word2VecTrainBatchOp$Word@13322} ...... 9 = {Word2VecTrainBatchOp$Word@13328}
此處會再次對原始輸入作單詞分割,這裏總感受是能夠把此步驟和前面步驟放在一塊兒作優化。
DataSet <String[]> split = in .select("`" + getSelectedCol() + "`") .getDataSet() .flatMap(new WordCountUtil.WordSpliter(getWordDelimiter())) .rebalance();
生成訓練數據代碼以下,此處也比較晦澀。
DataSet <int[]> trainData = encodeContent(split, vocab).rebalance();
最終目的是,把每一個句子都翻譯成了一個詞典idx的序列,好比:
原始輸入 : "老王 是 咱們 團隊 裏 最胖 的"
編碼以後 : 「4,1,9,3,8,6,2」 , 這裏每一個數字是 本句子中每一個單詞在詞典中的序列號。
encodeContent 的輸入是:
流程邏輯以下:
content.mapPartition
,獲得數據集 Tuple4 <>(taskId, localCnt, i, val[i])
,分別是 Tuple4 <>(taskId, 本分區句子數目, 本單詞在本句子中的idx, 本單詞)
,因此此處發送的核心是單詞。Flink coGroup
功能完成了雙流匹配合並功能,將單詞流和詞典篩選合併(where(3).equalTo(1))
,其中上步處理中,f3是word,vocab.f1 是word,因此就是在兩個流中找到相同的單詞而後作操做。得倒 Tuple4.of(tuple.f0, tuple.f1, tuple.f2, row.getField(0)))
,即 結果集是 Tuple4 <taskId, 本分區句子數目, 本單詞在本句子中的idx,單詞在詞典的idx>
。groupBy(0, 1).reduceGroup
,而後排序(根據本單詞在本句子中的idx來排序);結果集是 DataSet <int[]>
,即返回 「本單詞在詞典的idx」,好比 [4,1,9,3,8,6,2]
。就是本句子中每一個單詞在詞典中的序列號。具體代碼以下:
private static DataSet <int[]> encodeContent( DataSet <String[]> content, DataSet <Tuple3 <Integer, String, Word>> vocab) { return content .mapPartition(new RichMapPartitionFunction <String[], Tuple4 <Integer, Long, Integer, String>>() { @Override public void mapPartition(Iterable <String[]> values, Collector <Tuple4 <Integer, Long, Integer, String>> out) throws Exception { int taskId = getRuntimeContext().getIndexOfThisSubtask(); long localCnt = 0L; for (String[] val : values) { if (val == null || val.length == 0) { continue; } for (int i = 0; i < val.length; ++i) { // 核心是發送單詞 out.collect(new Tuple4 <>(taskId, localCnt, i, val[i])); } ++localCnt; // 這裏注意,發送時候 localCnt 尚未更新 // runtime 的數據以下: val = {String[7]@10008} 0 = "老王" 1 = "是" 2 = "咱們" 3 = "團隊" 4 = "裏" 5 = "最胖" 6 = "的" } } }).coGroup(vocab) .where(3) // 上步處理中,f3是word .equalTo(1) // vocab.f1 是word .with(new CoGroupFunction <Tuple4 <Integer, Long, Integer, String>, Tuple3 <Integer, String, Word>, Tuple4 <Integer, Long, Integer, Integer>>() { @Override public void coGroup(Iterable <Tuple4 <Integer, Long, Integer, String>> first, Iterable <Tuple3 <Integer, String, Word>> second, Collector <Tuple4 <Integer, Long, Integer, Integer>> out) { for (Tuple3 <Integer, String, Word> row : second) { for (Tuple4 <Integer, Long, Integer, String> tuple : first) { out.collect( Tuple4.of(tuple.f0, tuple.f1, tuple.f2, row.getField(0))); // 將單詞和詞典篩選合併, 返回 <taskId, 本分區句子數目, 本單詞在本句子中的idx,單詞在詞典的idx> // runtime的變量是: row = {Tuple3@10640} // Tuple3<單詞在詞典的idx,單詞,單詞在詞典中對應的元素> f0 = {Integer@10650} 7 f1 = "老黃" f2 = {Word2VecTrainBatchOp$Word@10652} tuple = {Tuple4@10641} // (taskId, 本分區句子數目, 本單詞在本句子中的idx, 本單詞) f0 = {Integer@10642} 1 f1 = {Long@10643} 0 f2 = {Integer@10644} 0 f3 = "老黃" } } } }).groupBy(0, 1) // 分組排序 .reduceGroup(new GroupReduceFunction <Tuple4 <Integer, Long, Integer, Integer>, int[]>() { @Override public void reduce(Iterable <Tuple4 <Integer, Long, Integer, Integer>> values, Collector <int[]> out) { ArrayList <Tuple2 <Integer, Integer>> elements = new ArrayList <>(); for (Tuple4 <Integer, Long, Integer, Integer> val : values) { // 獲得 (本單詞在本句子中的idx, 本單詞在詞典的idx) elements.add(Tuple2.of(val.f2, val.f3)); } // runtime變量以下: val = {Tuple4@10732} "(2,0,0,0)" // <taskId, 本分區句子數目, 本單詞在本句子中的idx,單詞在詞典的idx> f0 = {Integer@10737} 2 f1 = {Long@10738} 0 f2 = {Integer@10733} 0 f3 = {Integer@10733} 0 elements = {ArrayList@10797} size = 7 0 = {Tuple2@10803} "(0,4)" 1 = {Tuple2@10804} "(1,1)" 2 = {Tuple2@10805} "(2,9)" 3 = {Tuple2@10806} "(3,3)" 4 = {Tuple2@10807} "(4,8)" 5 = {Tuple2@10808} "(5,6)" 6 = {Tuple2@10809} "(6,2)" Collections.sort(elements, new Comparator <Tuple2 <Integer, Integer>>() { @Override public int compare(Tuple2 <Integer, Integer> o1, Tuple2 <Integer, Integer> o2) { return o1.f0.compareTo(o2.f0); } }); int[] ret = new int[elements.size()]; for (int i = 0; i < elements.size(); ++i) { ret[i] = elements.get(i).f1; // 返回 "本單詞在詞典的idx" } // runtime變量以下: ret = {int[7]@10799} 0 = 4 1 = 1 2 = 9 3 = 3 4 = 8 5 = 6 6 = 2 out.collect(ret); } }); }
這裏使用了 Flink coGroup 功能完成了雙流匹配合並功能。coGroup 和 Join 的區別是:
在 coGroup 的 CoGroupFunction 中,想輸出什麼形式的元素都行,徹底看使用者的具體實現。
到了這一步,已經把每一個句子都翻譯成了一個詞典idx的序列,好比:
原始輸入 : "老王 是 咱們 團隊 裏 最胖 的"
編碼以後 : 「4,1,9,3,8,6,2」 , 這裏每一個數字是 本句子中每一個單詞在詞典中的序列號。
接下來Alink換了一條路,精簡詞典, 就是去掉單詞原始文字。
DataSet <Tuple2 <Integer, Word>> vocabWithoutWordStr = vocab .map(new UseVocabWithoutWordString());
原始詞典是 Tuple3<單詞在詞典的idx,單詞,單詞在詞典中對應的元素>
:
"(1,的,com.alibaba.alink.operator.batch.nlp.Word2VecTrainBatchOp$Word@13099fc)"
精簡以後的詞典是 Tuple2<單詞在詞典的idx,單詞在詞典中對應的元素>
:
"(1, com.alibaba.alink.operator.batch.nlp.Word2VecTrainBatchOp$Word@13099fc)"
代碼以下:
private static class UseVocabWithoutWordString implements MapFunction <Tuple3 <Integer, String, Word>, Tuple2 <Integer, Word>> { @Override public Tuple2 <Integer, Word> map(Tuple3 <Integer, String, Word> value) throws Exception { return Tuple2.of(value.f0, value.f2); // 去掉單詞原始文字 f1 } } // runtime變量以下: value = {Tuple3@10692} "(1,的,com.alibaba.alink.operator.batch.nlp.Word2VecTrainBatchOp$Word@13099fc)" f0 = {Integer@10693} 1 value = 1 f1 = "的" value = {char[1]@10700} hash = 0 f2 = {Word2VecTrainBatchOp$Word@10694} cnt = 2 point = {int[3]@10698} 0 = 8 1 = 7 2 = 5 code = {int[3]@10699} 0 = 1 1 = 0 2 = 1
用精簡後的詞典初始化模型,即隨機初始化全部的模型權重參數θ,全部的詞向量w。
DataSet <Tuple2 <Integer, double[]>> initialModel = vocabWithoutWordStr .mapPartition(new initialModel(seed, vectorSize)) .rebalance();
如今詞典是:Tuple2<每一個單詞在詞典的idx,每一個單詞在詞典中對應的元素>,這裏只用到了 idx。
最後初始化的模型是 :<每一個單詞在詞典中的idx,隨機初始化的權重係數>。權重大小默認是 100。
具體代碼是
private static class initialModel extends RichMapPartitionFunction <Tuple2 <Integer, Word>, Tuple2 <Integer, double[]>> { private final long seed; private final int vectorSize; Random random; public initialModel(long seed, int vectorSize) { this.seed = seed; this.vectorSize = vectorSize; random = new Random(); } @Override public void open(Configuration parameters) throws Exception { random.setSeed(seed + getRuntimeContext().getIndexOfThisSubtask()); } @Override public void mapPartition(Iterable <Tuple2 <Integer, Word>> values, Collector <Tuple2 <Integer, double[]>> out) throws Exception { for (Tuple2 <Integer, Word> val : values) { double[] inBuf = new double[vectorSize]; for (int i = 0; i < vectorSize; ++i) { inBuf[i] = random.nextFloat(); } // 發送 <每一個單詞在詞典中的idx,隨機初始化的係數> out.collect(Tuple2.of(val.f0, inBuf)); } } }
如今計算迭代訓練的次數,就是 "訓練語料中全部單詞數目 / 100000L" 和 5 之間的最大值。
DataSet <Integer> syncNum = DataSetUtils .countElementsPerPartition(trainData) .sum(1) .map(new MapFunction <Tuple2 <Integer, Long>, Integer>() { @Override public Integer map(Tuple2 <Integer, Long> value) throws Exception { return Math.max((int) (value.f1 / 100000L), 5); } });
至此,完成了預處理節點:對輸入的處理,以及詞典、二叉樹的創建。下一步就是要開始訓練。
word2vec原理(二) 基於Hierarchical Softmax的模型
word2vec原理(一) CBOW與Skip-Gram模型基礎
word2vec原理(三) 基於Negative Sampling的模型