吳裕雄--天生天然 pythonTensorFlow天然語言處理:Attention模型--測試

import sys
import codecs
import tensorflow as tf

# 1.參數設置。
# 讀取checkpoint的路徑。9000表示是訓練程序在第9000步保存的checkpoint。
CHECKPOINT_PATH = "F:\\temp\\attention_ckpt-9000"

# 模型參數。必須與訓練時的模型參數保持一致。
HIDDEN_SIZE = 1024                          # LSTM的隱藏層規模。
DECODER_LAYERS = 2                          # 解碼器中LSTM結構的層數。
SRC_VOCAB_SIZE = 10000                      # 源語言詞彙表大小。
TRG_VOCAB_SIZE = 4000                       # 目標語言詞彙表大小。
SHARE_EMB_AND_SOFTMAX = True                # 在Softmax層和詞向量層之間共享參數。

# 詞彙表文件
SRC_VOCAB = "F:\\TensorFlowGoogle\\201806-github\\TensorFlowGoogleCode\\Chapter09\\en.vocab"
TRG_VOCAB = "F:\\TensorFlowGoogle\\201806-github\\TensorFlowGoogleCode\\Chapter09\\zh.vocab"

# 詞彙表中<sos>和<eos>的ID。在解碼過程當中須要用<sos>做爲第一步的輸入,並將檢查
# 是不是<eos>,所以須要知道這兩個符號的ID。
SOS_ID = 1
EOS_ID = 2
# 2.定義NMT模型和解碼步驟。
# 定義NMTModel類來描述模型。
class NMTModel(object):
    # 在模型的初始化函數中定義模型要用到的變量。
    def __init__(self):
        # 定義編碼器和解碼器所使用的LSTM結構。
        self.enc_cell_fw = tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE)
        self.enc_cell_bw = tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE)
        self.dec_cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE) for _ in range(DECODER_LAYERS)])

        # 爲源語言和目標語言分別定義詞向量。   
        self.src_embedding = tf.get_variable("src_emb", [SRC_VOCAB_SIZE, HIDDEN_SIZE])
        self.trg_embedding = tf.get_variable("trg_emb", [TRG_VOCAB_SIZE, HIDDEN_SIZE])

        # 定義softmax層的變量
        if SHARE_EMB_AND_SOFTMAX:
            self.softmax_weight = tf.transpose(self.trg_embedding)
        else:
            self.softmax_weight = tf.get_variable("weight", [HIDDEN_SIZE, TRG_VOCAB_SIZE])
        self.softmax_bias = tf.get_variable("softmax_bias", [TRG_VOCAB_SIZE])

    def inference(self, src_input):
        # 雖然輸入只有一個句子,但由於dynamic_rnn要求輸入是batch的形式,所以這裏
        # 將輸入句子整理爲大小爲1的batch。
        src_size = tf.convert_to_tensor([len(src_input)], dtype=tf.int32)
        src_input = tf.convert_to_tensor([src_input], dtype=tf.int32)
        src_emb = tf.nn.embedding_lookup(self.src_embedding, src_input)

        with tf.variable_scope("encoder"):
            # 使用bidirectional_dynamic_rnn構造編碼器。這一步與訓練時相同。
            enc_outputs, enc_state = tf.nn.bidirectional_dynamic_rnn(self.enc_cell_fw, self.enc_cell_bw, src_emb, src_size, dtype=tf.float32)
            # 將兩個LSTM的輸出拼接爲一個張量。
            enc_outputs = tf.concat([enc_outputs[0], enc_outputs[1]], -1)   
        
        with tf.variable_scope("decoder"):
            # 定義解碼器使用的注意力機制。
            attention_mechanism = tf.contrib.seq2seq.BahdanauAttention(HIDDEN_SIZE, enc_outputs,memory_sequence_length=src_size)

            # 將解碼器的循環神經網絡self.dec_cell和注意力一塊兒封裝成更高層的循環神經網絡。
            attention_cell = tf.contrib.seq2seq.AttentionWrapper(self.dec_cell, attention_mechanism,attention_layer_size=HIDDEN_SIZE)
   
        # 設置解碼的最大步數。這是爲了不在極端狀況出現無限循環的問題。
        MAX_DEC_LEN=100

        with tf.variable_scope("decoder/rnn/attention_wrapper"):
            # 使用一個變長的TensorArray來存儲生成的句子。
            init_array = tf.TensorArray(dtype=tf.int32, size=0,dynamic_size=True, clear_after_read=False)
            # 填入第一個單詞<sos>做爲解碼器的輸入。
            init_array = init_array.write(0, SOS_ID)
            # 調用attention_cell.zero_state構建初始的循環狀態。循環狀態包含
            # 循環神經網絡的隱藏狀態,保存生成句子的TensorArray,以及記錄解碼
            # 步數的一個整數step。
            init_loop_var = (attention_cell.zero_state(batch_size=1, dtype=tf.float32),init_array, 0)

            # tf.while_loop的循環條件:
            # 循環直到解碼器輸出<eos>,或者達到最大步數爲止。
            def continue_loop_condition(state, trg_ids, step):
                return tf.reduce_all(tf.logical_and(tf.not_equal(trg_ids.read(step), EOS_ID),tf.less(step, MAX_DEC_LEN-1)))

            def loop_body(state, trg_ids, step):
                # 讀取最後一步輸出的單詞,並讀取其詞向量。
                trg_input = [trg_ids.read(step)]
                trg_emb = tf.nn.embedding_lookup(self.trg_embedding,trg_input)
                # 調用attention_cell向前計算一步。
                dec_outputs, next_state = attention_cell.call(state=state, inputs=trg_emb)
                # 計算每一個可能的輸出單詞對應的logit,並選取logit值最大的單詞做爲
                # 這一步的而輸出。
                output = tf.reshape(dec_outputs, [-1, HIDDEN_SIZE])
                logits = (tf.matmul(output, self.softmax_weight)+ self.softmax_bias)
                next_id = tf.argmax(logits, axis=1, output_type=tf.int32)
                # 將這一步輸出的單詞寫入循環狀態的trg_ids中。
                trg_ids = trg_ids.write(step+1, next_id[0])
                return next_state, trg_ids, step+1

            # 執行tf.while_loop,返回最終狀態。
            state, trg_ids, step = tf.while_loop(continue_loop_condition, loop_body, init_loop_var)
            return trg_ids.stack()
