TypeScript:Aho–Corasick算法實現敏感詞過濾

敏感詞過濾應該是許多後端同事常常會遇到的需求,不管是評論、彈幕、文章,都須要作敏感詞過濾處理來規避風險。在前端開發中,使用replace函數來替換字符串是咱們的常規操做,在這以前我思考過若是用JavaScript來實現敏感詞過濾該怎麼作。在學習過程當中,接觸到了Trie樹,瞬間有一種撥開雲霧見青天的感受。前端

因此,我這裏算法使用的是AC(Aho–Corasick)自動機算法。會簡單的對方案進行闡述,主要是代碼實現,須要注意的是,在這裏將採用TypeScript編寫。同時代碼也上傳至GitHub,點擊此處查看本文完整代碼。文章較長,建議先馬後看。node

Aho–Corasick算法是由Alfred V. Aho和Margaret J.Corasick 發明的字符串搜索算法,用於在輸入的一串字符串中匹配有限組「字典」中的子串。它與普通字符串匹配的不一樣點在於同時與全部字典串進行匹配。算法均攤狀況下具備近似於線性的時間複雜度,約爲字符串的長度加全部匹配的數量。git

在正式進入到AC自動機算法以前,咱們須要先了解Trie樹。github

Trie樹(字典樹)

在維基百科中,Trie 樹的解釋是這樣的:算法

在計算機科學中,trie,又稱前綴樹或字典樹,是一種有序樹,用於保存關聯數組,其中的鍵一般是字符串。 與二叉查找樹不一樣,鍵不是直接保存在節點中,而是由節點在樹中的位置決定。 一個節點的全部子孫都有相同的前綴,也就是這個節點對應的字符串,而根節點對應空字符串。typescript

構建Trie樹

Trie樹應用十分常見,例如搜索提示。如當輸入一個網址,能夠自動搜索出可能的選擇。當沒有徹底匹配的搜索結果,能夠返回前綴最類似的可能,固然咱們這裏不作過多的討論。後端

Trie 樹

上圖是一個保存了8個鍵的trie結構,"A", "to", "tea", "ted", "ten", "i", "in", and "inn"。數組

但這種描述可能不太清晰,咱們舉一個例子:bash

有這麼一個過濾規則,如下詞組都要被過濾:['atd', 'aq', 'bs', 'bsc', 'qf'],須要被過濾的字符串是:acatdaabsc。那麼首先咱們須要構建一個Trie樹,下圖就是基於上述關鍵詞構建的Trie樹:函數

Trie 樹

與咱們熟悉的二叉樹不一樣的是,這裏的根節點ROOT沒有包含任何數據,子節點也沒有數量的限制,其每個分支都表明着一個完整的字符串。

若是咱們向上述過濾詞中再加入一個「atp」,那麼Trie樹就會構建成這樣:

Trie 樹

那麼咱們也能夠看到「atd」與「atp」擁有公共前綴「at」。固然,若是咱們仔細看上面的過濾詞組,會發現咱們過濾了「bs」與「bsc」,那麼他們的公共前綴就是bs,但與「atd」、「atp」不一樣的是,過濾詞組中並無「at」,那麼這種狀況咱們應該怎麼處理呢?

很簡單,由於咱們須要過濾掉「bs」,但不須要過濾「at」,那麼咱們就在「bs」的最後一個節點「s」處作一個標記,告訴程序分支到此處組成的單詞是須要過濾的。當把全部的單詞節點標記後,樹會是這樣的。

Trie 樹

那麼肯定了Trie樹是個什麼樣子,咱們就能夠用代碼去實現了。首先咱們從最基本的節點開始構建,從該圖示能夠看到,一個子節點包含了三個要素:

  • 當前的值
  • 該子節點的子節點
  • 分支到此處是不是一個單詞

那麼咱們就按照上述信息構建一個Node類,但此時這個Node類並非咱們最終須要的樣子,在這裏只是知足了構建一個Trie樹的需求:

// 子節點的接口
interface Children {
  [key: string]: Node
}

export default class Node {
    // 節點值
  public key: string
  // 是否爲單詞最後節點(重要,後面詳述)
  public word: boolean
  // 子節點的引用(重要,後面詳述)
  public children: Children = {}

  constructor (key: string, word: boolean = false) {
    this.key = key
    this.word = word
  }
}

複製代碼

