如下是最基礎的RNN公式,固然也可使用LSTM(Long Short-Term Memory)或GRU(Gated Recurrent Unit)生成序列算法
$$ h_t=tanh(W_{hh}h_{t-1}+W_{xh}x_t)+b_h $$數據庫
$$ y_t=W_{hy}h_t+b_y $$json
# -*- coding: utf-8 -*- import numpy as np sentences = [] with open('../lyrics.txt', 'r', encoding='utf8') as fr: lines = fr.readlines() for line in lines: line = line.strip() count = 0 for c in line: if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'): count += 1 if count / len(line) < 0.1: sentences.append(line) print('共%d首歌' % len(sentences))
chars = {} for sentence in sentences: for c in sentence: chars[c] = chars.get(c, 0) + 1 chars = sorted(chars.items(), key=lambda x:x[1], reverse=True) chars = [char[0] for char in chars] vocab_size = len(chars) print('共%d個字' % vocab_size, chars[:20]) char2id = {c: i for i, c in enumerate(chars)} id2char = {i: c for i, c in enumerate(chars)}
hidden_size = 100 maxlen = 25 learning_rate = 0.1 X_data = [] Y_data = [] for sentence in sentences: for i in range(0, len(sentence) - maxlen - 1, maxlen): X_data.append([char2id[c] for c in sentence[i: i + maxlen]]) Y_data.append([char2id[c] for c in sentence[i + 1: i + maxlen + 1]]) print(len(X_data)) Wxh = np.random.randn(hidden_size, vocab_size) * 0.01 Whh = np.random.randn(hidden_size, hidden_size) * 0.01 Why = np.random.randn(vocab_size, hidden_size) * 0.01 bh = np.zeros((hidden_size, 1)) by = np.zeros((vocab_size, 1))
def lossFun(inputs, targets, hprev): xs, hs, ys, ps = {}, {}, {}, {} hs[-1] = np.copy(hprev) loss = 0 # forward pass for t in range(len(inputs)): xs[t] = np.zeros((vocab_size, 1)) xs[t][inputs[t]] = 1 hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t - 1]) + bh) ys[t] = np.dot(Why, hs[t]) + by ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t])) loss += -np.log(ps[t][targets[t], 0]) # backward pass dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why) dbh, dby = np.zeros_like(bh), np.zeros_like(by) dhnext = np.zeros_like(hs[0]) for t in reversed(range(len(inputs))): dy = np.copy(ps[t]) dy[targets[t]] -= 1 dWhy += np.dot(dy, hs[t].T) dby += dy dh = np.dot(Why.T, dy) + dhnext dhraw = (1 - hs[t] * hs[t]) * dh dbh += dhraw dWxh += np.dot(dhraw, xs[t].T) dWhh += np.dot(dhraw, hs[t-1].T) dhnext = np.dot(Whh.T, dhraw) for dparam in [dWxh, dWhh, dWhy, dbh, dby]: np.clip(dparam, -5, 5, out=dparam) return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs) - 1]
def sample(h, seed_ix, n): x = np.zeros((vocab_size, 1)) x[seed_ix] = 1 ixes = [] for t in range(n): h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh) y = np.dot(Why, h) + by p = np.exp(y) / np.sum(np.exp(y)) ix = np.random.choice(range(vocab_size), p=p.ravel()) ixes.append(ix) x = np.zeros((vocab_size, 1)) x[ix] = 1 return ixes
n = 0 mWxh, mWhh, mWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why) mbh, mby = np.zeros_like(bh), np.zeros_like(by) smooth_loss = -np.log(1.0 / vocab_size) * maxlen
while True: if n == 0 or n == len(X_data): hprev = np.zeros((hidden_size, 1)) n = 0 X = X_data[n] Y = Y_data[n] loss, dWxh, dWhh, dWhy, dbh, dby, hprev = lossFun(X, Y, hprev) smooth_loss = smooth_loss * 0.999 + loss * 0.001 for param, dparam, mem in zip([Wxh, Whh, Why, bh, by], [dWxh, dWhh, dWhy, dbh, dby], [mWxh, mWhh, mWhy, mbh, mby]): mem += dparam * dparam param += -learning_rate * dparam / np.sqrt(mem + 1e-8) if n % 100 == 0: print('iter %d, loss: %f' % (n, smooth_loss)) sample_ix = sample(hprev, X[0], 200) txt = ''.join(id2char[ix] for ix in sample_ix) print(txt) n += 1
顏悲 心已中雨著街眼淚不知 留在這時祈忘的本身同樣無常 你個人歡 當時是你能止學了綻開瞥袖 前朝來去勇氣 讓你是一雙睡過之後 由於你飛雪中的街音裏飛 此模糊的愛 只有誰要再多少時 管只是無度美醉不給主題襯 曾流盲雙腳一片城自己邊 來並肩常與滿是一點和缺 好愛得也還記得證着多夢 愛 作人來 這吃碎 咱們精神蹲着你的門 口不信心終究理想透完了誰幾度 我都在憑營力的光體 賣愛不說 愛你是個人好
# -*- coding: utf-8 -*- from keras.models import Sequential from keras.layers import Dense, LSTM, Embedding from keras.callbacks import LambdaCallback import numpy as np import random import sys import pickle
sentences = [] with open('../lyrics.txt', 'r', encoding='utf8') as fr: lines = fr.readlines() for line in lines: line = line.strip() count = 0 for c in line: if (c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z'): count += 1 if count / len(line) < 0.1: sentences.append(line) print('共%d首歌' % len(sentences)) chars = {} for sentence in sentences: for c in sentence: chars[c] = chars.get(c, 0) + 1 chars = sorted(chars.items(), key=lambda x:x[1], reverse=True) chars = [char[0] for char in chars] vocab_size = len(chars) print('共%d個字' % vocab_size, chars[:20]) char2id = {c: i for i, c in enumerate(chars)} id2char = {i: c for i, c in enumerate(chars)} with open('dictionary.pkl', 'wb') as fw: pickle.dump([char2id, id2char], fw)
maxlen = 10 step = 3 embed_size = 128 hidden_size = 128 vocab_size = len(chars) batch_size = 64 epochs = 20 X_data = [] Y_data = [] for sentence in sentences: for i in range(0, len(sentence) - maxlen, step): X_data.append([char2id[c] for c in sentence[i: i + maxlen]]) y = np.zeros(vocab_size, dtype=np.bool) y[char2id[sentence[i + maxlen]]] = 1 Y_data.append(y) X_data = np.array(X_data) Y_data = np.array(Y_data) print(X_data.shape, Y_data.shape) model = Sequential() model.add(Embedding(input_dim=vocab_size, output_dim=embed_size, input_length=maxlen)) model.add(LSTM(hidden_size, input_shape=(maxlen, embed_size))) model.add(Dense(vocab_size, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam')
def sample(preds, diversity=1.0): preds = np.asarray(preds).astype('float64') preds = np.log(preds + 1e-10) / diversity exp_preds = np.exp(preds) preds = exp_preds / np.sum(exp_preds) probas = np.random.multinomial(1, preds, 1) return np.argmax(probas)
def on_epoch_end(epoch, logs): print('-' * 30) print('Epoch', epoch) index = random.randint(0, len(sentences)) for diversity in [0.2, 0.5, 1.0]: print('----- diversity:', diversity) sentence = sentences[index][:maxlen] print('----- Generating with seed: ' + sentence) sys.stdout.write(sentence) for i in range(400): x_pred = np.zeros((1, maxlen)) for t, char in enumerate(sentence): x_pred[0, t] = char2id[char] preds = model.predict(x_pred, verbose=0)[0] next_index = sample(preds, diversity) next_char = id2char[next_index] sentence = sentence[1:] + next_char sys.stdout.write(next_char) sys.stdout.flush()
model.fit(X_data, Y_data, batch_size=batch_size, epochs=epochs, callbacks=[LambdaCallback(on_epoch_end=on_epoch_end)]) model.save('song_keras.h5')
# -*- coding: utf-8 -*- from keras.models import load_model import numpy as np import pickle import sys maxlen = 10 model = load_model('song_keras.h5') with open('dictionary.pkl', 'rb') as fr: [char2id, id2char] = pickle.load(fr) def sample(preds, diversity=1.0): preds = np.asarray(preds).astype('float64') preds = np.log(preds + 1e-10) / diversity exp_preds = np.exp(preds) preds = exp_preds / np.sum(exp_preds) probas = np.random.multinomial(1, preds, 1) return np.argmax(probas) sentence = '能不能給我一首歌的時間' sentence = sentence[:maxlen] diversity = 1.0 print('----- Generating with seed: ' + sentence) print('----- diversity:', diversity) sys.stdout.write(sentence) for i in range(400): x_pred = np.zeros((1, maxlen)) for t, char in enumerate(sentence): x_pred[0, t] = char2id[char] preds = model.predict(x_pred, verbose=0)[0] next_index = sample(preds, diversity) next_char = id2char[next_index] sentence = sentence[1:] + next_char sys.stdout.write(next_char) sys.stdout.flush()
能不能給我一首歌的時間 要去人還有古年 你表明我所的 只願爲你作下一個成熟 從那個歌聲中 你的別思量 寫你的畫面走過了西陌上雨張 小水沒忘了 我欲再感覺 我終於你開心哭過心事流出了我心痛 就看口提幽紋太多 獨自一直行 你也在想 我感到最此的第一次 只想要閒想 穿行多高樓的星雲 看見鞍上雲 青竹瓊樓又新葉 人潮春涌成度過 幸福嗚 風雪落入麗箏悽悽 萬頃枯枝回伸離袖弦 不幸以潮 到底必經認來我不變 都想你 這星辰 暮鼓 WA Lsevemusich hey Live 走進不在意 不肯天涯 如此溫柔 不夠支離 多巧認真和你還太平行 哎呀呀呀 呀呀呀呀呀呀呀啊嘿 餓很差去哪兒呀 那個人聰明? 王王之如下 下也難改徒有愛還能敢相離 撥開你的嘴角 相識的一見 到你的世界所世 才發現我也不會躲藏 讓我決定有人擔憂善良 像一我的世界心裏長着 夜晚需來又頭 與我專車徵 戰天幾天不懂配遊戲 也是本身應嗎 你給我來的狠也
# -*- coding: utf-8 -*- import tensorflow as tf import numpy as np import glob import json from collections import Counter from tqdm import tqdm from snownlp import SnowNLP
poets = [] paths = glob.glob('chinese-poetry/json/poet.*.json') for path in paths: data = open(path, 'r').read() data = json.loads(data) for item in data: content = ''.join(item['paragraphs']) if len(content) >= 24 and len(content) <= 32: content = SnowNLP(content) poets.append('[' + content.han + ']') poets.sort(key=lambda x: len(x)) print('共%d首詩' % len(poets), poets[0], poets[-1])
chars = [] for item in poets: chars += [c for c in item] print('共%d個字' % len(chars)) chars = sorted(Counter(chars).items(), key=lambda x:x[1], reverse=True) print('共%d個不一樣的字' % len(chars)) print(chars[:10]) chars = [c[0] for c in chars] char2id = {c: i + 1 for i, c in enumerate(chars)} id2char = {i + 1: c for i, c in enumerate(chars)}
batch_size = 64 X_data = [] Y_data = [] for b in range(len(poets) // batch_size): start = b * batch_size end = b * batch_size + batch_size batch = [[char2id[c] for c in poets[i]] for i in range(start, end)] maxlen = max(map(len, batch)) X_batch = np.full((batch_size, maxlen - 1), 0, np.int32) Y_batch = np.full((batch_size, maxlen - 1), 0, np.int32) for i in range(batch_size): X_batch[i, :len(batch[i]) - 1] = batch[i][:-1] Y_batch[i, :len(batch[i]) - 1] = batch[i][1:] X_data.append(X_batch) Y_data.append(Y_batch) print(len(X_data), len(Y_data))
hidden_size = 256 num_layer = 2 embedding_size = 256 X = tf.placeholder(tf.int32, [batch_size, None]) Y = tf.placeholder(tf.int32, [batch_size, None]) learning_rate = tf.Variable(0.0, trainable=False) cell = tf.nn.rnn_cell.MultiRNNCell( [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], state_is_tuple=True) initial_state = cell.zero_state(batch_size, tf.float32) embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0)) embedded = tf.nn.embedding_lookup(embeddings, X) # outputs: batch_size, max_time, hidden_size # last_states: 2 tuple(two LSTM), 2 tuple(c and h) # batch_size, hidden_size outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state) outputs = tf.reshape(outputs, [-1, hidden_size]) # batch_size * max_time, hidden_size logits = tf.layers.dense(outputs, units=len(char2id) + 1) # batch_size * max_time, len(char2id) + 1 logits = tf.reshape(logits, [batch_size, -1, len(char2id) + 1]) # batch_size, max_time, len(char2id) + 1 probs = tf.nn.softmax(logits) # batch_size, max_time, len(char2id) + 1 loss = tf.reduce_mean(tf.contrib.seq2seq.sequence_loss(logits, Y, tf.ones_like(Y, dtype=tf.float32))) params = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5) optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params))
sess = tf.Session() sess.run(tf.global_variables_initializer()) for epoch in range(50): sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch))) data_index = np.arange(len(X_data)) np.random.shuffle(data_index) X_data = [X_data[i] for i in data_index] Y_data = [Y_data[i] for i in data_index] losses = [] for i in tqdm(range(len(X_data))): ls_, _ = sess.run([loss, optimizer], feed_dict={X: X_data[i], Y: Y_data[i]}) losses.append(ls_) print('Epoch %d Loss %.5f' % (epoch, np.mean(losses)))
saver = tf.train.Saver() saver.save(sess, './poet_generation_tensorflow') import pickle with open('dictionary.pkl', 'wb') as fw: pickle.dump([char2id, id2char], fw)
# -*- coding: utf-8 -*- import tensorflow as tf import numpy as np import pickle with open('dictionary.pkl', 'rb') as fr: [char2id, id2char] = pickle.load(fr) batch_size = 1 hidden_size = 256 num_layer = 2 embedding_size = 256 X = tf.placeholder(tf.int32, [batch_size, None]) Y = tf.placeholder(tf.int32, [batch_size, None]) learning_rate = tf.Variable(0.0, trainable=False) cell = tf.nn.rnn_cell.MultiRNNCell( [tf.nn.rnn_cell.BasicLSTMCell(hidden_size, state_is_tuple=True) for i in range(num_layer)], state_is_tuple=True) initial_state = cell.zero_state(batch_size, tf.float32) embeddings = tf.Variable(tf.random_uniform([len(char2id) + 1, embedding_size], -1.0, 1.0)) embedded = tf.nn.embedding_lookup(embeddings, X) outputs, last_states = tf.nn.dynamic_rnn(cell, embedded, initial_state=initial_state) outputs = tf.reshape(outputs, [-1, hidden_size]) logits = tf.layers.dense(outputs, units=len(char2id) + 1) probs = tf.nn.softmax(logits) targets = tf.reshape(Y, [-1]) loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=targets)) params = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(loss, params), 5) optimizer = tf.train.AdamOptimizer(learning_rate).apply_gradients(zip(grads, params)) sess = tf.Session() sess.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess, tf.train.latest_checkpoint('./')) def generate(): states_ = sess.run(initial_state) gen = '' c = '[' while c != ']': gen += c x = np.zeros((batch_size, 1)) x[:, 0] = char2id[c] probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_}) probs_ = np.squeeze(probs_) pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_))) c = id2char[pos] return gen[1:] def generate_with_head(head): states_ = sess.run(initial_state) gen = '' c = '[' i = 0 while c != ']': gen += c x = np.zeros((batch_size, 1)) x[:, 0] = char2id[c] probs_, states_ = sess.run([probs, last_states], feed_dict={X: x, initial_state: states_}) probs_ = np.squeeze(probs_) pos = int(np.searchsorted(np.cumsum(probs_), np.random.rand() * np.sum(probs_))) if (c == '[' or c == '。' or c == ',') and i < len(head): c = head[i] i += 1 else: c = id2char[pos] return gen[1:] print(generate()) print(generate_with_head('深度學習'))
百計無意魄可無,知君又到兩家書。自知君子有天祿,天下名通赤子虛。 深山宜數月交馳,度世曾徒有客期。學子今來能入楚,習家不癭莫辭卑。