用機器學習打造聊天機器人(四) 代碼篇

本文是用機器學習打造聊天機器人系列的第四篇,將先對主要模塊的代碼進行展現和解讀,末尾會給出完整代碼的地址。建議先看主要模塊的代碼解讀,有助於理解核心代碼的思路,而後瀏覽完整項目代碼的README文檔,將項目跑起來體驗如下,再針對性的根據接口去閱讀各模塊代碼的實現。web

主要模塊代碼

構造特徵向量

特徵向量的構造有兩種思想,一種是one-hot,一種是Dristributed Representation(這裏用word2vec實現),通常來講後者可以更好的表示詞的含義,可是有時候咱們使用的句子來自特殊的領域,word2vec模型的預訓練語料未必可以表示的很好,因此這個時候用one-hot就可能會表現的更好。算法

  • one-hot
def build_feature(self, sentence, w_i_dict):
    """
    根據詞彙表構造句子向量,其中用到的'w_i_dict'參數會經過如下方法先構造好:
    # 構建訓練語料庫
    build_corpus_vocabulary()
    # 訓練語料庫分詞
    cut_corpus_vocabulary()
    # 構建訓練語料庫詞彙反向索引
    word_index_dict_ = load_vocabulary()
    # 存儲訓練語料庫詞彙反向索引
    dump_word_index(word_index_dict_)
    :param sentence: 句子
    :param w_i_dict: 詞彙-位置索引字典
    :return: one-hot 向量
    """
    # 分詞
    sentence_seg = jieba.cut(sentence)
    # 用0初始化one-hot向量,維數爲詞彙表的詞的個數
    sen_vec = np.zeros(len(w_i_dict))
    # 詞彙表的詞的列表
    w_i_dict_keys = w_i_dict.keys()
    # one-hot向量對應詞在詞典中的位置至1
    for word in sentence_seg:
        if w_i_dict_keys.__contains__(word):
            sen_vec[w_i_dict[word]] = 1
    return sen_vec
  • Dristributed Representation
def sum_vecs_avg(self, text):
    """
    根據詞向量模型構建句子向量
    :param text: 句子
    :return:
    """
    # 加載詞向量模型
    word_vec_model = ModelsLoader().sf_words_vec_model
    # 用0值初始化一個同維數的向量,若是你知道你的詞向量模型是多少維的,能夠直接指定,不用採用下面的野路子
    vec = np.zeros(word_vec_model['是'].shape[0])
    # 分詞
    words_list = list(jieba.cut(text))
    for w in words_list:
        try:
            # 將全部詞的向量累加
            vec = vec + word_vec_model[w]
        except KeyError as e:
            logging.warning('詞‘%s’,不在詞向量模型詞彙表中', w)
            continue
        except ValueError as e:
            logging.error('Error:', e)
            break
    # 計算平均向量
    vec = vec / len(words_list)
    return vec

意圖分類

和特徵向量的構建同樣,分兩種方式,一種是基於貝葉斯算法(對應上面的one-hot特徵),另外一種是基於句子向量各份量的算數平均值構成的向量和輸入向量的夾角餘弦類似度來分類(對應上面的詞向量特徵)。前者的訓練是根據樣本計算機率模型,後者的訓練是提早計算好每一個類別的中心向量。json

def train_clf(self):
    """
    基於貝葉斯算法訓練意圖分類器,並存儲爲文件,以便下次使用
    :return: 
    """
    dump_path = "%s/classifier_mnb.m" % get_resources_trained_models()
    # 加載訓練樣本數據
    features_np, labels_np = load_train_data()
    features_np = np.array(features_np)
    labels_np = np.array(labels_np)
    # 開始訓練
    starttime = datetime.datetime.now()
    print("開始訓練分類器...")
    # 使用多項式樸素貝葉斯算法訓練模型
    clf = MultinomialNB(alpha=0.1, fit_prior=True, class_prior=None)
    # 從第10個開始歸入訓練,前10將作爲驗證集評估模型的表現
    clf.fit(features_np[10:], labels_np[10:])
    endtime = datetime.datetime.now()
    print("===========訓練耗時: %s" % (endtime - starttime).seconds)
    # 評估分類器在驗證集上的表現
    print("評估結果:%s" % clf.score(features_np[:10], labels_np[:10]))
    self.clf_nb = clf
    # 存儲分類器
    dump_clf(self)
    print("分類器存儲位置:%s" % dump_path)
    return self

