在這篇文章中,咱們將介紹一種新的關鍵字搜索和替換的算法:Flashtext 算法。Flashtext 算法是一個高效的字符搜索和替換算法。該算法的時間複雜度不依賴於搜索或替換的字符的數量。好比,對於一個文檔有 N 個字符,和一個有 M 個詞的關鍵詞庫,那麼時間複雜度就是 O(N) 。這個算法比咱們通常的正則匹配法快不少,由於正則匹配的時間複雜度是 O(M * N)。這個算法和 Aho Corasick 算法也有一點不一樣,由於它不匹配子字符串。javascript
Flashtext 算法被設計爲只匹配完整的單詞。好比,咱們輸入一個單詞 {Apple},那麼這個算法就不會去匹配 「I like Pineapple」 中的 apple。這個算法也被設計爲首先匹配最長字符串。在舉個例子,好比咱們有這樣一個數據集 {Machine, Learning,Machine Learning},一個文檔 「I like Machine Learning」,那麼咱們的算法只會去匹配 「Machine Learning」 ,由於這是最長匹配。html
這個算法咱們已經在 Github 上面實現了,用的是 Python 語言。java
在信息檢索領域,關鍵字搜索和替代都是很常見的問題。咱們常常想要在一個特定的文本中搜索特定的關鍵詞,或者在文本中替代一個特定的關鍵詞。python
例如:git
爲了去解決上述這些問題,正則表達式是最經常使用的一個技術。雖然正則表達式能夠很好的解決這個問題,可是當咱們的數據量增大時,這個速度就會很是慢了。若是咱們的文檔達到百萬級別時,那麼這個運行時間將達到幾天的量級。好比下面的圖1,正則表達式在一個 10k 的詞庫中查找 15k 個關鍵詞的時間差很少是 0.165 秒。可是對於 Flashtext 而言只須要 0.002 秒。所以,在這個問題上 Flashtext 的速度大約比正則表達式快 82 倍。github
隨着咱們須要處理的字符愈來愈多,正則表達式的處理速度幾乎都是線性增長的。然而,Flashtext 幾乎是一個常量。在本文中,咱們將着重討論正則表達式與 Flashtext 之間的性能區別。咱們還將詳細的描述 Flashtext 算法及其工做原理,和一些基準測試。正則表達式
1.1 用於關鍵字搜索的正則表達式算法
正則表達式是一種很是靈活和有用的模式匹配方式。好比咱們在文本中搜索一個匹配 「d{4}」,它表示任何 4 位數字匹配,如 2017。咱們利用 Python 代碼能夠實現這樣一個功能,以下:編程
import re compiled_regex = re.compile(r'\b2017\b|\b\d{4}\b') compiled_regex.findall('In 2017 2311 is my birthday.') # output ['2017', '2311']
這裏 ‘b’ 用來表示單詞邊界,它會去匹配特殊字符,如 'space','period','new line' 等。api
1.2 用於關鍵字替換的正則表達式
咱們也可使用正則表達式來製做一個標準化術語的替換腳本,好比咱們能夠編寫一個 Python 腳原本用 「javascript」 替換 「java script」。以下:
import re re.sub(r"\bjava script\b", 'javascript', 'java script is awesome.') # output javascript is awesome.
Flashtext 是一種基於 Trie 字典數據結構和 Aho Corasick 的算法。它的工做方式是,首先它將全部相關的關鍵字做爲輸入。使用這些關鍵字創建一個 trie 字典,以下圖3所示:
start 和 eot 是兩個特殊的字符,用來定義詞的邊界,這和咱們上面提到的正則表達式是同樣的。這個 trie 字典就是咱們後面要用來搜索和替換的數據結構。
2.1 利用 Flashtext 進行搜索
對於輸入字符串(文檔),咱們對字符進行逐個遍歷。當咱們在文檔中的字符序列 <b>word<b> 匹配到字典中的 <start>word<eot> 時(start 和 eot 分別是字符序列的開始標籤和結束標籤),咱們認爲這是一個完整匹配了。咱們將匹配到的字符序列所對應的標準關鍵字進行輸出,具體以下:
2.2 利用 Flashtext 進行替換
對於輸入字符串(文檔),咱們對字符進行逐個遍歷它。咱們先建立一個空的字符串,當咱們字符序列中的 <b>word<b> 沒法在 Trie 字典中找到匹配時,那麼咱們就簡單的原始字符複製到返回字符串中。可是,當咱們能夠從 Trie 字典中找到匹配時,那麼咱們將將匹配到的字符的標準字符複製到返回字符串中。所以,返回字符串是輸入字符串的一個副本,惟一的不一樣是替換了匹配到的字符序列,具體以下:
2.3 Flashtext 算法
Flashtext 算法那主要分爲三部分,咱們接下來將對每一部分進行單獨分析:
2.3.1 構建 Trie 字典
爲了構建 trie 字典,咱們必須設立一個空的節點指向咱們的空字典。這個節點被用做全部單詞的起點。咱們在字典中插入一個單詞。這個單詞中的下一個字符在本字典中做爲關鍵字,而且這個指針須要再次指向一個空字典。這個過程不斷重複,直到咱們達到單詞中的最後一個字符。當咱們到達單詞的末尾時,咱們插入一個特殊的字符(eot)來表示詞尾。
輸入
關鍵詞 w = c1c2c3...cn,其中 ci 表示一個輸入字符。標準詞咱們用 s 來表示。
代碼:用於 Flashtext 的初始化並向字典添加關鍵字
class FlashText(object): def __init__(self, case_sensitive=False): self._keyword = '_keyword_' # end of term (eot) and key to store standardized name sef._white_space_chars = set(['.', '\t', '\n', '\a', ' ', ',']) self.keyword_trie_dict = dict() sefl.case_sensitive = case_sensitive def add_keyword(self, keyword, clean_name = None): if not clean_name and keyword: clean_name = keyword if keyword and clean_name: # if both keyword and clean_name are not empty. if not self.case_sensitive: # if not case_sensitive then lowercase the keyword keyword = keyword.lower() current_dict = self.keyword_trie_dict for letter in keyword: current_dict = current_dict.setdefault(letter, {}) current_dict[self._keyword] = clean_name
輸出
上述程序將會建立一個字典,如圖3所示。
2.3.2 關鍵字搜索
一旦咱們將全部的詞都添加到 trie 字典中,咱們就能夠在輸入字符串中找到關鍵字。
輸入
字符串 x = a1a2...an,其中 ai 是輸入字符串 x 中的第 i 個字符。
代碼:Python 代碼用來獲取字典中的輸入字符串中的關鍵字。
def extract_keywords(self, sentence): keywords_extracted = [] if not self.case_sensitive: # if not case_sensitive then lowercase the sentence sentence = sentence.lower() current_dict = self.keyword_trie_dict sequence_and_pos = 0 idx = 0 sentence_len = len(sentence) while idx < sentence_len: char = sentence[idx] # when we reach a character that might denote word end if char not in self.non_word_boundaries: # if eot is present in current_dict if self._keyword in current_dict or char in current_dict: # update longest sequence found sequence_found = None longest_sequence_found = None is_longer_seq_found = False if self._keyword in current_dict: sequence_found = current_dict[self._keyword] longest_sequence_found = current_dict[self._keyword] sequence_end_pos = idx # re look for longest_sequence from this position if char in current_dict: current_dict_continued = current_dict[char] idy = idx + 1 while idy < sentence_len: inner_char = sentence[idy] if inner_char not in self.non_word_boundaries and self._keyword in current_dict_continued: # update longest sequence found longest_sequence_found = current_dict_continued[self._keyword] sequence_end_ops = idy is_longer_seq_found = True if inner_char in current_dict_continued: current_dict_continued = current_dict_continued[inner_char] else: break idy += 1 else: # end of sentence reached if self._keyword in current_dict_continued: # update longest sequence found longest_sequence_found = current_dict_continued[self._keyword] sequence_end_pos = idy is_longer_seq_found = True if is_longer_seq_found: idx = sequence_end_pos current_dict = self.keyword_trie_dict if longest_sequence_found: keywords_extracted.append(longest_sequence_found) else: # we reset current_dict current_dict = self.keyword_trie_dict elif char in current_dict: # char is present in current dictionary position current_dict = current_dict[char] else: # we reset current_dict current_dict = self.keyword_trie_dict # skip to end of word idy = idx + 1 while idy < sentence_len: char = sentence[idy] if char not in self.non_word_boundaries: break idy += 1 idx = idy # if we are end of sentence and have a sequence discovered if idx + 1 >= sentence_len: if self._keyword in current_dict: sequence_found = current_dict[self._keyword] keywords_extracted.append(sequence_found) idx += 1 return keywords_extracted
輸出
返回一個列表,輸出字符串 x 中找到的全部標準化以後的字,如圖 4 所示。
2.3.3 關鍵字替換
咱們使用相同的字典用標準化的字來替換輸入字符串中的關鍵字。
輸入
輸入字符串 x = a1a2...an,其中 ai 表示第 i 個字符。
代碼:Python 代碼用於從輸入字符串中用標準詞替換。
def replace_keywords(self, sentence): new_sentence = '' orig_sentence = sentence if not self.case_sensitive: sentence = sentence.lower() current_word = '' current_dict = self.keyword_trie_dict current_white_space = '' sequence_end_pos = 0 idx = 0 sentence_len = len(sentence) while idx < sentence_len: char = sentence[idx] current_word += orig_sentence[idx] # when we reach whitespace if char not in self.non_word_boundaries: current_white_space = char # if end is present in current_dict if self._keyword in current_dict or char in current_dict: # update longest sequence found sequence_found = None longest_sequence_found = None is_longer_seq_found = False if self._keyword in current_dict: sequence_found = curretn_dcit[self._keyword] longest_sequence_found = current_dict[self._keyword] sequence_end_pos = idx # re look for longest_sequence from this position if char in current_dict: current_dict_continued = current_dict[char] current_word_continued = current_word idy = idx + 1 while idy < sentence_len: inner_char = sentence[idy] current_word_continued += orig_sentence[idy] if inner_char not in self.non_word_boundaries and self._keyword in current_dict_continuted: # update longest sequence found current_white_space = inner_char longest_sequence_found = current_dict_continued[self._keyword] sequence_end_pos = idy is_longer_seq_found = True if inner_char in current_dict_continued: current_dict_continued = curretn_dict_continued[inner_char] else: break idy += 1 else: # end of sentence reached. if self._keyword in current_dict_continued: # update longest sequence found current_white_space = '' longest_sequence_found = current_dict_continued[self._keyword] sequence_end_pos = idy is_longer_seq_found = True if is_longer_seq_found: idx = sequence_end_pos current_word = current_word_continued current_dict = self.keyword_trie_dict if longest_sequence_found: new_sentence += longest_sequence_found + curretn_white_space current_word = '' current_white_space = '' else: new_sentence += current_word current_word = '' current_white_space = '' else: # we reset current_dict current_dict = self.keyword_trie_dict new_sentence += current_word current_word = '' current_white_space = '' elif char in current_dict: # we can continue from this char current_dict = current_dict[char] else: # we reset current_dict current_dict = self.keyword_trie_dict # skip to end of word idy = idx + 1 while idy < sentence_len: char = sentence[idy] current_word += orig_sentence[idy] if char not in self.non_word_boundaries: break idy += 1 idx = idy new_sentence += current_word current_word = '' current_white_space = '' # if we are end of sentence and have a sequence disv=convered if idx + 1 >= sentence_len: if self._keyword in current_dict: sequence_found = current_dict[self._keyword] new_sentence += sequence_found idx += 1 return new_sentence
輸出
在字符串 x 中找到須要替換的詞,而後用標準詞進行替換輸出,如圖 5 所示。
正如在圖 1 和圖 2 中所展現的,Flashtext 比正則表達式的處理速度要快不少。如今咱們來作一些基準測試來更加說明這個問題。
3.1 關鍵字搜索
咱們利用 Python 代碼來實現這個關鍵字搜索的基準測試。首先,咱們會隨機建立一個 10K 的語料庫。而後,咱們將從單詞列表中選擇 1K 的詞用來建立一個新的文檔。
咱們將從語料庫中選擇 k 個詞,其中 k ∈ {0, 1000, 2000, .. , 20000}。咱們將使用正則表達式和 Flashtext 分別對這個文檔中的關鍵詞進行搜索,而後對比分析。具體 Python 代碼以下:
from flashtext.keyword import KeywordProcessor import random import re import string import time def get_word_of_length(str_length): # generate a random word of given length return ''.join(random.choice(string.ascii_lowercase) for _ in range(str_length)) # generate a list of 100K words of randomly chosen size all_words = [get_word_of_length(random.choice([3, 4, 5, 6, 7, 8])) for i in range(100000)] print('Count | FlashText | Regex ') print('------------------------------') for keywords_length in [0, 1000, 5000, 10000, 15000]: # chose 1000 terms and create a string to search in. all_words_chosen = random.sample(all_words, 1000) story = ' '.join(all_words_chosen) # get unique keywrods from the list of words generated. unique_keywords_sublist = list(set(random.sample(all_words, keywords_length))) # compile Regex compiled_re = re.compile('|'.join([r'\b' + keyword + r'\b' for keyword in unique_keywords_sublist])) # add keywords to Flashtext keyword_processor = KeywordProcessor() keyword_processor.add_keywords_from_list(unique_keywords_sublist) # time the modules start = time.time() _ = keyword_processor.extract_keywords(story) mid = time.time() _ = compiled_re.findall(story) end = time.time() # print output print(str(keywords_length).ljust(6), '|', "{0:.5f}".format(mid - start).ljust(9), '|', "{0:.5f}".format(end - mid).ljust(9), '|') # output: Data for Figure 1
3.2 關鍵詞替換
下面的代碼是用來作關鍵詞替換的 Python 代碼。
from flashtext.keyword import KeywordProcessor import random import string import re import time def get_word_of_length(str_length): # generate a random word of given length return ''.join(random.choice(string.ascii_lowercase) for _ in range(str_length)) # generate a list of 100K words of randomly chosen size all_words = [get_word_of_length(random.choice([3, 4, 5, 6, 7, 8])) for i in range(100000)] print('Count | FlashText | Regex ') print('-------------------------------') for keywords_length in range(1, 20002, 1000): # chose 5000 terms and create a string to search in. all_words_chosen = random.sample(all_words, 5000) story = ' '.join(all_words_chosen) # get unique keywords from the list of words generated. unique_keywords_sublist = list(set(random.sample(all_words, keywords_length))) # compile regex # source: https://stackoverflow.com/questions/6116978/python-replace-multiple-strings rep = dict([(key, '_keyword_') for key in unique_keywords_sublist]) compiled_re = re.compile("|".join(rep.keys())) # add keywords to flashtext keyword_processor = KeywordProcessor() for keyword in unique_keywords_sublist: keyword_processor.add_keyword(keyword, '_keyword_') # time the modules start = time.time() _ = keyword_processor.replace_keywords(story) mid = time.time() _ = compiled_re.sub(lambda m: rep[re.escape(m.group(0))], story) end = time.time() # print output print(str(keywords_length).ljust(6), '|', "{0:.5f}".format(mid - start).ljust(9), '|', "{0:.5f}".format(end - mid).ljust(9), '|',) # output: Data for Figure 2
3.3 結論
正如咱們在上面看到的對比結果,Flashtext 在關鍵字搜索和替換上面比正則表達式都快不少。特別是在處理大規模數據的時候,這個優點會更加的顯示被體現出來。
4.1 安裝
pip install flashtext
4.2 使用例子
4.2.1 關鍵字提取
>>> from flashtext import KeywordProcessor >>> keyword_processor = KeywordProcessor() >>> # keyword_processor.add_keyword(<unclean name>, <standardised name>) >>> keyword_processor.add_keyword('Big Apple', 'New York') >>> keyword_processor.add_keyword('Bay Area') >>> keywords_found = keyword_processor.extract_keywords('I love Big Apple and Bay Area.') >>> keywords_found >>> # ['New York', 'Bay Area']
4.2.2 關鍵字替換
>>> keyword_processor.add_keyword('New Delhi', 'NCR region') >>> new_sentence = keyword_processor.replace_keywords('I love Big Apple and new delhi.') >>> new_sentence >>> # 'I love New York and NCR region.'
4.2.3 區分大小寫字母
>>> from flashtext import KeywordProcessor >>> keyword_processor = KeywordProcessor(case_sensitive=True) >>> keyword_processor.add_keyword('Big Apple', 'New York') >>> keyword_processor.add_keyword('Bay Area') >>> keywords_found = keyword_processor.extract_keywords('I love big Apple and Bay Area.') >>> keywords_found >>> # ['Bay Area']
4.2.4 關鍵字不清晰
>>> from flashtext import KeywordProcessor >>> keyword_processor = KeywordProcessor() >>> keyword_processor.add_keyword('Big Apple') >>> keyword_processor.add_keyword('Bay Area') >>> keywords_found = keyword_processor.extract_keywords('I love big Apple and Bay Area.') >>> keywords_found >>> # ['Big Apple', 'Bay Area']
4.2.5 同時添加多個關鍵詞
>>> from flashtext import KeywordProcessor >>> keyword_processor = KeywordProcessor() >>> keyword_dict = { >>> "java": ["java_2e", "java programing"], >>> "product management": ["PM", "product manager"] >>> } >>> # {'clean_name': ['list of unclean names']} >>> keyword_processor.add_keywords_from_dict(keyword_dict) >>> # Or add keywords from a list: >>> keyword_processor.add_keywords_from_list(["java", "python"]) >>> keyword_processor.extract_keywords('I am a product manager for a java_2e platform') >>> # output ['product management', 'java']
4.2.6 刪除關鍵字
>>> from flashtext import KeywordProcessor >>> keyword_processor = KeywordProcessor() >>> keyword_dict = { >>> "java": ["java_2e", "java programing"], >>> "product management": ["PM", "product manager"] >>> } >>> keyword_processor.add_keywords_from_dict(keyword_dict) >>> print(keyword_processor.extract_keywords('I am a product manager for a java_2e platform')) >>> # output ['product management', 'java'] >>> keyword_processor.remove_keyword('java_2e') >>> # you can also remove keywords from a list/ dictionary >>> keyword_processor.remove_keywords_from_dict({"product management": ["PM"]}) >>> keyword_processor.remove_keywords_from_list(["java programing"]) >>> keyword_processor.extract_keywords('I am a product manager for a java_2e platform') >>> # output ['product management']
有時候咱們會將一些特殊符號做爲字符邊界,好比 空格, 等等。爲了從新設定字邊界,咱們須要添加一些符號告訴算法,這是單詞字符的一部分。
>>> from flashtext import KeywordProcessor >>> keyword_processor = KeywordProcessor() >>> keyword_processor.add_keyword('Big Apple') >>> print(keyword_processor.extract_keywords('I love Big Apple/Bay Area.')) >>> # ['Big Apple'] >>> keyword_processor.add_non_word_boundary('/') >>> print(keyword_processor.extract_keywords('I love Big Apple/Bay Area.')) >>> # []
4.3 API 文檔
具體的 API 文檔,你能夠點擊這裏進行查看。
CoderPai 是一個專一於算法實戰的平臺,從基礎的算法到人工智能算法都有設計。若是你對算法實戰感興趣,請快快關注咱們吧。加入AI實戰微信羣,AI實戰QQ羣,ACM算法微信羣,ACM算法QQ羣。長按或者掃描以下二維碼,關注 「CoderPai」 微信號(coderpai)