利用TfidfVectorizer進行中文文本分類(數據集是復旦中文語料) html
利用CNN進行中文文本分類(數據集是復旦中文語料) github
和以前介紹的不一樣,重構了些代碼,爲了使整個流程更加清楚,咱們要從新對數據進行預處理。算法
閱讀本文,你能夠了解中文文本分類從數據預處理、模型定義、訓練和測試的整個流程。網絡
1、熟悉數據session
數據的格式是這樣子的:多線程
基本目錄以下:app
其中train存放的是訓練集,answer存放的是測試集,具體看下train中的文件:dom
下面有20個文件夾,對應着20個類,咱們繼續看下其中的文件,以C3-Art爲例:函數
每一篇都對應着一個txt文件,編碼格式是gb18030.utf8文件夾下的是utf-8編碼格式的txt文件。
其中C3-Art0001.txt的部份內容以下:
2、數據預處理
本文數據預處理基本流程:
def _txtpath_to_txt(self): #將訓練集和測試集下的txt路徑保存 train_txt_path = os.path.join(PATH, "process/Fudan/train.txt") test_txt_path = os.path.join(PATH, "process/Fudan//test.txt") train_list = os.listdir(os.path.join(PATH, self.trainPath)) #得到該目錄下的全部文件夾,返回一個列表 fp1 = open(train_txt_path,"w",encoding="utf-8") fp2 = open(test_txt_path,"w",encoding="utf-8") for train_dir in train_list: #取得下一級目錄下的全部的txt路徑(絕對路徑) for txt in glob.glob(os.path.join(PATH,self.trainPath+train_dir+"/*.txt")): fp1.write(txt+"\n") fp1.close() test_list = os.listdir(os.path.join(PATH,self.testPath)) #得到該目錄下的全部文件夾,返回一個列表 for test_dir in test_list: for txt in glob.glob(os.path.join(PATH, self.testPath+test_dir+"/*.txt")): fp2.write(txt+"\n") fp2.close()
#將txt中的文本和標籤存儲到txt中 def _contentlabel_to_txt(self, txt_path, content_path, label_path): files = open(txt_path,"r",encoding="utf-8") content_file = open(content_path,"w",encoding="utf-8") label_file = open(label_path,"w",encoding="utf-8") for txt in files.readlines(): #讀取每一行的txt txt = txt.strip() #去除掉\n content_list=[] label_str = txt.split("/")[-1].split("-")[-1] #先用/進行切割,獲取列表中的最後一個,再利用-進行切割,獲取最後一個 label_list = [] #如下for循環用於獲取標籤,遍歷每一個字符,若是遇到了數字,就終止 for s in label_str: if s.isalpha(): label_list.append(s) elif s.isalnum(): break else: print("出錯了") label = "".join(label_list) #將字符列表轉換爲字符串,獲得標籤 #print(label) #如下用於獲取全部文本 fp1 = open(txt,"r",encoding="gb18030",errors='ignore') #以gb18030的格式打開文件,errors='ignore'用於忽略掉超過該字符編碼範圍的字符 for line in fp1.readlines(): #讀取每一行 #jieba分詞,精確模式 line = jieba.lcut(line.strip(), cut_all=False) #將每一行分詞的結果保存在一個list中 content_list.extend(line) fp1.close() content_str = " ".join(content_list) #轉成字符串 #print(content_str) content_file.write(content_str+"\n") #將文本保存到tx中 label_file.write(label+"\n") content_file.close() label_file.close() files.close()
存儲的分詞後的文本是這個樣子的:
標籤是這樣子的:
from gensim.models import Word2Vec from gensim.models.word2vec import PathLineSentences import multiprocessing import os import sys import logging # 日誌信息輸出 program = os.path.basename(sys.argv[0]) logger = logging.getLogger(program) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(message)s') logging.root.setLevel(level=logging.INFO) logger.info("running %s" % ' '.join(sys.argv)) # check and process input arguments # if len(sys.argv) < 4: # print(globals()['__doc__'] % locals()) # sys.exit(1) # input_dir, outp1, outp2 = sys.argv[1:4] # 訓練模型 # 輸入語料目錄:PathLineSentences(input_dir) # embedding size:200 共現窗口大小:10 去除出現次數10如下的詞,多線程運行,迭代10次 model = Word2Vec(PathLineSentences('/content/drive/My Drive/transformer/process/Fudan/word2vec/data/'), size=200, window=10, min_count=10, workers=multiprocessing.cpu_count(), iter=10) model.save('/content/drive/My Drive/transformer/process/Fudan/word2vec/model/Word2vec.w2v')
#去除掉停用詞 def _get_clean_data(self, filePath): #先初始化停用詞字典 self._get_stopwords() sentence_list = [] with open(filePath,'r',encoding='utf-8') as fp: lines = fp.readlines() for line in lines: tmp = [] words = line.strip().split(" ") for word in words: word = word.strip() if word not in self.stopWordDict and word != '': tmp.append(word) else: continue sentence_list.append(tmp) return sentence_list #讀取停用詞字典 def _get_stopwords(self): with open(os.path.join(PATH, self.stopWordSource), "r") as f: stopWords = f.read() stopWordList = set(stopWords.splitlines()) # 將停用詞用列表的形式生成,以後查找停用詞時會比較快 self.stopWordDict = dict(zip(stopWordList, list(range(len(stopWordList)))))
#建立詞彙表 def _get_vocaburay(self): train_content = os.path.join(PATH, "process/Fudan/word2vec/data/train_content.txt") sentence_list = self._get_clean_data(train_content) #這裏能夠計算文本的平均長度,設置配置中的sequenceLength #max_sequence = sum([len(s) for s in sentence_list]) / len(sentence_list) vocab_before = [] for sentence in sentence_list: for word in sentence: vocab_before.append(word) count_vocab = Counter(vocab_before) #統計每一個詞出現的次數 #print(len(count_vocab)) count_vocab = sorted(count_vocab.items(),key=lambda x:x[1], reverse=True) #將出現頻率按從高到低排序 vocab_after = copy.deepcopy(count_vocab[:6000]) return dict(vocab_after) #返回前6000個詞,將元組構成的列表轉換爲字典
將文本轉換爲id,將標籤轉換爲Id
def _wordToIdx(self): #構建詞彙和id的映射 vocab = list(self._get_vocaburay().keys()) #取得字典中的鍵,也就是詞語,轉換成列表 #print(vocab) tmp = ['PAD','UNK'] vocab = tmp + vocab word2idx = {word:i for i,word in enumerate(vocab)} idx2word = {i:word for i,word in enumerate(vocab)} return word2idx,idx2word def _labelToIdx(self): #構建詞彙列表和到id的映射 label_path = os.path.join(PATH, "process/Fudan/train_label.txt") with open(os.path.join(PATH, label_path), "r") as f: labels = f.read() labelsList = sorted(set(labels.splitlines())) #爲了不每次標籤id變換,這裏排個序 label2idx = {label:i for i,label in enumerate(labelsList)} idx2label = {i:label for i,label in enumerate(labelsList)} self.labelList = [label2idx[label] for label in labelsList] return label2idx,idx2label
def _getData(self,contentPath,labelPath,mode=None): #這裏有兩種操做,若是文本中的詞沒有在詞彙表中出現,則能夠捨去或者用UNK代替,咱們這裏使用UNK vocab = self._get_vocaburay() word2idx,idx2word = self._wordToIdx() label2idx,idx2label = self._labelToIdx() data = [] content_list = self._get_clean_data(contentPath) for content in content_list: #print(content) tmp = [] if len(content) >= self.config.sequenceLength: #大於最大長度進行截斷 content = content[:self.config.sequenceLength] else: #小於最大長度用PAD的id進行填充層 content = ['PAD']*(self.config.sequenceLength-len(content)) + content for word in content: #將詞語用id進行映射 if word in word2idx: tmp.append(word2idx[word]) else: tmp.append(word2idx['UNK']) data.append(tmp) with open(labelPath,'r',encoding='utf-8') as fp: labels = fp.read() label = [[label2idx[label]] for label in labels.splitlines()] return data,label
def _getData(self,contentPath,labelPath,mode=None): #這裏有兩種操做,若是文本中的詞沒有在詞彙表中出現,則能夠捨去或者用UNK代替,咱們這裏使用UNK vocab = self._get_vocaburay() word2idx,idx2word = self._wordToIdx() label2idx,idx2label = self._labelToIdx() data = [] content_list = self._get_clean_data(contentPath) for content in content_list: #print(content) tmp = [] if len(content) >= self.config.sequenceLength: #大於最大長度進行截斷 content = content[:self.config.sequenceLength] else: #小於最大長度用PAD的id進行填充層 content = ['PAD']*(self.config.sequenceLength-len(content)) + content for word in content: #將詞語用id進行映射 if word in word2idx: tmp.append(word2idx[word]) else: tmp.append(word2idx['UNK']) data.append(tmp) with open(labelPath,'r',encoding='utf-8') as fp: labels = fp.read() label = [[label2idx[label]] for label in labels.splitlines()] return data,label
def _getTrainValData(self,dataPath,labelPath): trainData,trainLabel = self._getData(dataPath,labelPath) #方便起見,咱們這裏就直接使用sklearn中的函數了 self.trainData,self.valData,self.trainLabels,self.valLabels = train_test_split(trainData,trainLabel,test_size=self.rate,random_state=1) def _getTestData(self,dataPath,labelPath): self.testData,self.testLabels = self._getData(dataPath,labelPath)
#獲取詞彙表中的詞向量 def _getWordEmbedding(self): word2idx,idx2word = self._wordToIdx() vocab = sorted(word2idx.items(), key=lambda x:x[1]) #將詞按照id進行排序 #print(vocab) w2vModel = Word2Vec.load(os.path.join(PATH,'process/Fudan/word2vec/model/Word2vec.w2v')) self.wordEmbedding.append([0]*self.embeddingSize) #PAD對應的詞向量 self.wordEmbedding.append([0]*self.embeddingSize) #UNK對應的詞向量 for i in range(2,len(vocab)): self.wordEmbedding.append(list(w2vModel[vocab[i][0]]))
代碼就不一一貼了。
3、建立模型
import numpy as np import tensorflow as tf import warnings warnings.filterwarnings("ignore") class Transformer(object): """ Transformer Encoder 用於文本分類 """ def __init__(self, config, wordEmbedding): # 定義模型的輸入 #inputX:[None,600],inputY:[None,20] self.inputX = tf.placeholder(tf.int32, [None, config.sequenceLength], name="inputX") self.inputY = tf.placeholder(tf.int32, [None, config.numClasses], name="inputY") self.lastBatch = False self.dropoutKeepProb = tf.placeholder(tf.float32, name="dropoutKeepProb") self.config = config # 定義l2損失 l2Loss = tf.constant(0.0) # 詞嵌入層, 位置向量的定義方式有兩種:一是直接用固定的one-hot的形式傳入,而後和詞向量拼接,在當前的數據集上表現效果更好。另外一種 # 就是按照論文中的方法實現,這樣的效果反而更差,多是增大了模型的複雜度,在小數據集上表現不佳。 with tf.name_scope("wordEmbedding"): self.W = tf.Variable(tf.cast(wordEmbedding, dtype=tf.float32, name="word2vec"), name="W") self.wordEmbedded = tf.nn.embedding_lookup(self.W, self.inputX) with tf.name_scope("positionEmbedding"): if tf.shape(self.wordEmbedded)[0] == config.batchSize: self.positionEmbedded = self._positionEmbedding() else: self.positionEmbedded = self._positionEmbedding(lastBatch=tf.shape(self.wordEmbedded)[0]) self.embeddedWords = self.wordEmbedded + self.positionEmbedded with tf.name_scope("transformer"): for i in range(config.modelConfig.numBlocks): with tf.name_scope("transformer-{}".format(i + 1)): # 維度[batch_size, sequence_length, embedding_size] multiHeadAtt = self._multiheadAttention(rawKeys=self.wordEmbedded, queries=self.embeddedWords, keys=self.embeddedWords) # 維度[batch_size, sequence_length, embedding_size] self.embeddedWords = self._feedForward(multiHeadAtt, [config.modelConfig.filters, config.modelConfig.embeddingSize]) outputs = tf.reshape(self.embeddedWords, [-1, config.sequenceLength * (config.modelConfig.embeddingSize)]) outputSize = outputs.get_shape()[-1].value with tf.name_scope("dropout"): outputs = tf.nn.dropout(outputs, keep_prob=self.dropoutKeepProb) # 全鏈接層的輸出 with tf.name_scope("output"): outputW = tf.get_variable( "outputW", shape=[outputSize, config.numClasses], initializer=tf.contrib.layers.xavier_initializer()) outputB= tf.Variable(tf.constant(0.1, shape=[config.numClasses]), name="outputB") l2Loss += tf.nn.l2_loss(outputW) l2Loss += tf.nn.l2_loss(outputB) self.logits = tf.nn.xw_plus_b(outputs, outputW, outputB, name="logits") if config.numClasses == 1: self.predictions = tf.cast(tf.greater_equal(self.logits, 0.0), tf.float32, name="predictions") elif config.numClasses > 1: self.predictions = tf.argmax(self.logits, axis=-1, name="predictions") # 計算二元交叉熵損失 with tf.name_scope("loss"): if config.numClasses == 1: losses = tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=tf.cast(tf.reshape(self.inputY, [-1, 1]), dtype=tf.float32)) elif config.numClasses > 1: print(self.logits,self.inputY) losses = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.inputY) self.loss = tf.reduce_mean(losses) + config.modelConfig.l2RegLambda * l2Loss def _layerNormalization(self, inputs, scope="layerNorm"): # LayerNorm層和BN層有所不一樣 epsilon = self.config.modelConfig.epsilon inputsShape = inputs.get_shape() # [batch_size, sequence_length, embedding_size] paramsShape = inputsShape[-1:] # LayerNorm是在最後的維度上計算輸入的數據的均值和方差,BN層是考慮全部維度的 # mean, variance的維度都是[batch_size, sequence_len, 1] mean, variance = tf.nn.moments(inputs, [-1], keep_dims=True) beta = tf.Variable(tf.zeros(paramsShape)) gamma = tf.Variable(tf.ones(paramsShape)) normalized = (inputs - mean) / ((variance + epsilon) ** .5) outputs = gamma * normalized + beta return outputs def _multiheadAttention(self, rawKeys, queries, keys, numUnits=None, causality=False, scope="multiheadAttention"): # rawKeys 的做用是爲了計算mask時用的,由於keys是加上了position embedding的,其中不存在padding爲0的值 numHeads = self.config.modelConfig.numHeads keepProp = self.config.modelConfig.keepProp if numUnits is None: # 如果沒傳入值,直接去輸入數據的最後一維,即embedding size. numUnits = queries.get_shape().as_list()[-1] # tf.layers.dense能夠作多維tensor數據的非線性映射,在計算self-Attention時,必定要對這三個值進行非線性映射, # 其實這一步就是論文中Multi-Head Attention中的對分割後的數據進行權重映射的步驟,咱們在這裏先映射後分割,原則上是同樣的。 # Q, K, V的維度都是[batch_size, sequence_length, embedding_size] Q = tf.layers.dense(queries, numUnits, activation=tf.nn.relu) K = tf.layers.dense(keys, numUnits, activation=tf.nn.relu) V = tf.layers.dense(keys, numUnits, activation=tf.nn.relu) # 將數據按最後一維分割成num_heads個, 而後按照第一維拼接 # Q, K, V 的維度都是[batch_size * numHeads, sequence_length, embedding_size/numHeads] Q_ = tf.concat(tf.split(Q, numHeads, axis=-1), axis=0) K_ = tf.concat(tf.split(K, numHeads, axis=-1), axis=0) V_ = tf.concat(tf.split(V, numHeads, axis=-1), axis=0) # 計算keys和queries之間的點積,維度[batch_size * numHeads, queries_len, key_len], 後兩維是queries和keys的序列長度 similary = tf.matmul(Q_, tf.transpose(K_, [0, 2, 1])) # 對計算的點積進行縮放處理,除以向量長度的根號值 scaledSimilary = similary / (K_.get_shape().as_list()[-1] ** 0.5) # 在咱們輸入的序列中會存在padding這個樣的填充詞,這種詞應該對最終的結果是毫無幫助的,原則上說當padding都是輸入0時, # 計算出來的權重應該也是0,可是在transformer中引入了位置向量,當和位置向量相加以後,其值就不爲0了,所以在添加位置向量 # 以前,咱們須要將其mask爲0。雖然在queries中也存在這樣的填充詞,但原則上模型的結果之和輸入有關,並且在self-Attention中 # queryies = keys,所以只要一方爲0,計算出的權重就爲0。 # 具體關於key mask的介紹能夠看看這裏: https://github.com/Kyubyong/transformer/issues/3 # 利用tf,tile進行張量擴張, 維度[batch_size * numHeads, keys_len] keys_len = keys 的序列長度 # tf.tile((?, 200), [8,1]) # 將每一時序上的向量中的值相加取平均值 keyMasks = tf.sign(tf.abs(tf.reduce_sum(rawKeys, axis=-1))) # 維度[batch_size, time_step] print(keyMasks.shape) keyMasks = tf.tile(keyMasks, [numHeads, 1]) # 增長一個維度,並進行擴張,獲得維度[batch_size * numHeads, queries_len, keys_len] keyMasks = tf.tile(tf.expand_dims(keyMasks, 1), [1, tf.shape(queries)[1], 1]) # tf.ones_like生成元素全爲1,維度和scaledSimilary相同的tensor, 而後獲得負無窮大的值 paddings = tf.ones_like(scaledSimilary) * (-2 ** (32 + 1)) # tf.where(condition, x, y),condition中的元素爲bool值,其中對應的True用x中的元素替換,對應的False用y中的元素替換 # 所以condition,x,y的維度是同樣的。下面就是keyMasks中的值爲0就用paddings中的值替換 maskedSimilary = tf.where(tf.equal(keyMasks, 0), paddings, scaledSimilary) # 維度[batch_size * numHeads, queries_len, key_len] # 在計算當前的詞時,只考慮上文,不考慮下文,出如今Transformer Decoder中。在文本分類時,能夠只用Transformer Encoder。 # Decoder是生成模型,主要用在語言生成中 if causality: diagVals = tf.ones_like(maskedSimilary[0, :, :]) # [queries_len, keys_len] tril = tf.contrib.linalg.LinearOperatorTriL(diagVals).to_dense() # [queries_len, keys_len] masks = tf.tile(tf.expand_dims(tril, 0), [tf.shape(maskedSimilary)[0], 1, 1]) # [batch_size * numHeads, queries_len, keys_len] paddings = tf.ones_like(masks) * (-2 ** (32 + 1)) maskedSimilary = tf.where(tf.equal(masks, 0), paddings, maskedSimilary) # [batch_size * numHeads, queries_len, keys_len] # 經過softmax計算權重係數,維度 [batch_size * numHeads, queries_len, keys_len] weights = tf.nn.softmax(maskedSimilary) # 加權和獲得輸出值, 維度[batch_size * numHeads, sequence_length, embedding_size/numHeads] outputs = tf.matmul(weights, V_) # 將多頭Attention計算的獲得的輸出重組成最初的維度[batch_size, sequence_length, embedding_size] outputs = tf.concat(tf.split(outputs, numHeads, axis=0), axis=2) outputs = tf.nn.dropout(outputs, keep_prob=keepProp) # 對每一個subLayers創建殘差鏈接,即H(x) = F(x) + x outputs += queries # normalization 層 outputs = self._layerNormalization(outputs) return outputs def _feedForward(self, inputs, filters, scope="multiheadAttention"): # 在這裏的前向傳播採用卷積神經網絡 # 內層 params = {"inputs": inputs, "filters": filters[0], "kernel_size": 1, "activation": tf.nn.relu, "use_bias": True} outputs = tf.layers.conv1d(**params) # 外層 params = {"inputs": outputs, "filters": filters[1], "kernel_size": 1, "activation": None, "use_bias": True} # 這裏用到了一維卷積,實際上卷積核尺寸仍是二維的,只是只須要指定高度,寬度和embedding size的尺寸一致 # 維度[batch_size, sequence_length, embedding_size] outputs = tf.layers.conv1d(**params) # 殘差鏈接 outputs += inputs # 歸一化處理 outputs = self._layerNormalization(outputs) return outputs def _positionEmbedding(self, lastBatch=None, scope="positionEmbedding"): # 生成可訓練的位置向量 if lastBatch is None: batchSize = self.config.batchSize #128 else: batchSize = lastBatch sequenceLen = self.config.sequenceLength #600 embeddingSize = self.config.modelConfig.embeddingSize #100 # 生成位置的索引,並擴張到batch中全部的樣本上 positionIndex = tf.tile(tf.expand_dims(tf.range(sequenceLen), 0), [batchSize, 1]) # 根據正弦和餘弦函數來得到每一個位置上的embedding的第一部分 positionEmbedding = np.array([[pos / np.power(10000, (i-i%2) / embeddingSize) for i in range(embeddingSize)] for pos in range(sequenceLen)]) # 而後根據奇偶性分別用sin和cos函數來包裝 positionEmbedding[:, 0::2] = np.sin(positionEmbedding[:, 0::2]) positionEmbedding[:, 1::2] = np.cos(positionEmbedding[:, 1::2]) # 將positionEmbedding轉換成tensor的格式 positionEmbedding_ = tf.cast(positionEmbedding, dtype=tf.float32) # 獲得三維的矩陣[batchSize, sequenceLen, embeddingSize] positionEmbedded = tf.nn.embedding_lookup(positionEmbedding_, positionIndex) return positionEmbedded
4、定義訓練、測試、預測
import sys import os BASE_DIR = os.path.dirname(os.path.abspath(__file__)) #當前程序上一級目錄,這裏爲transformer from dataset.fudanDataset import FudanDataset from models.transformer import Transformer from utils.utils import * from utils.metrics import * from config.fudanConfig import FudanConfig from config.globalConfig import PATH import numpy as numpy import tensorflow as tf import time import datetime from tkinter import _flatten from sklearn import metrics import jieba def train(): print("配置Saver。。。\n") save_dir = 'checkpoint/transformer/' if not os.path.exists(save_dir): os.makedirs(save_dir) save_path = os.path.join(save_dir, 'best_validation') # 最佳驗證結果保存路徑 globalStep = tf.Variable(0, name="globalStep", trainable=False) # 配置 Saver saver = tf.train.Saver() #定義session """ session_conf = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) session_conf.gpu_options.allow_growth=True session_conf.gpu_options.per_process_gpu_memory_fraction = 0.9 # 配置gpu佔用率 sess = tf.Session(config=session_conf) """ sess = tf.Session() print("定義優化器。。。\n") # 定義優化函數,傳入學習速率參數 optimizer = tf.train.AdamOptimizer(config.trainConfig.learningRate) # 計算梯度,獲得梯度和變量 gradsAndVars = optimizer.compute_gradients(model.loss) # 將梯度應用到變量下,生成訓練器 trainOp = optimizer.apply_gradients(gradsAndVars, global_step=globalStep) sess.run(tf.global_variables_initializer()) def trainStep(batchX, batchY): """ 訓練函數 """ feed_dict = { model.inputX: batchX, model.inputY: batchY, model.dropoutKeepProb: config.modelConfig.dropoutKeepProb, } _, step, loss, predictions = sess.run([trainOp, globalStep, model.loss, model.predictions], feed_dict) if config.numClasses == 1: acc, recall, prec, f_beta = get_binary_metrics(pred_y=predictions, true_y=batchY) elif config.numClasses > 1: acc, recall, prec, f_beta = get_multi_metrics(pred_y=predictions, true_y=batchY, labels=labelList) return loss, acc, prec, recall, f_beta def valStep(batchX, batchY): """ 驗證函數 """ feed_dict = { model.inputX: batchX, model.inputY: batchY, model.dropoutKeepProb: 1.0, } step, loss, predictions = sess.run([globalStep, model.loss, model.predictions], feed_dict) if config.numClasses == 1: acc, recall, prec, f_beta = get_binary_metrics(pred_y=predictions, true_y=batchY) elif config.numClasses > 1: acc, recall, prec, f_beta = get_multi_metrics(pred_y=predictions, true_y=batchY, labels=labelList) return loss, acc, prec, recall, f_beta print("開始訓練。。。\n") best_f_beta_val = 0.0 # 最佳驗證集準確率 last_improved = 0 # 記錄上一次提高批次 require_improvement = 1000 # 若是超過1000輪未提高,提早結束訓練 flag = False for epoch in range(config.trainConfig.epoches): print('Epoch:', epoch + 1) batch_train = batch_iter(train_data, train_label, config.batchSize) for x_batch, y_batch in batch_train: loss, acc, prec, recall, f_beta = trainStep(x_batch, y_batch) currentStep = tf.train.global_step(sess, globalStep) # 多少次迭代打印一次訓練結果: if currentStep % config.trainConfig.print_per_step == 0: print("train: step: {}, loss: {:.4f}, acc: {:.4f}, recall: {:.4f}, precision: {:.4f}, f_beta: {:.4f}".format( currentStep, loss, acc, recall, prec, f_beta)) if currentStep % config.trainConfig.evaluateEvery == 0: print("開始驗證。。。\n") losses = [] accs = [] f_betas = [] precisions = [] recalls = [] batch_val = batch_iter(val_data, val_label, config.batchSize) for x_batch, y_batch in batch_val: loss, acc, precision, recall, f_beta = valStep(x_batch, y_batch) losses.append(loss) accs.append(acc) f_betas.append(f_beta) precisions.append(precision) recalls.append(recall) if mean(f_betas) > best_f_beta_val: # 保存最好結果 best_f_beta_val = mean(f_betas) last_improved = currentStep saver.save(sess=sess, save_path=save_path) improved_str = '*' else: improved_str = '' time_str = datetime.datetime.now().isoformat() print("{}, step: {:>6}, loss: {:.4f}, acc: {:.4f},precision: {:.4f}, recall: {:.4f}, f_beta: {:.4f} {}".format( time_str, currentStep, mean(losses), mean(accs), mean(precisions), mean(recalls), mean(f_betas), improved_str)) if currentStep - last_improved > require_improvement: # 驗證集正確率長期不提高,提早結束訓練 print("沒有優化很長一段時間了,自動中止") flag = True break # 跳出循環 if flag: # 同上 break sess.close() def test(test_data,test_label): print("開始進行測試。。。") save_path = 'checkpoint/transformer/best_validation' saver = tf.train.Saver() sess = tf.Session() sess.run(tf.global_variables_initializer()) saver.restore(sess=sess, save_path=save_path) # 讀取保存的模型 data_len = len(test_data) test_batchsize = 128 batch_test = batch_iter(test_data, test_label, 128, is_train=False) pred_label = [] for x_batch,y_batch in batch_test: feed_dict = { model.inputX: x_batch, model.inputY: y_batch, model.dropoutKeepProb: 1.0, } predictions = sess.run([model.predictions], feed_dict) pred_label.append(predictions[0].tolist()) pred_label = list(_flatten(pred_label)) test_label = [np.argmax(item) for item in test_label] # 評估 print("計算Precision, Recall and F1-Score...") print(metrics.classification_report(test_label, pred_label, target_names=true_labelList)) sess.close() def process_sentence(data): fudanDataset._get_stopwords() sentence_list = [] for content in data: words_list = jieba.lcut(content, cut_all=False) tmp1 = [] for word in words_list: word = word.strip() if word not in fudanDataset.stopWordDict and word != '': tmp1.append(word) else: continue sentence_list.append(tmp1) vocab = fudanDataset._get_vocaburay() word2idx,idx2word = fudanDataset._wordToIdx() label2idx,idx2label = fudanDataset._labelToIdx() res_data = [] #print(content) for content in sentence_list: tmp2 = [] if len(content) >= config.sequenceLength: #大於最大長度進行截斷 content = content[:config.sequenceLength] else: #小於最大長度用PAD的id進行填充層 content = ['PAD']*(config.sequenceLength-len(content)) + content for word in content: #將詞語用id進行映射 if word in word2idx: tmp2.append(word2idx[word]) else: tmp2.append(word2idx['UNK']) res_data.append(tmp2) return res_data def get_predict_content(content_path,label_path): use_data = 5 txt_list = [] label_list = [] predict_data = [] predict_label = [] content_file = open(content_path,"r",encoding="utf-8") label_file = open(label_path,"r",encoding="utf-8") for txt in content_file.readlines(): #讀取每一行的txt txt = txt.strip() #去除掉\n txt_list.append(txt) for label in label_file.readlines(): label = label.strip() label_list.append(label) data = [] for txt,label in zip(txt_list,label_list): data.append((txt,label)) import random predict_data = random.sample(data,use_data) p_data = [] p_label = [] for txt,label in predict_data: with open(txt,"r",encoding="gb18030",errors='ignore') as fp1: tmp = [] for line in fp1.readlines(): #讀取每一行 tmp.append(line.strip()) p_data.append("".join(tmp)) p_label.append(label) content_file.close() label_file.close() return p_data,p_label def predict(data,label,p_data): print("開始預測文本的類別。。。") predict_data = data predict_true_data = label save_path = 'checkpoint/transformer/best_validation' saver = tf.train.Saver() sess = tf.Session() sess.run(tf.global_variables_initializer()) saver.restore(sess=sess, save_path=save_path) # 讀取保存的模型 feed_dict = { model.inputX: predict_data, model.inputY: predict_true_data, model.dropoutKeepProb: 1.0, } predictions = sess.run([model.predictions], feed_dict) pred_label = predictions[0].tolist() real_label = [np.argmax(item) for item in predict_true_data] for content,pre_label,true_label in zip(p_data,pred_label,real_label): print("輸入的文本是:{}...".format(content[:100])) print("預測的類別是:",idx2label[pre_label]) print("真實的類別是:",idx2label[true_label]) print("================================================") sess.close() if __name__ == '__main__': config = FudanConfig() fudanDataset = FudanDataset(config) word2idx,idx2word = fudanDataset._wordToIdx() label2idx,idx2label = fudanDataset._labelToIdx() print("加載數據。。。") train_content_path = os.path.join(PATH, "process/Fudan/word2vec/data/train_content.txt") train_label_path = os.path.join(PATH, "process/Fudan/train_label.txt") test_content_path = os.path.join(PATH, "process/Fudan/word2vec/data/test_content.txt") test_label_path = os.path.join(PATH, "process/Fudan/test_label.txt") fudanDataset._getTrainValData(train_content_path,train_label_path) fudanDataset._getTestData(test_content_path,test_label_path) fudanDataset._getWordEmbedding() train_data,val_data,train_label,val_label = fudanDataset.trainData,fudanDataset.valData,fudanDataset.trainLabels,fudanDataset.valLabels test_data,test_label = fudanDataset.testData,fudanDataset.testLabels train_label = one_hot(train_label) val_label = one_hot(val_label) test_label = one_hot(test_label) wordEmbedding = fudanDataset.wordEmbedding labelList = fudanDataset.labelList true_labelList = [idx2label[label] for label in labelList] print("定義模型。。。") model = Transformer(config, wordEmbedding) test(test_data,test_label) print("進行預測。。。") p_data,p_label = get_predict_content(os.path.join(PATH, "process/Fudan/test.txt"),test_label_path) process_data = process_sentence(p_data) onehot_label = np.zeros((len(p_label),config.numClasses)) for i,value in enumerate(p_label): onehot_label[i][label2idx[value]] = 1 process_label = onehot_label predict(process_data,process_label,p_data)
結果:
計算Precision, Recall and F1-Score... precision recall f1-score support Agriculture 0.83 0.90 0.87 1022 Art 0.79 0.86 0.82 742 Communication 0.00 0.00 0.00 27 Computer 0.93 0.97 0.95 1358 Economy 0.87 0.89 0.88 1601 Education 0.67 0.07 0.12 61 Electronics 0.00 0.00 0.00 28 Energy 1.00 0.03 0.06 33 Enviornment 0.86 0.95 0.90 1218 History 0.68 0.66 0.67 468 Law 0.18 0.12 0.14 52 Literature 0.00 0.00 0.00 34 Medical 0.19 0.06 0.09 53 Military 0.50 0.03 0.05 76 Mine 1.00 0.03 0.06 34 Philosophy 0.62 0.22 0.33 45 Politics 0.78 0.88 0.83 1026 Space 0.91 0.81 0.85 642 Sports 0.86 0.88 0.87 1254 Transport 1.00 0.02 0.03 59 accuracy 0.84 9833 macro avg 0.63 0.42 0.43 9833 weighted avg 0.83 0.84 0.83 9833
輸入的文本是:中國環境科學CHINA ENVIRONMENTAL SCIENCE1998年 第18卷 第1期 No.1 Vol.18 1998科技期刊鎘脅迫對小麥葉片細胞膜脂過氧化的影響*羅立新 孫鐵珩 靳月華(中... 預測的類別是: Enviornment 真實的類別是: Enviornment ================================================ 輸入的文本是:自動化學報AGTA AUTOMATICA SINICA1999年 第25卷 第2期 Vol.25 No.2 1999TSP問題分層求解算法的複雜度研究1)盧 欣 李衍達關鍵詞 TSP,局部搜索算法,動... 預測的類別是: Computer 真實的類別是: Computer ================================================ 輸入的文本是:【 文獻號 】3-5519【原文出處】人民日報【原刊地名】京【原刊期號】19960615【原刊頁號】⑵【分 類 號】D4【分 類 名】中國政治【複印期號】199606【 標 題 】中國人民政治協商會... 預測的類別是: Politics 真實的類別是: Politics ================================================ 輸入的文本是:軟件學報JOURNAL OF SOFTWARE1999年 第2期 No.2 1999視覺導航中基於模糊神經網的消陰影算法研究郭木河 楊 磊 陶西平 何克忠 張 鈸摘要 在實際的應用中,因爲室外移動機器... 預測的類別是: Computer 真實的類別是: Computer ================================================ 輸入的文本是:【 文獻號 】2-814【原文出處】中國鄉鎮企業會計【原刊地名】京【原刊期號】199907【原刊頁號】7~9【分 類 號】F22【分 類 名】鄉鎮企業與農場管理【複印期號】199908【 標 題 】... 預測的類別是: Economy 真實的類別是: Economy ================================================
總目錄結構:
後續還將繼續添加相應的功能,好比tensorboard可視化,其它網絡LSTM、GRU等等。
參考:
https://www.cnblogs.com/jiangxinyang/p/10210813.html
這裏面在transformer模型多頭注意力那裏漏了一句: