char RNN代碼來源於https://github.com/hzy46/Char-RNN-TensorFlowpython
本人在學習char RNN的過程當中,遇到了不少的問題,可是依然選擇一行代碼一行代碼的啃下來,而且註釋好,我在啃代碼的過程當中,就想要是有一位大神在我旁邊就行了,我在看代碼的過程當中,不懂那裏,就問那裏,但是現實中並無,全部問題都要本身解決,今日我終於把代碼所有弄懂了,也把代碼分享給下一位想要學習char RNN的人。開源才能進步,中國加油。覺有有用但願你們能夠點個贊,關注我,這將給我莫大的動力。若是我文中有錯誤的地方,歡迎指出,我也須要學習和進步。多一點包容,多一點努力。git
# -*- coding:utf-8 -*- import tensorflow as tf from read_utils import TextConverter, batch_generator from model import CharRNN import os import codecs FLAGS = tf.flags.FLAGS tf.flags.DEFINE_string('name', 'default', '模型名') tf.flags.DEFINE_integer('num_seqs', 32, '一個batch裏面的序列數量') # 32 tf.flags.DEFINE_integer('num_steps', 26, '序列的長度') # 26 tf.flags.DEFINE_integer('lstm_size', 128, 'LSTM隱層的大小') tf.flags.DEFINE_integer('num_layers', 2, 'LSTM的層數') tf.flags.DEFINE_boolean('use_embedding', False, '是否使用 embedding') tf.flags.DEFINE_integer('embedding_size', 128, 'embedding的大小') tf.flags.DEFINE_float('learning_rate', 0.001, '學習率') tf.flags.DEFINE_float('train_keep_prob', 0.5, '訓練期間的dropout比率') tf.flags.DEFINE_string('input_file', '', 'utf8編碼過的text文件') tf.flags.DEFINE_integer('max_steps', 10000, '一個step 是運行一個batch, max_steps固定了最大的運行步數') tf.flags.DEFINE_integer('save_every_n', 1000, '每隔1000步會將模型保存下來') tf.flags.DEFINE_integer('log_every_n', 10, '每隔10步會在屏幕上打出曰志') # 使用的字母(漢字)的最大個數。默認爲3500 。程序會自動挑選出使用最多的字,井將剩下的字歸爲一類,並標記爲<unk> tf.flags.DEFINE_integer('max_vocab', 10000, '最大字符數量') # python train.py --use_embedding --input_file data/poetry.txt --name poetry --learning_rate 0.005 --num_steps 26 --num_seqs 32 --max_steps 10000 # python train.py \ # --use_embedding \ # --input_file data/poetry.txt \ # --name poetry \ # --learning_rate 0.005 \ # --num_steps 26 \ # --num_seqs 32 \ # --max_steps 10000 def main(_): model_path = os.path.join('model', FLAGS.name) if os.path.exists(model_path) is False: os.makedirs(model_path) with codecs.open(FLAGS.input_file, encoding='utf-8') as f: # 打開訓練數據集poetry.txt text = f.read() converter = TextConverter(text, FLAGS.max_vocab) # 最大字符數量10000 converter.save_to_file(os.path.join(model_path, 'converter.pkl')) arr = converter.text_to_arr(text) g = batch_generator(arr, FLAGS.num_seqs, FLAGS.num_steps) # 句子數量、句子長度 print(converter.vocab_size) # 3501 model = CharRNN(converter.vocab_size, num_seqs=FLAGS.num_seqs, num_steps=FLAGS.num_steps, lstm_size=FLAGS.lstm_size, num_layers=FLAGS.num_layers, learning_rate=FLAGS.learning_rate, train_keep_prob=FLAGS.train_keep_prob, use_embedding=FLAGS.use_embedding, embedding_size=FLAGS.embedding_size) model.train(g, FLAGS.max_steps, model_path, FLAGS.save_every_n, FLAGS.log_every_n) if __name__ == '__main__': tf.app.run()
# coding: utf-8 import os import time import numpy as np import tensorflow as tf def pick_top_n(preds, vocab_size, top_n=5): p = np.squeeze(preds) # p[np.argsort(p)]將p從小到大排序 p[np.argsort(p)[:-top_n]] = 0 # 將除了top_n個預測值的位置都置爲0 p = p / np.sum(p) # 歸一化機率 # 以p的機率從vocab_size中隨機選取一個字符,p是列表,vocab_size也是列表,p表明vocab_size中每一個字的機率 c = np.random.choice(vocab_size, 1, p=p)[0] return c class CharRNN: def __init__(self, num_classes, num_seqs=32, num_steps=26, lstm_size=128, num_layers=2, learning_rate=0.001, grad_clip=5, sampling=False, train_keep_prob=0.5, use_embedding=False, embedding_size=128): if sampling is True: # 若是是測試 num_seqs, num_steps = 1, 1 else: num_seqs, num_steps = num_seqs, num_steps self.num_classes = num_classes # 一共分3501類,每一個字是一類,判斷下一個字出現的機率,是下一個類的機率,分類任務 self.num_seqs = num_seqs # 一個batch裏面句子的數量32 self.num_steps = num_steps # 句子的長度26 self.lstm_size = lstm_size # 隱藏層大小 (batch_size, state_size) self.num_layers = num_layers # LSTM層數量 self.learning_rate = learning_rate # 學習率 self.grad_clip = grad_clip self.train_keep_prob = train_keep_prob self.use_embedding = use_embedding self.embedding_size = embedding_size # embedding的大小128 tf.reset_default_graph() self.build_inputs() self.build_lstm() self.build_loss() self.build_optimizer() self.saver = tf.train.Saver() def build_inputs(self): with tf.name_scope('inputs'): # shape = (batch_size, num_steps) = (句子數量,句子長度)=(32, 26) self.inputs = tf.placeholder(tf.int32, shape=(self.num_seqs, self.num_steps), name='inputs') # 輸出shape=輸入shape,內容是self.inputs每一個字母對應的下一個字母(32, 26) self.targets = tf.placeholder(tf.int32, shape=(self.num_seqs, self.num_steps), name='targets') self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') # 對於漢字生成,使用embedding層會取得更好的效果。 # 英文字母沒有必要用embedding層 if self.use_embedding is False: self.lstm_inputs = tf.one_hot(self.inputs, self.num_classes) else: with tf.device("/cpu:0"): # 先定義一個embedding變量,embedding纔是咱們的訓練數據(字的總類別,每一個字的向量)=(3501, 128) embedding = tf.get_variable('embedding', [self.num_classes, self.embedding_size]) # 使用tf.nn.embedding lookup查找embedding,讓self.input從embedding中查數據 # 請注意embedding變量也是能夠訓練的,所以是經過訓練獲得embedding的具體數值。 # embedding.shape=[self.num_classes, self.embedding_size]=(3501, 128) # self.inputs.shape=(num_seqs, num_steps)=(句子數量,句子長度)=(32, 26) # self.lstm_inputs是直接輸入LSTM的數據。 # self.lstm_inputs.shape=(batch_size, time_step, input_size)=(num_seqs, num_steps, embedding_size)=(句子數量,句子長度,詞向量)=(32, 26, 128) self.lstm_inputs = tf.nn.embedding_lookup(embedding, self.inputs) def build_lstm(self): """定義多層N vs N LSTM模型""" # 建立單個cell函數 def get_a_cell(lstm_size, keep_prob): lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size) drop = tf.nn.rnn_cell.DropoutWrapper(lstm, output_keep_prob=keep_prob) return drop # 將LSTMCell進行堆疊 with tf.name_scope('lstm'): cell = tf.nn.rnn_cell.MultiRNNCell( [get_a_cell(self.lstm_size, self.keep_prob) for _ in range(self.num_layers)]) # 隱藏層的初始化 shape=batch_size,計入筆記中,你的博客漏掉了 self.initial_state = cell.zero_state(self.num_seqs, tf.float32) # (batch_size, state_size) print("self.initial_state.shape", self.initial_state) # (LSTMStateTuple( # c= <tf.Tensor 'lstm/MultiRNNCellZeroState/DropoutWrapperZeroState/BasicLSTMCellZeroState/zeros:0' shape = (32, 128) dtype = float32 >, # h = < tf.Tensor 'lstm/MultiRNNCellZeroState/DropoutWrapperZeroState/BasicLSTMCellZeroState/zeros_1:0' shape = (32, 128) dtype = float32 >), # LSTMStateTuple( # c= < tf.Tensor 'lstm/MultiRNNCellZeroState/DropoutWrapperZeroState_1/BasicLSTMCellZeroState/zeros:0' shape = (32, 128) dtype = float32 >, # h = < tf.Tensor 'lstm/MultiRNNCellZeroState/DropoutWrapperZeroState_1/BasicLSTMCellZeroState/zeros_1:0' shape = (32, 128) dtype = float32 >)) # 將咱們建立的LSTMCell經過dynamic_rnn對cell展開時間維度,否則只是在時間上走"一步" # inputs_shape = (batch_size, time_steps, input_size) # initial_state_shape = (batch_size, cell.state_size) # output_shape=(batch_size, time_steps, cell.output_size)=(32, 26, 128) time_steps步裏全部輸出,是個列表 self.lstm_outputs, self.final_state = tf.nn.dynamic_rnn(cell, self.lstm_inputs, initial_state=self.initial_state) # 經過lstm_outputs獲得機率 seq_output = tf.concat(self.lstm_outputs, 1) # 合併全部time_step獲得輸出,lstm_outputs只有一個,所以仍是原shape=32, 26, 128) x = tf.reshape(seq_output, [-1, self.lstm_size]) # (batch_size*time_steps, cell.output_size)=(32*26, 128) # softmax層 with tf.variable_scope('softmax'): softmax_w = tf.Variable(tf.truncated_normal([self.lstm_size, self.num_classes], stddev=0.1)) softmax_b = tf.Variable(tf.zeros(self.num_classes)) self.logits = tf.matmul(x, softmax_w) + softmax_b # 預測值 self.proba_prediction = tf.nn.softmax(self.logits, name='predictions') # 變成下一個詞出現的機率 def build_loss(self): with tf.name_scope('loss'): y_one_hot = tf.one_hot(self.targets, self.num_classes) y_reshaped = tf.reshape(y_one_hot, self.logits.get_shape()) loss = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=y_reshaped) self.loss = tf.reduce_mean(loss) def build_optimizer(self): # 使用截斷梯度降低 clipping gradients tvars = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, tvars), self.grad_clip) train_op = tf.train.AdamOptimizer(self.learning_rate) self.optimizer = train_op.apply_gradients(zip(grads, tvars)) def train(self, batch_generator, max_steps, save_path, save_every_n, log_every_n): self.session = tf.Session() with self.session as sess: sess.run(tf.global_variables_initializer()) # Train network step = 0 new_state = sess.run(self.initial_state) for x, y in batch_generator: step += 1 start = time.time() feed = {self.inputs: x, self.targets: y, self.keep_prob: self.train_keep_prob, self.initial_state: new_state} batch_loss, new_state, _ = sess.run([self.loss, self.final_state, self.optimizer], feed_dict=feed) end = time.time() # control the print lines if step % log_every_n == 0: print('step: {}/{}... '.format(step, max_steps), 'loss: {:.4f}... '.format(batch_loss), '{:.4f} sec/batch'.format((end - start))) if step % save_every_n == 0: self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step) if step >= max_steps: break self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step) def sample(self, n_samples, prime, vocab_size): """ :param n_samples: 生成多少詞 :param prime: 開始字符串 :param vocab_size: 一共有多少字符 """ samples = [c for c in prime] # [6, 14]=[風, 水] sess = self.session new_state = sess.run(self.initial_state) preds = np.ones((vocab_size,)) # for prime=[] for c in prime: print("輸入的單詞是:", c) x = np.zeros((1, 1)) # 輸入單個字符 x[0, 0] = c feed = {self.inputs: x, self.keep_prob: 1., self.initial_state: new_state} # preds是機率, preds, new_state = sess.run([self.proba_prediction, self.final_state], feed_dict=feed) c = pick_top_n(preds, vocab_size) print("預測出的詞是", c) # 18-->中 samples.append(c) # 添加字符到samples中 # 不斷生成字符,直到達到指定數目 for i in range(n_samples): # 30 x = np.zeros((1, 1)) x[0, 0] = c feed = {self.inputs: x, self.keep_prob: 1., self.initial_state: new_state} preds, new_state = sess.run([self.proba_prediction, self.final_state], feed_dict=feed) c = pick_top_n(preds, vocab_size) # c 爲詞索引 samples.append(c) return np.array(samples) def load(self, checkpoint): self.session = tf.Session() self.saver.restore(self.session, checkpoint) print('Restored from: {}'.format(checkpoint))
# Author:凌逆戰 # -*- coding:utf-8 -*- import tensorflow as tf from read_utils import TextConverter from model import CharRNN import os FLAGS = tf.flags.FLAGS tf.flags.DEFINE_integer('lstm_size', 128, 'size of hidden state of lstm') tf.flags.DEFINE_integer('num_layers', 2, 'number of lstm layers') tf.flags.DEFINE_boolean('use_embedding', False, 'whether to use embedding') tf.flags.DEFINE_integer('embedding_size', 128, 'size of embedding') tf.flags.DEFINE_string('converter_path', '', 'model/name/converter.pkl') tf.flags.DEFINE_string('checkpoint_path', '', 'checkpoint path') tf.flags.DEFINE_string('start_string', '', 'use this string to start generating') tf.flags.DEFINE_integer('max_length', 30, 'max length to generate') # --use_embedding --start_string "風水" --converter_path model/poetry/converter.pkl --checkpoint_path model/poetry/ --max_length 30 def main(_): FLAGS.start_string = FLAGS.start_string converter = TextConverter(filename=FLAGS.converter_path) if os.path.isdir(FLAGS.checkpoint_path): FLAGS.checkpoint_path = tf.train.latest_checkpoint(FLAGS.checkpoint_path) model = CharRNN(converter.vocab_size, sampling=True, lstm_size=FLAGS.lstm_size, num_layers=FLAGS.num_layers, use_embedding=FLAGS.use_embedding, embedding_size=FLAGS.embedding_size) model.load(FLAGS.checkpoint_path) start = converter.text_to_arr(FLAGS.start_string) arr = model.sample(FLAGS.max_length, start, converter.vocab_size) print("arr裝的是每一個單詞的位置", arr) print(converter.arr_to_text(arr)) if __name__ == '__main__': tf.app.run()
# Author:凌逆戰 # -*- coding:utf-8 -*- import numpy as np import copy import pickle def batch_generator(arr, n_seqs, n_steps): """ :param arr: 訓練集數據 :param n_seqs:一個batch的句子數量,32 :param n_steps: 句子長度,26 :return: x, y 的生成器 """ arr = copy.copy(arr) # 把數據備份一份 batch_size = n_seqs * n_steps # 一個batch的句子數量*句子長度=一個batch的總字數 n_batches = int(len(arr) / batch_size) # 取到了batch的整數 arr = arr[:batch_size * n_batches] # [:n_seqs * n_steps * n_batches] arr = arr.reshape((n_seqs, -1)) # # [n_seqs: n_steps * n_batches] while True: np.random.shuffle(arr) # 每次循環是一次batch for n in range(0, arr.shape[1], n_steps): x = arr[:, n:n + n_steps] # 一個句子,句子的每一個詞 y = np.zeros_like(x) # y[:, -1]全部行的最後一列=x[:, 0] 全部行的第0列 y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0] yield x, y class TextConverter(object): def __init__(self, text=None, max_vocab=5000, filename=None): if filename is not None: with open(filename, 'rb') as f: self.vocab = pickle.load(f) else: vocab = set(text) # 變成集和,去重 print("數據集總共用到了多少詞", len(vocab)) # 5387 # max_vocab_process # 計算每一個詞出現的次數 vocab_count = {} for word in vocab: vocab_count[word] = 0 for word in text: vocab_count[word] += 1 vocab_count_list = [] # [(詞,詞數量), (詞,詞數量)...] for word in vocab_count: # 字典循環,獲得的是鍵 vocab_count_list.append((word, vocab_count[word])) vocab_count_list.sort(key=lambda x: x[1], reverse=True) # 按照詞數量倒序 大-->小 if len(vocab_count_list) > max_vocab: vocab_count_list = vocab_count_list[:max_vocab] vocab = [x[0] for x in vocab_count_list] self.vocab = vocab # 裝載全部詞的列表 self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)} self.int_to_word_table = dict(enumerate(self.vocab)) # {(索引,單詞),(索引,單詞)...} for item in list(self.int_to_word_table.items())[:50]: # 遍歷字典中的元素 print(item) # (0, ',') # (1, '。') # (2, '\n') # (3, '不') # (4, '人') # (5, '山') # (6, '風') # (7, '日') # (8, '雲') # (9, '無') # (10, '何') # (11, '一') # (12, '春') # (13, '月') # (14, '水') # (15, '花') @property def vocab_size(self): return len(self.vocab) + 1 def word_to_int(self, word): if word in self.word_to_int_table: return self.word_to_int_table[word] # 返回這是第幾個詞 else: return len(self.vocab) def int_to_word(self, index): if index == len(self.vocab): return '<unk>' elif index < len(self.vocab): return self.int_to_word_table[index] # 返回第幾個詞所對應的詞 else: raise Exception('Unknown index!') def text_to_arr(self, text): arr = [] for word in text: arr.append(self.word_to_int(word)) # text中的詞,出如今vocab中的索引 return np.array(arr) def arr_to_text(self, arr): words = [] for index in arr: words.append(self.int_to_word(index)) return "".join(words) def save_to_file(self, filename): with open(filename, 'wb') as f: pickle.dump(self.vocab, f)