def predict(self, feature_vec, clf):
    """
    預測(基於貝葉斯模型)
    :param feature_vec: 輸入句子的特徵向量
    :param clf: 訓練好的貝葉斯模型
    :return:
    """
    proba_pred_np = clf.clf_nb.predict_proba(np.array([feature_vec]))[0]
    logging.debug("預測結果的機率:%s", proba_pred_np)
    # 加載類別集合
    labels_set = load_labels_set()
    label_score_list = []
    for i, num in enumerate(proba_pred_np):
        # if num != 0.00000000e+00:
        if num >= current_app.config['THRESHOLD_INTENT_RECOGNITION']:
            label_score_list.append((labels_set[i], num))
    if len(label_score_list) == 0:  # 正常閾值下沒有匹配項,就降級匹配
        logging.debug("意圖識別在正常分數閾值下沒有匹配到任何項,進行降級匹配...")
        for i, num in enumerate(proba_pred_np):
            # if num != 0.00000000e+00:
            if num >= current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']:
                label_score_list.append((labels_set[i], num))
    rs = sorted(label_score_list, key=lambda item: item[1], reverse=True)
    return rs, [c for c, v in rs]

def train_clf(self):
    """
    訓練分類器(基於中心向量的方式)
    :return: 
    """
    data = DataLoader().load_train_data()
    logging.info("開始訓練...")
    _, labels_centroids_dict = self.cal_centroid_vec(data)
    self.labels_centroids_dict = labels_centroids_dict
    self.labels = list(labels_centroids_dict.keys())
    logging.info("訓練完成!")
    # 存儲分類器模型
    self.dump(self)
    return self

def cal_centroid_vec(self, data):
    """
    構建「類別-中心向量」字典
    :param data: {'類別':{examples:'句子樣本',centroid:'中心向量'}}
    :return: 
    """
    labels_centroids_dict = {}
    for the_label in data.keys():
        centroid = self.get_centroid(data[the_label]["examples"])
        data[the_label]["centroid"] = centroid
        labels_centroids_dict[the_label] = centroid
    return data, labels_centroids_dict

def get_centroid(self, examples):
    """
    獲取當前意圖類別的中心向量。中心向量由examples中全部句子向量各份量上的算數平均數表示
    :param examples: 當前類別下的全部樣本句子
    :return:
    """
    word_vec_model = ModelsLoader().sf_words_vec_model
    word_dim = word_vec_model['是'].shape[0]
    C = np.zeros((len(examples), word_dim))
    for idx, text in enumerate(examples):
        C[idx, :] = self.sum_vecs_avg(text)
    centroid = np.mean(C, axis=0)
    assert centroid.shape[0] == word_dim
    return centroid
    
def predict(self, feature_vec, clf):
    """
    預測意圖類別(基於向量夾角餘弦值)
    :param feature_vec: 輸入句子的特徵向量
    :param clf: 從接口繼承下來的參數,這裏用不到
    :return: 
    """
    intents = self.labels
    # 分數計算規則:計算新句子的向量和當前意圖類別的中心向量的夾角餘弦值,下面其實能夠改進如下,用矩陣並行計算代替for循環,可是由於類別目前很少,影響暫時不大。
    scores = [(label_, np.dot(feature_vec, self.labels_centroids_dict[label_]) / (
            np.linalg.norm(feature_vec) * np.linalg.norm(self.labels_centroids_dict[label_]))) for label_ in
              intents]
    rs = sorted(scores, key=lambda item: item[1], reverse=True)
    top1scores = rs[0][1]
    top1label = rs[0][0]
    logging.debug("top1的分數:%s,label:%s", top1scores, top1label)
    if top1scores >= current_app.config['THRESHOLD_INTENT_RECOGNITION']:
        rs = rs[:1]
    elif top1scores >= current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']:
        logging.debug("意圖識別在正常分數閾值下沒有匹配到任何項,進行降級匹配...")
    elif top1scores < current_app.config['MINIMUM_THRESHOLD_INTENT_RECOGNITION']:
        logging.debug("意圖識別在最小分數閾值下沒有匹配到任何項...")
        rs = []
    return rs, [c for c, v in rs]

語義匹配

def compare(self, statement, statement_vec):
    """
    比較夾角餘弦值
    :param statement: 輸入句子對象
    :param statement_vec: 句子樣本特徵向量,是一個二維list
    :return: 輸入句子和各句子樣本的類似度構成的二維數組
    """
    statement_text_vec = statement.text_vector
    statement_vec = np.array(statement_vec)
    # 向量化並行計算餘弦值
    similarity = np.dot(statement_text_vec, statement_vec.T) / (
                np.linalg.norm(statement_text_vec) * np.linalg.norm(statement_vec, axis=1)).T
    print("similarity.shape %s" % similarity.shape)
    return similarity