在上面咱們就已經知道了,Trie樹根節點不保存數據,那麼咱們如今能夠構建一個基礎的Tree類,同時咱們知道該類應該有一個插入和搜索方法,但此時咱們不去實現這兩個方法:

import Node from './node'

// 子節點的接口
interface Children {
  [key: string]: Node
}


export default class Tree {
  // 保存子節點的引用
  public root: Node
  constructor () {
    this.root = new Node('root')
  }

  /** * 插入數據 */
  insert () {}

  /** * 搜索節點 */
  search () {}
}

複製代碼

在以前的圖中能夠看到,在Trie樹十分簡單,通俗的說就是將一個關鍵詞抽離成單字符,並構建出這個單字符的依賴順序,重複這樣的操做就構成了Trie樹。

好了,上面咱們已經構建了基本的Trie樹,也明白了該怎樣操做,那麼咱們就從insert方法開始吧:

export default class Tree {
  // ...省略其餘代碼

  /**
   * 插入節點/第1層
   */
  insert (key: string): boolean {
    if (!key) return false
    // 須要注意的是,插入的關鍵詞key多是單字符,也可能不是
    // 將key打散成數組方便操做
    let keyArr = key.split('')
    // 獲取key的第一個單字符
    let firstKey = keyArr.shift()
    // 獲取root的子節點,this.root是Node的實例,因此children是一個對象
    let children = this.root.children
    let len = keyArr.length
    
    // 這裏是樹第一層的處理
    // 關鍵詞第一個單字符在不在root的children裏,不在的話咱們就添加,這裏之因此把第一個單字符提出來單獨處理,是爲了後續操做方便
    if (!children[firstKey]) {
      // 同時這裏要判斷剩餘數組的長度,若是說傳入的自己就是個單字符,就證實該單字符就是咱們須要過濾的,咱們須要給他打上word標記
      children[firstKey] = len
        ? new Node(firstKey)
        : new Node(firstKey, true)
    } else if (!len) {
      // 若是後續傳入的是個單字符關鍵詞(位於樹第一層),咱們須要打上word標記
      firstNode.word = true
    }
    
    // 這裏是樹N+1曾的處理
    // 其餘多餘的key使用insertNode遞歸寫入樹中
    if (keyArr.length >= 1) {
      this.insertNode(children[firstKey], keyArr)
    }
    return true
  } 
  
  /**
   * 插入節點/N+1層
   * @param node
   * @param word
   */
  insertNode(node: Node, word: string[]) {
    let len = word.length
    
    // 由於是一個遞歸,這裏的帝國條件是word長度 >= 0
    if (len) {
      let children: Children
      children = node.children

      const key = word.shift()
      let item = children[key]
      const isWord = len === 1
      
      // 這裏判斷該節點有沒有相應子節點
      if (!item) {
        // 沒有即插入新的
        item = new Node(key, isWord)
      } else {
        // 有則更新它的word標記
        item.word = isWord
      }
      
      // 將結果重置到樹的相應位置
      children[key] = item
      
      // 下一輪遞歸
      this.insertNode(item, word)
    }
  }
  
  // ...省略其餘代碼
}
複製代碼

至此,咱們已經完整的構建了一棵Trie樹的結構,並定義了insert方法,構建完成後,就須要查找,那麼下面咱們就去定義它的查找方法。

查找

既然咱們知道也構建好了Trie樹的結構,那麼怎麼去查找相關關鍵字並實現過濾呢?咱們先定義這樣一些數據備用:

  • 過濾詞組:['atd', 'atp', 'aq', 'bs', 'bsc', 'gf']
  • 過濾字符串:acatdaabsc

咱們將全部數據圖形化:

Trie 樹

同時,咱們再定義三個索引/指針:

  • startIndex:將保存匹配到的關鍵詞起始位置索引/初始指向a
  • endIndex:將保存匹配到的關鍵詞結束位置索引/初始指向a
  • treeIndex:將保存Trie樹位置索引/初始指向ROOT

完成後,咱們就來看看Trie樹怎麼實現查找的,

第一步:endIndex位置指向a時(這是初始值)

一開始,程序詢問Trie樹treeIndex指向的ROOT節點有沒有a這個子節點,顯然是有的。那麼startIndex賦值爲a位置的索引0(雖然一開始也是0,但這不重要)。同時改變treeIndex指向,讓其指向a節點。判斷此節點是不是一個完整的單詞(即須要過濾關鍵詞的最後一個字符),顯然不是。

第二步:endIndex後移指向c時

