使用RNN實現歌詞和古詩的自動生成git
RNN多用於處理序列數據,經過學習數據上下文之間的關係,能夠在給定若干個連續數據點的基礎上,預測下一個可能的數據點github
如下是最基礎的RNN公式,固然也可使用LSTM(Long Short-Term Memory)或GRU(Gated Recurrent Unit)生成序列算法
$$ h_t=tanh(W_{hh}h_{t-1}+W_{xh}x_t)+b_h $$數據庫
$$ y_t=W_{hy}h_t+b_y $$json
一些序列數據,這裏咱們主要使用文本,例如歌詞和古詩等緩存
先來個最手動的版本,用numpy實現歌詞生成。歌詞爬取自網絡,主要參考瞭如下代碼,https://gist.github.com/karpathy/d4dee566867f8291f086網絡
加載庫和歌詞,去掉英文佔比較多的歌詞(可能爲英文歌),還剩36616首歌app
# -*- coding: utf-8 -*- import numpy as np sentences = [] with open('../lyrics.txt', 'r', encoding='utf8') as fr: lines = fr.readlines() for line in lines: line = line.strip() count = 0 for c in line: if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'): count += 1 if count / len(line) < 0.1: sentences.append(line) print('共%d首歌' % len(sentences))
整理字和id之間的映射,共10131個字dom
chars = {} for sentence in sentences: for c in sentence: chars[c] = chars.get(c, 0) + 1 chars = sorted(chars.items(), key=lambda x:x[1], reverse=True) chars = [char[0] for char in chars] vocab_size = len(chars) print('共%d個字' % vocab_size, chars[:20]) char2id = {c: i for i, c in enumerate(chars)} id2char = {i: c for i, c in enumerate(chars)}
定義一些訓練參數和模型參數,整理訓練數據函數
hidden_size = 100 maxlen = 25 learning_rate = 0.1 X_data = [] Y_data = [] for sentence in sentences: for i in range(0, len(sentence) - maxlen - 1, maxlen): X_data.append([char2id[c] for c in sentence[i: i + maxlen]]) Y_data.append([char2id[c] for c in sentence[i + 1: i + maxlen + 1]]) print(len(X_data)) Wxh = np.random.randn(hidden_size, vocab_size) * 0.01 Whh = np.random.randn(hidden_size, hidden_size) * 0.01 Why = np.random.randn(vocab_size, hidden_size) * 0.01 bh = np.zeros((hidden_size, 1)) by = np.zeros((vocab_size, 1))
損失函數
def lossFun(inputs, targets, hprev): xs, hs, ys, ps = {}, {}, {}, {} hs[-1] = np.copy(hprev) loss = 0 # forward pass for t in range(len(inputs)): xs[t] = np.zeros((vocab_size, 1)) xs[t][inputs[t]] = 1 hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t - 1]) + bh) ys[t] = np.dot(Why, hs[t]) + by ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t])) loss += -np.log(ps[t][targets[t], 0]) # backward pass dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why) dbh, dby = np.zeros_like(bh), np.zeros_like(by) dhnext = np.zeros_like(hs[0]) for t in reversed(range(len(inputs))): dy = np.copy(ps[t]) dy[targets[t]] -= 1 dWhy += np.dot(dy, hs[t].T) dby += dy dh = np.dot(Why.T, dy) + dhnext dhraw = (1 - hs[t] * hs[t]) * dh dbh += dhraw dWxh += np.dot(dhraw, xs[t].T) dWhh += np.dot(dhraw, hs[t-1].T) dhnext = np.dot(Whh.T, dhraw) for dparam in [dWxh, dWhh, dWhy, dbh, dby]: np.clip(dparam, -5, 5, out=dparam) return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs) - 1]
樣本生成函數,每通過若干輪迭代就調用一次
def sample(h, seed_ix, n): x = np.zeros((vocab_size, 1)) x[seed_ix] = 1 ixes = [] for t in range(n): h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh) y = np.dot(Why, h) + by p = np.exp(y) / np.sum(np.exp(y)) ix = np.random.choice(range(vocab_size), p=p.ravel()) ixes.append(ix) x = np.zeros((vocab_size, 1)) x[ix] = 1 return ixes
初始化訓練變量,這裏使用Adagrad
優化算法,因此須要一些額外的緩存變量
n = 0 mWxh, mWhh, mWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why) mbh, mby = np.zeros_like(bh), np.zeros_like(by) smooth_loss = -np.log(1.0 / vocab_size) * maxlen
訓練模型,會一直循環進行
while True: if n == 0 or n == len(X_data): hprev = np.zeros((hidden_size, 1)) n = 0 X = X_data[n] Y = Y_data[n] loss, dWxh, dWhh, dWhy, dbh, dby, hprev = lossFun(X, Y, hprev) smooth_loss = smooth_loss * 0.999 + loss * 0.001 for param, dparam, mem in zip([Wxh, Whh, Why, bh, by], [dWxh, dWhh, dWhy, dbh, dby], [mWxh, mWhh, mWhy, mbh, mby]): mem += dparam * dparam param += -learning_rate * dparam / np.sqrt(mem + 1e-8) if n % 100 == 0: print('iter %d, loss: %f' % (n, smooth_loss)) sample_ix = sample(hprev, X[0], 200) txt = ''.join(id2char[ix] for ix in sample_ix) print(txt) n += 1
通過54W次迭代後,生成了這麼一段話,雖然並不通順,但彷佛確實學習到了一些詞語和句法
顏悲 心已中雨著街眼淚不知 留在這時祈忘的本身同樣無常 你個人歡 當時是你能止學了綻開瞥袖 前朝來去勇氣 讓你是一雙睡過之後 由於你飛雪中的街音裏飛 此模糊的愛 只有誰要再多少時 管只是無度美醉不給主題襯 曾流盲雙腳一片城自己邊 來並肩常與滿是一點和缺 好愛得也還記得證着多夢 愛 作人來 這吃碎 咱們精神蹲着你的門 口不信心終究理想透完了誰幾度 我都在憑營力的光體 賣愛不說 愛你是個人好
Keras官方提供了使用LSTM生成文本的示例
https://github.com/fchollet/keras/blob/master/examples/lstm_text_generation.py
簡單地改一下,數據仍是使用以前的歌詞
加載庫
# -*- coding: utf-8 -*- from keras.models import Sequential from keras.layers import Dense, LSTM, Embedding from keras.callbacks import LambdaCallback import numpy as np import random import sys import pickle
加載數據,整理字和id之間的映射
sentences = [] with open('../lyrics.txt', 'r', encoding='utf8') as fr: lines = fr.readlines() for line in lines: line = line.strip() count = 0 for c in line: if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'): count += 1 if count / len(line) < 0.1: sentences.append(line) print('共%d首歌' % len(sentences)) chars = {} for sentence in sentences: for c in sentence: chars[c] = chars.get(c, 0) + 1 chars = sorted(chars.items(), key=lambda x:x[1], reverse=True) chars = [char[0] for char in chars] vocab_size = len(chars) print('共%d個字' % vocab_size, chars[:20]) char2id = {c: i for i, c in enumerate(chars)} id2char = {i: c for i, c in enumerate(chars)} with open('dictionary.pkl', 'wb') as fw: pickle.dump([char2id, id2char], fw)
整理訓練數據,定義模型並編譯
maxlen = 10 step = 3 embed_size = 128 hidden_size = 128 vocab_size = len(chars) batch_size = 64 epochs = 20 X_data = [] Y_data = [] for sentence in sentences: for i in range(0, len(sentence) - maxlen, step): X_data.append([char2id[c] for c in sentence[i: i + maxlen]]) y = np.zeros(vocab_size, dtype=np.bool) y[char2id[sentence[i + maxlen]]] = 1 Y_data.append(y) X_data = np.array(X_data) Y_data = np.array(Y_data) print(X_data.shape, Y_data.shape) model = Sequential() model.add(Embedding(input_dim=vocab_size, output_dim=embed_size, input_length=maxlen)) model.add(LSTM(hidden_size, input_shape=(maxlen, embed_size))) model.add(Dense(vocab_size, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam')
定義序列樣本生成函數
def sample(preds, diversity=1.0): preds = np.asarray(preds).astype('float64') preds = np.log(preds + 1e-10) / diversity exp_preds = np.exp(preds) preds = exp_preds / np.sum(exp_preds) probas = np.random.multinomial(1, preds, 1) return np.argmax(probas)
定義每輪訓練結束後的回調函數
def on_epoch_end(epoch, logs): print('-' * 30) print('Epoch', epoch) index = random.randint(0, len(sentences)) for diversity in [0.2, 0.5, 1.0]: print('----- diversity:', diversity) sentence = sentences[index][:maxlen] print('----- Generating with seed: ' + sentence) sys.stdout.write(sentence) for i in range(400): x_pred = np.zeros((1, maxlen)) for t, char in enumerate(sentence): x_pred[0, t] = char2id[char] preds = model.predict(x_pred, verbose=0)[0] next_index = sample(preds, diversity) next_char = id2char[next_index] sentence = sentence[1:] + next_char sys.stdout.write(next_char) sys.stdout.flush()
訓練模型並保存
model.fit(X_data, Y_data, batch_size=batch_size, epochs=epochs, callbacks=[LambdaCallback(on_epoch_end=on_epoch_end)]) model.save('song_keras.h5')
使用如下代碼調用模型生成歌詞,需提供一句起始歌詞
# -*- coding: utf-8 -*- from keras.models import load_model import numpy as np import pickle import sys maxlen = 10 model = load_model('song_keras.h5') with open('dictionary.pkl', 'rb') as fr: [char2id, id2char] = pickle.load(fr) def sample(preds, diversity=1.0): preds = np.asarray(preds).astype('float64') preds = np.log(preds + 1e-10) / diversity exp_preds = np.exp(preds) preds = exp_preds / np.sum(exp_preds) probas = np.random.multinomial(1, preds, 1) return np.argmax(probas) sentence = '能不能給我一首歌的時間' sentence = sentence[:maxlen] diversity = 1.0 print('----- Generating with seed: ' + sentence) print('----- diversity:', diversity) sys.stdout.write(sentence) for i in range(400): x_pred = np.zeros((1, maxlen)) for t, char in enumerate(sentence): x_pred[0, t] = char2id[char] preds = model.predict(x_pred, verbose=0)[0] next_index = sample(preds, diversity) next_char = id2char[next_index] sentence = sentence[1:] + next_char sys.stdout.write(next_char) sys.stdout.flush()
生成結果以下,比以前的結果彷佛好一些,有意義的詞語和短句更多
能不能給我一首歌的時間 要去人還有古年 你表明我所的 只願爲你作下一個成熟 從那個歌聲中 你的別思量 寫你的畫面走過了西陌上雨張 小水沒忘了 我欲再感覺 我終於你開心哭過心事流出了我心痛 就看口提幽紋太多 獨自一直行 你也在想 我感到最此的第一次 只想要閒想 穿行多高樓的星雲 看見鞍上雲 青竹瓊樓又新葉 人潮春涌成度過 幸福嗚 風雪落入麗箏悽悽 萬頃枯枝回伸離袖弦 不幸以潮 到底必經認來我不變 都想你 這星辰 暮鼓 WA Lsevemusich hey Live 走進不在意 不肯天涯 如此溫柔 不夠支離 多巧認真和你還太平行 哎呀呀呀 呀呀呀呀呀呀呀啊嘿 餓很差去哪兒呀 那個人聰明? 王王之如下 下也難改徒有愛還能敢相離 撥開你的嘴角 相識的一見 到你的世界所世 才發現我也不會躲藏 讓我決定有人擔憂善良 像一我的世界心裏長着 夜晚需來又頭 與我專車徵 戰天幾天不懂配遊戲 也是本身應嗎 你給我來的狠也
換一下工具和數據,使用TensorFlow實現古詩生成,使用如下數據,https://github.com/chinese-poetry/chinese-poetry
加載庫
# -*- coding: utf-8 -*- import tensorflow as tf import numpy as np import glob import json from collections import Counter from tqdm import tqdm from snownlp import SnowNLP
加載數據,共105336首詩
poets = [] paths = glob.glob('chinese-poetry/json/poet.*.json') for path in paths: data = open(path, 'r').read() data = json.loads(data) for item in data: content = ''.join(item['paragraphs']) if len(content) >= 24 and len(content) <= 32: content = SnowNLP(content) poets.append('[' + content.han + ']') poets.sort(key=lambda x: len(x)) print('共%d首詩' % len(poets), poets[0], poets[-1])
整理字和id之間的映射,共8072個不一樣的字
chars = [] for item in poets: chars += [c for c in item] print('共%d個字' % len(chars)) chars = sorted(Counter(chars).items(), key=lambda x:x[1], reverse=True) print('共%d個不一樣的字' % len(chars)) print(chars[:10]) chars = [c[0] for c in chars] char2id = {c: i + 1 for i, c in enumerate(chars)} id2char = {i + 1: c for i, c in enumerate(chars)}
整理訓練數據
batch_size = 64 X_data = [] Y_data = [] for b in range(len(poets) // batch_size): start = b * batch_size end = b * batch_size + batch_size batch = [[char2id[c] for c in poets[i]] for i in range(start, end)] maxlen = max(map(len, batch)) X_batch = np.full((batch_size, maxlen - 1), 0, np.int32) Y_batch = np.full((batch_size, maxlen - 1), 0, np.int32) for i in range(batch_size): X_batch[i, :len(batch[i]) - 1] = batch[i][:-1] Y_batch[i, :len(batch[i]) - 1] = batch[i][1:] X_data.append(X_batch) Y_data.append(Y_batch) print(len(X_data), len(Y_data))
定義模型結構和優化器
hidden_size = 256 num_layer = 2 embedding_size = 256 X = tf.placeholder(tf.int32, [batch_size, None]) Y = tf.placeholder(tf.int32, [batch_size, None]) learning_rate = tf.Variable(0.0, trainable=False) cell = tf.nn.rnn_cell.MultiRNNCell( [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], state_is_tuple=True) initial_state = cell.zero_state(batch_size, tf.float32) embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0)) embedded = tf.nn.embedding_lookup(embeddings, X) # outputs: batch_size, max_time, hidden_size # last_states: 2 tuple(two LSTM), 2 tuple(c and h) # batch_size, hidden_size outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state) outputs = tf.reshape(outputs, [-1, hidden_size]) # batch_size * max_time, hidden_size logits = tf.layers.dense(outputs, units=len(char2id) + 1) # batch_size * max_time, len(char2id) + 1 logits = tf.reshape(logits, [batch_size, -1, len(char2id) + 1]) # batch_size, max_time, len(char2id) + 1 probs = tf.nn.softmax(logits) # batch_size, max_time, len(char2id) + 1 loss = tf.reduce_mean(tf.contrib.seq2seq.sequence_loss(logits, Y, tf.ones_like(Y, dtype=tf.float32))) params = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5) optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))
訓練模型,共訓練50輪
sess = tf.Session() sess.run(tf.global_variables_initializer()) for epoch in range(50): sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch))) data_index = np.arange(len(X_data)) np.random.shuffle(data_index) X_data = [X_data[i] for i in data_index] Y_data = [Y_data[i] for i in data_index] losses = [] for i in tqdm(range(len(X_data))): ls_, _ = sess.run([loss, optimizer], feed_dict={X: X_data[i], Y: Y_data[i]}) losses.append(ls_) print('Epoch %d Loss %.5f' % (epoch, np.mean(losses)))
保存模型,以便在單機上使用
saver = tf.train.Saver() saver.save(sess, './poet_generation_tensorflow') import pickle with open('dictionary.pkl', 'wb') as fw: pickle.dump([char2id, id2char], fw)
在單機上使用模型生成古詩,可隨機生成或生成藏頭詩
# -*- coding: utf-8 -*- import tensorflow as tf import numpy as np import pickle with open('dictionary.pkl', 'rb') as fr: [char2id, id2char] = pickle.load(fr) batch_size = 1 hidden_size = 256 num_layer = 2 embedding_size = 256 X = tf.placeholder(tf.int32, [batch_size, None]) Y = tf.placeholder(tf.int32, [batch_size, None]) learning_rate = tf.Variable(0.0, trainable=False) cell = tf.nn.rnn_cell.MultiRNNCell( [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], state_is_tuple=True) initial_state = cell.zero_state(batch_size, tf.float32) embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0)) embedded = tf.nn.embedding_lookup(embeddings, X) outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state) outputs = tf.reshape(outputs, [-1, hidden_size]) logits = tf.layers.dense(outputs, units=len(char2id) + 1) probs = tf.nn.softmax(logits) targets = tf.reshape(Y, [-1]) loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=targets)) params = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5) optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params)) sess = tf.Session() sess.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess, tf.train.latest_checkpoint('./')) def generate(): states_ = sess.run(initial_state) gen = '' c = '[' while c != ']': gen += c x = np.zeros((batch_size, 1)) x[:, 0] = char2id[c] probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_}) probs_ = np.squeeze(probs_) pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_))) c = id2char[pos] return gen[1:] def generate_with_head(head): states_ = sess.run(initial_state) gen = '' c = '[' i = 0 while c != ']': gen += c x = np.zeros((batch_size, 1)) x[:, 0] = char2id[c] probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_}) probs_ = np.squeeze(probs_) pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_))) if (c == '[' or c == '。' or c == ',') and i < len(head): c = head[i] i += 1 else: c = id2char[pos] return gen[1:] print(generate()) print(generate_with_head('深度學習'))
生成結果以下,字數和標點符號都對上了,內容也像那麼回事,反正也看不太懂
百計無意魄可無,知君又到兩家書。自知君子有天祿,天下名通赤子虛。 深山宜數月交馳,度世曾徒有客期。學子今來能入楚,習家不癭莫辭卑。