# 3.翻譯一個測試句子。
def main():
    # 定義訓練用的循環神經網絡模型。
    with tf.variable_scope("nmt_model", reuse=None):
        model = NMTModel()

    # 定義個測試句子。
    test_en_text = "This is a test . <eos>"
    print(test_en_text)
    
    # 根據英文詞彙表,將測試句子轉爲單詞ID。
    with codecs.open(SRC_VOCAB, "r", "utf-8") as f_vocab:
        src_vocab = [w.strip() for w in f_vocab.readlines()]
        src_id_dict = dict((src_vocab[x], x) for x in range(len(src_vocab)))
    test_en_ids = [(src_id_dict[token] if token in src_id_dict else src_id_dict['<unk>'])for token in test_en_text.split()]
    print(test_en_ids)

    # 創建解碼所需的計算圖。
    output_op = model.inference(test_en_ids)
    sess = tf.Session()
    saver = tf.train.Saver()
    saver.restore(sess, CHECKPOINT_PATH)

    # 讀取翻譯結果。
    output_ids = sess.run(output_op)
    print(output_ids)
    
    # 根據中文詞彙表,將翻譯結果轉換爲中文文字。
    with codecs.open(TRG_VOCAB, "r", "utf-8") as f_vocab:
        trg_vocab = [w.strip() for w in f_vocab.readlines()]
    output_text = ''.join([trg_vocab[x] for x in output_ids])
    
    # 輸出翻譯結果。
    print(output_text.encode('utf8').decode(sys.stdout.encoding))
    sess.close()

if __name__ == "__main__":
    main()

相關文章
相關標籤/搜索