endIndex後移一位指向c:

endIndex後移指向c時

程序詢問Trie樹treeIndex指向的a節點有沒有c這個子節點,顯然是沒有的。那麼startIndex賦值爲c位置的索引1。同時改變treeIndex指向,讓其從新指向ROOT節點。

第三步:endIndex後移指向a時

endIndex後移一位指向a:

endIndex後移指向a時

這裏會徹底重複第一步的操做,可是startIndex指向的時第二個a的索引2。

此時,startIndex = endIndex = 2,他們都指向了a,treeIndex又從新指向了樹節點a。

第四步:endIndex後移指向t時

endIndex後移一位指向t:

endIndex後移指向t時

程序詢問Trie樹treeIndex指向的a節點有沒有t這個子節點,咱們知道有,符合需求。startIndex位置不變,同時改變treeIndex指向,讓其指向新找到的t節點。判斷此節點是不是一個完整的單詞。

第五步:endIndex後移指向d時

endIndex後移一位指向d:

endIndex後移指向d時

程序詢問Trie樹treeIndex指向的t節點有沒有d這個子節點,這裏有d/p兩個子節點,符合需求。startIndex位置不變,同時改變treeIndex指向,讓其指向新找到的d節點。

treeIndex指向d

判斷此節點是不是一個完整的單詞,很幸運,此次是一個完整待過濾關鍵詞atd。至此,咱們就找到了字符串中第一個關鍵詞。

找到後,咱們endIndex後移,並使startIndex = endIndex,treeIndex從新指向ROOT,開啓新一輪的匹配。重複這個過程,就完成了查找。

那麼在瞭解了上述查找過程以後,咱們能夠先完成一個基本查找,查找單個節點存不存在以作備用:

export default class Tree {
  // ...省略其餘代碼
  
  /**
   * 搜索節點
   * @param key
   * @param node
   */
  search(key: string, node: Children = this.root.children): Node | undefined {
    // 這個搜索十分簡單,只傳入的子節點是否有相應的節點
    return node[key]
  }
  
  // ...省略其餘代碼
}
複製代碼

Aho–Corasick算法(也稱AC自動機/狀態機)

尋找failure指針/索引

Trie樹是AC算法的基礎,AC算法有三個特別重要的概念,網上有不少文章,但搜索出來大多都是同樣的,有些關鍵點沒寫明白,看着十分吃力。在這裏,我會嘗試去讓這些概念性的東西具體化。

咱們先把上面的Node類拿下來:

export default class Node {
    // 節點值
  public key: string
  // 是否爲單詞最後節點(重要,後面詳述)
  public word: boolean
  // 子節點的引用(重要,後面詳述)
  public children: Children = {}

  constructor (key: string,  word: boolean = false) {
    this.key = key
    this.word = word
  }
}
複製代碼

那麼AC算法的三個關鍵是什麼呢?與Node類有什麼關係呢?通俗的講是這麼三個狀態(有的稱之爲函數,有的稱之爲表):

  1. success/output狀態:表示節點到此處就已經構成了個完整的關鍵詞(Node類的word標記)。
  2. goto狀態:表示此節點構成的關鍵詞還不完整,須要進入他的下一個子節點匹配(Node類的children)
  3. failure狀態(也稱失去匹配,下面簡稱【失配】狀態):表示此節點構成的關鍵詞還不完整,但沒法進入到下一個子節點(在當前children裏找不到了)。須要告訴程序,失配後怎麼走。

在這以前,失配咱們直接就返回到了ROOT,但AC算法不同,它利用【failure狀態/函數/表】指定程序在失配後的表現,沒必要每次失配都從新開始,這樣能節省很多的時間。

好的,咱們看到AC算法的兩個狀態Trie樹都具有,只有failure狀態是新加的,那麼咱們就着重講一下failure狀態,在這以前,咱們從新構造一下Node類:

export default class Node {
    // 節點值
  public key: string
  // 是否爲單詞最後節點
  public word: boolean
  // 子節點的引用
  public children: Children = {}
  // 父節點的引用
  public parent: Node | undefined
  // failure表,用於失配後的跳轉
  public failure: Node | undefined = undefined

  constructor (key: string, parent: Node | undefined = undefined, word: boolean = false) {
    this.key = key
    this.parent = parent
    this.word = word
  }
}
複製代碼

