TensorFlow Bi-LSTM實現文本分詞

本節咱們來嘗試使用 TensorFlow 搭建一個雙向 LSTM (Bi-LSTM) 深度學習模型來處理序列標註(分詞)問題,主要目的是學習 Bi-LSTM 的用法。git

Bi-LSTM

咱們知道 RNN 是能夠學習到文本上下文之間的聯繫的,輸入是上文,輸出是下文,但這樣的結果是模型能夠根據上文推出下文,而若是輸入下文,想要推出上文就沒有那麼簡單了,爲了彌補這個缺陷,咱們可讓模型從兩個方向來學習,這就構成了雙向 RNN。在某些任務中,雙向 RNN 的表現比單向 RNN 要好,本文要實現的文本分詞就是其中之一。不過本文使用的模型不是簡單的雙向 RNN,而是 RNN 的變種 -- LSTM。github

圖片

如圖所示爲 Bi-LSTM 的基本原理,輸入層的數據會通過向前和向後兩個方向推算,最後輸出的隱含狀態再進行 concat,再做爲下一層的輸入,原理其實和 LSTM 是相似的,就是多了雙向計算和 concat 過程。數組

數據處理

本文的訓練和測試數據使用的是已經作好序列標註的中文文本數據。序列標註,就是給一個漢語句子做爲輸入,以「BEMS」組成的序列串做爲輸出,而後再進行切詞,進而獲得輸入句子的劃分。其中,B 表明該字是詞語中的起始字,M 表明是詞語中的中間字,E 表明是詞語中的結束字,S 則表明是單字成詞。網絡

這裏的原始數據樣例以下:數據結構

人/b  們/e  常/s  說/s  生/b  活/e  是/s  一/s  部/s  教/b  科/m  書/eapp

這裏一個字對應一個標註,咱們首先須要對數據進行預處理,預處理的流程以下:
dom

  • 將句子切分ide

  • 將句子的的標點符號去掉學習

  • 將每一個字及對應的標註切分測試

  • 去掉長度爲 0 的無效句子

首先咱們將句子切分開來並去掉標點符號,代碼實現以下:

# Read origin data

text = open('data/data.txt', encoding='utf-8').read()

# Get split sentences

sentences = re.split('[,。!?、‘’「」]/[bems]', text)

# Filter sentences whose length is 0

sentences = list(filter(lambda x: x.strip(), sentences))

# Strip sentences

sentences = list(map(lambda x: x.strip(), sentences))

這樣咱們就能夠將句子切分開來並作好了清洗,接下來咱們還須要把每一個句子中的字及標註轉爲 Numpy 數組,便於下一步製做詞表和數據集,代碼實現以下:

import re

# To numpy array

words, labels = [], []

print('Start creating words and labels...')

for sentence in sentences:

    groups = re.findall('(.)/(.)', sentence)

    arrays = np.asarray(groups)

    words.append(arrays[:, 0])

    labels.append(arrays[:, 1])

print('Words Length', len(words), 'Labels Length', len(labels))

print('Words Example', words[0])

print('Labels Example', labels[0])

這裏咱們利用正則 re 庫的 findall() 方法將字及標註分開,並分別添加到 words 和 labels 數組中,運行效果以下:

Words Length 321533 Labels Length 321533

Words Example ['人' '們' '常' '說' '生' '活' '是' '一' '部' '教' '科' '書']

Labels Example ['b' 'e' 's' 's' 'b' 'e' 's' 's' 's' 'b' 'm' 'e']

接下來咱們有了這些數據就要開始製做詞表了,詞表製做起來無非就是輸入詞表和輸出詞表的不重複的正逆對應,製做詞表的目的就是將輸入的文字或標註轉爲 index,同時還能反向根據 index 獲取對應的文字或標註,因此咱們這裏須要製做 word2id、id2word、tag2id、id2tag 四個字典。

爲了解決 OOV 問題,咱們還須要將無效字符也進行標註,這裏咱們統一取 0。製做時咱們藉助於 pandas 庫的 Series 進行了去重和轉換,另外還限制了每一句的最大長度,這裏設置爲 32,若是大於32,則截斷,不然進行 padding,代碼以下:

