機器翻譯和語音識別是最先開展的兩項人工智能研究。今天也取得了最顯著的商業成果。
早先的機器翻譯實際脫胎於電子詞典,能力更擅長於詞或者短語的翻譯。那時候的翻譯一般會將一句話打斷爲一系列的片斷,隨後經過複雜的程序邏輯對每個片斷進行翻譯,最終組合在一塊兒。所獲得的翻譯結果應當說似是而非,最大的問題是可讀性和連貫性很是差。
實際從機器學習的觀點來說,這種翻譯方式,也不符合人類在作語言翻譯時所作的動做。其實以神經網絡爲表明的機器學習,更多的都是在「模仿」人類的行爲習慣。
一名職業翻譯一般是這樣作:首先完整聽懂要翻譯的語句,將語義充分理解,隨後把理解到的內容,用目標語言複述出來。
而如今的機器翻譯,也正是這樣作的,谷歌的seq2seq是這一模式的開創者。
若是用計算機科學的語言來講,這一過程很像一個編解碼過程。原始的語句進入編碼器,獲得一組用於表明原始語句「內涵」的數組。這些數組中的數字就是原始語句所表明的含義,只是這個含義人類沒法讀懂,是須要由神經網絡模型去理解的。隨後解碼過程,將「有含義的數字」解碼爲對應的目標語言。從而完成整個翻譯過程。這樣的獲得的翻譯結果,很是流暢,具備更好的可讀性。
(圖片來自谷歌NMT文檔)python
注意力機制是人類特有的大腦思惟方式,好比看到下面這幅照片:
(圖片來自互聯網)
照片的內容實際不少,甚至若是從數學上說,背景樹林的複雜度要高於前景。但看到照片的人,都會先注意到迎面而來的飛盤,隨後是投擲者,接着是圖像右側的小孩子。其它的信息都被忽略了。
這是人類在上萬年的進化中所造成的本能。對於快速向本身移動的物體首先會看到、識別危險、而且快速應對。接着是可能對本身形成威脅的同類或者生物。爲了作到集註,不得不忽略看起來可有可無的東西。
在機器學習中引入注意力模型,在圖像處理、機器翻譯、策略博弈等各個領域中都有應用。這裏的注意力機制有兩個做用:一是下降模型的複雜度或者計算量,把主要資源分配給更重要的內容。二是對應把最相關的輸入導出到相關的輸出,更有針對性的獲得結果。git
在機器翻譯領域,前面咱們已經肯定和解釋了編碼、解碼模型。那麼第二點的輸入輸出相關性就顯得更重要。
咱們舉例來講明:好比英文「I love you」,翻譯爲中文是「我愛你」。在一個編碼解碼模型中,首先由編碼器處理「I love you」,從而獲得中間語義,好比咱們稱爲C:github
C = Encoder("I love you")
解碼的時候,若是沒有注意力機制,那序列輸出則是:算法
"我" = Decoder(C) "愛" = Decoder(C) "你" = Decoder(C)
由於C至關於「I love you」三個單詞共同的做用。那麼解碼的時候,每個字的輸出,都至關於3個單詞共同做用的結果。這顯然是不合理的,並且也不大可能獲得一個理想、順暢的結果。
一個理想的解碼模型應當相似這樣的方式:後端
"我" = Decoder(C+"I") "愛" = Decoder(C+"love") "你" = Decoder(C+"you")
固然,機器學習不是人。人經過大量的學習、經驗的積累,一眼就能看出來「I」對應翻譯成「我」,「love」翻譯成「愛」。機器不可能提早知道這一切,因此咱們比較切實的方法,只能是增長一套權重邏輯,在不一樣的翻譯處理中,對應不一樣的權重屬性。這就好像下面這樣的方式:api
"我" = Decoder(C+0.8x"I"+0.1x"love"+0.2x"you") "愛" = Decoder(C+0.1x"I"+0.7x"love"+0.1x"you") "你" = Decoder(C+0.2x"I"+0.1x"love"+0.8x"you")
沒錯了,這個權重值,好比翻譯「我」的時候的權重序列:(0.8,0.1,0.2),就是注意力機制。在翻譯某個目標單詞輸出的時候,經過注意力機制,模型集註在對應的某個輸入單詞。
固然,注意力機制還包含上面示意性的表達式沒有顯示出來的一個重要操做:結合解碼器的當前狀態、和編碼器輸入內容以後的狀態,在每一次翻譯解碼操做中更新注意力的權重值。數組
回到上面的編解碼模型示意圖。編碼器、解碼器在咱們的機器學習中,實際都是神經網絡模型。那麼把上面的示意圖展開,一個沒有注意力機制的編碼、解碼翻譯模型是這個樣子:
(圖片來自谷歌NMT文檔)bash
隨後,咱們爲這個模型增長解碼時候的權重機制。模型在處理每一個單詞輸出的時候,會在權重的幫助下,把重點放在對應的輸入單詞上。示意圖以下:
(圖片來自谷歌NMT文檔)網絡
最終,結合權重生成的過程,成爲完整的注意力機制。注意力機制主要做用於解碼,在每個輸出步驟中都要從新計算注意力權重,並更新到解碼模型從而對輸出產生影響。模型的示意圖以下:
(圖片來自谷歌NMT文檔)
圖片中注意力權重的來源和去向箭頭,要注意看清楚,這對你下面閱讀實現的代碼會頗有幫助。app
前面的編解碼模型示意圖,還有模擬的表達式,固然都作了不少簡化。實際上中間還有不少工做要作,首先是翻譯樣本庫。
本例中使用http://www.manythings.org/anki/提供的英文對比西班牙文樣本庫,網站上還有不少其它語言的對比樣本能夠下載,有興趣的讀者不妨在作完這個練習後嘗試一下其它語言的機器翻譯。
這個樣本是文本格式,包含不少行,每一行都是一個完整的句子,包含英文和西班牙文兩部分,兩種文字之間使用製表符隔開,好比:
May I borrow this book? ¿Puedo tomar prestado este libro?
對於樣本庫,咱們要進行如下幾項預處理:
<start>
和結束標誌<end>
。看過《從鍋爐工到AI專家(10)》的話,你應當理解這種作法。通過訓練後,模型會根據這兩個標誌做爲翻譯的開始和結束。作完上面的處理後,剛纔的那行樣本看起來會是這個樣子:
<start> may i borrow this book ? <end> <start> ¿ puedo tomar prestado este libro ? <end>
注意標點符號也是語言的組成部分,每一個部分用空格隔開,都須要單獨數字化。因此你能看到,上面的兩行例句,標點符號以前也添加了空格。
一行句子數字化以後,編碼同單詞之間的對照關係可能相似下面的樣子:
Input Language; index to word mapping 1 ----> <start> 8 ----> no 38 ----> puedo 804 ----> confiar 20 ----> en 1000 ----> vosotras 3 ----> . 2 ----> <end> Target Language; index to word mapping 1 ----> <start> 4 ----> i 25 ----> can 12 ----> t 345 ----> trust 6 ----> you 3 ----> . 2 ----> <end>
你可能注意到了,「can't」中的單引號做爲不支持的字符被過濾掉了,不過你放心,這並不會影響模型的訓練。固然在一個完善的翻譯系統中,這樣的字符都應當單獨處理,本例中就忽略了。
本例中使用了編碼器、解碼器、注意力機制三個網絡模型,都繼承自keras.Model,屬於三個自定義的Keras模型。
三個模型共同組成了完整的翻譯模型。完整模型的組裝,是在訓練過程和翻譯(預測)過程當中,經過相應子程序把他們組裝在一塊兒的。這是由於它們三者之間的邏輯機制相對比較複雜。沒法用前面經常使用的keras.models.Sequential方法直接耦合在一塊兒。
自定義Keras模型在本系列中是第一次遇到,因此着重講一下。實現自定義模型有三個基本要求:
__init__
方法,用於實現類的初始化,同全部面向對象的語言同樣,這裏主要完成基類和類成員的初始化工做。自定義模型之因此有這些要求,主要是爲了自定義的模型,能夠跟Keras原生層同樣,互相兼容,支持多種模型的組合、互聯,從而共同造成更復雜的模型。
Encoder/Decoder主體都使用GRU網絡,讀起來應當比較容易理解。有須要的話,複習一下《從鍋爐工到AI專家(10)》。
注意力機制的BahdanauAttention模型就很使人費解了,困惑的關鍵在於其中的算法。算法的計算部分只有兩行代碼,代碼自己都知道是在作什麼,但徹底不明白組合在一塊兒是什麼功能以及爲何這樣作。其實閱讀由數學公式推導、轉換而來的程序代碼都有這種感受。因此如今不少的知識保護,根本不在於源代碼,而在於公式自己。沒有公式,不少源代碼很是難以讀懂。
這部分推薦閱讀Dzmitry Bahdanau的論文《Neural Machine Translation by Jointly Learning to Align and Translate》和以後Minh-Thang Luong改進的算法《Effective Approaches to Attention-based Neural Machine Translation》。論文中對於理論作了詳盡解釋,也有公式的推導過程。
這裏的BahdanauAttention模型實際就是公式的程序實現。若是精力不夠的話,死記公式也算一種學習方法。
咱們以往碰到的模型,訓練和預測基本都是一行代碼,幾乎沒有什麼須要解釋的。
今天的模型涉及了帶有注意力機制的自定義模型,主要的邏輯,是經過程序代碼,在訓練和評估子程序中把模型組合起來完成的。
程序若是隻是編碼器和解碼器串聯的邏輯,徹底能夠同之前同樣,一條keras.Sequential函數完成組裝,那就一點難度沒有了。而加上注意力機制,複雜度高了不少,也是最難理解的地方。作一個簡單的分析:
<start>
起始標誌開始,到<end>
標誌結束。預測時,沒有人知道這一句翻譯的結果是多少個單詞,就是逐個獲取Decoder的輸出,直到獲得一個<end>
標誌。attention_weights * values
,而後將結果跟前一個輸出詞一塊兒做爲Decoder的GRU輸入,values實際就是編碼器輸出enc_output。下面是完整的可執行源代碼,請參考註釋閱讀:
#!/usr/bin/env python3 from __future__ import absolute_import, division, print_function, unicode_literals import tensorflow as tf import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split import unicodedata import re import numpy as np import os import io import time import sys # 若是命令行增長了參數'train'則進入訓練模式,不然按照翻譯模式執行 TRAIN = False if len(sys.argv) == 2 and sys.argv[1] == 'train': TRAIN = True # 下載樣本集,下載後自動解壓。數據保存在路徑:~/.keras/datasets/ path_to_zip = tf.keras.utils.get_file( 'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip', extract=True) # 指向解壓後的樣本文件 path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt" # 將文本從unicode編碼轉換爲ascii編碼 def unicode_to_ascii(s): return ''.join( c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn') # 對全部的句子作預處理 def preprocess_sentence(w): w = unicode_to_ascii(w.lower().strip()) # 在單詞和標點之間增長空格 # 好比: "he is a boy." => "he is a boy ." # 參考: https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation w = re.sub(r"([?.!,¿])", r" \1 ", w) w = re.sub(r'[" "]+', " ", w) # 用空格替換掉除了大小寫字母和"."/ "?"/ "!"/ ","以外的字符 w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w) # 截斷兩端的空白 w = w.rstrip().strip() # 在句子兩端增長開始和結束標誌 # 這樣通過訓練後,模型知道何時開始和何時結束 w = '<start> ' + w + ' <end>' return w # 載入樣本集,對句子進行預處理 # 最終返回(英文,西班牙文)這樣的配對元組 def create_dataset(path, num_examples): lines = io.open(path, encoding='UTF-8').read().strip().split('\n') word_pairs = [[preprocess_sentence(w) for w in l.split('\t')] for l in lines[:num_examples]] return zip(*word_pairs) # 至此的輸出爲: # <start> go away ! <end> # <start> salga de aqui ! <end> # 這樣的形式。 # 獲取最長的句子長度 def max_length(tensor): return max(len(t) for t in tensor) # 將單詞數字化以後的數字<->單詞雙向對照表 def tokenize(lang): lang_tokenizer = tf.keras.preprocessing.text.Tokenizer( filters='') lang_tokenizer.fit_on_texts(lang) tensor = lang_tokenizer.texts_to_sequences(lang) tensor = tf.keras.preprocessing.sequence.pad_sequences( tensor, padding='post') return tensor, lang_tokenizer def load_dataset(path, num_examples=None): # 載入樣本,兩種語言分別保存到兩個數組 targ_lang, inp_lang = create_dataset(path, num_examples) # 把句子數字化,兩種語言是兩套對照編碼 input_tensor, inp_lang_tokenizer = tokenize(inp_lang) target_tensor, targ_lang_tokenizer = tokenize(targ_lang) return input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer # 訓練的樣本集數量,越大翻譯效果越好,但訓練耗時越長 num_examples = 80000 input_tensor, target_tensor, inp_lang, targ_lang = load_dataset(path_to_file, num_examples) # 至此,input_tensor/target_tensor 是數字化以後的樣本(數字數組) # inp_lang/targ_lang 是數字<->單詞編碼對照表 # 計算兩種語言中最長句子的長度 max_length_targ, max_length_inp = max_length(target_tensor), max_length(input_tensor) # 將樣本按照8:2分爲訓練集和驗證集 input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2) ############################################## BUFFER_SIZE = len(input_tensor_train) BATCH_SIZE = 64 steps_per_epoch = len(input_tensor_train)//BATCH_SIZE embedding_dim = 256 units = 1024 vocab_inp_size = len(inp_lang.word_index)+1 vocab_tar_size = len(targ_lang.word_index)+1 dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE) dataset = dataset.batch(BATCH_SIZE, drop_remainder=True) # 編碼器模型 class Encoder(tf.keras.Model): def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz): super(Encoder, self).__init__() self.batch_sz = batch_sz self.enc_units = enc_units self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) self.gru = tf.keras.layers.GRU( self.enc_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform') def call(self, x, hidden): x = self.embedding(x) output, state = self.gru(x, initial_state=hidden) return output, state def initialize_hidden_state(self): return tf.zeros((self.batch_sz, self.enc_units)) encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE) # 注意力模型 class BahdanauAttention(tf.keras.Model): def __init__(self, units): super(BahdanauAttention, self).__init__() self.W1 = tf.keras.layers.Dense(units) self.W2 = tf.keras.layers.Dense(units) self.V = tf.keras.layers.Dense(1) def call(self, query, values): # query爲上次的GRU隱藏層 # values爲編碼器的編碼結果enc_output hidden_with_time_axis = tf.expand_dims(query, 1) # 計算注意力權重值 score = self.V(tf.nn.tanh( self.W1(values) + self.W2(hidden_with_time_axis))) attention_weights = tf.nn.softmax(score, axis=1) # 使用注意力權重*編碼器輸出做爲返回值,未來會做爲解碼器的輸入 context_vector = attention_weights * values context_vector = tf.reduce_sum(context_vector, axis=1) return context_vector, attention_weights # 解碼器模型 class Decoder(tf.keras.Model): def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz): super(Decoder, self).__init__() self.batch_sz = batch_sz self.dec_units = dec_units self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) self.gru = tf.keras.layers.GRU( self.dec_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform') self.fc = tf.keras.layers.Dense(vocab_size) self.attention = BahdanauAttention(self.dec_units) def call(self, x, hidden, enc_output): # 使用上次的隱藏層(第一次使用編碼器隱藏層)、編碼器輸出計算注意力權重 context_vector, attention_weights = self.attention(hidden, enc_output) x = self.embedding(x) # 將上一循環的預測結果跟注意力權重值結合在一塊兒做爲本次的GRU網絡輸入 x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # state實際是GRU的隱藏層 output, state = self.gru(x) output = tf.reshape(output, (-1, output.shape[2])) x = self.fc(output) return x, state, attention_weights decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE) optimizer = tf.keras.optimizers.Adam() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) # 損失函數 def loss_function(real, pred): mask = tf.math.logical_not(tf.math.equal(real, 0)) loss_ = loss_object(real, pred) mask = tf.cast(mask, dtype=loss_.dtype) loss_ *= mask return tf.reduce_mean(loss_) # 保存中間訓練結果 checkpoint_dir = './training_checkpoints' checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt") checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder) # 一次訓練 @tf.function def train_step(inp, targ, enc_hidden): loss = 0 with tf.GradientTape() as tape: # 輸入源語言句子進行編碼 enc_output, enc_hidden = encoder(inp, enc_hidden) # 保留編碼器隱藏層用於第一次的注意力權重計算 dec_hidden = enc_hidden # 解碼器第一次的輸入一定是<start>,targ_lang.word_index['<start>']是轉換爲對應的數字編碼 dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1) # 循環整個目標句子(用於對比每一次解碼器輸出一樣本的對比) for t in range(1, targ.shape[1]): # 使用本單詞、隱藏層、編碼器輸出共同預測下一個單詞,同事保留本次的隱藏層做爲下一次輸入 predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output) # 計算損失值,最終的損失值是整個句子全部單詞損失值的合計 loss += loss_function(targ[:, t], predictions) # 在訓練時,每次解碼器的輸入並非上次解碼器的輸出,而是樣本目標語言對應單詞 # 這稱爲teach forcing dec_input = tf.expand_dims(targ[:, t], 1) # 全部單詞的平均損失值 batch_loss = (loss / int(targ.shape[1])) # 最終的訓練參量是編碼器和解碼的集合 variables = encoder.trainable_variables + decoder.trainable_variables # 根據代價值計算下一次的參量值 gradients = tape.gradient(loss, variables) # 將新的參量應用到模型 optimizer.apply_gradients(zip(gradients, variables)) return batch_loss def training(): EPOCHS = 10 for epoch in range(EPOCHS): start = time.time() # 初始化隱藏層和損失值 enc_hidden = encoder.initialize_hidden_state() total_loss = 0 # 一個批次的訓練 for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)): batch_loss = train_step(inp, targ, enc_hidden) total_loss += batch_loss # 每100次顯示一下模型損失值 if batch % 100 == 0: print('Epoch {} Batch {} Loss {:.4f}'.format( epoch + 1, batch, batch_loss.numpy())) # 每兩次迭代保存一次數據 if (epoch + 1) % 2 == 0: checkpoint.save(file_prefix=checkpoint_prefix) # 顯示每次迭代的損失值和消耗時間 print('Epoch {} Loss {:.4f}'.format(epoch + 1, total_loss / steps_per_epoch)) print('Time taken for 1 epoch {} sec\n'.format(time.time() - start)) # 根據命令行參數選擇本次是否進行訓練 if TRAIN: training() ################################################ # 評估(翻譯)一行句子 def evaluate(sentence): # 清空注意力圖 attention_plot = np.zeros((max_length_targ, max_length_inp)) # 句子預處理 sentence = preprocess_sentence(sentence) # 句子數字化 inputs = [inp_lang.word_index[i] for i in sentence.split(' ')] # 按照最長句子長度補齊 inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding='post') inputs = tf.convert_to_tensor(inputs) result = '' # 句子作編碼 hidden = [tf.zeros((1, units))] enc_out, enc_hidden = encoder(inputs, hidden) # 編碼器隱藏層做爲第一次解碼器的隱藏層值 dec_hidden = enc_hidden # 解碼第一個單詞必然是<start>,表示啓動解碼 dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0) # 假設翻譯結果不超過最長的樣本句子 for t in range(max_length_targ): # 逐個單詞翻譯 predictions, dec_hidden, attention_weights = decoder(dec_input, dec_hidden, enc_out) # 保留注意力權重用於繪製注意力圖 # 注意每次循環的每一個單詞注意力權重是不一樣的 attention_weights = tf.reshape(attention_weights, (-1, )) attention_plot[t] = attention_weights.numpy() # 獲得預測值 predicted_id = tf.argmax(predictions[0]).numpy() # 從數字查錶轉換爲對應單詞,累加到上一次結果,最終組成句子 result += targ_lang.index_word[predicted_id] + ' ' # 若是是<end>表示翻譯結束 if targ_lang.index_word[predicted_id] == '<end>': return result, sentence, attention_plot # 上次的預測值,將做爲下次解碼器的輸入 dec_input = tf.expand_dims([predicted_id], 0) # 若是超過樣本中最長的句子仍然沒有翻譯結束標誌,則返回當前全部翻譯結果 return result, sentence, attention_plot # 繪製注意力圖 def plot_attention(attention, sentence, predicted_sentence): fig = plt.figure(figsize=(10,10)) ax = fig.add_subplot(1, 1, 1) ax.matshow(attention, cmap='viridis') fontdict = {'fontsize': 14} ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90) ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict) plt.show() # 翻譯一句文本 def translate(sentence): result, sentence, attention_plot = evaluate(sentence) print('Input: %s' % (sentence)) print('Predicted translation: {}'.format(result)) attention_plot = attention_plot[:len(result.split(' ')), :len(sentence.split(' '))] plot_attention(attention_plot, sentence.split(' '), result.split(' ')) # 恢復保存的訓練結果 checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir)) # 測試如下翻譯 translate(u'hace mucho frio aqui.') translate(u'esta es mi vida.') translate(u'¿todavia estan en casa?') # 聽說這句話的翻譯結果不對,不懂西班牙文,不作評論 translate(u'trata de averiguarlo.')
第一次執行的時候要加參數tain:
$ ./translate_spa2en.py train Epoch 1 Batch 0 Loss 4.5296 Epoch 1 Batch 100 Loss 2.2811 Epoch 1 Batch 200 Loss 1.7985 Epoch 1 Batch 300 Loss 1.6724 Epoch 1 Loss 2.0235 Time taken for 1 epoch 149.3063322815 sec ...訓練過程略... Input: <start> hace mucho frio aqui . <end> Predicted translation: it s very cold here . <end> Input: <start> esta es mi vida . <end> Predicted translation: this is my life . <end> Input: <start> ¿ todavia estan en casa ? <end> Predicted translation: are you still at home ? <end> Input: <start> trata de averiguarlo . <end> Predicted translation: try to figure it out . <end>
之後若是隻是想測試翻譯效果,能夠不帶train參數執行,直接看翻譯結果。
對於每個翻譯句子,程序都會繪製注意力矩陣圖:
一般語法不是很複雜的句子,基本是順序對應關係,因此注意力亮點基本落在對角線上。
圖中X座標是西班牙文單詞,Y座標是英文單詞。每一個英文單詞,沿X軸看,亮點對應的X軸單詞,表示對於翻譯出這個英文單詞,是哪個西班牙文單詞權重最大。
(待續...)