能夠看到,我這裏新增了兩個公共屬性parent父節點及failure失配(失去匹配)後指向的節點。那麼Node類的結構變化之後,Tree類的也須要相應的改變。

export default class Tree {
  // ...省略其餘代碼

  insert (key: string): boolean {
    if (!key) return false
    let keyArr = key.split('')
    let firstKey = keyArr.shift()
    let children = this.root.children
    let len = keyArr.length
    if (!children[firstKey]) {
      // 變化處
      children[firstKey] = len
        ? new Node(firstKey)
        : new Node(firstKey, undefined, true)
    } else if (!len) {
      firstNode.word = true
    }
    if (keyArr.length >= 1) {
      this.insertNode(children[firstKey], keyArr)
    }
    return true
  } 
  
  insertNode(node: Node, word: string[]) {
    let len = word.length
    if (len) {
      let children: Children
      children = node.children

      const key = word.shift()
      let item = children[key]
      const isWord = len === 1
      if (!item) {
        // 變化處
        item = new Node(key, node, isWord)
      } else {
        item.word = isWord
      }
      children[key] = item
      this.insertNode(item, word)
    }
  }
  
  // ...省略其餘代碼
}
複製代碼

很簡單,只有兩個地方變化了,目的是實例化時傳入parent屬性。將兩個基礎類構造完成以後,咱們就要詳細說一說failure狀態了,先看['HER', 'HEQ', 'SHR']構建的樹:

AC狀態機

在下面的描述中,我將failure指針/索引,爲便於敘述,我通俗的說成「failure指向」

在這張圖中,虛線表示failure後的指向,上面咱們也說到failure狀態的做用,就是在失配的時候告訴程序往哪裏走,爲何要這麼作,從這張表咱們能夠很清楚的看到,當咱們匹配SHER時,程序會走右邊的分支,當走到S > H > E時,會出現失配,怎麼辦?可能有小夥伴會想到回滾到ROOT從H開始從新匹配,但這樣回溯是有成本的,咱們既然走了H節點,爲何要回溯呢?

這個時候failure就發揮做用了,咱們看到右分支的H有一條虛線指向了左分支的H,咱們也知道這就是failure的指向,經過這個指向,咱們很輕鬆的將當前狀態移交過去。程序繼續匹配E > R,加上移交過來的H,咱們能夠輕鬆的匹配到HER。

到了這裏,我想小夥伴已經體會到了AC算法的美妙之處,那麼就有人會問了,這個failure的指向怎麼拿到呢?其實就是一句話:

問:假設有一個節點爲currNode,它的子節點是childNode,那麼子節點childNode的failure指向怎麼求?

解:首先,咱們須要找到childNode父節點currNode的failure指向,假設這個指向是Q的話,咱們就要看看Q的孩子(children屬性)中有沒有與childNode字符相同(key相同)的節點,若是有的話,這個節點就是childNode的failure指向。若是沒有,咱們就須要沿着currNode -> failure -> failure重複上述過程,若是一直沒找到,就將其指向root。

那麼以上,就是尋找failure指向的思路,具體爲何這麼作能夠查閱相關資料。

須要注意的是,咱們在構建Trie樹時,並不知道failure指向到哪裏的,因此failure指向須要在Trie樹構建完成後插入。

那麼咱們再定義一個方法構建failure指向,但咱們須要先看下面這幅圖:

構建failure指向

從圖中能夠看到,failure指向的構建是從上至下一層一層的完成的,第一層都是指向root:

export default class Tree {
  // ...省略其餘代碼
  
  /**
   * 建立Failure表
   */
  _createFailureTable() {
    // 獲取樹第一層
    let currQueue: Array<Node> = Object.values(this.root.children)

    while (currQueue.length > 0) {
      let nextQueue: Array<Node> = []

      for (let i = 0; i < currQueue.length; i++) {
        let node: Node = currQueue[i]
        let key = node.key
        let parent = node.parent
        node.failure = this.root
        // 獲取樹下一層
        for (let k in node.children) {
          nextQueue.push(node.children[k])
        }

        if (parent) {
          let failure: any = parent.failure
          while (failure) {
            let children: any = failure.children[key]

            // 判斷是否到了根節點
            if (children) {
              node.failure = children
              break
            }
            failure = failure.failure
          }
        }
      }

      currQueue = nextQueue
    }
  }
  
  // ...省略其餘代碼
}

複製代碼

完成上面代碼,咱們就完全完成了整個AC算法的前置準備工做也是核心部分。

字符串匹配

