TensorFlow學習筆記(六)循環神經網絡

 

 

 

1、循環神經網絡簡介

  循環神經網絡的主要用途是處理和預測序列數據。循環神經網絡刻畫了一個序列當前的輸出與以前信息的關係。從網絡結構上,循環神經網絡會記憶以前的信息,並利用以前的信息影響後面節點的輸出。python

下圖展現了一個典型的循環神經網絡。git

循環神經網絡的一個重要的概念就是時刻。上圖中循環神經網絡的主體結構A的輸入除了來自輸入層的Xt,還有一個自身當前時刻的狀態St。算法

在每個時刻,A會讀取t時刻的輸入Xt,而且獲得一個輸出Ht。同時還會獲得一個當前時刻的狀態St,傳遞給下一時刻t+1。數組

所以,循環神經網絡理論上可看做同一神經結構被無限重複的過程。(無限重複目前仍是不可行的)網絡

將循環神經網絡按照時間序列展開,以下圖所示session

 

 xt是t時刻的輸入app

St是t時刻的「記憶」,St = f(WSt-1 + Uxt),f是tanh等激活函數dom

Ot 是t時刻的輸出函數

下圖給出一個最簡單的循環體或者叫記憶體的結構圖性能

 

下圖展現了一個循環神經網絡的前向傳播算法的具體計算過程。

在獲得前向傳播計算結果以後,能夠和其餘網絡相似的定義損失函數。神經網絡的惟一區別在於它每個時刻都有一個輸出,因此循環神經網絡的總損失爲前面全部時刻的損失函數的總和。

咱們利用代碼來實現這個簡單的前向傳播過程。

import numpy as np

X = [1,2]
state = [0.0,0.0]
#定義不一樣輸入部分的權重
w_cell_state = np.asarray([[0.1,0.2],[0.3,0.4]])
w_cell_input = np.asarray([0.5,0.6])
b_cell = np.asarray([0.1,-0.1])
#定義輸出層的權重
w_output = np.asarray([[0.1],[0.2]])
b_output = 0.1
#按照時間順序執行循環神經網絡的前向傳播過程
for i in range(len(X)):
    before_activetion = np.dot(state,w_cell_state) + X[i] * w_cell_input + b_cell
    state = np.tanh(before_activetion)
    #計算當前時刻的最終輸出
    final_output = np.dot(state,w_output) + b_output
    #輸出每一時刻的信息
    print("before_activation",before_activetion)
    print("state",state)
    print("final_output",final_output)

 

2、長短時記憶網絡(LSTM)結構

循環神經網絡工做的關鍵點就是使用歷史的信息來幫助當前的決策。循環神經網絡能很好的利用傳統的神經網絡不能建模的信息,但同時,也帶來了更大的挑戰——長期依賴的問題。

  在有些問題中,模型僅僅須要短時間內的信息來執行當前的任務。但同時也會有一些上下文場景更加複雜的狀況。當間隔不斷增大時,簡單的循環神經網絡可能會喪失學習到如此遠的信息的能力。或者在複雜的語言場景中,有用的信息的間隔有大有小,長短不一,循環神經網絡的性能也會受限。

  爲了解決這類問題,設計了LSTM。與單一tanh循環結構不一樣,LSTM擁有三個門:「輸入門」、「輸出門」、「遺忘門」。 

 

  LSTM靠這些「門」的結構信息有選擇的影響循環神經網絡中每一個時刻的狀態。所謂的「門」就是一個sigmod網絡和一個按位作乘法的操做。當sigmod輸出爲1時,所有信息經過;爲0時,信息沒法經過。爲了使循環神經網絡更有效的保持長期記憶。「遺忘門「和」輸入門」就相當重要。「遺忘門」就是讓神經網絡忘記以前沒有用的信息。從當前的輸入補充新的「記憶」是「輸入門」做用。

使用LSTM結構的循環神經網絡的前向傳播時一個比較複雜的計算過程。在TensorFlow中能夠被很簡單的實現。例以下面的僞代碼:

import tensorflow as tf

#定義一個LSTM結構。TF經過一句簡單的命令就能夠定義一個LSTM循環體
#LSTM中使用的變量也會自動聲明

lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_hidden_size)
#將LSTM中的狀態初始化問哦全0數組。
#BasicLSTMCell類提供了zero_state函數來生成全0 的初始狀態
state = lstm.zero_state(batch_size,tf.float32)
current_input = "hello"
#定義損失函數
loss = 0.0
#雖然rnn理論上能夠處理任意長度的序列,可是在訓練時爲了不梯度消散的問題,會規定一個最大的循環長度num_temps
for i in range(num_temps):
    #在第一個時刻聲明LSTM結構中使用的變量,在以後的時刻都須要服用以前的定義好的變量。
    if i > 0:
        tf.get_variable_scope().reuse_variables()
    #每一步處理時間序列中的一個時刻
    lstm_output,state = lstm(current_input,state)
    #將當前時刻LSTM結構的輸出傳入一個全鏈接層獲得最後的輸出
    final_output = full_connected(lstm_output)
    #計算當前時刻的輸出的損失
    loss += calc_loss(final_output,expected_output)

#利用BP後向傳播算法訓練模型

3、循環神經網絡的變種

一、雙向循環神經網絡和深層循環神經網絡

在經典的循環神經網絡中,狀態的傳輸時從前向後單向的。然而,在有些問題中,當前時刻的輸出不只和以前的狀態有關,也和以後的轉檯有關。只是後就須要使用雙向循環神經網絡來解決此類問題。雙向循環神經網絡時由連個神經網絡上下疊加在一塊兒組成的。輸出有這兩個神經網絡的轉檯共同決定的。下圖展現了一個雙向循環神經網絡。

深層循環神經網絡是循環神經網絡的另一種變體。爲了加強模型的表達能力,能夠將每一時刻上的循環體重複屢次。深層循環神經網絡在每一時刻上將循環體結構重複了屢次。 每一層循環體中的參數是一致的,不一樣層的循環體參數能夠不一致。TF提供了MultiRNNCell類來實現深層循環神經網絡的前向傳播過程。

 

import tensorflow as tf

#定義一個基本的LSTM結構做爲循環體的基礎結構,深層循環神經網絡也能夠支持其餘的循環提結構
lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)

#經過MultiRNNCell類來實現深層循環神經網絡中每一時刻的前向傳播過程。其中。number_of_layers 表示了有多少層,也就是圖
#中從xi到hi須要通過多少個LSTM結構。
stacked_lstm = tf.nn.rnn_cell.MultiRNNCell([lstm]*number_of_layers)
#和經典神經網絡同樣,能夠經過zero_state函數得到初始狀態。
state = stacked_lstm.zero_state(batch_size,tf.float32)
#計算每一時刻的前向傳播過程
for i in range(num_steps):
    if i > 0:
        tf.get_variable_scope().reuse_variables()
    stacked_lstm_output  ,state = stacked_lstm(current_input,state)
    final_output =  fully_connected(stacked_lstm_output)
    loss += calc_loss(final_output,expected_output)
    

二、循環神經網絡的dropout

  dropout能夠樣循環神經網絡更加的健壯。dropout通常只在不一樣層循環體之間使用。也就是說從t-1時刻傳遞到時刻t,RNN不會進行狀態的dropout,而在同一時刻t,不一樣層循環體之間會使用dropout。

在TF中,使用tf.nn.rnn_cell.DropoutWrapper類能夠很容易實現dropout功能。

#定義LSTM結構
lstm  = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)
#經過DropoutWrapper來實現dropout功能。input_keep_drop參數用來控制輸入的dropout的機率,output_keep_drop參數用來控制輸出的dropout的機率,
dropout_lstm = tf.nn.rnn_cell.DropoutWrapper(lstm,input_keep_prob=0.5,output_keep_prob=0.5)
#在使用了dropout的基礎上定義深層RNN
stacked_lstm = tf.nn.rnn_cell.MultiRNNCell([dropout_lstm]* 5)

 

4、循環神經網絡的樣例應用

一、天然語言建模

  簡單的說,語言模型的目的就是爲了計算一個句子的出現機率。在這裏把句子當作單詞的序列S = (w1,w2,w3....wm),其中m爲句子的長度,它的機率能夠表示爲

P(S) = p(w1,w2,w3.....wm) = p(w1)p(w2|w1)p(w3|w1,w2)p(wm| w1,w2...wm)

等式右邊的每一項都是語言模型中的一個參數。爲了估計這些參數的取值,經常使用的方法有n-gram、決策樹、最大熵模型、條件隨機場、神經網絡模型。

  語言模型效果的好壞的經常使用的評價指標是複雜度(perplexity)。簡單來講,perplexity刻畫的就是經過某一語言模型估計一句話出現的機率。值越小越好。複雜度的計算公式:

下面就利用語言模型來處理PTB數據集。

爲了讓PTB數據集使用更方便,TF提供了兩個函數來預處理PTB數據集。ptb_raw_data用來讀取原始數據,並將原始數據的單詞轉化爲單詞ID,造成一個很是長的序列。ptb_iterator將序列按照某固定的長度來截斷,並將數據組成batch。

 

