RNN模型因爲具備短時間記憶功能,所以自然就比較適合處理天然語言等序列問題,尤爲是引入門控機制後,可以解決長期依賴問題,捕獲輸入樣本之間的長距離聯繫。本文的模型是堆疊兩層的LSTM和GRU模型,模型的結構爲:LSTM(GRU)—dropout—LSTM(GRU)—dropout—全鏈接層—輸出層,比較簡單。關於TensorFlow搭建RNN模型有關的內容,在這篇《TensorFlow之RNN:堆疊RNN、LSTM、GRU及雙向LSTM》博客裏闡述得比較清楚了,這裏不贅述。html
儘管RNN模型自然比較適合處理天然語言的問題,但是最近CNN模型有迎頭遇上之勢。爲何呢?從此次文本分類的任務中能夠體會到,RNN模型在運行速度上絲絕不佔優點,比CNN模型要慢幾倍到十幾倍。後一個時間步的輸出依賴於前一個時間步的輸出,沒法進行並行處理,致使模型訓練的速度慢,這是一個致命的弱點。而RNN模型引覺得傲的可以捕獲序列中的長距離依賴關係,已經再也不是獨門祕訣,由於CNN模型的卷積操做就相似於N-gram,能夠捕獲上下文關係,並且經過把構建更深層的卷積層,能夠捕獲更長距離的依賴關係。此外,Transformer橫空出世,不只可以進行並行處理,並且經過自注意力機制可以在任意距離的兩個詞之間創建依賴關係,大有後浪把前浪拍死在沙灘上的趨勢。python
另外,從模型預測的準確性來說,CNN模型的準確性不比RNN模型低,甚至超過了RNN模型。git
TextRNN模型依然分爲四個模塊:一、數據處理模塊;二、模型構建模塊;三、模型訓練模快;四、模型預測模塊。github
GitHub地址:https://github.com/DengYangyong/Chinese_Text_Classification/tree/master/Text-Classification-On_RNN數組
好,下面看代碼。session
1、數據處理app
數據處理部分和上一篇CharCNN是同樣的,儘管咱們說RNN模型能夠處理任意長度的序列,可是在這個TextRNN模型中,咱們仍是把輸入處理成了固定長度的序列。dom
#coding: utf-8 import sys from collections import Counter import numpy as np import tensorflow.contrib.keras as kr if sys.version_info[0] > 2: is_py3 = True else: reload(sys) sys.setdefaultencoding("utf-8") is_py3 = False # 判斷軟件的版本,若是版本爲3.6.5,那麼sys.version_info的輸出爲:sys.version_info(major=3, minor=6, micro=5)。 """若是在python2下面使用python3訓練的模型,可考慮調用此函數轉化一下字符編碼""" def native_word(word, encoding='utf-8'): if not is_py3: return word.encode(encoding) else: return word """is_py3函數當版本爲3時返回True,不然返回False。if not 後面的值爲False則將「utf-8」編碼轉換爲'unicode'.""" def native_content(content): if not is_py3: return content.decode('utf-8') else: return content """ 經常使用文件操做,可在python2和python3間切換.""" def open_file(filename, mode='r'): if is_py3: return open(filename, mode, encoding='utf-8', errors='ignore') else: return open(filename, mode) """ 讀取文件數據""" def read_file(filename): contents, labels = [], [] with open_file(filename) as f: for line in f: try: label, content = line.strip().split('\t') if content: contents.append(list(native_content(content))) labels.append(native_content(label)) except: pass return contents, labels # line.strip().split('\t')的輸出爲兩個元素的列表:['體育', '黃蜂vs湖人首發:科比帶傷戰保羅 加索爾救贖之戰 新浪體育訊...']。 # 注意這個list()函數,把一段文字轉化爲了列表,元素爲每一個字和符號:['黃', '蜂', 'v', 's', '湖', '人', '首', '發', ':', '科', '比',...] # contents的元素爲每段新聞轉化成的列表:[['黃', '蜂', 'v', 's', '湖', '人', '首', '發', ':', '科', '比',...],[],...] # labels爲['體育', '體育',...] """根據訓練集構建詞彙表,存儲""" def build_vocab(train_dir, vocab_dir, vocab_size=5000): data_train, _ = read_file(train_dir) all_data = [] for content in data_train: all_data.extend(content) counter = Counter(all_data) count_pairs = counter.most_common(vocab_size - 1) words, _ = list(zip(*count_pairs)) words = ['<PAD>'] + list(words) open_file(vocab_dir, mode='w').write('\n'.join(words) + '\n') '''讀取詞彙表''' def read_vocab(vocab_dir): with open_file(vocab_dir) as fp: words = [native_content(_.strip()) for _ in fp.readlines()] word_to_id = dict(zip(words, range(len(words)))) return words, word_to_id # readlines()讀取全部行而後把它們做爲一個字符串列表返回:['頭\n', '天\n', ...]。strip()函數去掉"\n"。 # words: ['<PAD>', ',', '的', '。', '一', '是', '在', '0', '有',...] # word_to_id:{'<PAD>': 0, ',': 1, '的': 2, '。': 3, '一': 4, '是': 5,..},每一個類別對應的value值爲其索引ID """讀取分類目錄""" def read_category(): categories = ['體育', '財經', '房產', '家居', '教育', '科技', '時尚', '時政', '遊戲', '娛樂'] categories = [native_content(x) for x in categories] cat_to_id = dict(zip(categories, range(len(categories)))) return categories, cat_to_id # cat_to_id的輸出爲:{'體育': 0, '財經': 1, '房產': 2, '家居': 3,...},每一個類別對應的value值爲其索引ID. """ 將id表示的內容轉換爲文字 """ def to_words(content, words): return ''.join(words[x] for x in content) """ 將文件轉換爲id表示,進行pad """ def process_file(filename, word_to_id, cat_to_id, max_length=600): contents, labels = read_file(filename) data_id, label_id = [], [] #contents的形式爲:[['黃', '蜂', 'v', 's', '湖', '人',...],[],[],...],每個元素是一個列表,該列表的元素是每段新聞的字和符號。 #labels的形式爲:['體育', '體育', '體育', '體育', '體育', ...] for i in range(len(contents)): data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id]) label_id.append(cat_to_id[labels[i]]) x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length) y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id)) return x_pad, y_pad # word_to_id是一個字典:{'<PAD>': 0, ',': 1, '的': 2, '。': 3, '一': 4, '是': 5,...} # 對於每一段新聞轉化的字列表,把每一個字在字典中對應的索引找到: # data_id: 將[['黃', '蜂', 'v', 's', '湖', '人',...],[],[],...] 轉化爲 [[387, 1197, 2173, 215, 110, 264,...],[],[],...]的形式 # label_id : ['體育', '體育', '體育', '體育', '體育', ...] 轉化爲[0, 0, 0, 0, 0, ...] # data_id的行數爲50000,即爲新聞的條數,每一個元素爲由每段新聞的字的數字索引構成的列表; # data_id長這樣:[[387, 1197, 2173, 215, 110, 264,...],[],[],...] # 因爲每段新聞的字數不同,所以每一個元素(列表)的長度不同,可能大於600,也可能小於600,須要統一長度爲600。 # 使用keras提供的pad_sequences來將文本pad爲固定長度,x_pad的形狀爲(50000,600). # label_id是形如[0, 0, 0, 0, 0, ...]的整形數組,cat_to_id是形如{'體育': 0, '財經': 1, '房產': 2, '家居': 3,...}的字典 # to_categorical是對標籤進行one-hot編碼,num-classes是類別數10,y_pad的維度是(50000,10) """生成批次數據""" def batch_iter(x, y, batch_size=64): data_len = len(x) num_batch = int((data_len - 1) / batch_size) + 1 indices = np.random.permutation(np.arange(data_len)) x_shuffle = x[indices] y_shuffle = y[indices] # 樣本長度爲50000 # int()能夠將其餘類型轉化爲整型,也能夠用於向下取整,這裏爲782. # indices元素的範圍是0-49999,形如[256,189,2,...]的擁有50000個元素的列表 # 用indices對樣本和標籤按照行進行從新洗牌,接着上面的例子,把第256行(從0開始計)放在第0行,第189行放在第1行. for i in range(num_batch): start_id = i * batch_size end_id = min((i + 1) * batch_size, data_len) yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id] # i=780時,end_id=781*64=49984; # 當i=781時,end_id=50000,由於782*64=50048>50000,因此最後一批取[49984:50000] # yield是生成一個迭代器,用for循環來不斷生成下一個批量。 # 爲了防止內存溢出,每次只取64個,內存佔用少。
2、模型搭建函數
模型比較簡單,下面的代碼也就是按照LSTM(GRU)—dropout—LSTM(GRU)—dropout—全鏈接層—輸出層這樣的結構來進行組織的。要注意的是對每層的LSTM或GRU核中的神經元進行dropout,還有取最後時刻和最後一層的LSTM或GRU的隱狀態做爲全鏈接層的輸入。測試
#!/usr/bin/python # -*- coding: utf-8 -*- import tensorflow as tf class TRNNConfig(object): """RNN配置參數""" embedding_dim = 64 seq_length = 600 num_classes = 10 vocab_size = 5000 num_layers= 2 hidden_dim = 128 rnn = 'gru' # 隱藏層層數爲2 # 選擇lstm 或 gru dropout_keep_prob = 0.8 learning_rate = 1e-3 batch_size = 128 num_epochs = 10 print_per_batch = 100 save_per_batch = 10 class TextRNN(object): """文本分類,RNN模型""" def __init__(self, config): self.config = config self.input_x = tf.placeholder(tf.int32, [None, self.config.seq_length], name='input_x') self.input_y = tf.placeholder(tf.float32, [None, self.config.num_classes], name='input_y') self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') self.rnn() def rnn(self): """rnn模型""" def lstm_cell(): return tf.nn.rnn_cell.LSTMCell(self.config.hidden_dim, state_is_tuple=True) def gru_cell(): return tf.nn.rnn_cell.GRUCell(self.config.hidden_dim) def dropout(): if (self.config.rnn == 'lstm'): cell = lstm_cell() else: cell = gru_cell() return tf.nn.rnn_cell.DropoutWrapper(cell, output_keep_prob=self.keep_prob) # 爲每個rnn核後面加一個dropout層 with tf.device('/gpu:0'): embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim]) embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x) with tf.name_scope("rnn"): cells = [dropout() for _ in range(self.config.num_layers)] rnn_cell = tf.nn.rnn_cell.MultiRNNCell(cells, state_is_tuple=True) # 堆疊了2層的RNN模型。 _outputs, _ = tf.nn.dynamic_rnn(cell=rnn_cell, inputs=embedding_inputs, dtype=tf.float32) last = _outputs[:, -1, :] # 取最後一個時序輸出做爲結果,也就是最後時刻和第2層的LSTM或GRU的隱狀態。 with tf.name_scope("score"): # 全鏈接層,後面接dropout以及relu激活 fc = tf.layers.dense(last, self.config.hidden_dim, name='fc1') fc = tf.contrib.layers.dropout(fc, self.keep_prob) fc = tf.nn.relu(fc) self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2') self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1) with tf.name_scope("optimize"): cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y) self.loss = tf.reduce_mean(cross_entropy) self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss) with tf.name_scope("accuracy"): correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls) self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
3、模型訓練、驗證和測試
這一部分的代碼和CharCNN能夠說沒啥區別。注意在驗證和測試時不用作dropout,還有用早停防止過擬合。
# coding: utf-8 from __future__ import print_function import os import sys import time from datetime import timedelta import numpy as np import tensorflow as tf from sklearn import metrics from rnn_model import TRNNConfig, TextRNN from cnews_loader import read_vocab, read_category, batch_iter, process_file, build_vocab base_dir = 'data/cnews' train_dir = os.path.join(base_dir, 'cnews.train.txt') test_dir = os.path.join(base_dir, 'cnews.test.txt') val_dir = os.path.join(base_dir, 'cnews.val.txt') vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt') save_dir = 'checkpoints/textrnn' save_path = os.path.join(save_dir, 'best_validation') # 最佳驗證結果保存路徑 def get_time_dif(start_time): """獲取已使用時間""" end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) def feed_data(x_batch, y_batch, keep_prob): feed_dict = { model.input_x: x_batch, model.input_y: y_batch, model.keep_prob: keep_prob } return feed_dict def evaluate(sess, x_, y_): """評估在某一數據上的準確率和損失""" data_len = len(x_) batch_eval = batch_iter(x_, y_, 128) total_loss = 0.0 total_acc = 0.0 for x_batch, y_batch in batch_eval: batch_len = len(x_batch) feed_dict = feed_data(x_batch, y_batch, 1.0) # 在測試時不用進行dropout y_pred_class,loss, acc = sess.run([model.y_pred_cls,model.loss, model.acc], feed_dict=feed_dict) total_loss += loss * batch_len total_acc += acc * batch_len return y_pred_class,total_loss / data_len, total_acc / data_len def train(): print("Configuring TensorBoard and Saver...") tensorboard_dir = 'tensorboard/textrnn' if not os.path.exists(tensorboard_dir): os.makedirs(tensorboard_dir) tf.summary.scalar("loss", model.loss) tf.summary.scalar("accuracy", model.acc) merged_summary = tf.summary.merge_all() writer = tf.summary.FileWriter(tensorboard_dir) # 配置 Saver saver = tf.train.Saver() if not os.path.exists(save_dir): os.makedirs(save_dir) print("Loading training and validation data...") # 載入訓練集與驗證集 start_time = time.time() x_train, y_train = process_file(train_dir, word_to_id, cat_to_id, config.seq_length) x_val, y_val = process_file(val_dir, word_to_id, cat_to_id, config.seq_length) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) # 建立session session = tf.Session() session.run(tf.global_variables_initializer()) writer.add_graph(session.graph) print('Training and evaluating...') start_time = time.time() total_batch = 0 best_acc_val = 0.0 last_improved = 0 require_improvement = 1000 # 若是超過1000輪未提高,提早結束訓練 flag = False for epoch in range(config.num_epochs): print('Epoch:', epoch + 1) batch_train = batch_iter(x_train, y_train, config.batch_size) for x_batch, y_batch in batch_train: feed_dict = feed_data(x_batch, y_batch, config.dropout_keep_prob) if total_batch % config.save_per_batch == 0: s = session.run(merged_summary, feed_dict=feed_dict) writer.add_summary(s, total_batch) if total_batch % config.print_per_batch == 0: feed_dict[model.keep_prob] = 1.0 loss_train, acc_train = session.run([model.loss, model.acc], feed_dict=feed_dict) y_pred_cls_1,loss_val, acc_val = evaluate(session, x_val, y_val) # todo if acc_val > best_acc_val: # 保存最好結果 best_acc_val = acc_val last_improved = total_batch saver.save(sess=session, save_path=save_path) improved_str = '*' else: improved_str = '' time_dif = get_time_dif(start_time) msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%},' \ + ' Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}' print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improved_str)) session.run(model.optim, feed_dict=feed_dict) # 運行優化 total_batch += 1 if total_batch - last_improved > require_improvement: # 驗證集正確率長期不提高,提早結束訓練 print("No optimization for a long time, auto-stopping...") flag = True break if flag: break def test(): print("Loading test data...") start_time = time.time() x_test, y_test = process_file(test_dir, word_to_id, cat_to_id, config.seq_length) session = tf.Session() session.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess=session, save_path=save_path) # 讀取保存的模型 print('Testing...') y_pred,loss_test, acc_test = evaluate(session, x_test, y_test) msg = 'Test Loss: {0:>6.2}, Test Acc: {1:>7.2%}' print(msg.format(loss_test, acc_test)) batch_size = 128 data_len = len(x_test) num_batch = int((data_len - 1) / batch_size) + 1 y_test_cls = np.argmax(y_test, 1) y_pred_cls = np.zeros(shape=len(x_test), dtype=np.int32) for i in range(num_batch): start_id = i * batch_size end_id = min((i + 1) * batch_size, data_len) feed_dict = { model.input_x: x_test[start_id:end_id], model.keep_prob: 1.0 } y_pred_cls[start_id:end_id] = session.run(model.y_pred_cls, feed_dict=feed_dict) # 評估 print("Precision, Recall and F1-Score...") print(metrics.classification_report(y_test_cls, y_pred_cls, target_names=categories)) # 混淆矩陣 print("Confusion Matrix...") cm = metrics.confusion_matrix(y_test_cls, y_pred_cls) print(cm) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) if __name__ == '__main__': print('Configuring RNN model...') config = TRNNConfig() if not os.path.exists(vocab_dir): build_vocab(train_dir, vocab_dir, config.vocab_size) categories, cat_to_id = read_category() words, word_to_id = read_vocab(vocab_dir) config.vocab_size = len(words) model = TextRNN(config) option='train' if option == 'train': train() else: test()
採用GRU核,訓練了46分鐘47秒,最好的驗證精度爲91.54%。測試精度爲94.67%。上一篇的CNN模型訓練只花了3分21秒,可見RNN模型在速度上慢了十幾倍。
Iter: 3500, Train Loss: 0.034, Train Acc: 98.44%, Val Loss: 0.35, Val Acc: 91.54%, Time: 0:46:47 *
Testing...
Test Loss: 0.2, Test Acc: 94.67%
Precision, Recall and F1-Score...
precision recall f1-score support
體育 0.99 0.99 0.99 1000
財經 0.93 0.99 0.96 1000
房產 1.00 1.00 1.00 1000
家居 0.95 0.83 0.89 1000
教育 0.88 0.93 0.90 1000
科技 0.95 0.96 0.95 1000
時尚 0.95 0.95 0.95 1000
時政 0.95 0.91 0.93 1000
遊戲 0.94 0.96 0.95 1000
娛樂 0.94 0.96 0.95 1000
micro avg 0.95 0.95 0.95 10000
macro avg 0.95 0.95 0.95 10000
weighted avg 0.95 0.95 0.95 10000
Confusion Matrix...
[[990 0 0 0 5 1 0 0 4 0]
[ 0 987 1 0 2 3 0 6 1 0]
[ 0 0 996 2 2 0 0 0 0 0]
[ 0 22 2 834 60 20 25 20 10 7]
[ 1 6 0 6 925 7 5 12 4 34]
[ 0 5 0 8 8 959 2 2 16 0]
[ 0 0 0 13 9 2 948 4 12 12]
[ 0 33 1 15 21 11 1 910 4 4]
[ 1 1 0 2 10 5 11 0 962 8]
[ 4 2 0 1 15 3 5 2 12 956]]
Time usage: 0:00:40
4、模型預測
從兩個新聞中各摘取了一段內容,進行預測。結果預測爲:科技、體育。
# coding: utf-8 from __future__ import print_function import os import tensorflow as tf import tensorflow.contrib.keras as kr from rnn_model import TRNNConfig, TextRNN from cnews_loader import read_category, read_vocab try: bool(type(unicode)) except NameError: unicode = str base_dir = 'data/cnews' vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt') save_dir = 'checkpoints/textrnn' save_path = os.path.join(save_dir, 'best_validation') # 最佳驗證結果保存路徑 class RnnModel: def __init__(self): self.config = TRNNConfig() self.categories, self.cat_to_id = read_category() self.words, self.word_to_id = read_vocab(vocab_dir) self.config.vocab_size = len(self.words) self.model = TextRNN(self.config) self.session = tf.Session() self.session.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess=self.session, save_path=save_path) # 讀取保存的模型 def predict(self, message): content = unicode(message) data = [self.word_to_id[x] for x in content if x in self.word_to_id] feed_dict = { self.model.input_x: kr.preprocessing.sequence.pad_sequences([data], self.config.seq_length), self.model.keep_prob: 1.0 } y_pred_cls = self.session.run(self.model.y_pred_cls, feed_dict=feed_dict) return self.categories[y_pred_cls[0]] if __name__ == '__main__': rnn_model = RnnModel() test_demo = ['三星ST550以全新的拍攝方式超越了以往任何一款數碼相機', '熱火vs騎士前瞻:皇帝回鄉二番戰 東部次席唾手可得新浪體育訊北京時間3月30日7:00'] for i in test_demo: print(rnn_model.predict(i))