使用Keras實現Siamese Network並進行語句類似度的計算git
Siamese Network是指網絡中包含兩個或以上徹底相同的子網絡,多應用於語句類似度計算、人臉匹配、簽名鑑別等任務上github
以語句類似度計算爲例,兩邊的子網絡從Embedding層到LSTM層等都是徹底相同的,整個模型稱做MaLSTM(Manhattan LSTM)網絡
經過LSTM層的最後輸出獲得兩句話的固定長度表示,再使用如下公式計算二者的類似度,類似度在0至1之間app
$$ D=\exp{(-\left| h^{(left)}-h^{(right)} \right|_1)} $$dom
使用Kaggle上的Quora問題對數據,Quora對應外國的知乎,https://www.kaggle.com/c/quora-question-pairs函數
訓練集和測試集分別有404290和3563475條數據,每條數據包括如下字段,但測試集不包括is_duplicate字段測試
加載庫code
# -*- coding: utf-8 -*- from keras.preprocessing.sequence import pad_sequences from keras.models import Model from keras.layers import Input, Embedding, LSTM, Lambda import keras.backend as K from keras.optimizers import Adam import pandas as pd import numpy as np from gensim.models import KeyedVectors from nltk.corpus import stopwords from sklearn.model_selection import train_test_split import matplotlib.pyplot as plt %matplotlib inline import re from tqdm import tqdm import pickle
加載訓練集和測試集orm
train_df = pd.read_csv('train.csv') test_df = pd.read_csv('test.csv') print(len(train_df), len(test_df)) train_df.head()
加載nltk(Natural Language Toolkit)中的停用詞,並定義一個文本預處理函數視頻
# 若是報錯nltk沒有stopwords則下載 # import nltk # nltk.download('stopwords') stops = set(stopwords.words('english')) def preprocess(text): # input: 'Hello are you ok?' # output: ['Hello', 'are', 'you', 'ok', '?'] text = str(text) text = text.lower() text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text) # 去掉其餘符號 text = re.sub(r"what's", "what is ", text) # 縮寫 text = re.sub(r"\'s", " is ", text) # 縮寫 text = re.sub(r"\'ve", " have ", text) # 縮寫 text = re.sub(r"can't", "cannot ", text) # 縮寫 text = re.sub(r"n't", " not ", text) # 縮寫 text = re.sub(r"i'm", "i am ", text) # 縮寫 text = re.sub(r"\'re", " are ", text) # 縮寫 text = re.sub(r"\'d", " would ", text) # 縮寫 text = re.sub(r"\'ll", " will ", text) # 縮寫 text = re.sub(r",", " ", text) # 去除逗號 text = re.sub(r"\.", " ", text) # 去除句號 text = re.sub(r"!", " ! ", text) # 保留感嘆號 text = re.sub(r"\/", " ", text) # 去掉右斜槓 text = re.sub(r"\^", " ^ ", text) # 其餘符號 text = re.sub(r"\+", " + ", text) # 其餘符號 text = re.sub(r"\-", " - ", text) # 其餘符號 text = re.sub(r"\=", " = ", text) # 其餘符號 text = re.sub(r"\'", " ", text) # 去掉單引號 text = re.sub(r"(\d+)(k)", r"\g<1>000", text) # 把30k等替換成30000 text = re.sub(r":", " : ", text) # 其餘符號 text = re.sub(r" e g ", " eg ", text) # 其餘詞 text = re.sub(r" b g ", " bg ", text) # 其餘詞 text = re.sub(r" u s ", " american ", text) # 其餘詞 text = re.sub(r"\0s", "0", text) # 其餘詞 text = re.sub(r" 9 11 ", " 911 ", text) # 其餘詞 text = re.sub(r"e - mail", "email", text) # 其餘詞 text = re.sub(r"j k", "jk", text) # 其餘詞 text = re.sub(r"\s{2,}", " ", text) # 將多個空白符替換成一個空格 return text.split()
加載Google預訓練好的300維詞向量
word2vec = KeyedVectors.load_word2vec_format('GoogleNews-vectors-negative300.bin.gz', binary=True)
整理詞典,一共有58564個詞,將文本替換成整數序列表示,得到詞向量映射矩陣
vocabulary = [] word2id = {} id2word = {} for df in [train_df, test_df]: for i in tqdm(range(len(df))): row = df.iloc[i] for column in ['question1', 'question2']: q2n = [] for word in preprocess(row[column]): if word in stops or word not in word2vec.vocab: continue if word not in vocabulary: word2id[word] = len(vocabulary) + 1 id2word[len(vocabulary) + 1] = word vocabulary.append(word) q2n.append(word2id[word]) else: q2n.append(word2id[word]) df.at[i, column] = q2n embedding_dim = 300 embeddings = np.random.randn(len(vocabulary) + 1, embedding_dim) embeddings[0] = 0 # 零填充對應的詞向量 for index, word in enumerate(vocabulary): embeddings[index] = word2vec.word_vec(word) del word2vec print(len(vocabulary))
分割訓練集和驗證集,將整數序列padding到統一長度
maxlen = max(train_df.question1.map(lambda x: len(x)).max(), train_df.question2.map(lambda x: len(x)).max(), test_df.question1.map(lambda x: len(x)).max(), test_df.question2.map(lambda x: len(x)).max()) valid_size = 40000 train_size = len(train_df) - valid_size X = train_df[['question1', 'question2']] Y = train_df['is_duplicate'] X_train, X_valid, Y_train, Y_valid = train_test_split(X, Y, test_size=valid_size) X_train = {'left': X_train.question1.values, 'right': X_train.question2.values} X_valid = {'left': X_valid.question1.values, 'right': X_valid.question2.values} Y_train = np.expand_dims(Y_train.values, axis=-1) Y_valid = np.expand_dims(Y_valid.values, axis=-1) # 前向填充或截斷 X_train['left'] = np.array(pad_sequences(X_train['left'], maxlen=maxlen)) X_train['right'] = np.array(pad_sequences(X_train['right'], maxlen=maxlen)) X_valid['left'] = np.array(pad_sequences(X_valid['left'], maxlen=maxlen)) X_valid['right'] = np.array(pad_sequences(X_valid['right'], maxlen=maxlen)) print(X_train['left'].shape, X_train['right'].shape) print(X_valid['left'].shape, X_valid['right'].shape) print(Y_train.shape, Y_valid.shape)
定義模型並訓練
hidden_size = 128 gradient_clipping_norm = 1.25 batch_size = 64 epochs = 20 def exponent_neg_manhattan_distance(args): left, right = args return K.exp(-K.sum(K.abs(left - right), axis=1, keepdims=True)) left_input = Input(shape=(None,), dtype='int32') right_input = Input(shape=(None,), dtype='int32') embedding_layer = Embedding(len(embeddings), embedding_dim, weights=[embeddings], input_length=maxlen, trainable=False) embedded_left = embedding_layer(left_input) embedded_right = embedding_layer(right_input) shared_lstm = LSTM(hidden_size) left_output = shared_lstm(embedded_left) right_output = shared_lstm(embedded_right) malstm_distance = Lambda(exponent_neg_manhattan_distance, output_shape=(1,))([left_output, right_output]) malstm = Model([left_input, right_input], malstm_distance) optimizer = Adam(clipnorm=gradient_clipping_norm) malstm.compile(loss='mean_squared_error', optimizer=optimizer, metrics=['accuracy']) history = malstm.fit([X_train['left'], X_train['right']], Y_train, batch_size=batch_size, epochs=epochs, validation_data=([X_valid['left'], X_valid['right']], Y_valid))
繪製訓練過程當中的正確率曲線和損失函數曲線
# Plot Accuracy plt.plot(history.history['acc']) plt.plot(history.history['val_acc']) plt.title('Model Accuracy') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(['Train', 'Validation'], loc='upper left') plt.show() # Plot Loss plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title('Model Loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train', 'Validation'], loc='upper right') plt.show()
訓練集損失不斷下降,但驗證集損失趨於平緩,說明模型泛化能力還不夠
訓練集正確率提高到了86%以上,而驗證集正確率維持在80%左右,模型有待進一步改進
保存模型,以便後續使用
malstm.save('malstm.h5') with open('data.pkl', 'wb') as fw: pickle.dump({'word2id': word2id, 'id2word': id2word}, fw)
在單機上使用訓練好的模型作個簡單測試,從訓練集中隨機拿出一些樣本,觀察模型分類的結果是否和標籤一致,主要是熟悉下如何應用模型進行推斷
# -*- coding: utf-8 -*- from keras.preprocessing.sequence import pad_sequences from keras.models import Model, load_model import pandas as pd import numpy as np from nltk.corpus import stopwords import re import pickle with open('data.pkl', 'rb') as fr: data = pickle.load(fr) word2id = data['word2id'] id2word = data['id2word'] train_df = pd.read_csv('train.csv') stops = set(stopwords.words('english')) def preprocess(text): # input: 'Hello are you ok?' # output: ['Hello', 'are', 'you', 'ok', '?'] text = str(text) text = text.lower() text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text) # 去掉其餘符號 text = re.sub(r"what's", "what is ", text) # 縮寫 text = re.sub(r"\'s", " is ", text) # 縮寫 text = re.sub(r"\'ve", " have ", text) # 縮寫 text = re.sub(r"can't", "cannot ", text) # 縮寫 text = re.sub(r"n't", " not ", text) # 縮寫 text = re.sub(r"i'm", "i am ", text) # 縮寫 text = re.sub(r"\'re", " are ", text) # 縮寫 text = re.sub(r"\'d", " would ", text) # 縮寫 text = re.sub(r"\'ll", " will ", text) # 縮寫 text = re.sub(r",", " ", text) # 去除逗號 text = re.sub(r"\.", " ", text) # 去除句號 text = re.sub(r"!", " ! ", text) # 保留感嘆號 text = re.sub(r"\/", " ", text) # 去掉右斜槓 text = re.sub(r"\^", " ^ ", text) # 其餘符號 text = re.sub(r"\+", " + ", text) # 其餘符號 text = re.sub(r"\-", " - ", text) # 其餘符號 text = re.sub(r"\=", " = ", text) # 其餘符號 text = re.sub(r"\'", " ", text) # 去掉單引號 text = re.sub(r"(\d+)(k)", r"\g<1>000", text) # 把30k等替換成30000 text = re.sub(r":", " : ", text) # 其餘符號 text = re.sub(r" e g ", " eg ", text) # 其餘詞 text = re.sub(r" b g ", " bg ", text) # 其餘詞 text = re.sub(r" u s ", " american ", text) # 其餘詞 text = re.sub(r"\0s", "0", text) # 其餘詞 text = re.sub(r" 9 11 ", " 911 ", text) # 其餘詞 text = re.sub(r"e - mail", "email", text) # 其餘詞 text = re.sub(r"j k", "jk", text) # 其餘詞 text = re.sub(r"\s{2,}", " ", text) # 將多個空白符替換成一個空格 return text.split() malstm = load_model('malstm.h5') correct = 0 for i in range(5): print('Testing Case:', i + 1) random_sample = dict(train_df.iloc[np.random.randint(len(train_df))]) left = random_sample['question1'] right = random_sample['question2'] print('Origin Questions...') print('==', left) print('==', right) left = preprocess(left) right = preprocess(right) print('Preprocessing...') print('==', left) print('==', right) left = [word2id[w] for w in left if w in word2id] right = [word2id[w] for w in right if w in word2id] print('To ids...') print('==', left, [id2word[i] for i in left]) print('==', right, [id2word[i] for i in right]) left = np.expand_dims(left, 0) right = np.expand_dims(right, 0) maxlen = max(left.shape[-1], right.shape[-1]) left = pad_sequences(left, maxlen=maxlen) right = pad_sequences(right, maxlen=maxlen) print('Padding...') print('==', left.shape) print('==', right.shape) pred = malstm.predict([left, right]) pred = 1 if pred[0][0] > 0.5 else 0 print('True:', random_sample['is_duplicate']) print('Pred:', pred) if pred == random_sample['is_duplicate']: correct += 1 print(correct / 5)