Attention模型形象的比喻就是「圖像對焦」。git
上圖是Encoder-Decoder模型,Decoder中每一個單詞生成過程以下:數組
其中C是「語義編碼C」,f是Decoder的非線性變換函數。由此,咱們能夠看出生成目標句子的每一個單詞都使用同一個語義編碼C,即:源句子中的每一個單詞的影響力都是同樣的,這如同圖像沒有對焦的狀況,現實項目中也存在明顯的不合理。好比一個機器翻譯模型,輸入是「Tom chase Jerry」,模型輸出:「湯姆」,「追逐」,「傑瑞」。在翻譯「傑瑞」的時候顯然「Jerry」的貢獻值最大,若是每一個單詞的貢獻值相同明顯不合理。這個問題在輸入句子長度較短時問題不大,可是當輸入句子較長時會丟失不少細節信息(我的以爲此處相似平均池化和最大值池化)。正由於如此,咱們引入了Attention思想。app
使用Attention模型翻譯「傑瑞」的時候,咱們能夠獲得輸入句子中的每一個單詞對輸出當前單詞的貢獻值大小如:(Tom,0.3)(Chase,0.2) (Jerry,0.5)。這意味着生成每一個單詞yi時再也不使用同一個語義編碼C,而是根據yi使用不一樣的Ci。在引入Attention模型後yi的計算過程改變以下所示:函數
每一個Ci對應源句子中每一個單詞的注意力分配機率,示例以下:post
f2是Encoder對每一個單詞的變換函數,g函數表明整個源句子的中間語義表示的變換函數,通常形式是加權求和:優化
aji表明注意力分配係數,hj表明源句子中某個單詞的語義編碼,Lx表明源句子中單詞數量。g函數的計算過程以下圖所示:編碼
若是所示,當咱們要生成yi單詞,此時咱們用i-1時刻的隱藏節點輸出值Hi-1去和源句子中的每一個單詞對應RNN隱藏節點狀態hj依次進行對比,即:經過函數F(hj,Hi-1)來得到yi對源句子中每一個單詞對應的對齊可能性,函數F常見方法以下圖所示:lua
而後使用Softmax函數進行數值歸一化處理。如對「對齊機率」不理解的朋友,能夠查看下圖英語-德語翻譯系統中加入Attention機制後,Encoder和Decoder兩個句子中每一個單詞對應注意力分配機率分佈。spa
在Soft Attention模型中,Attention機制發生在Decoder中Yi和Encoder中的全部元素之間。Self Attention模型不是在二者之間,而是Decoder內部元素之間或者Encoder內部元素之間發生的Attention機制,計算方法和Soft Attention模型一致。那麼Self Attention模型有什麼好處?咱們依然以機器翻譯爲例:翻譯
如圖所示,Self Attention模型在內部能夠捕獲一些句法特徵或語義特徵。Self Attention模型相比傳統RNN模型須要依次序序列計算,它的感覺野更大,能夠直接將句子中的任意兩個單詞的聯繫經過一個計算步驟聯繫起來,能夠捕獲遠距離的相互依賴特徵(就像列表和數組的區別)。此外,Self Attention模型對於增長計算的並行性也有幫助。
咱們使用的語言數據集是「英語-西班牙語」,數據集樣本以下圖所示:
# 數據下載 path_to_zip=tf.keras.utils.get_file( fname='spa-eng.zip', origin='http://download.tensorflow.org/data/spa-eng.zip', # 解壓tar zip文件 extract=True ) path_to_file=os.path.dirname(path_to_zip)+'/spa-eng/spa.txt'
轉碼:
def unicode_to_ascii(sen): return ''.join( char for char in unicodedata.normalize('NFD',sen) if unicodedata.category(char) != 'Mn' )
def preprocess_sentence(w): w = unicode_to_ascii(w.lower().strip()) # 在單詞和標點之間建立空格 # 如: "he is a boy." => "he is a boy ." w = re.sub(r"([?.!,¿])", r" \1 ", w) w = re.sub(r'[" "]+', " ", w) # 特殊字符以空格代替 w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w) w = w.rstrip().strip() # 添加開始和結束標記 w = '<start> ' + w + ' <end>' return w
建立數據集:
def create_dataset(path, num_examples): lines = open(path, encoding='UTF-8').read().strip().split('\n') word_pairs = [[preprocess_sentence(w) for w in l.split('\t')] for l in lines[:num_examples]] # 返回格式:[ENGLISH, SPANISH] return word_pairs
字符轉ID,ID轉字符,並排序:
class LanguageIndex(): def __init__(self,lang): self.lang=lang self.wrod2idx={} self.id2word={} self.vacab=set() self.create_index() def create_index(self): for phrase in self.lang: # 添加到集合中,重複內容不添加 self.vacab.update(phrase.split(' ')) self.vacab=sorted(self.vacab) self.wrod2idx['<pad>']=0 #字符-ID轉換 for index,word in enumerate(self.vacab): self.wrod2idx[word]=index+1 for word,index in self.wrod2idx.items(): self.id2word[index]=word
加載數據集:
# 計算最大長度 def max_length(tensor): return max(len(t) for t in tensor)
def load_dataset(path,num_example): #get inputs outputs pairs=create_dataset(path,num_example) # 獲取ID表示 inp_lang=LanguageIndex(sp for en,sp in pairs) targ_lang=LanguageIndex(en for en,sp in pairs) # LanguageIndex 不包含重複值,如下包含重複值 input_tensor=[[inp_lang.wrod2idx[s]for s in sp.split(' ')]for en,sp in pairs] target_tensor=[[targ_lang.wrod2idx[s]for s in en.split(' ')]for en,sp in pairs] max_length_inp,max_length_tar=max_length(input_tensor),max_length(target_tensor) # 將句子補長到預設的最大長度 # padding: post:後補長,pre:前補長 input_tensor=tf.keras.preprocessing.sequence.pad_sequences( sequences=input_tensor, maxlen=max_length_inp, padding='post' ) target_tensor=tf.keras.preprocessing.sequence.pad_sequences( sequences=target_tensor, maxlen=max_length_tar, padding='post' ) return input_tensor,target_tensor,inp_lang,targ_lang,max_length_inp,max_length_tar
建立訓練集驗證集:
# 本次項目只使用前30000條數據 num_examples = 30000 input_tensor, target_tensor, inp_lang, targ_lang, max_length_inp, max_length_targ = load_dataset(path_to_file, num_examples)
# 訓練集80%,驗證集20% input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)
# 打亂數據集 BUFFER_SIZE=len(input_tensor_train) BATCH_SIZE=64 # 每一個epoch迭代次數 N_BATCH=BUFFER_SIZE // BATCH_SIZE # 詞嵌入維度 embedding_dim=256 # 隱藏神經元數量 units=1024 vocab_inp_size=len(inp_lang.wrod2idx) vocab_tar_size=len(targ_lang.wrod2idx) dataset=tf.data.Dataset.from_tensor_slices((input_tensor_train,target_tensor_train)).shuffle(BUFFER_SIZE) # drop_remainder 當剩餘數據量小於batch_size時候,是否丟棄 dataset=dataset.batch(BATCH_SIZE,drop_remainder='True')
文章開始咱們介紹了Attention模型的計算過程,相信你會很容易理解上圖的內容。對每一個節點具體方程實現以下:
FC=全鏈接層,EO=編碼器輸出,H=隱藏層狀態,X=解碼器輸入,模型計算過程以下表示:
GRU配置:
def gru(units): # 使用GPU加速運算 if tf.test.is_gpu_available(): return tf.keras.layers.CuDNNGRU(units, return_sequences=True, return_state=True, # 循環核的初始化方法 # glorot_uniform是sqrt(2 / (fan_in + fan_out))的正態分佈產生 # 其中fan_in和fan_out是權重張量的扇入扇出(即輸入和輸出單元數目) recurrent_initializer='glorot_uniform') else: return tf.keras.layers.GRU(units, return_sequences=True, return_state=True, # hard_sigmoid <= -1 輸出0,>=1 輸出1 ,中間爲線性 recurrent_activation='sigmoid', recurrent_initializer='glorot_uniform')
編碼器:
class Encoder(tf.keras.Model): def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz): super(Encoder, self).__init__() self.batch_sz = batch_sz self.enc_units = enc_units self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) self.gru = gru(self.enc_units) def call(self, x, hidden): x = self.embedding(x) output, state = self.gru(x, initial_state = hidden) return output, state def initialize_hidden_state(self): return tf.zeros((self.batch_sz, self.enc_units))
解碼器:
class Decoder(tf.keras.Model): def __init__(self,vocab_size,embedding_dim,dec_units,batch_sz): super(Decoder, self).__init__() self.batch_sz=batch_sz self.dec_units=dec_units self.embedding=tf.keras.layers.Embedding( input_shape=vocab_size, output_dim=embedding_dim ) self.gru=gru(self.dec_units) self.fc=tf.keras.layers.Dense(units=vocab_size) # 用於計算score,即:注意力權重係數 self.W1=tf.keras.layers.Dense(self.dec_units) self.W2=tf.keras.layers.Dense(self.dec_units) self.V=tf.keras.layers.Dense(units=1) def __call__(self,x,hidden,ec_output): # tf.expand_dims:在指定索引出增長一維度,值爲1,從索引0開始 # axis: 取值範圍是[-階數,階數],二維的時候0指的是列,1指的是行, # 更高維度的時候,數值是由外向裏增長,如:3維向量,外向內依次是:0,1,2 # 經過計算score公式可得,須要將hidden維度擴展至:[batch_size,1,hidden_size] hidden_with_time_axis=tf.expand_dims(hidden,axis=1) # score=[batch_size, max_length, 1] score=self.V(tf.nn.tanh(self.W1(ec_output)+self.W2(hidden_with_time_axis))) # 數值歸一化和爲1的機率分佈值 attention_weight=tf.nn.softmax(score,axis=1) context_vetor=attention_weight*ec_output # 求和平均 context_vetor=tf.reduce_sum(context_vetor,axis=1) X=self.embedding(x) # 合併解碼器embedding輸出和context vector x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # output shape=(batch_size,time_step,hidden_size) # state shape=(batch_size,hidden_size) output,state=self.gru(x) # output[batch_size*1,hidden_size] output=tf.reshape(output,shape=(-1,output.shape[2])) x-self.fc(output) return x,state,attention_weight def initilize_hidden_size(self): return tf.zeros((self.batch_sz,self.dec_units))
實例化模型:
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE) decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)
損失函數,優化器:
optimizer = tf.train.AdamOptimizer() def loss_function(real, pred): mask = 1 - np.equal(real, 0) loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask return tf.reduce_mean(loss_)
模型保存:
checkpoint_dir = './training_checkpoints' checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt") checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)
因爲咱們使用Teacher Forcing進行訓練,因此咱們簡單介紹下。
如圖所示Teacher Forcing與Free-running不一樣,在訓練過程當中再也不是前一時刻的hidden-output做爲當前輸入,
而是在Ground Truth中找到對應的上一項做爲當前輸入。早期的RNN很弱,若是生成了很是差的結果Free-running的運行方式會致使後面的hidden-output都受到影響。Teacher Forcing運行方式就能夠避免這種問題,缺點也很明顯它嚴重依賴標籤數據。
# 迭代10次訓練集 EPOCHS = 10 for epoch in range(EPOCHS): start = time.time() hidden = encoder.initialize_hidden_state() total_loss = 0 for (batch, (inp, targ)) in enumerate(dataset): loss = 0 # 先記錄梯度 with tf.GradientTape() as tape: # 編碼器輸出 enc_output, enc_hidden = encoder(inp, hidden) dec_hidden = enc_hidden dec_input = tf.expand_dims([targ_lang.word2idx['<start>']] * BATCH_SIZE, 1) # 使用Teacher forcing運行方式 for t in range(1, targ.shape[1]): # 解碼器輸出 predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output) loss += loss_function(targ[:, t], predictions) # 樣本標籤做爲輸入 dec_input = tf.expand_dims(targ[:, t], 1) batch_loss = (loss / int(targ.shape[1])) # one_loss++;batch_loss++ total_loss += batch_loss variables = encoder.variables + decoder.variables gradients = tape.gradient(loss, variables) optimizer.apply_gradients(zip(gradients, variables)) if batch % 100 == 0: print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1, batch, batch_loss.numpy())) # 每迭代2次訓練集保存一次模型 if (epoch + 1) % 2 == 0: checkpoint.save(file_prefix = checkpoint_prefix)
評估函數咱們不使用teacher-forcing模式,解碼器的每步輸入是它前一時刻的hidden-state和編碼器輸出,當模型遇到 <end>標記中止運行。
# 和訓練模型函數代碼基本一致 def evaluate(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ): attention_plot = np.zeros((max_length_targ, max_length_inp)) # 數據預處理 sentence = preprocess_sentence(sentence) # 向量化表示輸入數據 inputs = [inp_lang.word2idx[i] for i in sentence.split(' ')] # 後置補長 inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding='post') inputs = tf.convert_to_tensor(inputs) result = '' hidden = [tf.zeros((1, units))] enc_out, enc_hidden = encoder(inputs, hidden) dec_hidden = enc_hidden # 維度擴展batch_size dec_input = tf.expand_dims([targ_lang.word2idx['<start>']], 0) for t in range(max_length_targ): predictions, dec_hidden, attention_weights = decoder(dec_input, dec_hidden, enc_out) # 保存權重用於稍後可視化展現 attention_weights = tf.reshape(attention_weights, (-1, )) attention_plot[t] = attention_weights.numpy() predicted_id = tf.argmax(predictions[0]).numpy() # 獲取文本翻譯結果 result += targ_lang.idx2word[predicted_id] + ' ' # 預設的結束標記 if targ_lang.idx2word[predicted_id] == '<end>': return result, sentence, attention_plot # 預測值做爲輸入,以此輸出下一時刻單詞 dec_input = tf.expand_dims([predicted_id], 0) return result, sentence, attention_plot
可視化權重值:
fig = plt.figure(figsize=(10,10)) ax = fig.add_subplot(1, 1, 1) ax.matshow(attention, cmap='viridis') fontdict = {'fontsize': 14} ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90) ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict) plt.show()
本篇文章篇幅較多,不過項目的重點是Attention思想的理解,Self Attention模型具備更長的感覺野,更容易捕獲長距離的相互依賴特徵,目前Google機器翻譯模型就大量使用到Self Attention。Attention模型目前在機器翻譯,圖片描述任務,語音識別都有大量應用,熟練使用Attention對於解決實際問題會有很大的幫助。
文章部份內容參考 Yash Katariya 和 張俊林,在此表示感謝。