chatterbot訓練

本項目裏,做者把訓練語料的類型分紅了閒聊和業務兩大類,下面你會看到不少SF關鍵字,就是指業務,至於爲何叫SF,是歷史遺留(lan)的問題,沒必要過於糾結。閒聊類目前咱們不拆分,因此代碼和上面介紹chatterbot的時候的代碼相似,可是對於業務類的樣本,因爲咱們須要分紅多個類型,因此這裏要建立多個chatterbot實例,下面展現的是業務類的chatbot的實例化過程:數組

def train_sf_chatbot():
    data_root_dir = path_configer.get_classifier_train_samples()
    for file_name in os.listdir(data_root_dir):
        if file_name.startswith("QA_sf_"):
            __train(('%s/%s' % (get_chatter_corpus(), file_name)), file_name[:file_name.find('-')])
        
def __train(corpus_path, collection_name):
    print("開始訓練SF...")
    starttime = datetime.now()
    chatbot = SF().chatters[collection_name]
    chatbot.set_trainer(ListTrainer)
    chatbot.train(read_custom(corpus_path))
    print("SF訓練完成!")
    endtime = datetime.now()
    print("===========訓練耗時: %s秒" % (endtime - starttime).seconds)
    
@singleton
class SF(object):
    def __init__(self):
        logging.info('預加載sf詞向量模型...')
        logging.info('預加載SF全部實例...')
        labels = [file_name[:file_name.find("-")] for file_name in os.listdir(path_configer.get_chatter_corpus()) if
                  file_name.startswith("QA_sf_")]
        chatters = {}
        bot_name = current_app.config['DATABASE']
        # 根據不一樣的類型,建立不一樣的ChatBot實例
        for label in labels:
            chatters[label] = (
                ChatBot(
                    bot_name,
                    database=bot_name,
                    database_uri=current_app.config['DATABASE_URI'],
                    # 使用合適的詞向量模型時開啓
                    preprocessors=[
                        'kbqa_sf.train.chatter.sf.sf_preprocessors.sum_vecs_avg'
                    ],
                    statement_comparison_function=WordVecComparator(),
                    # statement_comparison_function=levenshtein_distance,
                    logic_adapters=[{'import_path': 'kbqa_sf.train.chatter.sf.sf_adapter.BestMatchExtLogicAdapter'}],
                    storage_adapter="kbqa_sf.train.chatter.sf.sf_mongo_storage.MongoDatabaseExtAdapter",
                    ext_collection_name=label,
                    read_only=True)
            )
        self.chatters = chatters
        logging.info('SF全部實例預加載完成!')

在線學習

chatterbot提供了學習接口,就是方便之後再追加新的問答對,代碼以下:微信

# a:問題對象Statement,q:回答對象Statement
chatbot_.learn_response(a, q)

可是光是執行上面的代碼,在咱們的項目中是不夠的,由於當樣本庫變更了,咱們的意圖分類器,詞彙-索引字典,句子-句向量字典都要從新生成。若是你的樣本庫數量不大,那麼這個過程仍是很快的,可是若是數據量比較大的話,好比上萬條,那麼這個過程須要幾十秒到幾分鐘。因此不建議讓用戶可以直接經過web頁面就使用這個學習的接口,而是採用異步的方式,先記錄下用戶提交的反饋,而後定時由程序在後臺執行比較合適。固然,若是你是本身隨便玩玩,數據量不大的話,直接經過web頁面使用這個接口是最方便的了。在線學習的代碼以下,分爲記錄和學習2個接口:app