使用循環神經網絡實現語言模型

# -*- coding:utf-8 -*-

import numpy as np
import tensorflow as tf
from tensorflow.models.rnn.ptb import reader
from tensorflow.contrib.legacy_seq2seq import sequence_loss_by_example
DATA_PATH = "path/to/ptb/data"
HIDDEN_SIZE = 200 #隱藏層的規模
NUM_LAYERS = 2 #DRNN中LSTM結構的層數
VOCAB_SIZE = 10000 #詞典規模,加上語句結束符和稀有單詞結束符總共10000
LEARNING_RATE = 1.0
TRAIN_BATCH_SIZE = 20  #訓練數據BATCH大小
TRAIN_NUM_STEPS = 35    #訓練數據截斷長度
#在測試的時候不須要使用截斷
EVAL_BATCH_SIZE = EVAL_NUM_STEP = 1
NUM_EPOCH = 2 #使用訓練數據的輪數
KEEP_DROP =0.5 #節點不被dropout的機率
MAX_GRAD_NORM =5 #用於控制梯度膨脹的參數


#定義一個PTBMODEL類來描述模型,方便維護循環神經網絡中的狀態
class PTBMODEL:
    def __init__(self,batch_size,num_steps,is_training = True):
        self.batch_size = batch_size
        self.num_steps = num_steps
        #定義輸入層,維度爲batch_size* num_steps
        self.input_data = tf.placeholder(tf.int32,shape=[batch_size,num_steps])
        #定義預期輸出。它的維度和ptb_iterrattor輸出的正確答案維度是同樣的。
        self.targets = tf.placeholder(tf.int32,[batch_size,num_steps])
        #定義使用LSTM結構爲循環體結構且使用dropout的深層循環神經網絡
        lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE)
        if is_training:
            lstm_cell = tf.nn.rnn_cell.DropoutWrapper(lstm_cell,output_keep_prob=KEEP_DROP)
        cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
        #初始化初始狀態
        self.initial_state = cell.zero_state(batch_size,tf.float32)
        #將單詞ID轉換爲單詞向量,總共有VOCAB_SIZE個單詞,每一個單詞向量的維度爲HIDDEN_SIZE,因此embedding參數的維度爲
        #VOCAB_SIZE*HIDDEN_SIZE
        embedding = tf.get_variable("embedding",[VOCAB_SIZE,HIDDEN_SIZE])
        #將本來batch_size * num_steps個單詞ID轉化爲單詞向量,轉化後的輸入層維度爲batch_size * num_steps * HIDDEN_SIZE
        inputs = tf.nn.embedding_lookup(embedding,self.input_data)
        #只在訓練時使用dropout
        if is_training:
            inputs  = tf.nn.dropout(inputs,KEEP_DROP)
        #定義輸出列表,在這裏現將不一樣時刻LSTM結構的輸出收集起來,再經過一個全鏈接層獲得最終輸出
        output = []
        #state 存儲不一樣batch中LSTM的狀態,而且初始化爲0.
        state = self.initial_state
        with tf.variable_scope("RNN"):
            for time_step  in range(num_steps):
                if time_step > 0 :
                    tf.get_variable_scope().reuse_variables()
                cell_output,state = cell(inputs[:,time_step,:],state)
                #將當前輸出加入輸出隊列
                output.append(cell_output)
        #把輸出隊列展開成[batch,hidden_size*num_steps]的形狀,而後再reshape成【batch*num_steps,hidden_size】的形狀。
        output = tf.reshape(tf.concat(output,1),[-1,HIDDEN_SIZE])
        #將從LSTM中獲得的輸出再通過一個全鏈接層獲得最後的預測結果,最終的預測結果在每一時刻上都是一個長度爲VOCAB_SIZE的數組
        #通過SoftMax層以後表示下一個位置是不一樣單詞的機率。
        weight = tf.get_variable("weight",[HIDDEN_SIZE,VOCAB_SIZE])
        baias  =  tf.get_variable("bias",[VOCAB_SIZE])
        logits = tf.matmul(output,weight) + baias
        #定義交叉熵損失函數
        loss  = sequence_loss_by_example([logits],[tf.reshape(self.targets,[-1])],
                                                                   [tf.ones([batch_size*num_steps],dtype=tf.float32)]
                                                                   )
        #計算獲得每一個batch的平均損失
        self.cost = tf.reduce_sum(loss)/batch_size
        self.final_state = state
        #只在訓練模型是定義反向傳播操做
        if not is_training:
            return

        trainable_variables = tf.trainable_variables()
        #經過clip_by_global_norm函數控制梯度的大小,避免梯度膨脹的問題
        grads,_ = tf.clip_by_global_norm(tf.gradients(self.cost,trainable_variables),MAX_GRAD_NORM)
        #定義優化方法
        optimizer = tf.train.GradientDescentOptimizer(LEARNING_RATE)
        #定義訓練步驟
        self.train_op = optimizer.apply_gradients(zip(grads,trainable_variables))

