1 大綱概述html
文本分類這個系列將會有十篇左右,包括基於word2vec預訓練的文本分類,與及基於最新的預訓練模型(ELMo,BERT等)的文本分類。總共有如下系列:git
word2vec預訓練詞向量github
textCNN 模型json
Bi-LSTM 模型session
RCNN 模型dom
全部代碼均在textClassifier倉庫中。
2 數據集
數據集爲IMDB 電影影評,總共有三個數據文件,在/data/rawData目錄下,包括unlabeledTrainData.tsv,labeledTrainData.tsv,testData.tsv。在進行文本分類時須要有標籤的數據(labeledTrainData),數據預處理如文本分類實戰(一)—— word2vec預訓練詞向量中類似,惟一的不一樣是須要保留標點符號,不然模型難以收斂。預處理後的文件爲/data/preprocess/labeledCharTrain.csv。
3 charCNN 模型結構
在charCNN論文Character-level Convolutional Networks for Text Classification中提出了6層卷積層 + 3層全鏈接層的結構,具體結構以下圖:
針對不一樣大小的數據集提出了兩種結構參數:
1)卷積層
2)全鏈接層
4 配置參數
import os import time import datetime import csv import json from math import sqrt import warnings import numpy as np import pandas as pd import tensorflow as tf from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score warnings.filterwarnings("ignore")
# 參數配置 class TrainingConfig(object): epoches = 10 evaluateEvery = 100 checkpointEvery = 100 learningRate = 0.001 class ModelConfig(object): # 該列表中子列表的三個元素分別是卷積核的數量,卷積核的高度,池化的尺寸 convLayers = [[256, 7, 4], [256, 7, 4], [256, 3, 4]] # [256, 3, None], # [256, 3, None], # [256, 3, 3]] fcLayers = [512] dropoutKeepProb = 0.5 epsilon = 1e-3 # BN層中防止分母爲0而加入的極小值 decay = 0.999 # BN層中用來計算滑動平均的值 class Config(object):
# 咱們使用論文中提出的69個字符來表徵輸入數據 alphabet = "abcdefghijklmnopqrstuvwxyz0123456789-,;.!?:'\"/\\|_@#$%^&*~`+-=<>()[]{}" # alphabet = "abcdefghijklmnopqrstuvwxyz0123456789" sequenceLength = 1014 # 字符表示的序列長度 batchSize = 128 rate = 0.8 # 訓練集的比例 dataSource = "../data/preProcess/labeledCharTrain.csv" training = TrainingConfig() model = ModelConfig() config = Config()
5 訓練數據生成
1) 加載數據,將全部的句子分割成字符表示
2) 構建字符-索引映射表,並保存成json的數據格式,方便在inference階段加載使用
3)將字符轉換成one-hot的嵌入形式,做爲模型中embedding層的初始化值。
4) 將數據集分割成訓練集和驗證集
# 數據預處理的類,生成訓練集和測試集 class Dataset(object): def __init__(self, config): self._dataSource = config.dataSource self._sequenceLength = config.sequenceLength self._rate = config.rate self.trainReviews = [] self.trainLabels = [] self.evalReviews = [] self.evalLabels = [] self._alphabet = config.alphabet self.charEmbedding =None self._charToIndex = {} self._indexToChar = {} def _readData(self, filePath): """ 從csv文件中讀取數據集 """ df = pd.read_csv(filePath) labels = df["sentiment"].tolist() review = df["review"].tolist() reviews = [[char for char in line if char != " "] for line in review] return reviews, labels def _reviewProcess(self, review, sequenceLength, charToIndex): """ 將數據集中的每條評論用index表示 wordToIndex中「pad」對應的index爲0 """ reviewVec = np.zeros((sequenceLength)) sequenceLen = sequenceLength # 判斷當前的序列是否小於定義的固定序列長度 if len(review) < sequenceLength: sequenceLen = len(review) for i in range(sequenceLen): if review[i] in charToIndex: reviewVec[i] = charToIndex[review[i]] else: reviewVec[i] = charToIndex["UNK"] return reviewVec def _genTrainEvalData(self, x, y, rate): """ 生成訓練集和驗證集 """ reviews = [] labels = [] # 遍歷全部的文本,將文本中的詞轉換成index表示 for i in range(len(x)): reviewVec = self._reviewProcess(x[i], self._sequenceLength, self._charToIndex) reviews.append(reviewVec) labels.append([y[i]]) trainIndex = int(len(x) * rate) trainReviews = np.asarray(reviews[:trainIndex], dtype="int64") trainLabels = np.array(labels[:trainIndex], dtype="float32") evalReviews = np.asarray(reviews[trainIndex:], dtype="int64") evalLabels = np.array(labels[trainIndex:], dtype="float32") return trainReviews, trainLabels, evalReviews, evalLabels def _genVocabulary(self, reviews): """ 生成字符向量和字符-索引映射字典 """ chars = [char for char in self._alphabet] vocab, charEmbedding = self._getCharEmbedding(chars) self.charEmbedding = charEmbedding self._charToIndex = dict(zip(vocab, list(range(len(vocab))))) self._indexToChar = dict(zip(list(range(len(vocab))), vocab)) # 將詞彙-索引映射表保存爲json數據,以後作inference時直接加載來處理數據 with open("../data/charJson/charToIndex.json", "w", encoding="utf-8") as f: json.dump(self._charToIndex, f) with open("../data/charJson/indexToChar.json", "w", encoding="utf-8") as f: json.dump(self._indexToChar, f) def _getCharEmbedding(self, chars): """ 按照one的形式將字符映射成向量 """ alphabet = ["UNK"] + [char for char in self._alphabet] vocab = ["pad"] + alphabet charEmbedding = [] charEmbedding.append(np.zeros(len(alphabet), dtype="float32")) for i, alpha in enumerate(alphabet): onehot = np.zeros(len(alphabet), dtype="float32") # 生成每一個字符對應的向量 onehot[i] = 1 # 生成字符嵌入的向量矩陣 charEmbedding.append(onehot) return vocab, np.array(charEmbedding) def dataGen(self): """ 初始化訓練集和驗證集 """ # 初始化數據集 reviews, labels = self._readData(self._dataSource) # 初始化詞彙-索引映射表和詞向量矩陣 self._genVocabulary(reviews) # 初始化訓練集和測試集 trainReviews, trainLabels, evalReviews, evalLabels = self._genTrainEvalData(reviews, labels, self._rate) self.trainReviews = trainReviews self.trainLabels = trainLabels self.evalReviews = evalReviews self.evalLabels = evalLabels data = Dataset(config) data.dataGen()
6 生成batch數據集
# 輸出batch數據集 def nextBatch(x, y, batchSize): """ 生成batch數據集,用生成器的方式輸出 """ perm = np.arange(len(x)) np.random.shuffle(perm) x = x[perm] y = y[perm] numBatches = len(x) // batchSize for i in range(numBatches): start = i * batchSize end = start + batchSize batchX = np.array(x[start: end], dtype="int64") batchY = np.array(y[start: end], dtype="float32") yield batchX, batchY
7 charCNN模型
在charCNN 模型中咱們引入了BN層,可是效果並不明顯,甚至存在一些收斂問題,待以後去探討。
# 定義char-CNN分類器 class CharCNN(object): """ char-CNN用於文本分類 """ def __init__(self, config, charEmbedding): # placeholders for input, output and dropuot self.inputX = tf.placeholder(tf.int32, [None, config.sequenceLength], name="inputX") self.inputY = tf.placeholder(tf.float32, [None, 1], name="inputY") self.dropoutKeepProb = tf.placeholder(tf.float32, name="dropoutKeepProb") self.isTraining = tf.placeholder(tf.bool, name="isTraining") self.epsilon = config.model.epsilon self.decay = config.model.decay # 字符嵌入 with tf.name_scope("embedding"): # 利用one-hot的字符向量做爲初始化詞嵌入矩陣 self.W = tf.Variable(tf.cast(charEmbedding, dtype=tf.float32, name="charEmbedding") ,name="W") # 得到字符嵌入 self.embededChars = tf.nn.embedding_lookup(self.W, self.inputX) # 添加一個通道維度 self.embededCharsExpand = tf.expand_dims(self.embededChars, -1) for i, cl in enumerate(config.model.convLayers): print("開始第" + str(i + 1) + "卷積層的處理") # 利用命名空間name_scope來實現變量名複用 with tf.name_scope("convLayer-%s"%(i+1)): # 獲取字符的向量長度 filterWidth = self.embededCharsExpand.get_shape()[2].value # filterShape = [height, width, in_channels, out_channels] filterShape = [cl[1], filterWidth, 1, cl[0]] stdv = 1 / sqrt(cl[0] * cl[1]) # 初始化w和b的值 wConv = tf.Variable(tf.random_uniform(filterShape, minval=-stdv, maxval=stdv), dtype='float32', name='w') bConv = tf.Variable(tf.random_uniform(shape=[cl[0]], minval=-stdv, maxval=stdv), name='b') # w_conv = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.05), name="w") # b_conv = tf.Variable(tf.constant(0.1, shape=[cl[0]]), name="b") # 構建卷積層,能夠直接將卷積核的初始化方法傳入(w_conv) conv = tf.nn.conv2d(self.embededCharsExpand, wConv, strides=[1, 1, 1, 1], padding="VALID", name="conv") # 加上誤差 hConv = tf.nn.bias_add(conv, bConv) # 能夠直接加上relu函數,由於tf.nn.conv2d事實上是作了一個卷積運算,而後在這個運算結果上加上誤差,再導入到relu函數中 hConv = tf.nn.relu(hConv) # with tf.name_scope("batchNormalization"): # hConvBN = self._batchNorm(hConv) if cl[-1] is not None: ksizeShape = [1, cl[2], 1, 1] hPool = tf.nn.max_pool(hConv, ksize=ksizeShape, strides=ksizeShape, padding="VALID", name="pool") else: hPool = hConv print(hPool.shape) # 對維度進行轉換,轉換成卷積層的輸入維度 self.embededCharsExpand = tf.transpose(hPool, [0, 1, 3, 2], name="transpose") print(self.embededCharsExpand) with tf.name_scope("reshape"): fcDim = self.embededCharsExpand.get_shape()[1].value * self.embededCharsExpand.get_shape()[2].value self.inputReshape = tf.reshape(self.embededCharsExpand, [-1, fcDim]) weights = [fcDim] + config.model.fcLayers for i, fl in enumerate(config.model.fcLayers): with tf.name_scope("fcLayer-%s"%(i+1)): print("開始第" + str(i + 1) + "全鏈接層的處理") stdv = 1 / sqrt(weights[i]) # 定義全鏈接層的初始化方法,均勻分佈初始化w和b的值 wFc = tf.Variable(tf.random_uniform([weights[i], fl], minval=-stdv, maxval=stdv), dtype="float32", name="w") bFc = tf.Variable(tf.random_uniform(shape=[fl], minval=-stdv, maxval=stdv), dtype="float32", name="b") # w_fc = tf.Variable(tf.truncated_normal([weights[i], fl], stddev=0.05), name="W") # b_fc = tf.Variable(tf.constant(0.1, shape=[fl]), name="b") self.fcInput = tf.nn.relu(tf.matmul(self.inputReshape, wFc) + bFc) with tf.name_scope("dropOut"): self.fcInputDrop = tf.nn.dropout(self.fcInput, self.dropoutKeepProb) self.inputReshape = self.fcInputDrop with tf.name_scope("outputLayer"): stdv = 1 / sqrt(weights[-1]) # 定義隱層到輸出層的權重係數和誤差的初始化方法 # w_out = tf.Variable(tf.truncated_normal([fc_layers[-1], num_classes], stddev=0.1), name="W") # b_out = tf.Variable(tf.constant(0.1, shape=[num_classes]), name="b") wOut = tf.Variable(tf.random_uniform([config.model.fcLayers[-1], 1], minval=-stdv, maxval=stdv), dtype="float32", name="w") bOut = tf.Variable(tf.random_uniform(shape=[1], minval=-stdv, maxval=stdv), name="b") # tf.nn.xw_plus_b就是x和w的乘積加上b self.predictions = tf.nn.xw_plus_b(self.inputReshape, wOut, bOut, name="predictions") # 進行二元分類 self.binaryPreds = tf.cast(tf.greater_equal(self.predictions, 0.0), tf.float32, name="binaryPreds") with tf.name_scope("loss"): # 定義損失函數,對預測值進行softmax,再求交叉熵。 losses = tf.nn.sigmoid_cross_entropy_with_logits(logits=self.predictions, labels=self.inputY) self.loss = tf.reduce_mean(losses) def _batchNorm(self, x): # BN層代碼實現 gamma = tf.Variable(tf.ones([x.get_shape()[3].value])) beta = tf.Variable(tf.zeros([x.get_shape()[3].value])) self.popMean = tf.Variable(tf.zeros([x.get_shape()[3].value]), trainable=False, name="popMean") self.popVariance = tf.Variable(tf.ones([x.get_shape()[3].value]), trainable=False, name="popVariance") def batchNormTraining(): # 必定要使用正確的維度確保計算的是每一個特徵圖上的平均值和方差而不是整個網絡節點上的統計分佈值 batchMean, batchVariance = tf.nn.moments(x, [0, 1, 2], keep_dims=False) decay = 0.99 trainMean = tf.assign(self.popMean, self.popMean*self.decay + batchMean*(1 - self.decay)) trainVariance = tf.assign(self.popVariance, self.popVariance*self.decay + batchVariance*(1 - self.decay)) with tf.control_dependencies([trainMean, trainVariance]): return tf.nn.batch_normalization(x, batchMean, batchVariance, beta, gamma, self.epsilon) def batchNormInference(): return tf.nn.batch_normalization(x, self.popMean, self.popVariance, beta, gamma, self.epsilon) batchNormalizedOutput = tf.cond(self.isTraining, batchNormTraining, batchNormInference) return tf.nn.relu(batchNormalizedOutput)
8 性能指標函數
輸出分類問題的經常使用指標。
# 定義性能指標函數 def mean(item): return sum(item) / len(item) def genMetrics(trueY, predY, binaryPredY): """ 生成acc和auc值 """ auc = roc_auc_score(trueY, predY) accuracy = accuracy_score(trueY, binaryPredY) precision = precision_score(trueY, binaryPredY, average='macro') recall = recall_score(trueY, binaryPredY, average='macro') return round(accuracy, 4), round(auc, 4), round(precision, 4), round(recall, 4)
9 訓練模型
在訓練時,咱們定義了tensorBoard的輸出,並定義了兩種模型保存的方法。
# 訓練模型 # 生成訓練集和驗證集 trainReviews = data.trainReviews trainLabels = data.trainLabels evalReviews = data.evalReviews evalLabels = data.evalLabels charEmbedding = data.charEmbedding # 定義計算圖 with tf.Graph().as_default(): session_conf = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) session_conf.gpu_options.allow_growth=True session_conf.gpu_options.per_process_gpu_memory_fraction = 0.9 # 配置gpu佔用率 sess = tf.Session(config=session_conf) # 定義會話 with sess.as_default(): cnn = CharCNN(config, charEmbedding) globalStep = tf.Variable(0, name="globalStep", trainable=False) # 定義優化函數,傳入學習速率參數 optimizer = tf.train.RMSPropOptimizer(config.training.learningRate) # 計算梯度,獲得梯度和變量 gradsAndVars = optimizer.compute_gradients(cnn.loss) # 將梯度應用到變量下,生成訓練器 trainOp = optimizer.apply_gradients(gradsAndVars, global_step=globalStep) # 用summary繪製tensorBoard gradSummaries = [] for g, v in gradsAndVars: if g is not None: tf.summary.histogram("{}/grad/hist".format(v.name), g) tf.summary.scalar("{}/grad/sparsity".format(v.name), tf.nn.zero_fraction(g)) outDir = os.path.abspath(os.path.join(os.path.curdir, "summarys")) print("Writing to {}\n".format(outDir)) lossSummary = tf.summary.scalar("trainLoss", cnn.loss) summaryOp = tf.summary.merge_all() trainSummaryDir = os.path.join(outDir, "train") trainSummaryWriter = tf.summary.FileWriter(trainSummaryDir, sess.graph) evalSummaryDir = os.path.join(outDir, "eval") evalSummaryWriter = tf.summary.FileWriter(evalSummaryDir, sess.graph) # 初始化全部變量 saver = tf.train.Saver(tf.global_variables(), max_to_keep=5) # 保存模型的一種方式,保存爲pb文件 builder = tf.saved_model.builder.SavedModelBuilder("../model/charCNN/savedModel") sess.run(tf.global_variables_initializer()) def trainStep(batchX, batchY): """ 訓練函數 """ feed_dict = { cnn.inputX: batchX, cnn.inputY: batchY, cnn.dropoutKeepProb: config.model.dropoutKeepProb, cnn.isTraining: True } _, summary, step, loss, predictions, binaryPreds = sess.run( [trainOp, summaryOp, globalStep, cnn.loss, cnn.predictions, cnn.binaryPreds], feed_dict) timeStr = datetime.datetime.now().isoformat() acc, auc, precision, recall = genMetrics(batchY, predictions, binaryPreds) print("{}, step: {}, loss: {}, acc: {}, auc: {}, precision: {}, recall: {}".format(timeStr, step, loss, acc, auc, precision, recall)) trainSummaryWriter.add_summary(summary, step) def devStep(batchX, batchY): """ 驗證函數 """ feed_dict = { cnn.inputX: batchX, cnn.inputY: batchY, cnn.dropoutKeepProb: 1.0, cnn.isTraining: False } summary, step, loss, predictions, binaryPreds = sess.run( [summaryOp, globalStep, cnn.loss, cnn.predictions, cnn.binaryPreds], feed_dict) acc, auc, precision, recall = genMetrics(batchY, predictions, binaryPreds) evalSummaryWriter.add_summary(summary, step) return loss, acc, auc, precision, recall for i in range(config.training.epoches): # 訓練模型 print("start training model") for batchTrain in nextBatch(trainReviews, trainLabels, config.batchSize): trainStep(batchTrain[0], batchTrain[1]) currentStep = tf.train.global_step(sess, globalStep) if currentStep % config.training.evaluateEvery == 0: print("\nEvaluation:") losses = [] accs = [] aucs = [] precisions = [] recalls = [] for batchEval in nextBatch(evalReviews, evalLabels, config.batchSize): loss, acc, auc, precision, recall = devStep(batchEval[0], batchEval[1]) losses.append(loss) accs.append(acc) aucs.append(auc) precisions.append(precision) recalls.append(recall) time_str = datetime.datetime.now().isoformat() print("{}, step: {}, loss: {}, acc: {}, auc: {}, precision: {}, recall: {}".format(time_str, currentStep, mean(losses), mean(accs), mean(aucs), mean(precisions), mean(recalls))) if currentStep % config.training.checkpointEvery == 0: # 保存模型的另外一種方法,保存checkpoint文件 path = saver.save(sess, "../model/charCNN/model/my-model", global_step=currentStep) print("Saved model checkpoint to {}\n".format(path)) inputs = {"inputX": tf.saved_model.utils.build_tensor_info(cnn.inputX), "keepProb": tf.saved_model.utils.build_tensor_info(cnn.dropoutKeepProb)} outputs = {"binaryPreds": tf.saved_model.utils.build_tensor_info(cnn.binaryPreds)} prediction_signature = tf.saved_model.signature_def_utils.build_signature_def(inputs=inputs, outputs=outputs, method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME) legacy_init_op = tf.group(tf.tables_initializer(), name="legacy_init_op") builder.add_meta_graph_and_variables(sess, [tf.saved_model.tag_constants.SERVING], signature_def_map={"predict": prediction_signature}, legacy_init_op=legacy_init_op) builder.save()