最後對字符串進行關鍵詞匹配,思路不難但有點龐雜,是核心點,關鍵點在於獲取failure指針的定位,當匹配成功後,獲取整個字符串。但如何獲取匹配成功的關鍵詞,我看到有些方案是回溯分支,但我是以爲不必,由於匹配成功,程序已經走了以前的分支,爲何還要再次回溯呢?下面咱們直接再代碼上看:

_filterFn(word: string, every: boolean = false, replace: boolean = true): FilterValue {
    let startIndex = 0
    let endIndex = startIndex
    const wordLen = word.length
    
    // 由於英文匹配我直接轉換成了全大寫,就須要保存一個原始文本
    let originalWord: string = word
    
    // 保存過濾的關鍵字
    let filterKeywords: Array<string> = []
    
    // 所有轉換成大寫
    word = word.toLocaleUpperCase()

    // 是否過濾文本
    let isReplace = replace
    let filterText: string = ''

    // 是否經過,字符串無敏感詞
    let isPass = true

    // 正在進行劃詞判斷
    let isJudge: boolean = false
    let judgeText: string = ''

    // 上一個Node與下一個Node
    let prevNode: Node = this.root
    let currNode: Node | boolean

    for (endIndex; endIndex <= wordLen; endIndex++) {
      let key: string = word[endIndex]
      let originalKey: string = originalWord[endIndex]
      
      // 查找當前Node
      currNode = this.search(key, prevNode.children)

      // 判斷是否處於正在判斷狀態isJudge,且還能繼續匹配(currNode爲Node)
      if (isJudge && currNode) { // ①
        judgeText += originalKey
        prevNode = currNode
        continue
      } else if (isJudge && prevNode.word) {
        // 處於正在匹配狀態,不能繼續匹配,上一個Node有word標記,證實已經匹配成功了
        
       // 這裏用做快速查找方法every
       isPass = false
        if (every) break

        // 原始字符串在這裏作關鍵詞替換,並保存被替換的關鍵詞
        if (isReplace) filterText += '*'.repeat(endIndex - startIndex)
        filterKeywords.push(word.slice(startIndex, endIndex))
      } else {
        // 將①保存的臨時文本從新添加到過濾文本中,由於可能最後狀態是失配
        filterText += judgeText
      }

      // 直接在分支上找不到,須要走failure,這裏的查找也與構建failure時類似
      if (!currNode) {
        let failure: Node = prevNode.failure

        while (failure) {
          currNode = this.search(key, failure.children)
          if (currNode) break
          failure = failure.failure
        }
      }

      if (currNode) {
        judgeText = originalKey
        isJudge = true
        prevNode = currNode
      } else {
        judgeText = ''
        isJudge = false
        prevNode = this.root
        if (isReplace && key !== undefined) filterText += originalKey
      }
      startIndex = endIndex
    }

    return {
      text: isReplace ? filterText : originalWord,
      filter: [...new Set(filterKeywords)],
      pass: isPass
    }
  }
複製代碼

使用:

let m = new Mint(['淘寶', '拼多多', '京東'])

console.log(m.filterSync('雙十一在淘寶買東西,618在京東買東西,固然你也能夠在拼多多買東西。'))
/* { 
    text: '雙十一在**買東西,618在**買東西,固然你也能夠在***買東西。',
    filter: [ '淘寶', '京東', '拼多多' ],
    pass: false
} */

console.log(m.everySync('測試這條語句是否能經過'))  // true

console.log(m.everySync('測試這條語句是否能經過,加上任意一個關鍵詞京東'))  // false
複製代碼

那麼自此,整個流程就通了,固然這只是關鍵代碼,具體代碼我已上傳至Github,固然,因本人能力及知識水平有限,不免有所錯誤,如若發現,歡迎你們指正。

性能

測試字符串包含隨機生成的漢字、字母、數字。 如下測試均在20000個隨機敏感詞構建的樹下進行測試,每組測試6次取平均值:

編號 字符串長度 不替換敏感詞 替換敏感詞
1 1000 0.987ms 1.088ms
2 5000 3.095ms 3.252ms
3 10000 9.133ms 9.881ms
4 20000 10.569ms 12.032ms
5 50000 15.741ms 23.606ms
6 100000 31.072ms 46.681ms

須要注意的是,生產實際運行速度會比上面測試數據更快。

GitHub地址:github.com/ZhelinCheng…

相關文章
相關標籤/搜索