python利用Trie(前綴樹)實現搜索引擎中關鍵字輸入提示(學習Hash Trie和Double-array Trie)

python利用Trie(前綴樹)實現搜索引擎中關鍵字輸入提示(學習Hash Trie和Double-array Trie)html

(2)按照darts-java的方法作python的實現Double-array Trie

(2)Double-array Trie是Trie高效實現,時間複雜度達到O(n),可是實現相對較難java




這些問題均可以在單詞樹/前綴樹/Trie來解決,關於Trie的介紹看【小白詳解 Trie 樹】這篇文章就夠了




#!/usr/bin/env python # encoding: utf-8
""" @date: 20131001 @version: 0.2 @author: @desc: 搜索下拉提示,基於後臺提供數據,創建數據結構(前綴樹),用戶輸入query前綴時,能夠提示對應query前綴補全 @update: 20131001 基本結構,新增,搜索等基本功能 20131005 增長緩存功能,當緩存打開,用戶搜索某個前綴超過必定次數時,進行緩存,減小搜索時間 20140309 修改代碼,下降內存佔用 @TODO: test case 加入拼音的話,致使內存佔用翻倍增加,要考慮下如何優化節點,共用內存 """
#這是實現cache的一種方式,也可使用redis/memcached在外部作緩存
#一旦打開,search時會對每一個節點作cache,當增長刪除節點時,其路徑上的cache會被清除,搜索時間下降了一個數量級
#代價:內存消耗, 不須要時能夠關閉,或者經過CACHED_THREHOLD調整緩存數量

#開啓
#CACHED = True
#關閉
CACHED = False #注意,CACHED_SIZE >= search中的limit,保證search從緩存能獲取到足夠多的結果

############### start ######################

class Node(dict): def __init__(self, key, is_leaf=False, weight=0, kwargs=None): """ @param key: 節點字符 @param is_leaf: 是否葉子節點 @param weight: 節點權重, 某個詞最後一個字節點表明其權重,其他中間節點權重爲0,無心義 @param kwargs: 可傳入其餘任意參數,用於某些特殊用途 """ self.key = key self.is_leaf = is_leaf self.weight = weight #緩存,存的是node指針
        self.cache = [] #節點前綴搜索次數,能夠用於搜索query數據分析
        self.search_count = 0 #其餘節點無關僅和內容相關的參數
        if kwargs: for key, value in kwargs.iteritems(): setattr(self, key, value) def __str__(self): return '<Node key:%s is_leaf:%s weight:%s Subnodes: %s>' % (self.key, self.is_leaf, self.weight, self.items()) def add_subnode(self, node): """ 添加子節點 :param node: 子節點對象 """ self.update({node.key: node}) def get_subnode(self, key): """ 獲取子節點 :param key: 子節點key :return: Node對象 """
        return self.get(key) def has_subnode(self): """ 判斷是否存在子節點 :return: bool """
        return len(self) > 0
    def get_top_node(self, prefix):
        """
        獲取一個前綴的最後一個節點(補全全部後綴的頂部節點)
        :param prefix: 字符轉前綴
        :return: Node對象
        """
        top = self
        for k in prefix:
            top = top.get_subnode(k)
            if top is None:
                return None
        return top
def depth_walk(node):
    """
    遞歸,深度優先遍歷一個節點,返回每一個節點所表明的key以及全部關鍵字節點(葉節點)
    @param node: Node對象
    """
    result = []
    if node.is_leaf: 
        if len(node) >0:#修改,避免該前綴恰好是關鍵字時搜索不到
            result.append((node.key[:-1], node))
            node.is_leaf=False
            depth_walk(node)
        else:
            return [('', node)]
    if node.has_subnode():
        for k in node.iterkeys():
            s = depth_walk(node.get(k))
            result.extend([(k + subkey, snode) for subkey, snode in s])
    return result
        #print node.key
        #return [('', node)]

def search(node, prefix, limit=None, is_case_sensitive=False): """ 搜索一個前綴下的全部單詞列表 遞歸 @param node: 根節點 @param prefix: 前綴 @param limit: 返回提示的數量 @param is_case_sensitive: 是否大小寫敏感 @return: [(key, node)], 包含提示關鍵字和對應葉子節點的元組列表 """
    if not is_case_sensitive:
        prefix = prefix.lower()
    node = node.get_top_node(prefix)
    if node is None: return [] #搜索次數遞增
    node.search_count += 1

    if CACHED and node.cache: return node.cache[:limit] if limit is not None else node.cache #print depth_walk(node)
    result = [(prefix + subkey, pnode) for subkey, pnode in depth_walk(node)]
    result.sort(key=lambda x: x[1].weight, reverse=True)
    if CACHED and node.search_count >= CACHED_THREHOLD:
        node.cache = result[:CACHED_SIZE]
    return result[:limit] if limit is not None else result
