參照當Bert趕上Kerashttps://spaces.ac.cn/archives/6736此示例準確率達到95.5%+
https://github.com/CyberZHG/keras-bert/blob/master/README.zh-CN.mdpython
# ! -*- coding:utf-8 -*- import json import numpy as np import pandas as pd from random import choice from keras_bert import load_trained_model_from_checkpoint, Tokenizer import codecs maxlen = 100 config_path = 'model/bert_config.json' checkpoint_path = 'model/bert_model.ckpt' dict_path = 'model/vocab.txt' token_dict = {} with codecs.open(dict_path, 'r', 'utf8') as reader: for line in reader: token = line.strip() token_dict[token] = len(token_dict) class OurTokenizer(Tokenizer): def __init__(self, token_dict): super(OurTokenizer, self).__init__(token_dict) def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # space類用未經訓練的[unused1]表示 else: R.append('[UNK]') # 剩餘的字符是[UNK] return R tokenizer = OurTokenizer(token_dict) neg = pd.read_excel('neg.xls', header=None) pos = pd.read_excel('pos.xls', header=None) data = [] for d in neg[0]: data.append((d, 0)) for d in pos[0]: data.append((d, 1)) # 按照9:1的比例劃分訓練集和驗證集 random_order = list(range(len(data))) np.random.shuffle(random_order) train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0] valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0] def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) class data_generator: def __init__(self, data, batch_size=32): self.data = data self.batch_size = batch_size self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = list(range(len(self.data))) np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y [X1, X2, Y] = [], [], [] from keras.layers import * from keras.models import Model from keras.optimizers import Adam bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = False x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) x = Lambda(lambda x: x[:, 0])(x) p = Dense(1, activation='sigmoid')(x) model = Model([x1_in, x2_in], p) model.compile( loss='binary_crossentropy', optimizer=Adam(1e-5), # 用足夠小的學習率 metrics=['accuracy'] ) model.summary() train_D = data_generator(train_data) valid_D = data_generator(valid_data) test = [train_data[0]] test_D = data_generator(test) model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=1, validation_data=valid_D.__iter__(), validation_steps=len(valid_D) ) #保存模型權重值 model.save('model.h5')
模型在保持完以後再進行加載時提示存在自定義層和激活方法的問題,暫沒找到解決辦法,若有知道辦法的小夥伴請留言私信git
# ! -*- coding:utf-8 -*- import json import numpy as np import pandas as pd from random import choice from keras_bert import load_trained_model_from_checkpoint, Tokenizer, get_custom_objects import re, os import codecs from keras.models import load_model maxlen = 100 config_path = 'model/bert_config.json' checkpoint_path = 'model/bert_model.ckpt' dict_path = 'model/vocab.txt' token_dict = {} with codecs.open(dict_path, 'r', 'utf8') as reader: for line in reader: token = line.strip() token_dict[token] = len(token_dict) class OurTokenizer(Tokenizer): def __init__(self, token_dict): super(OurTokenizer, self).__init__(token_dict) def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # space類用未經訓練的[unused1]表示 else: R.append('[UNK]') # 剩餘的字符是[UNK] return R tokenizer = OurTokenizer(token_dict) neg = pd.read_excel('neg.xls', header=None) pos = pd.read_excel('pos.xls', header=None) data = [] for d in neg[0]: data.append((d, 0)) for d in pos[0]: data.append((d, 1)) # 按照9:1的比例劃分訓練集和驗證集 random_order = list(range(len(data))) np.random.shuffle(random_order) train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0] valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0] def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) class data_generator: def __init__(self, data, batch_size=32): self.data = data self.batch_size = batch_size self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = list(range(len(self.data))) np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y [X1, X2, Y] = [], [], [] from keras.layers import * from keras.models import Model import keras.backend as K from keras.optimizers import Adam bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = False x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) print(bert_model.layers) x = Lambda(lambda x: x[:, 0])(x) p = Dense(1, activation='sigmoid')(x) model = Model([x1_in, x2_in], p) model.compile( loss='binary_crossentropy', optimizer=Adam(1e-5), # 用足夠小的學習率 metrics=['accuracy'] ) model.summary() train_D = data_generator(train_data) valid_D = data_generator(valid_data) ''' model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=5, validation_data=valid_D.__iter__(), validation_steps=len(valid_D) ) model.save('save_path.h5') ''' # 定義生成器將數據集解析爲 class data_token_generator: def __init__(self, data, batch_size=32): self.data = data self.batch_size = batch_size self.steps = len(self.data) # self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def get_data(self): idxs = list(range(len(self.data))) np.random.shuffle(idxs) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] print(text) x1, x2 = tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append([y]) X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) return X1, X2, Y new_model = load_model('save_path.h5', custom_objects=get_custom_objects()) test_T = data_token_generator(valid_data[0:10]) X_test1, X_test2, Y_test = test_T.get_data() print(Y_test) print(new_model.predict([X_test1, X_test2]))
# ! -*- coding:utf-8 -*- import numpy as np import pandas as pd from random import choice from keras_bert import load_trained_model_from_checkpoint, Tokenizer, get_checkpoint_paths import codecs from keras.layers import * from keras.models import Model from keras.optimizers import Adam # 評價文本最大長度 maxlen = 100 dict_path = 'model/vocab.txt' token_dict = {} EPOCHS = 30 BATCH_SIZE = 128 # 初始化令牌字典 with codecs.open(dict_path, 'r', 'utf8') as reader: for line in reader: token = line.strip() # print(token, len(token_dict)) token_dict[token] = len(token_dict) # 定義令牌解析器 class OurTokenizer(Tokenizer): def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # space類用未經訓練的[unused1]表示 else: R.append('[UNK]') # 剩餘的字符是[UNK] return R # 初始化令牌解析器 tokenizer = OurTokenizer(token_dict) # 讀取數據集 neg = pd.read_excel('neg.xls', header=None) pos = pd.read_excel('pos.xls', header=None) data = [] for d in neg[0]: data.append((d, 0)) for d in pos[0]: data.append((d, 1)) # 按照9:1的比例劃分訓練集和驗證集 random_order = list(range(len(data))) np.random.shuffle(random_order) train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0] valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0] # 令牌序列長度補全 def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) t = [ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ] return t # 定義生成器將數據集解析爲 class data_token_generator: def __init__(self, data, batch_size=32, print_text=False): self.data = data self.batch_size = batch_size self.steps = len(self.data) # self.batch_size self.print_text = print_text if len(self.data) % self.batch_size != 0: self.steps += 1 # bert中文模型路徑 paths = get_checkpoint_paths('model') # bert中文模型加載 self.bert_model = load_trained_model_from_checkpoint(paths.config, paths.checkpoint, seq_len=None) for l in self.bert_model.layers: l.trainable = True def __len__(self): return self.steps def get_data(self): data_x = [] data_y = [] idxs = list(range(len(self.data))) # 隨機 np.random.shuffle(idxs) indices, segments, Y = [], [], [] for i in idxs: d = self.data[i] # 截取數據 text = d[0][:maxlen] if self.print_text: print(text) # 生成指標及段 indice, segment = tokenizer.encode(first=text) y = d[1] # 數據放入數組中 indices.append(indice) segments.append(segment) Y.append([y]) # 轉化成批次 if len(indices) == self.batch_size or i == idxs[-1]: indices = seq_padding(indices) segments = seq_padding(segments) Y = seq_padding(Y) # 產生詞向量 x = self.bert_model.predict([np.array(indices), np.array(segments)]) j_idxs = list(range(len(x))) for j in j_idxs: data_x.append(x[j]) data_y.append(Y[j]) print(len(data_y)) [indices, segments, Y] = [], [], [] return np.array(data_x), np.array(data_y) # 定義二分類網絡 x_in = Input(shape=(None, 768)) x = Lambda(lambda x: x[:, 0])(x_in) p = Dense(1, activation='sigmoid')(x) model = Model(x_in, p) model.compile( loss='binary_crossentropy', optimizer=Adam(1e-5), # 用足夠小的學習率 metrics=['accuracy'] ) # 打印模型結構 model.summary() # 開始訓練 print('Training -----------') train_T = data_token_generator(train_data) train_x, train_y = train_T.get_data() valid_T = data_token_generator(valid_data) validation_data = valid_T.get_data() model.fit( train_x, train_y, epochs=EPOCHS, batch_size=BATCH_SIZE, validation_data=validation_data ) model.save('new_model.h5') # 加載模型驗證 import keras test_T = data_token_generator(valid_data[0:10], print_text=True) X_test, Y_test = test_T.get_data() print(Y_test) new_model = keras.models.load_model('new_model.h5') y = new_model.predict(X_test) print(y)
採用哈工大版權重,準確率在80%左右github