from itertools import chain

import pandas as pd

import numpy as np

# Merge all words

all_words = list(chain(*words))

# All words to Series

all_words_sr = pd.Series(all_words)

# Get value count, index changed to set

all_words_counts = all_words_sr.value_counts()

# Get words set

all_words_set = all_words_counts.index

# Get words ids

all_words_ids = range(1, len(all_words_set) + 1)


# Dict to transform

word2id = pd.Series(all_words_ids, index=all_words_set)

id2word = pd.Series(all_words_set, index=all_words_ids)


# Tag set and ids

tags_set = ['x', 's', 'b', 'm', 'e']

tags_ids = range(len(tags_set))


# Dict to transform

tag2id = pd.Series(tags_ids, index=tags_set)

id2tag = pd.Series(tags_set, index=tag2id)


max_length = 32


def x_transform(words):

    ids = list(word2id[words])

    if len(ids) >= max_length:

        ids = ids[:max_length]

    ids.extend([0] * (max_length - len(ids)))

    return ids


def y_transform(tags):

    ids = list(tag2id[tags])

    if len(ids) >= max_length:

        ids = ids[:max_length]

    ids.extend([0] * (max_length - len(ids)))

    return ids


print('Starting transform...')

data_x = list(map(lambda x: x_transform(x), words))

data_y = list(map(lambda y: y_transform(y), labels))

data_x = np.asarray(data_x)

data_y = np.asarray(data_y)

這樣咱們就完成了 word2id、id2word、tag2id、id2tag 四個字典的製做,並製做好了 Numpy 數組類型的 data_x 和 data_y,這裏 data_x 和 data_y 單句示例以下:

Data X Example: [8, 43, 320, 88, 36, 198, 7, 2, 41, 163, 124, 245, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Data Y Example: [2, 4, 1, 1, 2, 4, 1, 1, 1, 2, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

能夠看到數據的 x 部分,原始文字和標註結果都轉化成了詞表中的 index,同時不夠 32 個字符就以 0 補全。

接下來咱們將其保存成 pickle 文件,以備訓練和測試使用,代碼以下:

with open(join(path, 'data.pkl'), 'wb') as f:

    pickle.dump(data_x, f)

    pickle.dump(data_y, f)

    pickle.dump(word2id, f)

    pickle.dump(id2word, f)

    pickle.dump(tag2id, f)

    pickle.dump(id2tag, f)

print('Pickle finished')

好,如今數據預處理部分就完成了。

構造模型

接下來咱們就須要利用 pickle 文件中的數據來構建模型了,首先進行 pickle 文件的讀取,而後將數據分爲訓練集、開發集、測試集,詳細流程再也不贅述,賦值爲以下變量:

# Load data

data_x, data_y, word2id, id2word, tag2id, id2tag = load_data()

# Split data

train_x, train_y, dev_x, dev_y, test_x, test_y = get_data(data_x, data_y)

接下來咱們使用 TensorFlow 自帶的 Dataset 數據結構構造輸入輸出,利用 Dataset 咱們能夠構造一個 iterator 迭代器,每調用一次 get_next() 方法,咱們就能夠獲得一個 batch,這裏 Dataset 的初始化咱們使用 from_tensor_slices() 方法,而後調用其 batch() 方法來初始化每一個數據集的 batch_size,接着初始化同一個 iterator,並綁定到三個數據集上聲明爲三個 initializer,這樣每調用 initializer,就會將 iterator 切換到對應的數據集上,代碼實現以下:

# Train and dev dataset

train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y))

train_dataset = train_dataset.batch(FLAGS.train_batch_size)

dev_dataset = tf.data.Dataset.from_tensor_slices((dev_x, dev_y))

dev_dataset = dev_dataset.batch(FLAGS.dev_batch_size)

test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y))

test_dataset = test_dataset.batch(FLAGS.test_batch_size)

# A reinitializable iterator