#使用給定的模型model在數據data上運行train_op並返回所有數據上的perplexity值

def run_epoch(session,model,data,train_op,output_log):
    #計算perplexity的輔助變量
    total_costs = 0.0
    iters = 0
    state = session.run(model.initial_state)
    #使用當前數據訓練或者測試模型
    for step ,(x,y) in  enumerate(reader.ptb_iterator( data,model.batch_size,model.num_steps)):
        cost,state,_ = session.run([model.cost,model.final_output,model.train_op],{
            model.input_data:x,model.targets:y,
            model.initial_state:state
        })
        total_costs += cost
        iters += model.num_steps
        #只有在訓練時輸出日誌
        if output_log and step % 100 == 0:
            print("After %s steps ,perplexity is %.3f"%(step,np.exp(total_costs/iters)))

    #返回給定模型在給定數據上的perplexity
    return np.exp(total_costs/iters)


def main(_):
    #獲取原始數據
    train_data,valid_data,test_data = reader.ptb_raw_data(DATA_PATH)
    #定義初始化函數
    initializer = tf.random_uniform_initializer(-0.05,0.05)
    #定義訓練用的循環神經網絡模型
    with tf.variable_scope("language_model",reuse=True,initializer=initializer):
        train_model = PTBMODEL(TRAIN_BATCH_SIZE,TRAIN_NUM_STEPS,is_training=True)
    #定義評估用的循環神經網絡模型
    with tf.variable_scope("language_model",reuse=True,initializer=initializer):
        eval_model = PTBMODEL(EVAL_BATCH_SIZE,EVAL_NUM_STEP,is_training=False)
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        #使用訓練數據訓練模型
        for i in range(NUM_EPOCH):
            print("In iteration:%s"%(i+1))
            #在全部訓練數據上訓練RNN
            run_epoch(sess,train_model,train_data,train_model.train_op,True)
            #使用驗證集評測模型效果
            valid_perplexity = run_epoch(sess,eval_model,valid_data,tf.no_op(),False)
            print("Epoch %s ,Validation perplexity :%.3f"%(i+1,valid_perplexity))
        # 最後使用測試集驗證模型效果
        test_perplexity = run_epoch(sess,eval_model,valid_data,tf.no_op(),False)
        print("TEST perplexity :%.3f"%(test_perplexity))

if __name__ == '__main__':
    tf.app.run()

 4、時間序列預測

   怎麼用循環神經網絡來預測正弦函數,可利用TF的高級封裝--TFLearn.

  一、使用TFLearn自定義模型

  

from sklearn  import cross_validation
from sklearn import datasets
from sklearn import metrics
import tensorflow as tf
from tensorflow.contrib.learn import models,Estimator,SKCompat
from tensorflow.contrib import layers,framework
import numpy as np
#導入TFLearn

#自定義模型,對於給定的輸入數據以及其對應的正確答案,返回在這些輸入上的預測值、損失值以及訓練步驟
def my_model(feature,target):
    #將預測的模型轉換爲one-hot編碼的形式,由於共有三個類別,因此向量長度爲3.通過轉化後,三個個類別(1,0,0),(0,1,0),(0,0,1)
    target = tf.one_hot(target,3,1,0)
    #定義模型以及其在給定數據上的損失函數。TFLearn經過logistic_regression封裝了一個單層全連接神經網絡
    logits,loss = models.logistic_regression(feature,target)
    #建立模型的優化器,並獲得優化步驟
    train_op = layers.optimize_loss(loss,   #損失函數
                                    framework.get_global_step(), #獲取訓練步數並在訓練時更新
                                    optimizer="Adagrad",  #定義優化器
                                    learning_rate=0.1 #定義學習率
                                    )
    #返回在給定數據上的預測結果、損失值以及優化步驟
    return tf.argmax(logits,1) ,loss,train_op

