ANN的前身技術是NN(Neighbor Search),簡單地說,最近鄰檢索就是根據數據的類似性,從數據集中尋找與目標數據最類似的項目,而這種類似性一般會被量化到空間上數據之間的距離,例如歐幾里得距離(Euclidean distance),NN認爲數據在空間中的距離越近,則數據之間的類似性越高。html
當須要查找離目標數據最近的前k個數據項時,就是k最近鄰檢索(K-NN)。python
近些年的研究中涌現出大量以最近鄰檢索爲基本思想的方法,主要可分爲兩類:git
儘管出現了不少針對NN算法的改進措施,可是在實際工業場景中,NN算法遇到最大阻礙是:github
數據通過向量化(即特徵工程)以後,由於特徵空間特別高維(上百/上千/甚至上萬),致使在空間距離上特別稀疏,維度越高這個現象越明顯,這直接致使了NN的近鄰搜索效果很差。筆者本身也一樣在項目開發中嘗試使用過NN算法,當發現NN搜索效果不佳時,反過來調整特徵工程,而後再繼續NN搜索,如此反覆迭代,最終效果難以保證,由於你沒法保證每一次的特徵工程都能精確地表徵出業務場景的類似性。web
舉個例子來講,咱們有一批惡意文件如今要對其進行聚類分析,首先咱們對其進行文本方面的特徵工程,獲得一個向量集合。由於基於專家經驗獲得的特徵維度之間是彼此「正交」的,所以每一個特徵向量之間的餘弦類似性都不強,基於「空間距離度量」的聚類算法效果天然也不會很好。算法
並且另外一方面,由於特徵空間的維度過高了(幾百維、幾千維),一些原本頗有用的」強貢獻特徵「可能會被淹沒在大量的「弱貢獻特徵」中,這很好理解,看一下歐幾里得空間的距離度量公式:網頁爬蟲
從公式中能夠看到,全部維度都被「公平看待」,平方和開根起到了一個均值的做用,弱特徵越多,強特徵被「稀釋」的影響就越大。特徵不是越多越好,有時候太多無用的特徵可能還會引發反效果。windows
換句話說,即便原本可能很「類似的文件」(例如同一個病毒家族的變種),可是在咱們設計的特徵上卻不能很好地體現。數組
面對這些問題,如何解決呢?安全
一個很天然的想法是,若是能有一種算法,能將類似的字符串,從高維空間降維到一個相對低維的空間中。同時,在這個低維空間中,語法/語義相近的字符串的夾角餘弦相對較小,也即語法/語義相近的字符串在降維後彼此較爲接近。
若是能實現上述兩個目標,咱們不只能夠有效實現對高維向量的降維,同時由於低維空間的向量間具有類似彙集性,咱們能夠在接近線性的時間內,進行向量間距離評估,以及找到類似的文本。
這個章節咱們按照歷史時間線來討論學術界在面對語言模型中文本類似性這個課題分支時,一路走來遇到了哪些問題,整個時間線學術成果很是豐富,咱們這裏只能摘取其主要節點進行推導式的討論。
假如咱們有兩段輸入文本:
1. how are u? 2. how are you?
如今計算這兩段文本的類似度,也即須要計算這兩段文本的區別度,一個最簡單直觀的想法是直接基於原始的ascii序列逐位計算最小編輯距離:
1. u -> u 2. ? -> o 3. N/A -> u 3. N/A -> ?
即第一段文本經過4次修改便可獲得第二段文本,因此這兩個文本的類似度爲:
(1 - 4 / (len(第一段文本) + len(第二段文本))) * 100% = (1 - 4 / (10 + 12)) * 100% = 81.81%
類似度爲81%,這個結論怎麼樣?準嗎?勉強好像可用,可是效果顯然不太好,怎麼辦呢?
咱們開始思考,原始ascii字符空間對變換的感知很是敏感,有兩個主要緣由:
從向量空間的角度來看,原始的ascii字符空間能夠抽象爲一個 N * 2的列向量組(ascii bytes vec,position vec),這裏N表明着輸入文本的length長度。
沿着線性空間的這個思考路線,咱們應該去找一個新的向量空間,該新向量空間與原始ascii字符空間相比,對變化的敏感度更低(包括對ascii修改、ascii位置變化)。
那對ascii字符變化的敏感度更低,怎麼用數學思惟來理解這個概念呢?
這裏須要引入線性映射的概念:
設 S 和 S' 是兩個集合,若是存在一個法則f,使得集合S中每個元素a,都有集合 S' 中惟一肯定的元素b與它對應,則稱 f 是S到 S' 的一個映射,記做:
咱們須要找到一種線性映射,將原始ascii序列中的 N * 2(ascii byte,position)向量組,降維映射成一個 M * 1(ascii sequence windows)向量組,這裏 M 是新向量空間中的維度。
從投影降維理論視角咱們知道,降維後,原始空間中的position維度被徹底忽略了,而ascii byte這個維度被轉換爲ascii sequence windows這個新維度,這顯然不是一個正交投影,即不是單射,由於原始輸入文本中的一個ascii修改,可能會引發新空間裏多個ascii sequence window的變化。
好,接下來的問題是,如何找到這個線性空間映射呢?這就是接下來要討論的ngram分詞算法。
假設咱們如今有3段輸入文本:
[ 'This is the first document.', 'This is the second document.', 'Is this the first document?', ]
以單個word爲一個slice window進行切詞,即1-gram(unigram),獲得:
[ { u'This', u'is', u'the', u'first', u'document' }, { u'This', u'is', u'the', u'second', u'document' }, { u'This', u'is', u'the', u'first', u'document' } ]
從1-gram slice結果中,咱們能夠看到幾點信息:
顯然,1-gram的分詞方案形成了信息的過分失真,致使了原始輸入文本的語法結構被丟失了,這個問題怎麼解決呢?顯然,咱們須要引入相對位置(relative position)這個特徵維度。
使用2-gram算法進行切詞,獲得:
[ { u'This', u'This is', u'is the', u'the first', u'first document', u'document' }, { u'This', u'This is', u'is the', u'the second', u'second document', u'document' }, { u'is', u'is This', , u'This the', u'the first', u'first document', u'document' } ]
從1-gram slice結果中,咱們能夠看到幾點信息:
從線性映射的角度看,在2-gram算法下,原始ascii序列中的 N * 2(ascii byte,position)向量組,降維映射成一個 M * N(ascii sequence windows,relative position)向量組,這裏 M 表明了2-gram後的 gram token數量,N 表明了每一個gram分組內的word組合,2-gram token內的組合維度數爲2,若是是3-gram,則組合維度數位6。
問題到這裏就結束了嗎?顯然不是的,ngram算法雖然比純粹的ascii逐字符比對各方面效果要好,可是還存在幾個問題:
那解決問題的思路是什麼呢?答案仍是降維,咱們須要繼續尋找一個新的映射函數,將原始ascii字符空間映射到一個低維向量空間中。可是要注意,這個新的低維向量空間有幾個技術指標須要知足:
基於上個章節討論的3個技術指標,學術界開始了學術的研究和創新,演化出了兩個不一樣的方向:
LSH的核心是哈希散列、其次是降維、其次是語法/語義一致性、再其次是算法過程簡單高效適合在大規模高併發場景中使用。
能夠這麼說,LSH經過犧牲了一部分的信息熵,即犧牲了一部分的語法/語義一致性,換取了超級高效的時間/計算複雜度,是一種很是優秀的算法思想,值得咱們不斷深刻思考和學習。
可是換一個角度,若是對時間/空間複雜度沒有那麼高的需求,而是對語法/語義一致性有很高的要求,LSH算法家族可能就不必定很是適合了。
這個時候,另外一條思考脈輪就呈如今咱們的面前,即詞向量/句子向量/文檔向量,具體來講,就是從2007年開始逐漸被提出的各類詞向量降維表徵方法,包括:
詞向量普遍地被運用於NLP相關的任務中,關於這部分的詳細討論,筆者在另外一篇文章有所涉及。
從筆者本身經驗來看,在大數據時代,LSH的使用場景相比詞向量要相對少一些,筆者我的以爲問題核心在於現代NLP任務中,對語義的精確表徵能力要求愈來愈高,工程師和數據科學家經過不斷地引入更龐大的數據集,引入更復雜的詞向量算法,也是但願儘量提升信息的利用率,儘可能少的丟失信息。
而相對的,對時間/空間複雜度有極高要求的場景可能只存在於一些極端的場景中,例如搜索引擎等。
Relevant Link:
https://www2007.cpsc.ucalgary.ca/papers/paper215.pdf google Detecting Near-Duplicates for Web Crawling. WWW2007 http://www2007.org/ https://www.iw3c2.org/blog/category/www2007/
在不少應用領域中,咱們面對和須要處理的數據每每是海量而且具備很高的維度(high dimensional spaces),同時數據中又廣泛存在着近似相同的狀況(例如類似的對話、類似的網頁、類似的URl等),怎樣快速地從海量的高維數據集合中找到與某個數據近似類似(approximate or exact Near Neighbor)的一個數據或多個數據,成爲了一個難點和問題。
若是是低維的小數據集,咱們經過線性查找(Linear Search)就能夠容易解決,但若是是對一個海量的高維數據集採用線性查找匹配的話,會很是耗時,這成爲ANN被研究和發展的原動力。
在筆者所在的網絡安全學科中,也經常會遇到不少局部不一樣(locality change)的近似文本的識別與檢測問題,例如:
面對海量高維數據背景下,還要進行高效的數據類似性搜索的需求,該從哪些方面進行思考解決方案呢?
要實現上述目標,咱們須要能找到一整套綜合技術,能綜合實現如下幾個技術指標:
符合以上四點技術指標的算法被統稱爲ANN(Approximate Nearest Neighbor)算法。
Relevant Link:
https://www2007.cpsc.ucalgary.ca/papers/paper215.pdf
值得注意的是,對於第二章提到的ANN技術指標中的前兩個,詞向量和LSH均可以實現一樣的效果,可是咱們本文的討論對象LSH局部敏感哈希。
局部敏感哈希(LSH)核心思想是:在高維空間相鄰的數據通過局部敏感哈希函數的映射投影轉化到低維空間後,他們落入同一個吊桶(空間區間)的機率很大而不相鄰的數據映射到同一個吊桶的機率則很小。
這種方法的主要難點在於如何尋找適合的局部敏感哈希函數,在原論文中,做者提出了局部敏感Hash函數的通常性定義:
咱們設定x和y的距離測定函數爲d(x,y),這個d()函數能夠是Jaccard函數/Hamming度量函數,也能夠其餘具有一樣性能的函數。
在這個距離測定標準下,設定兩個距離閾值d1,d2,且 d1 < d2。
若是一個函數族F的每個函數 f 知足:
那麼稱F爲(d1,d2,p1,p2)-敏感的函數族,實際上,simhash就是一種(d1,d2,p1,p2)-敏感的函數。
左圖是傳統Hash算法,右圖是LSH。紅色點和綠色點距離相近,橙色點和藍色點距離相近。
按照LSH的發展順序,LSH家族的演變史以下:
咱們接下來逐個討論其算法流程及其背後的思惟方式。
Relevant Link:
https://www.cnblogs.com/wt869054461/p/9234184.html http://people.csail.mit.edu/gregory/annbook/introduction.pdf https://www.cnblogs.com/wt869054461/p/9234184.html http://infolab.stanford.edu/~ullman/mmds/ch3.pdf https://www.cnblogs.com/fengfenggirl/p/lsh.html http://sawyersun.top/2016/Locality-Sensitive-Hashing.html
2008年IEEE Signal Process上有一篇文章Locality-Sensitive Hashing for Finding Nearest Neighbors是一篇較爲容易理解的基於Stable Dsitrubution的投影方法的Tutorial。
其思想在於高維空間中相近的物體,投影(降維)後也相近。
三維空間中的四個點,紅色圓形在三圍空間中相近,綠色方塊在三圍空間中相距較遠,那麼投影后仍是紅色圓形相距較近,綠色方塊相距較遠。
基於Stable Distribution的投影LSH,就是產生知足Stable Distribution的分佈進行投影,最後將量化後的投影值做爲value輸出。
具體數學表示形式以下:給定特徵向量v,Hash的每一bit的生成公式爲:
其中:
須要注意的是,若是 x 抽樣於高斯分佈,那麼ϕ(u,v)衡量的是L2 norm;若是 x 抽樣於柯西分佈,那麼ϕ(u,v)衡量的是L1 norm。
更詳細的介紹在Alexandr Andoni維護的LSH主頁中,這就是LSH方法的鼻祖。
關於隨機抽樣涉及到隨機過程方面的知識,能夠參閱這篇帖子。關於高斯隨機投影和柯西分佈隨機投影的討論,能夠參閱另外一篇blog。
Relevant Link:
http://www.slaney.org/malcolm/yahoo/Slaney2008-LSHTutorial.pdf https://www.cnblogs.com/LittleHann/p/6558575.html#_label2 https://www.zhihu.com/question/26694486/answer/242650962
Stable Distribution Projection從原理上沒有什麼大問題,其實後來改進的隨機超平面和球面哈希算法,其底層思想上和Stable Distribution Projection沒有太大的區別。
可是在實際操做中,Stable Distribution Projection存在幾個比較明顯的問題:
面對上述問題,Charikar改進了這種狀況,提出了一種隨機超平面投影LSH。能夠參考論文《Multi-probe LSH: efficient indexing for high-dimensional similarity search》。
假設有一個M維高維數據向量x,咱們在M維空間中隨機選擇一個超平面,經過這個超平面來對數據進行切分。
這個動做總共進行N次,即經過N個隨機超平面單位向量來對原始數據集進行切分,這裏N就是降維後的向量維度。
超平面的選擇是隨機過程,不須要提早參數設定。
以下圖所示,隨機在空間裏劃幾個超平面,就能夠把數據分到不一樣空間裏,好比中間這個小三角的區域就能夠賦值爲110.
Hash的每一bit的數學定義式爲:
x 是隨機超平面單位向量,sgn是符號函數:
接下來咱們來討論在隨機超平面投影算法下,LSH哈希的產生原理是什麼。
這時ϕ(u,v),也就是上述公式中的內積點乘計算,衡量的就是u和v的cosine距離,θ(u,v)表示向量u和v的夾角。
hyperplane projection的核心假設就是,兩個向量越類似,則他們的cosine距離越小:
下圖說明了該公式原理
能夠看到,給定兩個向量(圖中的黑色箭頭),只有在其法線的交疊區域(深藍色區域)投影后的方向(sgn函數的值)纔不相等,因此有:
,即藍色區域面積佔比整個圓,的比率等於u與v的夾角。
經過sgn符號函數的歸一化,只要兩個向量是同方向,無論距離遠近,都統一歸一化爲1。
這樣計算後的hash value值是比特形式的1和0,雖然帶來了必定的信息丟失,可是免去了使用時須要再次歸一化。
Relevant Link:
https://www.jiqizhixin.com/articles/2018-06-26-15 http://delivery.acm.org/10.1145/1330000/1325958/p950-lv.pdf?ip=42.120.75.135&id=1325958&acc=ACTIVE%20SERVICE&key=C8BAF422464E9FCC%2EC8BAF422464E9FCC%2E4D4702B0C3E38B35%2E4D4702B0C3E38B35&__acm__=1560846249_128df98f27ef856192df883b1ce48987 http://yangyi-bupt.github.io/ml/2015/08/28/lsh.html
spherical hash是在前人hyperplane hash的基礎之上改進而來的,因此這裏咱們首先來一塊兒思考下hyperplane-base哈希算法都存在哪些問題。
sphericalplane hash超球體哈希算法就在這個背景下,在2012 CVPR上提出的。
面對上述幾個問題,sphericalplane hash進行了算法理論和公式層面上的創新,咱們接下來詳細討論具體細節。
咱們知道,利用kernel space核空間技術,咱們能夠將線性超平面映射爲一個非線性超平面,這是創建在覈函數的理論基礎上的。可是研究發現,使用sphericalplane,由於球平面天生的封閉性,能夠直接對高維空間進行partition分類,並得到比non-linear hyperplane更好的效果。
理論上說,若是須要切割出一個d維封閉空間,至少須要d+1個超平面,可是若是使用超球體,則最少值須要1個超球體便可,例以下圖
值得注意的是,c個超球體劃分出的有界封閉區域數是能夠計算的,即:
同時,球哈希劃分的區域是封閉且更緊湊的,每一個區域內樣本的最大距離的平均值(bounding power)會更小,說明各個區域的樣本是更緊湊的,以下圖所示:
Average of maximum distances within a partition: ‐ Hyper‐spheres gives tighter bound!
經過漸進逼近的方法,迭代優化算法超參數,獲得符合算法約束條件的近似最優解。
這裏的約束條件指的是:
1. 咱們但願每一個超球體把樣本都是均分的,就是球內球外各佔一半 2. 但願每一個超球體的交叉部分不要太多,最多1/4,也就是每一個哈希函數相對獨立
優化過程最重要的一個前提就是設定約束條件(constraint condition)。
這裏首先先定義一些數學標記:
設爲數據向量在單個超球體(單個hash function)內部(+1)仍是外部(-1)的機率。
設爲單個超球體的半徑。
Spherical Hashing是由c個不一樣位置,不一樣的大小的超球體組成的,對於c個超球體的總約束條件以下:
注意,約束2個1/4是一個理論極限值了,經過空間幾何的相關知識能夠證實,當兩個球都近似將空間一分爲二時,這兩個球的交集的最小值就是1/4。
優化過程的僞碼以下,咱們接下來逐步討論:
採用隨機採樣的方式從樣本集中採樣m個樣本,用於進行後續的優化過程。固然,若是你的算力足夠,也能夠將全部樣本都做爲訓練集進行優化訓練。
從樣本集S中隨機選擇c個數據點做爲初始的超球體中心。
值得注意的是,做者在使用kmeans獲得c個聚類中心做爲初始的超球體中心後,並無很明顯提高實驗結果,這反映了求哈希算法對初始值不是很是敏感。
接下來的迭代會不斷會動態調整半徑,以及動態移動球心的位置。爲了方便計算,咱們定義下面兩個輔助變量:
,1 ≤ i, j ≤ c。
對於來講,咱們只要使其知足
便可。
對於來講,咱們的目標是使其靠近m/4,經過計算當前值和目標值之間的殘差累積和,獲得一個迴歸值,原論文中使用了力的概念形象地說明了這個過程。
對於交叉樣本太多的兩個球心,賦予一個repulsive的力,對離得太遠的兩個球賦予一個attractive 的力。而後計算這些力的累加做用,更新球心,再根據目標一更新半徑。對照上面算法僞碼很容易理解該思想。
重複這個過程,直到知足收斂條件。
理論上說,優化的最終結果應該是的均值爲m/4,方差爲0,即徹底收斂,可是這很容易致使過擬合。
和不少漸進逼近的優化算法同樣(例如gradient descent),球哈希算法設置了一個收斂近似精度,來提早中止優化,避免過擬合的發生。
算法對均值和方差設置了一個容忍精度閾值 和
,只要優化在一段步驟區間中,達到了這個容忍精度,即代表優化結果,中止優化。
在原論文中,做者對和
的值實驗最佳值分別是10%和15%。
Relevant Link:
https://engineering.stanford.edu/people/moses-charikar http://xueshu.baidu.com/s?wd=charikar+%E2%80%9CRandom+Hyperplane%E2%80%9D&tn=SE_baiduxueshu_c1gjeupa&cl=3&ie=utf-8&bs=charikar+Random+Hyperplane&f=8&rsv_bp=1&rsv_sug2=0&sc_f_para=sc_tasktype%3D%7BfirstSimpleSearch%7D http://sglab.kaist.ac.kr/Spherical_Hashing/ https://blog.csdn.net/u014624632/article/details/79972100 http://sglab.kaist.ac.kr/Spherical_Hashing/Spherical_Hashing.pdf https://blog.csdn.net/u014624632/article/details/79972100 https://blog.csdn.net/zwwkity/article/details/8565485 https://www.bbsmax.com/A/LPdojpBG53/
Simhash是一種降維投影方法,它將一段文本映射爲一段固定位數的二進制指紋(fixed length fingerprint),同時,這種fingerprint具備較好的語法/語義一致性。
它由google的Moses Charikar提出。整個算法很是簡單精巧,咱們這章來闡述一下其算法過程。
使用ngram對文檔進行token化分詞,值得注意的是,n值的選取具備必定的技巧:
在分詞後,計算每一個token的權重,能夠經過ngram token詞頻統計獲得w,也能夠經過TF-IDF計算。無論用什麼方式,核心是將ngram token的權重表徵出來。
例如對」How are you?「這段話進行4-gram的切詞能夠獲得:
ngram tokens frequency list:
{u'owar': 1, u'reyo': 1, u'howa': 1, u'eyou': 1, u'ware': 1, u'arey': 1}
注意,這裏權重w爲1,只是咱們舉例比較簡單,恰好的巧合。
將每一個ngram token都被轉換爲了一個散列hash,這個散列hash是隨機均勻分佈的,例如MD五、SHA-1算法。
(u'owar', 1): 333172464361321106773216808497407930520 (u'reyo', 1): 310879434437019318776469684649603935114 (u'howa', 1): 98593099505511350710740956016689849066 (u'eyou', 1): 32675000308058660898513414756955031020 (u'ware', 1): 325869966946114134008620588371145019154 (u'arey', 1): 110781832133915061990833609831166700777
這個哈希化過程主要是完成字符串的數字化。由於對token進行哈希處理的散列函數是像MD五、SHA1這種隨機散列函數,散列後的空間是隨機均勻的。所以不一樣的token獲得的散列值自己不包含任何信息熵。
那什麼東西包含信息熵呢?筆者認爲這裏的傳遞下一步的信息熵有兩項:
對每一個token hash進行逐位掃描,對某一個token hash來講,若是某一位爲1,則賦值一個該token的正權重;若是某位是0,則賦值爲該token的負權重。
獲得一個N x M矩陣,N爲token數量,M爲fingerprint向量V的長度,原論文中默認爲64bit,咱們在實際開發中大多數也使用64bit,這是一個效率與效果比較折中的配置。
(u'owar', 1): [-1, -1, -1, 1, 1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, -1, 1, -1, 1, 1, 1, 1, 1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, 1, 1, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, 1, 1, 1, 1, -1] (u'reyo', 1): [-1, 1, -1, 1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, -1, -1, -1, -1, -1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, -1, 1, -1, -1, 1, 1, 1, -1, 1, 1, -1, -1] (u'howa', 1): [-1, 1, -1, 1, -1, 1, 1, 1, -1, 1, -1, -1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, -1, 1, -1, -1, -1, -1, -1, -1, -1, 1, 1, -1, 1, -1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1] (u'eyou', 1): [-1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, 1, 1, -1, -1, 1, 1, -1, -1, -1, 1, -1, -1, -1, -1, 1, -1, 1, -1, -1, -1, 1, -1, -1, -1, -1, -1] (u'ware', 1): [-1, 1, -1, -1, 1, -1, -1, -1, 1, 1, 1, 1, -1, -1, 1, -1, -1, 1, 1, -1, 1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, -1, -1, 1, 1, -1, -1, -1, -1, 1, 1, 1, -1, -1, 1, -1, 1, -1, -1, 1, 1, -1, 1, -1, -1, 1, 1, -1, 1, 1, -1, 1] (u'arey', 1): [1, -1, -1, 1, -1, 1, 1, 1, -1, -1, 1, 1, -1, 1, 1, 1, -1, 1, 1, 1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1, -1, 1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, -1, -1, -1]
這步有一個細節須要注意,即無論上一步token hash的位數多長,這一步都只進行fingerprint V長度的逐位掃描與翻譯,這實際上使用裁剪cutoff的方式實現了降維,這種壓縮映射會損失一部分準確性,引入必定的信息損失和誤報,不過這和咱們選擇的fingerprint V長度有關,咱們選的V越長,例如128bit,這種信息損失就越小。
從信息熵的角度來講,這一步實際就是在將上一步傳入的token權重這一信息進行翻譯。
上一步獲得的V是一個由token w組成的N x M矩陣,咱們逐位進行縱向的列維度sum壓縮:
v: [-4, 0, -4, 4, -2, 0, 0, 4, 0, 0, 0, 0, -4, 2, -2, 2, -2, 0, 4, 0, 2, 0, 0, 0, -4, 2, 2, 2, -4, 4, -4, 2, 0, 0, 0, 2, -2, -4, -2, 2, 0, -2, 0, 4, -2, 0, 2, 4, 4, -6, 0, -2, 0, -2, 0, -2, 0, 4, 4, 0, 2, 2, -4, -4]
這一步經過將每一bit上的全部信息都壓縮綜合起來,獲得最終的信息表達。
上一步獲得的V是一個1 x M,這裏M已經就是fingerprint長度的向量,默認爲64bit,最後一步進行歸一化。
逐位bit掃描當前fingerprint向量 V,若是其值>0,則歸一化爲1;若是其小於零,則歸一化爲0
v_: [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0]
用一張圖來總結梳理一下上述的幾個步驟:
這裏筆者拋出一個問題來一塊兒思考下,看起來在第四步已經獲得了已經降維後的定長向量了,並且向量的每一個元素也都是由全部token綜合起來獲得的,應該可以表明原始的輸入文本了,那爲啥第五步還要畫蛇添足進行一次0/1歸一化呢?背後的原理是啥呢?
# -*- coding: utf-8 -*- from __future__ import division, unicode_literals import re import sys import hashlib import logging import numbers import collections from itertools import groupby if sys.version_info[0] >= 3: basestring = str unicode = str long = int else: range = xrange def _hashfunc(x): return int(hashlib.md5(x).hexdigest(), 16) class Simhash(object): def __init__( self, value, f=64, reg=r'[\w\u4e00-\u9fcc]+', hashfunc=None, log=None ): """ `f` is the dimensions of fingerprints `reg` is meaningful only when `value` is basestring and describes what is considered to be a letter inside parsed string. Regexp object can also be specified (some attempt to handle any letters is to specify reg=re.compile(r'\w', re.UNICODE)) `hashfunc` accepts a utf-8 encoded string and returns a unsigned integer in at least `f` bits. """ self.f = f self.reg = reg self.value = None if hashfunc is None: self.hashfunc = _hashfunc else: self.hashfunc = hashfunc if log is None: self.log = logging.getLogger("simhash") else: self.log = log if isinstance(value, Simhash): self.value = value.value elif isinstance(value, basestring): self.build_by_text(unicode(value)) elif isinstance(value, collections.Iterable): self.build_by_features(value) elif isinstance(value, numbers.Integral): self.value = value else: raise Exception('Bad parameter with type {}'.format(type(value))) def __eq__(self, other): """ Compare two simhashes by their value. :param Simhash other: The Simhash object to compare to """ return self.value == other.value def _slide(self, content, width=4): return [content[i:i + width] for i in range(max(len(content) - width + 1, 1))] def _tokenize(self, content): content = content.lower() content = ''.join(re.findall(self.reg, content)) ans = self._slide(content) # ngram slide into tokens list return ans def build_by_text(self, content): # 1. ngram分詞 features = self._tokenize(content) # 2. ngram token詞頻統計,統計獲得的詞頻將做爲權重 features = {k:sum(1 for _ in g) for k, g in groupby(sorted(features))} print "ngram tokens frequency list: ", features return self.build_by_features(features) def build_by_features(self, features): """ `features` might be a list of unweighted tokens (a weight of 1 will be assumed), a list of (token, weight) tuples or a token -> weight dict. """ v = [0] * self.f # 初始化simhash fingerprint V,默認爲64bit,每一個元素初始化爲0 # 逐位爲1的掩碼,即[1], [10], [100]....[100000(64個)],這個掩碼數組的做用是後面進行逐位提取 masks = [1 << i for i in range(self.f)] print "masks: ", masks if isinstance(features, dict): features = features.items() for f in features: v_ = [0] * self.f # 若是傳入的是一個token string list,則默認每一個token string的權重都爲1 if isinstance(f, basestring): # 經過散列哈希算法將每一個ngram token轉換爲一個hash序列 h = self.hashfunc(f.encode('utf-8')) w = 1 # 若是傳入的是一個(token, wight)的list,則按照預約的weight進行計算,咱們本文默認採用ngram詞頻統計方式獲得weight else: assert isinstance(f, collections.Iterable) h = self.hashfunc(f[0].encode('utf-8')) w = f[1] # 每一個ngram token都被轉換爲了一個散列hash,這個散列hash是隨機均勻分佈的 #print "{0}: ".format(f), h # 循環f次(本文是64bit),逐位進行掃描,若是某一位是1,則賦值爲該token的正權重;若是某位是0,則賦值爲該token的負權重 for i in range(self.f): #print "h & masks[i]: ", h & masks[i] v[i] += w if h & masks[i] else -w v_[i] += w if h & masks[i] else -w print "{0}: ".format(f), v_ # 在完成對全部ngram token的掃描後,fingerprint向量 V 的每一位bit都是全部token hash在該bit上的權重加和結果。 print "v: ", v ans = 0 # 逐位bit掃描當前fingerprint向量 V,若是其值>0,則歸一化爲1;若是其小於零,則歸一化爲0 v_ = [0] * self.f for i in range(self.f): if v[i] > 0: ans |= masks[i] v_[i] = 1 else: v_[i] = 0 print "v_: ", v_ self.value = ans def distance(self, another): assert self.f == another.f x = (self.value ^ another.value) & ((1 << self.f) - 1) ans = 0 while x: ans += 1 x &= x - 1 return ans class SimhashIndex(object): def __init__(self, objs, f=64, k=2, log=None): """ `objs` is a list of (obj_id, simhash) obj_id is a string, simhash is an instance of Simhash `f` is the same with the one for Simhash `k` is the tolerance """ self.k = k self.f = f count = len(objs) if log is None: self.log = logging.getLogger("simhash") else: self.log = log self.log.info('Initializing %s data.', count) self.bucket = collections.defaultdict(set) for i, q in enumerate(objs): if i % 10000 == 0 or i == count - 1: self.log.info('%s/%s', i + 1, count) self.add(*q) def get_near_dups(self, simhash): """ `simhash` is an instance of Simhash return a list of obj_id, which is in type of str """ assert simhash.f == self.f ans = set() for key in self.get_keys(simhash): dups = self.bucket[key] self.log.debug('key:%s', key) if len(dups) > 200: self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups)) for dup in dups: sim2, obj_id = dup.split(',', 1) sim2 = Simhash(long(sim2, 16), self.f) d = simhash.distance(sim2) if d <= self.k: ans.add(obj_id) return list(ans) def add(self, obj_id, simhash): """ `obj_id` is a string `simhash` is an instance of Simhash """ assert simhash.f == self.f for key in self.get_keys(simhash): v = '%x,%s' % (simhash.value, obj_id) self.bucket[key].add(v) def delete(self, obj_id, simhash): """ `obj_id` is a string `simhash` is an instance of Simhash """ assert simhash.f == self.f for key in self.get_keys(simhash): v = '%x,%s' % (simhash.value, obj_id) if v in self.bucket[key]: self.bucket[key].remove(v) @property def offsets(self): """ You may optimize this method according to <http://www.wwwconference.org/www2007/papers/paper215.pdf> """ return [self.f // (self.k + 1) * i for i in range(self.k + 1)] def get_keys(self, simhash): for i, offset in enumerate(self.offsets): if i == (len(self.offsets) - 1): m = 2 ** (self.f - offset) - 1 else: m = 2 ** (self.offsets[i + 1] - offset) - 1 c = simhash.value >> offset & m yield '%x:%x' % (c, i) def bucket_size(self): return len(self.bucket)
使用時,import引入便可:
# -*- coding: utf-8 -*- from simhash import Simhash, SimhashIndex if __name__ == '__main__': sh = Simhash('How are you? I Am fine. ablar ablar xyz blar blar blar blar blar blar blar Thanks.') sh2 = Simhash('How are you i am fine.ablar ablar xyz blar blar blar blar blar blar blar than') dis = sh.distance(sh2) print "sh: ", sh.value print "sh2: ", sh2.value print "dis: ", dis
Relevant Link:
https://www.mit.edu/~andoni/LSH/ http://www.cs.princeton.edu/courses/archive/spr04/cos598B/bib/CharikarEstim.pdf http://people.csail.mit.edu/indyk/ https://blog.csdn.net/laobai1015/article/details/78011870 https://github.com/LittleHann/simhash https://www.cnblogs.com/hxsyl/p/4518506.html https://zhuanlan.zhihu.com/p/32078737 https://www.kancloud.cn/kancloud/the-art-of-programming/41614 https://wizardforcel.gitbooks.io/the-art-of-programming-by-july/content/06.03.html http://yanyiwu.com/work/2014/01/30/simhash-shi-xian-xiang-jie.html https://www.cnblogs.com/maybe2030/p/5203186.html
筆者認爲Simhash之因此能夠實現局部敏感,主要原有有兩個:
simhash的hash不是直接經過原始輸入文本計算獲得的,而是經過ngram分片,將原始輸入文本經過滑動窗口分片獲得slice token列表,對每個slice token分別經過某種合理的方式計算一段hash,而後經過某種合理的方式將全部hash綜合起來,獲得最終的hash。
咱們經過一個例子來講明,假設有兩段文本:
1. how are u? 2. how are you?
分別使用4-gram進行切片,獲得:
1. [u'howa', u'owar', u'ware', u'areu'] 2. [u'howa', u'owar', u'ware', u'arey', u'reyo', u'eyou']
能夠看到,由於ngram切片的緣由,輸入文本中的修改隻影響到最終ngram list中的最後3個slice token,從而輸入文本對最終Hash的影響也從整個散列空間縮小到了最後3個slice token中,這就是所謂的局部敏感算法。
其實基於ngram的切片式特徵工程自己就是一個有損信息抽取的特徵提取方式,這種信息損失,一方面損失了精度,可是另外一方面也帶來了對輸入局部修改的容忍度。
可是simhash僅僅是感知局部slice token的變化嗎?不是,光一個rooling slice checksum是沒法提供足夠的局部修改容忍度的。
除了rooling piece wise分片思想以外,Simhash還引入了」Sice Token權重思想「,即每一個slice Token具體對最終的Hash能產生多大的影響,取決於這些slice Token的權重。
咱們仍是用一個例子來講明這句話的意思,假設有三段文本:
1. how are u? 2. how are you? 3. how are u? and u? and u? and u? and u?
能夠看到,這3段文本都不同,可是若是咱們以第一段文本爲基準,能夠發現另外2段文本的修改程度是不同的。
仍是使用4-gram進行切片,獲得slice token list:
1. [u'howa', u'owar', u'ware', u'areu'] 2. [u'howa', u'owar', u'ware', u'arey', u'reyo', u'eyou'] 3. [u'howa', u'owar', u'ware', u'areu', u'reua', u'euan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu']
能夠看到,後兩個輸入文本都形成了不少slice token的改變。那最終的simhash受了多少影響呢?影響slice token數量多就是影響多嗎?
simhash在slice token之上,還引入了slice token weight一維度信息,simhash不只統計受影響的slice token,還會統計每一個slice token的權重(例如是詞頻統計,也能夠是TF-iDF)。
例如對上面的slice token list進行詞頻統計得:
1. {u'ware': 1, u'owar': 1, u'howa': 1, u'areu': 1} 2. {u'ware': 1, u'owar': 1, u'howa': 1, u'arey': 1, u'reyo': 1, , u'eyou': 1} 3. {u'ware': 1, u'owar': 1, u'howa': 1, u'areu': 1, u'reua': 1, , u'euan': 1, u'ndua': 3, , u'duan': 3, u'andu': 4, u'uand': 4}
能夠看到,第二個文本雖然變更了2個slice token,可是權重不高,對最終的hash的影響有限。可是第三個文本中,不只出現了較多token變更,並且每一個token的權重比較高,它們對最終hash的影響就相對很大了。
爲了說明上述的觀點,咱們來運行一段示例代碼:
# -*- coding: utf-8 -*- from simhash import Simhash, SimhashIndex if __name__ == '__main__': sh1 = Simhash('how are u?') sh2 = Simhash('how are you?') sh3 = Simhash('how are u? and u? and u? and u? and u?') dis_1_2 = sh1.distance(sh2) dis_1_3 = sh1.distance(sh3) print "sh1: ", sh1.value print "sh2: ", sh2.value print "sh3: ", sh3.value print "dis_1_2: ", dis_1_2 print "dis_1_3: ", dis_1_3
能夠看到,文本2和文本1的距離,小於文本3和文本1的距離。
筆者認爲,Simhash比Ssdeep效果好的主要緣由之一就在於這第二點,即Slice Token權重思想,藉助權重均值化這種hash化方法,使得Simhash對多處少許的局部能夠具有更大的容忍度。
這裏提醒讀者朋友注意一個細節,simhash的降維過程分紅了2個環節。第一個環節中,原始ascii特徵空間被降維到了ngram token特徵空間;第二個環節中,ngram token特徵空間被降維到了一個定長的fingerprint hashbit空間中,第二步降維的本質上也是一個線性變換的過程,從矩陣列向量的角度能夠看的很是明顯。
Simhash算法與上文提到隨機超平面哈希之間是什麼關係呢?一言以蔽之:Simhash是隨機超平面投影的一種特殊實現,本質上屬於隨機超平面投影的一種。
怎麼理解這句話呢?筆者帶領你們從列向量的視角來從新審視一下simhash的計算過程。simhash的具體原理這裏再也不贅述,文章前面已經詳細討論過了,這裏直接進入正題。
假設輸入文本通過ngram以後獲得5個詞token,並經過詞頻統計獲得這5個詞token的權重向量d,d = (w1=1,w2=2,w3=0,w4=3,w5=0)
simhash中是經過散列哈希的方法獲得每一個詞token的一個向量化表示,這裏咱們抓住其本質,即散列哈希每個詞token的本質目的就是爲了定義一個低維的向量空間。
假設這5個詞token對應的3維向量分別爲:
h(w1) = (1, -1, 1) h(w2) = (-1, 1, 1) h(w3) = (1, -1, -1) h(w4) = (-1, -1, 1) h(w5) = (1, 1, -1)
按照simhash的算法,是將每一個詞token向量乘上對應的權重w,而後再按照列相加起來,即
m = w1 * h(w1) + w2 * h(w2) + w3 * h(w3) + w4 * h(w4) + w5 * h(w5) = 1 * h(w1) + 2 * h(w2) + 0 * h(w3) + 3 * h(w4) + 0 * h(w5) = (-4, -2, 6)
實際上,上述過程可使用列向量矩陣的方式來一步完成:
接下來simhash的0/1歸一化,其實就是sgn符號函數。
能夠看到,simhash算法產生的結果與隨機超平面投影的結果是一致的。
更進一步地說,在simhash中,隨機超平面,被詞token的權重向量代替了,詞token權重向量做爲超平面和原始向量進行內積計算,計算其夾角。
simhash算法獲得的兩個簽名的漢明距離,能夠用來衡量原始向量的夾角。
Relevant Link:
http://www.cs.princeton.edu/courses/archive/spr04/cos598B/bib/CharikarEstim.pdf
這個章節,咱們繼續深刻討論simhash的算法的底層思想。無論simhash流程如何複雜,其本質是對原始數據應用一個矩陣變換,經過線性變換的方式轉換向量基,將原始數據轉換到另外一個向量空間中。
假設輸入文本中包含 n 個字符,原始字符向量空間維度爲m,原始的數據向量矩陣爲:
,M * N的矩陣。
左乘上一個 K*M 矩陣:
,K * N的矩陣。
左乘上一個 K*N 對角矩陣:
左乘上一個 P*K 矩陣:
能夠看到,simhash中不一樣步驟,分別對應了不一樣的矩陣運算:
Relevant Link:
https://www.cnblogs.com/LittleHann/p/10859016.html#_label7
前面討論的幾種LSH算法,基本能夠解決通常狀況下的問題,不過對於某些特定狀況仍是不行,好比:
其實若是咱們從計算公式的角度來看前面討論的幾種LSH,發現其形式均可以表示成內積的形式,提到內積天然會想到kernel方法,LSH也一樣可使用kernel核方法,關於Kernel LSH的工做可參看下面這三篇文章。
Relevant Link:
http://www.robots.ox.ac.uk/~vgg/rg/papers/klsh.pdf 2009年ICCV上的 Kernelized Locality-Sensitive Hashing for Scalable Image Search http://machinelearning.wustl.edu/mlpapers/paper_files/NIPS2009_0146.pdf 2009年NIPS上的Locality-Sensitive Binary Codes From Shift-Invariant Kernels http://pages.cs.wisc.edu/~brecht/papers/07.rah.rec.nips.pdf 2007年NIPS上的Random Features for Large-Scale Kernel Machines
模糊哈希算法,又叫基於內容分割的分片分片哈希算法(context triggered piecewise hashing, CTPH)。
筆者認爲,ssdeep算法的主要思想有如下幾點:
Dynamic piecewise hashing動態分片哈希思想:基於輸入文本的長度,進行動態分片,將局部的改變限制在一個有限長度的窗口內。
ssdeep的分片不是ngram那種固定size的滑動窗口機制,而是根據輸入文本的長度動態算出的一個n值。
咱們知道,即使對弱哈希,都具有隨機均勻散列的性質,即產生的結果在其映射空間上是接近於均勻分佈的。
在ssdeep中,n的值始終取2的整數次方,這樣Alder-32哈希值(每一個byte的滾動hash)除以n的餘數也接近於均勻分佈。僅當餘數等於n-1時分片,就至關於只有差很少1/n的狀況下會分片。也就是說,對一個文件,沒往前讀取一個byte,就有1/n的可能要分片。
在ssdeep中,每次都是將n除以或者乘以2,來調整,使最終的片數儘量在32到64之間。
bs = 3 while bs * MAX_LENGTH < length: bs *= 2
同時在ssdeep中,n的值會做爲一個最終結果的一部分出現,在比較的時候,n會做爲一個考量因素被計入考量,具體細節後面會討論。
上述策略下,一個新問題出現了。這是一種比較極端的狀況。假設一個文件使用的分片值n。在該文件中改動一個字節(修改、插入、刪除等),且這個改動影響了分片的數量,使得分片數增長或減小,例如把n乘以或者除以2。所以,即使對文件的一個字節改動,也可能致使分片條件n的變化,從而致使分片數相差近一倍,而獲得的結果可能會發生巨大的變化,如何解決這個問題?
ssdeep解決這種問題的思考是加入冗餘因子,將邊界狀況也歸入進來。
對每個文件,它同時使用n和n/2做爲分片值,算得兩個不一樣的模糊哈希值,而這兩個值都使用。所以,最後獲得的一個文件的模糊哈希值是:
n : h(n) : h(n/2)
而在比較時,若是兩個文件的分片值分別爲n和m,則判斷是否有n==m, n==2m, 2n==m三種狀況,若是有之一,則將二者相應的模糊哈希值進行比較。例如,若是n==2m,則比較h(n/2)與h(m)是否類似。這樣,在必定程序上解決了分片值變化的問題。
ssdeep逐字節讀取輸入文本內容,並採用滾動哈希算法(rolling hashing)不斷疊加式計算最新的hash,在ssdeep中,使用Alder-32 [4] 算法做爲弱哈希。它實際是一種用於校驗和的弱哈希,相似於CRC32,不能用於密碼學算法,但計算快速,生成4字節哈希值,而且是滾動哈希。
獲得了當前byte對應的滾動hash值後,ssdeep基於動態分片閾值(上一節討論過)以及滾動Hash的當前State狀態值動態決定每一步(byte)是否分片。
哈希值除以n的餘數剛好等於n-1時,就在當前位置分片
和simhash同樣,對每一個token進行隨機散列哈希化,可使用傳統的哈希算法,例如MD5。在ssdeep中,使用一個名爲Fowler-Noll-Vo hash的哈希算法。
這一步沒有什麼特別意義,純粹是一個信息傳遞過程。
對每個文件分片,計算獲得一個哈希值之後,能夠選擇將結果壓縮短。例如,在ssdeep中,只取FNV(Fowler-Noll-Vo hash的哈希算法)哈希結果的最低6位,並用一個ASCII字符表示出來,做爲這個分片的最終哈希結果。
這一步的壓縮映射損失了一部分的信息,可是帶來了必定的冗餘度的提高。
將每片壓縮後的哈希值鏈接到一塊兒,就獲得這個文件的模糊哈希值了(hash)。若是分片條件參數n對不一樣文件可能不一樣,還應該將n歸入模糊哈希值中。
':'.join([str(bs), hash1, hash2])
注意,上文提到的h(n)和h(n/2)都要拼接進來
在ssdeep中,採用的以下思路。因爲ssdeep對每一片獲得的哈希值是一個ASCII字符,最終獲得的文件模糊哈希值就是一個字符串了。假設是s一、s2,將s1到s2的「加權編輯距離」(weighted edit distance)做爲評價其類似性的依據。
接下來,ssdeep將這個距離除以s1和s2的長度和,以將絕對結果變爲相對結果,再映射到0-100的一個整數值上,其中,100表示兩個字符串徹底一致,而0表示徹底不類似。
咱們來模擬分析一下模糊哈希是如何面對不一樣程度的文本修改,以及又是如何在各類修改狀況下進行類似性分析的,經過這個例子咱們能夠更清晰地理解ssdeep的工做原理。
咱們以概括推理的方法來展開分析,無論對原始輸入文本進行如何程度的修改,均可以從單個字符的修改這裏推演獲得,複雜的增刪改查是簡單原子的修改的組合與疊加,這是部分與總體的關係。
若是在一個輸入文本中修改一個字節,對ssdeep hash來講,有幾種狀況:
# -*- coding: utf-8 -*- import numpy as np import collections import doctest import pprint def INSERTION(A, cost=1): return cost def DELETION(A, cost=1): return cost def SUBSTITUTION(A, B, cost=1): return cost Trace = collections.namedtuple("Trace", ["cost", "ops"]) class WagnerFischer(object): # Initializes pretty printer (shared across all class instances). pprinter = pprint.PrettyPrinter(width=75) def __init__(self, A, B, insertion=INSERTION, deletion=DELETION, substitution=SUBSTITUTION): # Stores cost functions in a dictionary for programmatic access. self.costs = {"I": insertion, "D": deletion, "S": substitution} # Initializes table. self.asz = len(A) self.bsz = len(B) self._table = [[None for _ in range(self.bsz + 1)] for _ in range(self.asz + 1)] # From now on, all indexing done using self.__getitem__. ## Fills in edges. self[0][0] = Trace(0, {"O"}) # Start cell. for i in range(1, self.asz + 1): self[i][0] = Trace(self[i - 1][0].cost + self.costs["D"](A[i - 1]), {"D"}) for j in range(1, self.bsz + 1): self[0][j] = Trace(self[0][j - 1].cost + self.costs["I"](B[j - 1]), {"I"}) ## Fills in rest. for i in range(len(A)): for j in range(len(B)): # Cleans it up in case there are more than one check for match # first, as it is always the cheapest option. if A[i] == B[j]: self[i + 1][j + 1] = Trace(self[i][j].cost, {"M"}) # Checks for other types. else: costD = self[i][j + 1].cost + self.costs["D"](A[i]) costI = self[i + 1][j].cost + self.costs["I"](B[j]) costS = self[i][j].cost + self.costs["S"](A[i], B[j]) min_val = min(costI, costD, costS) trace = Trace(min_val, set()) # Adds _all_ operations matching minimum value. if costD == min_val: trace.ops.add("D") if costI == min_val: trace.ops.add("I") if costS == min_val: trace.ops.add("S") self[i + 1][j + 1] = trace # Stores optimum cost as a property. self.cost = self[-1][-1].cost def __repr__(self): return self.pprinter.pformat(self._table) def __iter__(self): for row in self._table: yield row def __getitem__(self, i): """ Returns the i-th row of the table, which is a list and so can be indexed. Therefore, e.g., self[2][3] == self._table[2][3] """ return self._table[i] # Stuff for generating alignments. def _stepback(self, i, j, trace, path_back): """ Given a cell location (i, j) and a Trace object trace, generate all traces they point back to in the table """ for op in trace.ops: if op == "M": yield i - 1, j - 1, self[i - 1][j - 1], path_back + ["M"] elif op == "I": yield i, j - 1, self[i][j - 1], path_back + ["I"] elif op == "D": yield i - 1, j, self[i - 1][j], path_back + ["D"] elif op == "S": yield i - 1, j - 1, self[i - 1][j - 1], path_back + ["S"] elif op == "O": return # Origin cell, so we"re done. else: raise ValueError("Unknown op {!r}".format(op)) def alignments(self): """ Generate all alignments with optimal-cost via breadth-first traversal of the graph of all optimal-cost (reverse) paths implicit in the dynamic programming table """ # Each cell of the queue is a tuple of (i, j, trace, path_back) # where i, j is the current index, trace is the trace object at # this cell, and path_back is a reversed list of edit operations # which is initialized as an empty list. queue = collections.deque( self._stepback(self.asz, self.bsz, self[-1][-1], [])) while queue: (i, j, trace, path_back) = queue.popleft() if trace.ops == {"O"}: # We have reached the origin, the end of a reverse path, so # yield the list of edit operations in reverse. yield path_back[::-1] continue queue.extend(self._stepback(i, j, trace, path_back)) def IDS(self): """ Estimates insertions, deletions, and substitution _count_ (not costs). Non-integer values arise when there are multiple possible alignments with the same cost. """ npaths = 0 opcounts = collections.Counter() for alignment in self.alignments(): # Counts edit types for this path, ignoring "M" (which is free). opcounts += collections.Counter(op for op in alignment if op != "M") npaths += 1 # Averages over all paths. return collections.Counter({o: c / npaths for (o, c) in opcounts.items()}) FNV_PRIME = 0x01000193 FNV_INIT = 0x28021967 MAX_LENGTH = 64 B64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" class Last7chars(object): def __init__(self): self._reset_rollhash() def _reset_rollhash(self): self.roll_h1 = 0 self.roll_h2 = 0 self.roll_h3 = 0 self.ringbuffer = [0] * 7 self.writeindex = 0 def _roll_hash(self, char): char7bf = self.readwrite(char) self.roll_h2 += 7 * char - self.roll_h1 self.roll_h1 += char - char7bf self.roll_h3 <<= 5 self.roll_h3 &= 0xffffffff self.roll_h3 ^= char return self.roll_h1 + self.roll_h2 + self.roll_h3 def readwrite(self, num): retval = self.ringbuffer[self.writeindex] self.ringbuffer[self.writeindex] = num self.writeindex = (self.writeindex + 1) % 7 return retval def __repr__(self): arr = self.ringbuffer[ self.writeindex:] + self.ringbuffer[:self.writeindex] return " ".join(map(str, arr)) def _update_fnv(fnvhasharray, newchar): fnvhasharray *= FNV_PRIME fnvhasharray &= 0xffffffff fnvhasharray ^= newchar return fnvhasharray def _calc_initbs(length): bs = 3 while bs * MAX_LENGTH < length: bs *= 2 if bs > 3: #proably checking for integer overflow here? return bs return 3 def ssdeep_hash(content): bs = _calc_initbs(len(content)) #print "bs: ", bs hash1 = '' hash2 = '' last7chars = Last7chars() while True: last7chars._reset_rollhash() fnv1 = FNV_INIT fnv2 = FNV_INIT hash1 = '' hash2 = '' fnvarray = np.array([fnv1, fnv2]) for i in range(len(content)): # 逐bytes掃描 c = ord(content[i]) # 使用Alder-32 [4] 算法做爲弱哈希。它實際是一種用於校驗和的弱哈希,相似於CRC32,不能用於密碼學算法,但計算快速,生成4字節哈希值,而且是滾動哈希。 h = last7chars._roll_hash(c) #print "h_roll_hash: ", h fnvarray = _update_fnv(fnvarray, c) # 當Alder-32哈希值除以n的餘數剛好等於n-1時,就在當前位置分片;不然,不分片,窗口日後滾動一個字節,而後再次計算Alder-32哈希值並判斷,如此繼續 # 1. 使用bs做爲分片值 if h % bs == (bs - 1) and len(hash1) < (MAX_LENGTH - 1): # 對每片分別計算哈希了。可使用傳統的哈希算法,例如MD5。在ssdeep中,使用一個名爲Fowler-Noll-Vo hash的哈希算法 b64char = B64[fnvarray[0] & 63] hash1 += b64char fnvarray[0] = FNV_INIT # 2. 使用2*bs做爲分片值 if h % (2 * bs) == (2 * bs - 1) and len(hash2) < ( MAX_LENGTH / 2 - 1): b64char = B64[fnvarray[1] & 63] hash2 += b64char fnvarray[1] = FNV_INIT # 將每片壓縮後的哈希值鏈接到一塊兒,就獲得這個文件的模糊哈希值了 hash1 += B64[fnvarray[0] & 63] # 對每個文件分片,計算獲得一個哈希值之後,能夠選擇將結果壓縮短。例如,在ssdeep中,只取FNV哈希結果的最低6位,並用一個ASCII字符表示出來,做爲這個分片的最終哈希結果 hash2 += B64[fnvarray[1] & 63] # 這裏 &63,等價於取最低6bit if bs <= 3 or len(hash1) > (MAX_LENGTH / 2): break bs = int(bs / 2) if bs < 3: bs = 3 # 對每個文件,它同時使用n和n/2做爲分片值,算得兩個不一樣的模糊哈希值,而這兩個值都使用。所以,最後獲得的一個文件的模糊哈希值是: n:h(n):h(n/2) return ':'.join([str(bs), hash1, hash2]) #from https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Longest_common_substring#Python_2 def longest_common_substring(s1, s2): m = [[0] * (1 + len(s2)) for i in xrange(1 + len(s1))] longest, x_longest = 0, 0 for x in xrange(1, 1 + len(s1)): for y in xrange(1, 1 + len(s2)): if s1[x - 1] == s2[y - 1]: m[x][y] = m[x - 1][y - 1] + 1 if m[x][y] > longest: longest = m[x][y] x_longest = x else: m[x][y] = 0 return s1[x_longest - longest:x_longest] def _likeliness(min_lcs, a, b): # 若是最長公共子串長度不知足要求,則直接退出 if longest_common_substring(a, b) < min_lcs: return 0 # Wagner Fischer算法(字符串編輯距離,Edit Distance) dist = WagnerFischer(a, b).cost # ssdeep將這個距離除以s1和s2的長度和,以將絕對結果變爲相對結果,再映射到0-100的一個整數值上,其中,100表示兩個字符串徹底一致,而0表示徹底不類似 dist = int(dist * MAX_LENGTH / (len(a) + len(b))) dist = int(100 * dist / 64) if dist > 100: dist = 100 return 100 - dist def ssdeep_compare(hashA, hashB, min_lcs=7): bsA, hs1A, hs2A = hashA.split(':') #blocksize, hash1, hash2 bsB, hs1B, hs2B = hashB.split(':') bsA = int(bsA) bsB = int(bsB) like = 0 # 在比較時,若是兩個文件的分片值分別爲n和m,則判斷是否有n==m, n==2m, 2n==m三種狀況,若是有之一,則將二者相應的模糊哈希值進行比較。例如,若是n==2m,則比較h(n/2)與h(m)是否類似 #block size comparison if bsA == bsB: #compare both hashes like1 = _likeliness(min_lcs, hs1A, hs1B) like2 = _likeliness(min_lcs, hs2A, hs2B) like = max(like1, like2) elif bsA == 2 * bsB: # Compare hash_bsA with hash_2*bsB like = _likeliness(min_lcs, hs1A, hs2B) elif 2 * bsA == bsB: # Compare hash_2*bsA with hash_bsB like = _likeliness(min_lcs, hs2A, hs1B) else: #nothing suitable to compare like = 0 return like if __name__ == '__main__': import sys content1 = "this is a test!" content2 = "this is a test." hash1 = ssdeep_hash(content1) print hash1 hash2 = ssdeep_hash(content2) print hash2 similarity = ssdeep_compare(hash1, hash2) print similarity
Relevant Link:
https://github.com/LittleHann/ssdeeppy https://ssdeep-project.github.io/ssdeep/ https://www.claudxiao.net/2012/02/fuzzy_hashing/#comment-457473