今天Tony老師給你們帶來的案例是Kaggle上的Twitter的情感分析競賽。在這個案例中,將使用預訓練的模型BERT來完成對整個競賽的數據分析。git
import numpy as np import pandas as pd from math import ceil, floor import tensorflow as tf import tensorflow.keras.layers as L from tensorflow.keras.initializers import TruncatedNormal from sklearn import model_selection from transformers import BertConfig, TFBertPreTrainedModel, TFBertMainLayer from tokenizers import BertWordPieceTokenizer
在競賽中,對數據的理解是很是關鍵的。所以咱們首先要作的就是讀取數據,而後查看數據的內容以及特色。算法
先用pandas來讀取csv數據,session
train_df = pd.read_csv('train.csv') train_df.dropna(inplace=True) test_df = pd.read_csv('test.csv') test_df.loc[:, "selected_text"] = test_df.text.values submission_df = pd.read_csv('sample_submission.csv')
再查看下咱們的數據的數量,咱們一共有27485條訓練數據,3535條測試數據,app
print("train numbers =", train_df.shape) print("test numbers =", test_df.shape)
緊接着查看訓練數據和測試數據前10條表單的字段跟數據,表單中包含了一下幾個數據字段:框架
從數據中咱們能夠得出,目標就是根據現有的情感從本來是的語句中選出能表明這個情感的語句部分。dom
train_df.head(10) test_df.head(10)
# bert預訓練權重跟數據存放的目錄 PATH = "./bert-base-uncased/" # 語句最大長度 MAX_SEQUENCE_LENGTH = 128
BERT是依據一個固定的詞向量來進行訓練的。所以在競賽中須要先使用BertWordPieceTokenizer來加載這些詞向量,其中的lowercase=True表示全部的詞向量都是小寫。設置大小寫不敏感能夠減小模型對資源的佔用。函數
TOKENIZER = BertWordPieceTokenizer(f"{PATH}/vocab.txt", lowercase=True)
定義數據預處理函數學習
def preprocess(tweet, selected_text, sentiment): # 將被轉成byte string的原始字符串轉成utf-8的字符串 tweet = tweet.decode('utf-8') selected_text = selected_text.decode('utf-8') sentiment = sentiment.decode('utf-8') tweet = " ".join(str(tweet).split()) selected_text = " ".join(str(selected_text).split()) # 標記出selected text和text共有的單詞 idx_start, idx_end = None, None for index in (i for i, c in enumerate(tweet) if c == selected_text[0]): if tweet[index:index+len(selected_text)] == selected_text: idx_start = index idx_end = index + len(selected_text) break intersection = [0] * len(tweet) if idx_start != None and idx_end != None: for char_idx in range(idx_start, idx_end): intersection[char_idx] = 1 # 對原始數據用詞向量進行編碼, 這裏會返回原始數據中的詞在詞向量中的下標 # 和原始數據中每一個詞向量的單詞在文中的起始位置跟結束位置 enc = TOKENIZER.encode(tweet) input_ids_orig, offsets = enc.ids, enc.offsets target_idx = [] for i, (o1, o2) in enumerate(offsets): if sum(intersection[o1: o2]) > 0: target_idx.append(i) target_start = target_idx[0] target_end = target_idx[-1] sentiment_map = { 'positive': 3893, 'negative': 4997, 'neutral': 8699, } # 將情感標籤和原始的語句的詞向量組合在一塊兒組成咱們新的數據 input_ids = [101] + [sentiment_map[sentiment]] + [102] + input_ids_orig + [102] input_type_ids = [0] * (len(input_ids_orig) + 4) attention_mask = [1] * (len(input_ids_orig) + 4) offsets = [(0, 0), (0, 0), (0, 0)] + offsets + [(0, 0)] target_start += 3 target_end += 3 # 計算須要paddning的長度, BERT是以固定長度進行輸入的,所以對於不足的咱們須要作pandding padding_length = MAX_SEQUENCE_LENGTH - len(input_ids) if padding_length > 0: input_ids = input_ids + ([0] * padding_length) attention_mask = attention_mask + ([0] * padding_length) input_type_ids = input_type_ids + ([0] * padding_length) offsets = offsets + ([(0, 0)] * padding_length) elif padding_length < 0: pass return ( input_ids, attention_mask, input_type_ids, offsets, target_start, target_end, tweet, selected_text, sentiment, )
定義數據加載器測試
class TweetDataset(tf.data.Dataset): outputTypes = ( tf.dtypes.int32, tf.dtypes.int32, tf.dtypes.int32, tf.dtypes.int32, tf.dtypes.float32, tf.dtypes.float32, tf.dtypes.string, tf.dtypes.string, tf.dtypes.string, ) outputShapes = ( (128,), (128,), (128,), (128, 2), (), (), (), (), (), ) def _generator(tweet, selected_text, sentiment): for tw, st, se in zip(tweet, selected_text, sentiment): yield preprocess(tw, st, se) def __new__(cls, tweet, selected_text, sentiment): return tf.data.Dataset.from_generator( cls._generator, output_types=cls.outputTypes, output_shapes=cls.outputShapes, args=(tweet, selected_text, sentiment) ) @staticmethod def create(dataframe, batch_size, shuffle_buffer_size=-1): dataset = TweetDataset( dataframe.text.values, dataframe.selected_text.values, dataframe.sentiment.values ) dataset = dataset.cache() if shuffle_buffer_size != -1: dataset = dataset.shuffle(shuffle_buffer_size) dataset = dataset.batch(batch_size) dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE) return dataset
咱們使用BERT模型來進行此次競賽,這裏對BERT模型作一些簡單的介紹。fetch
BERT的全稱是Bidirectional Encoder Representation from Transformers,即雙向Transformer的Encoder,由於decoder是不能獲要預測的信息的。
模型的主要創新點都在pre-train方法上,即用了Masked LM和Next Sentence Prediction兩種方法分別捕捉詞語和句子級別representation。
BERT主要特色以下:
BERT的本質是在海量的語料基礎上,運行自監督學習方法讓單詞學習獲得一個較好的特徵表示。
在以後特定任務中,能夠直接使用BERT的特徵表示做爲該任務的詞嵌入特徵。因此BERT提供的是一個供其它任務遷移學習的模型,該模型能夠根據任務微調或者固定以後做爲特徵提取器。
在競賽中,咱們定義了一個BertModel類,裏面使用TFBertPreTrainedModel來進行推理。
BERT的輸出咱們保存在hidden_states中,而後將這個獲得的hidden_states結果在加入到Dense Layer,最後輸出咱們須要提取的表示情感的文字的起始位置跟結束位置。
這兩個位置信息就是咱們須要從原文中提取的詞向量的位置。
class BertModel(TFBertPreTrainedModel): # drop out rate, 防止過擬合 dr = 0.1 # hidden state數量 hs = 2 def __init__(self, config, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) self.bert = TFBertMainLayer(config, name="bert") self.concat = L.Concatenate() self.dropout = L.Dropout(self.dr) self.qa_outputs = L.Dense( config.num_labels, kernel_initializer=TruncatedNormal(stddev=config.initializer_range), dtype='float32', name="qa_outputs") @tf.function def call(self, inputs, **kwargs): _, _, hidden_states = self.bert(inputs, **kwargs) hidden_states = self.concat([ hidden_states[-i] for i in range(1, self.hs+1) ]) hidden_states = self.dropout(hidden_states, training=kwargs.get("training", False)) logits = self.qa_outputs(hidden_states) start_logits, end_logits = tf.split(logits, 2, axis=-1) start_logits = tf.squeeze(start_logits, axis=-1) end_logits = tf.squeeze(end_logits, axis=-1) return start_logits, end_logits
定義訓練函數
def train(model, dataset, loss_fn, optimizer): @tf.function def train_step(model, inputs, y_true, loss_fn, optimizer): with tf.GradientTape() as tape: y_pred = model(inputs, training=True) loss = loss_fn(y_true[0], y_pred[0]) loss += loss_fn(y_true[1], y_pred[1]) scaled_loss = optimizer.get_scaled_loss(loss) scaled_gradients = tape.gradient(scaled_loss, model.trainable_variables) gradients = optimizer.get_unscaled_gradients(scaled_gradients) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss, y_pred epoch_loss = 0. for batch_num, sample in enumerate(dataset): loss, y_pred = train_step(model, sample[:3], sample[4:6], loss_fn, optimizer) epoch_loss += loss print( f"training ... batch {batch_num+1:03d} : " f"train loss {epoch_loss/(batch_num+1):.3f} ", end='\r')
定義預製函數
def predict(model, dataset, loss_fn, optimizer): @tf.function def predict_step(model, inputs): return model(inputs) def to_numpy(*args): out = [] for arg in args: if arg.dtype == tf.string: arg = [s.decode('utf-8') for s in arg.numpy()] out.append(arg) else: arg = arg.numpy() out.append(arg) return out offset = tf.zeros([0, 128, 2], dtype=tf.dtypes.int32) text = tf.zeros([0,], dtype=tf.dtypes.string) selected_text = tf.zeros([0,], dtype=tf.dtypes.string) sentiment = tf.zeros([0,], dtype=tf.dtypes.string) pred_start = tf.zeros([0, 128], dtype=tf.dtypes.float32) pred_end = tf.zeros([0, 128], dtype=tf.dtypes.float32) for batch_num, sample in enumerate(dataset): print(f"predicting ... batch {batch_num+1:03d}"+" "*20, end='\r') y_pred = predict_step(model, sample[:3]) # add batch to accumulators pred_start = tf.concat((pred_start, y_pred[0]), axis=0) pred_end = tf.concat((pred_end, y_pred[1]), axis=0) offset = tf.concat((offset, sample[3]), axis=0) text = tf.concat((text, sample[6]), axis=0) selected_text = tf.concat((selected_text, sample[7]), axis=0) sentiment = tf.concat((sentiment, sample[8]), axis=0) pred_start, pred_end, text, selected_text, sentiment, offset = \ to_numpy(pred_start, pred_end, text, selected_text, sentiment, offset) return pred_start, pred_end, text, selected_text, sentiment, offset
判斷函數
這個競賽採用單詞級Jaccard係數,計算公式以下
Jaccard係數計算的是你預測的單詞在數據集中的個數,
def jaccard(str1, str2): a = set(str1.lower().split()) b = set(str2.lower().split()) c = a.intersection(b) return float(len(c)) / (len(a) + len(b) - len(c))
定義預測結果解碼函數
解碼函數經過模型預測拿到的start和end的index位置信息,而後和以前拿到的詞向量在樣本句子中的位置進行比較,將這個區間內的全部的單詞都提取出來做爲咱們的預測結果。
def decode_prediction(pred_start, pred_end, text, offset, sentiment): def decode(pred_start, pred_end, text, offset): decoded_text = "" for i in range(pred_start, pred_end+1): decoded_text += text[offset[i][0]:offset[i][1]] if (i+1) < len(offset) and offset[i][1] < offset[i+1][0]: decoded_text += " " return decoded_text decoded_predictions = [] for i in range(len(text)): if sentiment[i] == "neutral" or len(text[i].split()) < 2: decoded_text = text[i] else: idx_start = np.argmax(pred_start[i]) idx_end = np.argmax(pred_end[i]) if idx_start > idx_end: idx_end = idx_start decoded_text = str(decode(idx_start, idx_end, text[i], offset[i])) if len(decoded_text) == 0: decoded_text = text[i] decoded_predictions.append(decoded_text) return decoded_predictions
將訓練數據分紅5個folds,每一個folds訓練5個epoch,使用adam優化器,learning rate設置成3e-5,batch size使用32。
num_folds = 5 num_epochs = 5 batch_size = 32 learning_rate = 3e-5 optimizer = tf.keras.optimizers.Adam(learning_rate) optimizer = tf.keras.mixed_precision.experimental.LossScaleOptimizer( optimizer, 'dynamic') config = BertConfig(output_hidden_states=True, num_labels=2) model = BertModel.from_pretrained(PATH, config=config) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) kfold = model_selection.KFold( n_splits=num_folds, shuffle=True, random_state=42) test_preds_start = np.zeros((len(test_df), 128), dtype=np.float32) test_preds_end = np.zeros((len(test_df), 128), dtype=np.float32) for fold_num, (train_idx, valid_idx) in enumerate(kfold.split(train_df.text)): print("\nfold %02d" % (fold_num+1)) # 建立train, valid, test數據集 train_dataset = TweetDataset.create( train_df.iloc[train_idx], batch_size, shuffle_buffer_size=2048) valid_dataset = TweetDataset.create( train_df.iloc[valid_idx], batch_size, shuffle_buffer_size=-1) test_dataset = TweetDataset.create( test_df, batch_size, shuffle_buffer_size=-1) best_score = float('-inf') for epoch_num in range(num_epochs): print("\nepoch %03d" % (epoch_num+1)) train(model, train_dataset, loss_fn, optimizer) pred_start, pred_end, text, selected_text, sentiment, offset = \ predict(model, valid_dataset, loss_fn, optimizer) selected_text_pred = decode_prediction( pred_start, pred_end, text, offset, sentiment) jaccards = [] for i in range(len(selected_text)): jaccards.append( jaccard(selected_text[i], selected_text_pred[i])) score = np.mean(jaccards) print(f"valid jaccard epoch {epoch_num+1:03d}: {score}"+" "*15) if score > best_score: best_score = score # predict test set test_pred_start, test_pred_end, test_text, _, test_sentiment, test_offset = \ predict(model, test_dataset, loss_fn, optimizer) test_preds_start += test_pred_start * 0.2 test_preds_end += test_pred_end * 0.2 # 重置模型,避免OOM session = tf.compat.v1.get_default_session() graph = tf.compat.v1.get_default_graph() del session, graph, model model = BertModel.from_pretrained(PATH, config=config)
selected_text_pred = decode_prediction( test_preds_start, test_preds_end, test_text, test_offset, test_sentiment) def f(selected): return " ".join(set(selected.lower().split())) submission_df.loc[:, 'selected_text'] = selected_text_pred submission_df['selected_text'] = submission_df['selected_text'].map(f) submission_df.to_csv("submission.csv", index=False)
這個方案在提交的時候在553個隊伍中排名153位, 分數爲0.68。
Twitter情感分析案例以後會在矩池雲Demo鏡像中上線,能夠直接使用。另矩池雲還支持了Paddle、MindSpore、MegEngine、Jittor等國產深度學習框架,可免安裝直接運行。