def add(node, keyword, weight=0, **kwargs): """ 加入一個單詞到樹 @param node: 根節點 @param keyword: 關鍵詞,前綴 @param weight: 權重 @param kwargs: 其餘任意存儲屬性 """ one_node = node index = 0 last_index = len(keyword) - 1
    for c in keyword: if c not in one_node: if index != last_index: one_node.add_subnode(Node(c, weight=weight)) else: one_node.add_subnode(Node(c, is_leaf=True, weight=weight, kwargs=kwargs)) one_node = one_node.get_subnode(c) else: one_node = one_node.get_subnode(c) if CACHED: one_node.cache = [] if index == last_index: one_node.is_leaf = True one_node.weight = weight for key, value in kwargs: setattr(one_node, key, value) index += 1

def delete(node, keyword, judge_leaf=False): """ 從樹中刪除一個單詞 @param node: 根節點 @param keyword: 關鍵詞,前綴 @param judge_leaf: 是否斷定葉節點,遞歸用,外部調用使用默認值 """

    # 空關鍵詞,傳入參數有問題,或者遞歸調用到了根節點,直接返回
    if not keyword: return top_node = node.get_top_node(keyword) if top_node is None: return

    if CACHED: top_node.cache = [] #遞歸往上,遇到節點是某個關鍵詞節點時,要退出
    if judge_leaf: if top_node.is_leaf: return
    else: if not top_node.is_leaf: return

    if top_node.has_subnode(): #存在子節點,去除其標誌 done
        top_node.is_leaf = False return
    else: #不存在子節點,逐層檢查刪除節點
        this_node = top_node prefix = keyword[:-1] top_node = node.get_top_node(prefix) del top_node[this_node.key] delete(node, prefix, judge_leaf=True) ############################## # 增補功能 讀數據文件創建樹 # ##############################

def build(file_path, is_case_sensitive=False): """ 從文件構建數據結構, 文件必須utf-8編碼,可變動 @param file_path: 數據文件路徑,數據文件默認兩列,格式「關鍵詞\t權重" @param is_case_sensitive: 是否大小寫敏感 """ node = Node("") f = open(file_path) for line in f: line = line.strip() if not isinstance(line,unicode): line = line.decode('utf-8') parts = line.split('\t') name = parts[0] if not is_case_sensitive: name = name.lower() add(node, name, int(parts[1])) f.close() return node import time if __name__ == '__main__': #print '============ test1 ==============='
    start= time.clock()
    print '============ test2 ==============='
    tree = build("./shanxinpoi.txt", is_case_sensitive=False)
    print len(tree),'time:',time.clock()-start
    startline=time.clock()
    print u'search 秦嶺'
    for key, node in search(tree, u'秦嶺', limit=10): print key, node.weight print time.clock()-startline

2、Trie的Double-array Trie實現

Trie的Double-array Trie的實現參考【小白詳解 Trie 樹】和【雙數組Trie樹(DoubleArrayTrie)Java實現】

(1)Comero有根據komiya-atsushi/darts-java,進行了Double-array Trie的python實現,komiya-atsushi的實現巧妙使用了文字的的編碼,以文字的編碼(一個漢字三個字符,每一個字符0-256)做爲【小白詳解 Trie 樹】中的字符編碼。


(3)實現中使用了了base[s]+c=t & check[t]=base[s],而非【小白詳解 Trie 樹】中的base[s]+c=t & check[t]=s





# -*- coding:utf-8 -*-

# base # # #  # (komiya-atsushi/darts-java | 先創建Trie樹,再構造DAT,爲siblings先找到合適的空間) # # #

# 不須要構造真正的Trie樹,直接用字符串,構造對應node,由於words是排過序的 # todo : error info # todo : performance test # todo : resize # warning: code=0表示葉子節點可能會有隱患(正常詞彙的狀況下是ok的) # 修正: 因爲想要回溯字符串的效果,葉子節點和base不能重合(這樣葉子節點能夠繼續記錄其餘值好比頻率),葉子節點code: 0->-1 # 可是如此的話,葉子節點可能會與正常節點衝突? 找begin的使用應該是考慮到的? #from __future__ import print_function
class DATrie(object): class Node(object): def __init__(self, code, depth, left, right): self.code = code self.depth = depth self.left = left self.right = right def __init__(self): self.MAX_SIZE = 2097152  # 65536 * 32
        self.base = [0] * self.MAX_SIZE self.check = [-1] * self.MAX_SIZE  # -1 表示空
        self.used = [False] * self.MAX_SIZE self.nextCheckPos = 0  # 詳細 見後面->當數組某段使用率達到某個值時記錄下可用點,以便下次再也不使用
        self.size = 0  # 記錄總共用到的空間

    # 須要改變size的時候調用,這裏只能用於build以前。cuz沒有打算複製數據.
    def resize(self, size): self.MAX_SIZE = size self.base = [0] * self.MAX_SIZE self.check = [-1] * self.MAX_SIZE self.used = [False] * self.MAX_SIZE # 先決條件是self.words ordered 且沒有重複
    # siblings至少會有一個
    def fetch(self, parent):   ###獲取parent的孩子,存放在siblings中,並記錄下其左右截至
        depth = parent.depth siblings = []  # size == parent.right-parent.left
        i = parent.left
        while i < parent.right:
            s = self.words[i][depth:]
            if s == '': siblings.append( self.Node(code=-1, depth=depth+1, left=i, right=i+1)) # 葉子節點
            else:
                c = ord(s[0])
                #print type(s[0]),c
                if siblings == [] or siblings[-1].code != c: siblings.append( self.Node(code=c, depth=depth+1, left=i, right=i+1)) # 新建節點
                else:  # siblings[-1].code == c
                    siblings[-1].right += 1   #已是排過序的能夠直接計數+1
            i += 1
        # siblings
        return siblings # 在insert以前,認爲能夠先排序詞彙,對base的分配檢查應該是有利的
    # 先構建樹,再構建DAT,再銷燬樹
    def build(self, words): words = sorted(list(set(words)))  # 去重排序
        _root = self.Node(code=0, depth=0, left=0, right=len(self.words))  #增長第一個節點
        self.base[0] = 1
        siblings = self.fetch(_root)
        self.insert(siblings, 0)
        del self.words print("DATrie builded.") def insert(self, siblings, parent_base_idx): """ parent_base_idx爲父節點base index, siblings爲其子節點們 """
 begin = 0
        pos = max(siblings[0].code + 1, self.nextCheckPos) - 1
        nonzero_num = 0  # 非零統計
        first = 0 begin_ok_flag = False  # 找合適的begin
        while not begin_ok_flag: pos += 1
            if pos >= self.MAX_SIZE: raise Exception("no room, may be resize it.") if self.check[pos] != -1 or self.used[pos]:   # check——check數組,used——佔用標記,代表pos位置已經佔用
                nonzero_num += 1  # 已被使用
            elif first == 0: self.nextCheckPos = pos  # 第一個可使用的位置,記錄?僅執行一遍
                first = 1 begin = pos - siblings[0].code  # 第一個孩子節點對應的begin

            if begin + siblings[-1].code >= self.MAX_SIZE: raise Exception("no room, may be resize it.") if self.used[begin]:    #該位置已經佔用

            if len(siblings) == 1:  #只有一個節點
                begin_ok_flag = True break

            for sibling in siblings[1:]: if self.check[begin + sibling.code] == -1 and self.used[begin + sibling.code] is False: #對於sibling,begin位置可用
                    begin_ok_flag = True else: begin_ok_flag = False  #用一個不可用,則begin不可用

        # 獲得合適的begin

        #從位置 next_check_pos 開始到 pos 間,若是已佔用的空間在95%以上,下次插入節點時,直接從 pos 位置處開始查找成功得到這一層節點的begin以後獲得,影響下一次執行insert時的查找效率
        if (nonzero_num / (pos - self.nextCheckPos + 1)) >= 0.95: self.nextCheckPos = pos self.used[begin] = True # base[begin] 記錄 parent chr -- 這樣就能夠從節點回溯獲得字符串 
        if self.size < begin + siblings[-1].code + 1: self.size = begin + siblings[-1].code + 1
        for sibling in siblings: #更新全部子節點的check base[s]+c=t & check[t]=s
            self.check[begin + sibling.code] = begin for sibling in siblings:  # 因爲是遞歸的狀況,須要先處理完check
            # darts-java 還考慮到葉子節點有值的狀況,暫時不考慮(須要記錄的話,記錄在葉子節點上)
            if sibling.code == -1: self.base[begin + sibling.code] = -1 * sibling.left - 1
            else: new_sibings = self.fetch(sibling) h = self.insert(new_sibings, begin + sibling.code) #插入孫子節點,begin + sibling.code爲子節點的位置
                self.base[begin + sibling.code] = h #更新base全部子節點位置的轉移基數爲[其孩子最合適的begin]

        return begin def search(self, word): """ 查找單詞是否存在 """ p = 0  # root
        if word == '':
            return False
        for c in word:
            c = ord(c)
            next = abs(self.base[p]) + c
            if next > self.MAX_SIZE:  # 必定不存在
                return False # print(self.base[self.base[p]])
            if self.check[next] != abs(self.base[p]): return False p = next # print('*'*10+'\n', 0, p, self.base[self.base[p]], self.check[self.base[p]])
        # 因爲code=0,其實是base[leaf_node->base+leaf_node.code],這個負的值自己沒什麼用
        # 修正:left code = -1
        if self.base[self.base[p] - 1] < 0 and self.base[p] == self.check[self.base[p] - 1] :
            return True else:  # 不是詞尾
            return False def common_prefix_search(self, content): """ 公共前綴匹配 """
        # 用了 darts-java 寫法,再仔細看一下
        result = [] b = self.base[0]  # 從root開始
        p = 0 n = 0 tmp_str = ""
        for c in content:
            c = ord(c)
            p = b
            n = self.base[p - 1]

            if b == self.check[p - 1] and n < 0:
                result.append(tmp_str)
            tmp_str += chr(c)
            p = b + c   # cur node
            if b == self.check[p]: b = self.base[p]  # next base
            else:                 # no next node
                return result # 判斷最後一個node
        p = b
        n = self.base[p - 1]
        if b == self.check[p - 1] and n < 0:
            result.append(tmp_str)
        return result
    def Find_Last_Base_index(self, word):
        b = self.base[0]
        p = 0
        p = 0 #n = 0
        #print len(word)
        tmp_str = ""
        for c in word:
            c = ord(c)
            p = b
            p = b + c
            if b == self.check[p]:
                tmp_str += chr(c)
                b = self.base[p]
            else:                 # no next node
                return -1
        return p
    def GetAllChildWord(self,index):
        result = []
        return p def GetAllChildWord(self,index): result = [] #result.append("")
        if self.base[self.base[index]-1] <= 0 and self.base[index] == self.check[self.base[index] - 1]:
            result.append("")
        for i in range(0,256):
            if self.check[self.base[index]+i]==self.base[index]:
                for s in self.GetAllChildWord(self.base[index]+i):
                    result.append( chr(i)+s)
        return result
    def FindAllWords(self, word):
        result = []
        last_index=self.Find_Last_Base_index(word)
        if last_index==-1:
            return result
        for end in self.GetAllChildWord(last_index):
            result.append(word+end)
        return result
    def get_string(self, chr_id):
        """
        從某個節點返回整個字符串, todo:改成私有
        """
        if self.check[chr_id] == -1: raise Exception("不存在該字符。") child = chr_id s = [] while 0 != child: base = self.check[child] print(base, child) label = chr(child - base) s.append(label) print(label) child = self.base[base] return "".join(s[::-1]) def get_use_rate(self): """ 空間使用率 """
        return self.size / self.MAX_SIZE if __name__ == '__main__': words = ["一舉","一舉一動",'11', "一鳴驚人", "一鳴驚人天下知","洛陽市西工區中州中路","人民東路2號","中州東", "洛陽市","洛陽","洛神1","洛神賦","萬科","萬達3","萬科翡翠","萬達廣場", "洛川","洛川蘋果","商洛","商洛市","商朝","商業","商業模","商業模式", "萬能", "萬能膠"] #for word in words:print [word] #一個漢字的佔用3個字符,
    words=[]
    for line in open('1000.txt').readlines():
 words.append(line.strip())
    datrie = DATrie()
    for ii in  datrie.FindAllWords('中州中路'):print ii.decode('utf-8')