@qac.route('/record', methods=['POST'])
def record():
    """
    將要學習的問題、答案、類別,寫入文件learn目錄下的wait-learn.txt、history-learn.txt
    :return:
    """
    qac_list = request.get_json()
    learn_path = path_configer.get_learn()
    wait_learn_path = "%s/%s" % (learn_path, "wait-learn.txt")
    history_learn_path = "%s/%s" % (learn_path, "history-learn.txt")
    with __record_lock:
        fa_wait = codecs.open(wait_learn_path, "a", encoding="utf-8")
        fa_history = codecs.open(history_learn_path, "a", encoding="utf-8")
        for qac_item in qac_list:
            q = qac_item["q"]
            a = qac_item["a"]
            c = qac_item["c"]
            if 0 < len(a) <= 300 and len(q) > 0 and len(c) > 0:
                content = 'Q %s\nA %s\nC %s\n' % (q, a, c)
                fa_wait.write(content)
                fa_history.write(
                    '%sT %s\n' % (content, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
            else:
                return make_response(jsonify({'error': '參數不符合要求,請檢查!'}), 400)
        fa_wait.close()
        fa_history.close()
        logging.debug("=========待學習問題記錄完成!")
    return "success"


@qac.route('/learn/batch', methods=['GET'])
def learn_batch():
    """
    批量學習給定的問題和答案:
    重命名wait-learn.txt爲learning.txt,讀取learning.txt的內容進行學習
    :return:
    """
    _learn_new_batch_lock = threading.Lock()
    logging.debug("開始學習...")
    starttime = datetime.datetime.now()
    learn_path = path_configer.get_learn()
    wait_learn_path = "%s/%s" % (learn_path, "wait-learn.txt")
    learning_path = "%s/%s" % (learn_path, "learning.txt")
    with __record_lock:
        if os.path.exists(learning_path):
            # 若上一次的臨時文件未能刪除,就在這裏刪除。
            os.remove(learning_path)
            logging.info("=========發現上一次的臨時文件未能刪除,已刪除!")
        if not os.path.exists(wait_learn_path):
            msg = "nothing"
            logging.info(msg)
            return msg
        os.rename(wait_learn_path, learning_path)
        logging.debug("重命名wait-learn.txt爲learning.txt ...")
    with _learn_new_batch_lock:
        logging.debug("讀取learning.txt的內容進行學習 ...")
        with codecs.open(learning_path, "r", encoding="utf-8") as fr:
            q = fr.readline().strip("\n\r")
            while q != "":
                a = fr.readline().strip("\n\r")
                assert a.strip("\n\r") != "", 'q,a,c格式沒法匹配!缺乏a!'
                c = fr.readline().strip("\n\r")
                assert c.strip("\n\r") != "", 'q,a,c格式沒法匹配!缺乏a!'
                # 添加q,a到指定的c類別文件;訓練c對應的chatterbot
                logging.debug("添加%s,%s到指定的%s類別文件;訓練對應的chatterbot ...", q, a, c)
                # 開始學習
                learn_(q, a, c[c.find(" ") + 1:])
                q = fr.readline().strip("\n\r")
        logging.debug("learning.txt學習所有完成...")
        logging.debug("完整的從新訓練分類器模型 ...")
        IntentClassifier().full_retrain_clf()
        logging.debug("構建文本-向量索引文件,並存儲 ...")
        IntentClassifier().build_text_vec_indx()
        logging.debug("加載文本向量索引文件 ...")
        IntentClassifier().load_text_vec_indx()
        # 刪除臨時的學習文件
        os.remove(learning_path)
        endtime = datetime.datetime.now()
        print("===========本次學習耗時: %s秒" % (endtime - starttime).seconds)
        logging.info("=========本次學習已所有完成!")
    return "success"
    
def learn_(q, a, c):
    """
    添加q,a到指定的c類別文件;訓練c對應的chatterbot
    :param q: 問題
    :param a: 答案
    :param c: 分類
    :return:
    """
    file_names = [file_name for file_name in os.listdir(path_configer.get_chatter_corpus()) if
                  file_name.startswith(c)]
    if not file_names:
        logging.warning("未知的類別:%s,已忽略", c)
        return
    file_name = file_names[0]
    file_path = "%s/%s" % (path_configer.get_chatter_corpus(), file_name)
    # 追加到c對應的意圖分類文件中
    with codecs.open(file_path, "a", encoding="utf-8") as fa:
        if len(q) > 0 and len(a) > 0:
            if os.path.getsize(file_path) == 0:
                fa.write('%s' % q)
            else:
                fa.write('\n%s' % q)
            fa.write('\n%s' % a)
    # 學習問答
    qa_learn(q, a, c)
    return "success"

def qa_learn(q, a, c):
    a_statement = Statement(a)
    q_statement = Statement(q)
    if c.startswith("QA_talk"):
        chat_bot = Talk().chat
    else:
        chat_bot = SF().chatters[c]
    chat_bot.learn_response(a_statement, q_statement)

以上是主要功能的代碼,若要獲取可運行的完整代碼,能夠加做者微信(jiabao512859468)獲取,有任何相關技術問題,都歡迎和做者探討O(∩_∩)O~機器學習

ok,有了代碼,下一篇將介紹如何將聊天機器人項目應用到不一樣的業務領域,以及如何接入其餘項目中。異步

本篇就這麼多內容啦~,感謝閱讀O(∩_∩)O。學習

相關文章
相關標籤/搜索