iterator = tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes)

# Initializer

train_initializer = iterator.make_initializer(train_dataset)

dev_initializer = iterator.make_initializer(dev_dataset)

test_initializer = iterator.make_initializer(test_dataset)

有了 Dataset 的 iterator,咱們只須要調用一次 get_next() 方法便可獲得 x 和 y_label 了,就不須要使用 placeholder 來聲明瞭,代碼以下:

# Input Layer

with tf.variable_scope('inputs'):

    x, y_label = iterator.get_next()

接下來咱們須要實現 embedding 層,調用 TensorFlow 的 embedding_lookup 便可實現,這裏沒有使用 Pre Train 的 embedding,代碼實現以下:

# Embedding Layer

with tf.variable_scope('embedding'):

    embedding = tf.Variable(tf.random_normal([vocab_size, FLAGS.embedding_size]), dtype=tf.float32)

inputs = tf.nn.embedding_lookup(embedding, x)

接下來咱們就須要實現雙向 LSTM 了,這裏咱們要構造一個 2 層的 Bi-LSTM 網絡,實現的時候咱們首先須要聲明 LSTM Cell 的列表,而後調用 stack_bidirectional_rnn() 方法便可:

cell_fw = [lstm_cell(FLAGS.num_units, keep_prob) for _ in range(FLAGS.num_layer)]

cell_bw = [lstm_cell(FLAGS.num_units, keep_prob) for _ in range(FLAGS.num_layer)]

inputs = tf.unstack(inputs, FLAGS.time_step, axis=1)

output, _, _ = tf.contrib.rnn.stack_bidirectional_rnn(cell_fw, cell_bw, inputs=inputs, dtype=tf.float32)

這個方法內部是首先對每一層的 LSTM 進行正反向計算,而後對輸出隱層進行 concat,而後輸入下一層再進行計算,這裏值得注意的地方是,咱們不能把 LSTM Cell 提早組合成 MultiRNNCell 再調用 bidirectional_dynamic_rnn() 進行計算,這樣至關於只有最後一層才進行 concat,是錯誤的。

如今咱們獲得的 output 就是 Bi-LSTM 的最後輸出結果了。

接下來咱們須要對輸出結果進行一下 stack() 操做轉化爲一個 Tensor,而後將其 reshape() 一下,轉化爲 [-1, num_units * 2] 的 shape:

output = tf.stack(output, axis=1)

output = tf.reshape(output, [-1, FLAGS.num_units * 2])

這樣咱們再通過一層全鏈接網絡將維度進行轉換:

# Output Layer

with tf.variable_scope('outputs'):

    w = weight([FLAGS.num_units * 2, FLAGS.category_num])

    b = bias([FLAGS.category_num])

    y = tf.matmul(output, w) + b

    y_predict = tf.cast(tf.argmax(y, axis=1), tf.int32)

    print('Output Y', y_predict)

這樣獲得的最後的 y_predict 即爲預測結果,shape 爲 [batch_size],即每一句都獲得了一個最可能的結果標註。

接下來咱們須要計算一下準確率和 Loss,準確率其實就是比較 y_predict 和 y_label 的類似度,Loss 即爲兩者交叉熵:

# Reshape y_label

y_label_reshape = tf.cast(tf.reshape(y_label, [-1]), tf.int32)

# Prediction

correct_prediction = tf.equal(y_predict, y_label_reshape)

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

# Loss

cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_label_reshape, logits=tf.cast(y, tf.float32)))

# Train

train = tf.train.AdamOptimizer(FLAGS.learning_rate).minimize(cross_entropy, global_step=global_step)

這裏計算交叉熵使用的是 sparse_softmax_cross_entropy_with_logits() 方法,Optimizer 使用的是 Adam。

最後指定訓練過程和測試過程便可,訓練過程以下:

for epoch in range(FLAGS.epoch_num):

    tf.train.global_step(sess, global_step_tensor=global_step)

    # Train

    sess.run(train_initializer)

    for step in range(int(train_steps)):

        smrs, loss, acc, gstep, _ = sess.run([summaries, cross_entropy, accuracy, global_step, train], feed_dict={keep_prob: FLAGS.keep_prob})

        # Print log

        if step % FLAGS.steps_per_print == 0:

            print('Global Step', gstep, 'Step', step, 'Train Loss', loss, 'Accuracy', acc)


    if epoch % FLAGS.epochs_per_dev == 0:

        # Dev

        sess.run(dev_initializer)

        for step in range(int(dev_steps)):

            if step % FLAGS.steps_per_print == 0:

                print('Dev Accuracy', sess.run(accuracy, feed_dict={keep_prob: 1}), 'Step', step)

這裏訓練時首先調用了 train_initializer,將 iterator 指向訓練數據,這樣每調用一次 get_next(),x 和 y_label 就會被賦值爲訓練數據的一個 batch,接下來打印輸出了 Loss,Accuracy 等內容。另外對於開發集來講,每次進行驗證的時候也須要從新調用 dev_initializer,這樣 iterator 會再次指向開發集,這樣每調用一次 get_next(),x 和 y_label 就會被賦值爲開發集的一個 batch,而後進行驗證。

對於測試來講,咱們能夠計算其準確率,而後將測試的結果輸出出來,代碼實現以下:

sess.run(test_initializer)

for step in range(int(test_steps)):

    x_results, y_predict_results, acc = sess.run([x, y_predict, accuracy], feed_dict={keep_prob: 1})

    print('Test step', step, 'Accuracy', acc)

    y_predict_results = np.reshape(y_predict_results, x_results.shape)

    for i in range(len(x_results)):

        x_result, y_predict_result = list(filter(lambda x: x, x_results[i])), list(

            filter(lambda x: x, y_predict_results[i]))

        x_text, y_predict_text = ''.join(id2word[x_result].values), ''.join(id2tag[y_predict_result].values)

        print(x_text, y_predict_text)

這裏打印輸出了當前測試的準確率,而後獲得了測試結果,而後再結合詞表將測試的真正結果打印出來便可。

運行結果

在訓練過程當中,咱們須要構建模型圖,而後調用訓練部分的代碼進行訓練,輸出結果相似以下:

Global Step 0 Step 0 Train Loss 1.67181 Accuracy 0.1475

Global Step 100 Step 100 Train Loss 0.210423 Accuracy 0.928125

Global Step 200 Step 200 Train Loss 0.208561 Accuracy 0.920625

Global Step 300 Step 300 Train Loss 0.185281 Accuracy 0.939375

Global Step 400 Step 400 Train Loss 0.186069 Accuracy 0.938125

Global Step 500 Step 500 Train Loss 0.165667 Accuracy 0.94375

Global Step 600 Step 600 Train Loss 0.201692 Accuracy 0.9275

Global Step 700 Step 700 Train Loss 0.13299 Accuracy 0.954375

...

隨着訓練的進行,準確率能夠達到 96% 左右。

在測試階段,輸出了當前模型的準確率及真實測試輸出結果,輸出結果相似以下:

Test step 0 Accuracy 0.946125

據新華社北京7月9日電連日來 sbmebebmmesbes

董新輝爲本身今生不能侍奉母親而難過 bmesbebebebmmesbe

...

可見測試準確率在 95% 左右,對於測試數據,此處還輸出了每句話的序列標註結果,如第一行結果中,「據」字對應的標註就是 s,表明單字成詞,「新」字對應的標註是 b,表明詞的起始,「華」字對應標註是 m,表明詞的中間,「社」字對應的標註是 e,表明結束,這樣 「據」、「新華社」 就能夠被分紅兩個詞了,可見仍是有必定效果的。

結語

本節經過搭建一個 Bi-LSTM 網絡實現了序列標註,並可實現分詞,準確率可達到 95% 左右,可是最主要的仍是學習 Bi-LSTM 的用法,本實例代碼較多,部分代碼已經省略,完整代碼見:https://github.com/AIDeepLearning/BiLSTMWordBreaker

相關文章
相關標籤/搜索