序列標註(sequence labelling),輸入序列每一幀預測一個類別。OCR(Optical Character Recognition 光學字符識別)。數組
MIT口語系統研究組Rob Kassel收集,斯坦福大學人工智能實驗室Ben Taskar預處理OCR數據集(http://ai.stanford.edu/~btaskar/ocr/ ),包含大量單獨手寫小寫字母,每一個樣本對應16X8像素二值圖像。字線組合序列,序列對應單詞。6800個,長度不超過14字母的單詞。gzip壓縮,內容用Tab分隔文本文件。Python csv模塊直接讀取。文件每行一個歸一化字母屬性,ID號、標籤、像素值、下一字母ID號等。微信
下一字母ID值排序,按照正確順序讀取每一個單詞字母。收集字母,直到下一個ID對應字段未被設置爲止。讀取新序列。讀取完目標字母及數據像素,用零圖像填充序列對象,能歸入兩個較大目標字母全部像素數據NumPy數組。網絡
時間步之間共享softmax層。數據和目標數組包含序列,每一個目標字母對應一個圖像幀。RNN擴展,每一個字母輸出添加softmax分類器。分類器對每幀數據而非整個序列評估預測結果。計算序列長度。一個softmax層添加到全部幀:或者爲全部幀添加幾個不一樣分類器,或者令全部幀共享同一個分類器。共享分類器,權值在訓練中被調整次數更多,訓練單詞每一個字母。一個全鏈接層權值矩陣維數batch_size*in_size*out_size。現須要在兩個輸入維度batch_size、sequence_steps更新權值矩陣。令輸入(RNN輸出活性值)扁平爲形狀batch_size*sequence_steps*in_size。權值矩陣變成較大的批數據。結果反扁平化(unflatten)。架構
代價函數,序列每一幀有預測目標對,在相應維度平均。依據張量長度(序列最大長度)歸一化的tf.reduce_mean沒法使用。須要按照實際序列長度歸一化,手工調用tf.reduce_sum和除法運算均值。app
損失函數,tf.argmax針對軸2非軸1,各幀填充,依據序列實際長度計算均值。tf.reduce_mean對批數據全部單詞取均值。dom
TensorFlow自動導數計算,可以使用序列分類相同優化運算,只須要代入新代價函數。對全部RNN梯度裁剪,防止訓練發散,避免負面影響。函數
訓練模型,get_sataset下載手寫體圖像,預處理,小寫字母獨熱編碼向量。隨機打亂數據順序,分偏劃分訓練集、測試集。性能
單詞相鄰字母存在依賴關係(或互信息),RNN保存同一單詞所有輸入信息到隱含活性值。前幾個字母分類,網絡無大量輸入推斷額外信息,雙向RNN(bidirectional RNN)克服缺陷。
兩個RNN觀測輸入序列,一個按照一般順序從左端讀取單詞,另外一個按照相反順序從右端讀取單詞。每一個時間步獲得兩個輸出活性值。送入共享softmax層前,拼接。分類器從每一個字母獲取完整單詞信息。tf.modle.rnn.bidirectional_rnn已實現。測試
實現雙向RNN。劃分預測屬性到兩個函數,只關注較少內容。_shared_softmax函數,傳入函數張量data推斷輸入尺寸。複用其餘架構函數,相同扁平化技巧在全部時間步共享同一個softmax層。rnn.dynamic_rnn建立兩個RNN。
序列反轉,比實現新反向傳遞RNN運算容易。tf.reverse_sequence函數反轉幀數據中sequence_lengths幀。數據流圖節點有名稱。scope參數是rnn_dynamic_cell變量scope名稱,默認值RNN。兩個參數不一樣RNN,須要不一樣域。
反轉序列送入後向RNN,網絡輸出反轉,和前向輸出對齊。沿RNN神經元輸出維度拼接兩個張量,返回。雙向RNN模型性能更優。優化
import gzip import csv import numpy as np from helpers import download class OcrDataset: URL = 'http://ai.stanford.edu/~btaskar/ocr/letter.data.gz' def __init__(self, cache_dir): path = download(type(self).URL, cache_dir) lines = self._read(path) data, target = self._parse(lines) self.data, self.target = self._pad(data, target) @staticmethod def _read(filepath): with gzip.open(filepath, 'rt') as file_: reader = csv.reader(file_, delimiter='\t') lines = list(reader) return lines @staticmethod def _parse(lines): lines = sorted(lines, key=lambda x: int(x[0])) data, target = [], [] next_ = None for line in lines: if not next_: data.append([]) target.append([]) else: assert next_ == int(line[0]) next_ = int(line[2]) if int(line[2]) > -1 else None pixels = np.array([int(x) for x in line[6:134]]) pixels = pixels.reshape((16, 8)) data[-1].append(pixels) target[-1].append(line[1]) return data, target @staticmethod def _pad(data, target): max_length = max(len(x) for x in target) padding = np.zeros((16, 8)) data = [x + ([padding] * (max_length - len(x))) for x in data] target = [x + ([''] * (max_length - len(x))) for x in target] return np.array(data), np.array(target) import tensorflow as tf from helpers import lazy_property class SequenceLabellingModel: def __init__(self, data, target, params): self.data = data self.target = target self.params = params self.prediction self.cost self.error self.optimize @lazy_property def length(self): used = tf.sign(tf.reduce_max(tf.abs(self.data), reduction_indices=2)) length = tf.reduce_sum(used, reduction_indices=1) length = tf.cast(length, tf.int32) return length @lazy_property def prediction(self): output, _ = tf.nn.dynamic_rnn( tf.nn.rnn_cell.GRUCell(self.params.rnn_hidden), self.data, dtype=tf.float32, sequence_length=self.length, ) # Softmax layer. max_length = int(self.target.get_shape()[1]) num_classes = int(self.target.get_shape()[2]) weight = tf.Variable(tf.truncated_normal( [self.params.rnn_hidden, num_classes], stddev=0.01)) bias = tf.Variable(tf.constant(0.1, shape=[num_classes])) # Flatten to apply same weights to all time steps. output = tf.reshape(output, [-1, self.params.rnn_hidden]) prediction = tf.nn.softmax(tf.matmul(output, weight) + bias) prediction = tf.reshape(prediction, [-1, max_length, num_classes]) return prediction @lazy_property def cost(self): # Compute cross entropy for each frame. cross_entropy = self.target * tf.log(self.prediction) cross_entropy = -tf.reduce_sum(cross_entropy, reduction_indices=2) mask = tf.sign(tf.reduce_max(tf.abs(self.target), reduction_indices=2)) cross_entropy *= mask # Average over actual sequence lengths. cross_entropy = tf.reduce_sum(cross_entropy, reduction_indices=1) cross_entropy /= tf.cast(self.length, tf.float32) return tf.reduce_mean(cross_entropy) @lazy_property def error(self): mistakes = tf.not_equal( tf.argmax(self.target, 2), tf.argmax(self.prediction, 2)) mistakes = tf.cast(mistakes, tf.float32) mask = tf.sign(tf.reduce_max(tf.abs(self.target), reduction_indices=2)) mistakes *= mask # Average over actual sequence lengths. mistakes = tf.reduce_sum(mistakes, reduction_indices=1) mistakes /= tf.cast(self.length, tf.float32) return tf.reduce_mean(mistakes) @lazy_property def optimize(self): gradient = self.params.optimizer.compute_gradients(self.cost) try: limit = self.params.gradient_clipping gradient = [ (tf.clip_by_value(g, -limit, limit), v) if g is not None else (None, v) for g, v in gradient] except AttributeError: print('No gradient clipping parameter specified.') optimize = self.params.optimizer.apply_gradients(gradient) return optimize import random import tensorflow as tf import numpy as np from helpers import AttrDict from OcrDataset import OcrDataset from SequenceLabellingModel import SequenceLabellingModel from batched import batched params = AttrDict( rnn_cell=tf.nn.rnn_cell.GRUCell, rnn_hidden=300, optimizer=tf.train.RMSPropOptimizer(0.002), gradient_clipping=5, batch_size=10, epochs=5, epoch_size=50 ) def get_dataset(): dataset = OcrDataset('./ocr') # Flatten images into vectors. dataset.data = dataset.data.reshape(dataset.data.shape[:2] + (-1,)) # One-hot encode targets. target = np.zeros(dataset.target.shape + (26,)) for index, letter in np.ndenumerate(dataset.target): if letter: target[index][ord(letter) - ord('a')] = 1 dataset.target = target # Shuffle order of examples. order = np.random.permutation(len(dataset.data)) dataset.data = dataset.data[order] dataset.target = dataset.target[order] return dataset # Split into training and test data. dataset = get_dataset() split = int(0.66 * len(dataset.data)) train_data, test_data = dataset.data[:split], dataset.data[split:] train_target, test_target = dataset.target[:split], dataset.target[split:] # Compute graph. _, length, image_size = train_data.shape num_classes = train_target.shape[2] data = tf.placeholder(tf.float32, [None, length, image_size]) target = tf.placeholder(tf.float32, [None, length, num_classes]) model = SequenceLabellingModel(data, target, params) batches = batched(train_data, train_target, params.batch_size) sess = tf.Session() sess.run(tf.initialize_all_variables()) for index, batch in enumerate(batches): batch_data = batch[0] batch_target = batch[1] epoch = batch[2] if epoch >= params.epochs: break feed = {data: batch_data, target: batch_target} error, _ = sess.run([model.error, model.optimize], feed) print('{}: {:3.6f}%'.format(index + 1, 100 * error)) test_feed = {data: test_data, target: test_target} test_error, _ = sess.run([model.error, model.optimize], test_feed) print('Test error: {:3.6f}%'.format(100 * error)) import tensorflow as tf from helpers import lazy_property class BidirectionalSequenceLabellingModel: def __init__(self, data, target, params): self.data = data self.target = target self.params = params self.prediction self.cost self.error self.optimize @lazy_property def length(self): used = tf.sign(tf.reduce_max(tf.abs(self.data), reduction_indices=2)) length = tf.reduce_sum(used, reduction_indices=1) length = tf.cast(length, tf.int32) return length @lazy_property def prediction(self): output = self._bidirectional_rnn(self.data, self.length) num_classes = int(self.target.get_shape()[2]) prediction = self._shared_softmax(output, num_classes) return prediction def _bidirectional_rnn(self, data, length): length_64 = tf.cast(length, tf.int64) forward, _ = tf.nn.dynamic_rnn( cell=self.params.rnn_cell(self.params.rnn_hidden), inputs=data, dtype=tf.float32, sequence_length=length, scope='rnn-forward') backward, _ = tf.nn.dynamic_rnn( cell=self.params.rnn_cell(self.params.rnn_hidden), inputs=tf.reverse_sequence(data, length_64, seq_dim=1), dtype=tf.float32, sequence_length=self.length, scope='rnn-backward') backward = tf.reverse_sequence(backward, length_64, seq_dim=1) output = tf.concat(2, [forward, backward]) return output def _shared_softmax(self, data, out_size): max_length = int(data.get_shape()[1]) in_size = int(data.get_shape()[2]) weight = tf.Variable(tf.truncated_normal( [in_size, out_size], stddev=0.01)) bias = tf.Variable(tf.constant(0.1, shape=[out_size])) # Flatten to apply same weights to all time steps. flat = tf.reshape(data, [-1, in_size]) output = tf.nn.softmax(tf.matmul(flat, weight) + bias) output = tf.reshape(output, [-1, max_length, out_size]) return output @lazy_property def cost(self): # Compute cross entropy for each frame. cross_entropy = self.target * tf.log(self.prediction) cross_entropy = -tf.reduce_sum(cross_entropy, reduction_indices=2) mask = tf.sign(tf.reduce_max(tf.abs(self.target), reduction_indices=2)) cross_entropy *= mask # Average over actual sequence lengths. cross_entropy = tf.reduce_sum(cross_entropy, reduction_indices=1) cross_entropy /= tf.cast(self.length, tf.float32) return tf.reduce_mean(cross_entropy) @lazy_property def error(self): mistakes = tf.not_equal( tf.argmax(self.target, 2), tf.argmax(self.prediction, 2)) mistakes = tf.cast(mistakes, tf.float32) mask = tf.sign(tf.reduce_max(tf.abs(self.target), reduction_indices=2)) mistakes *= mask # Average over actual sequence lengths. mistakes = tf.reduce_sum(mistakes, reduction_indices=1) mistakes /= tf.cast(self.length, tf.float32) return tf.reduce_mean(mistakes) @lazy_property def optimize(self): gradient = self.params.optimizer.compute_gradients(self.cost) try: limit = self.params.gradient_clipping gradient = [ (tf.clip_by_value(g, -limit, limit), v) if g is not None else (None, v) for g, v in gradient] except AttributeError: print('No gradient clipping parameter specified.') optimize = self.params.optimizer.apply_gradients(gradient) return optimize
參考資料:
《面向機器智能的TensorFlow實踐》
歡迎加我微信交流:qingxingfengzi個人微信公衆號:qingxingfengzigz我老婆張幸清的微信公衆號:qingqingfeifangz