#加載iris數據集,並劃分爲訓練集合和測試集合
iris  = datasets.load_iris()
x_train,x_test,y_train,y_test = cross_validation.train_test_split(iris.data,iris.target,test_size=0.2,random_state=0)
#對自定義的模型進行封裝
classifier =Estimator(model_fn=my_model)
classifier = SKCompat(classifier)
#使用封裝好的模型和訓練數據執行100輪的迭代
classifier.fit(x_train,y_train,steps=100)
#使用訓練好的模型進行預測
y_predicted = classifier.predict(x_test)



#計算模型的準確度
score  = metrics.accuracy_score(y_test,y_predicted)
print("Accuracy: %.2f %%"%(score * 100))

二、預測正選函數

  由於標準的RNN預測的是離散值,因此程序須要將連續的sin函數曲線離散化。

  每一個SAMPLE_ITERVAL對sin函數進行一次採樣,採樣獲得的序列就是sin函數離散化以後的結果

 

import numpy as np
import tensorflow as tf
import matplotlib as mpl
from matplotlib import pyplot as plt
from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompat

# TensorFlow的高層封裝TFLearn
learn = tf.contrib.learn

# 神經網絡參數
HIDDEN_SIZE = 30  # LSTM隱藏節點個數
NUM_LAYERS = 2  # LSTM層數
TIMESTEPS = 10  # 循環神經網絡截斷長度
BATCH_SIZE = 32  # batch大小

# 數據參數
TRAINING_STEPS = 3000  # 訓練輪數
TRAINING_EXAMPLES = 10000  # 訓練數據個數
TESTING_EXAMPLES = 1000  # 測試數據個數
SAMPLE_GAP = 0.01  # 採樣間隔


def generate_data(seq):
    # 序列的第i項和後面的TIMESTEPS-1項合在一塊兒做爲輸入,第i+TIMESTEPS項做爲輸出
    X = []
    y = []
    for i in range(len(seq) - TIMESTEPS - 1):
        X.append([seq[i:i + TIMESTEPS]])
        y.append([seq[i + TIMESTEPS]])
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)


# LSTM結構單元
def LstmCell():
    lstm_cell = tf.contrib.rnn.BasicLSTMCell(HIDDEN_SIZE)
    return lstm_cell


def lstm_model(X, y):
    # 使用多層LSTM,不能用lstm_cell*NUM_LAYERS的方法,會致使LSTM的tensor名字都同樣
    cell = tf.contrib.rnn.MultiRNNCell([LstmCell() for _ in range(NUM_LAYERS)])

    # 將多層LSTM結構鏈接成RNN網絡並計算前向傳播結果
    output, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)
    output = tf.reshape(output, [-1, HIDDEN_SIZE])

    # 經過無激活函數的全聯接層計算線性迴歸,並將數據壓縮成一維數組的結構
    predictions = tf.contrib.layers.fully_connected(output, 1, None)

    # 將predictions和labels調整爲統一的shape
    y = tf.reshape(y, [-1])
    predictions = tf.reshape(predictions, [-1])

    # 計算損失值
    loss = tf.losses.mean_squared_error(predictions, y)

    # 建立模型優化器並獲得優化步驟
    train_op = tf.contrib.layers.optimize_loss(
        loss,
        tf.train.get_global_step(),
        optimizer='Adagrad',
        learning_rate=0.1)

    return predictions, loss, train_op


# 用sin生成訓練和測試數據集
test_start = TRAINING_EXAMPLES * SAMPLE_GAP
test_end = (TRAINING_EXAMPLES + TESTING_EXAMPLES) * SAMPLE_GAP
train_X, train_y = generate_data(
    np.sin(np.linspace(0, test_start, TRAINING_EXAMPLES, dtype=np.float32)))
test_X, test_y = generate_data(
    np.sin(
        np.linspace(test_start, test_end, TESTING_EXAMPLES, dtype=np.float32)))

# 創建深層循環網絡模型
regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir='model/'))

# 調用fit函數訓練模型
regressor.fit(train_X, train_y, batch_size=BATCH_SIZE, steps=TRAINING_STEPS)

# 使用訓練好的模型對測試集進行預測
predicted = [[pred] for pred in regressor.predict(test_X)]

# 計算rmse做爲評價指標
rmse = np.sqrt(((predicted - test_y)**2).mean(axis=0))
print('Mean Square Error is: %f' % (rmse[0]))

# 對預測曲線繪圖,並存儲到sin.jpg
fit = plt.figure()
plot_predicted = plt.plot(predicted,label = "predicted")
plot_test = plt.plot(test_y,label = "real_sin")
plt.legend([plot_predicted, plot_test], ['predicted', 'real_sin'])

plt.savefig("sin.png")
相關文章
相關標籤/搜索