傳統上,咱們接觸的機器學習算法,都是被設計爲解決某一個某一類問題的肯定性算法。對於這些機器學習算法來講,惟一的靈活性體如今參數搜索空間上,向算法輸入樣本,算法藉助不一樣的優化手段,對參數進行調整,以此來獲得一個對訓練樣本和測試樣本的最佳適配參數組。php
遺傳編程算法徹底走了另外一外一條路,遺傳編程算法的目標是編寫一個程度,這個程序會嘗試自動構造出解決某一問題的最佳程度。從本質上看,遺傳編程算法構造的是一個可以構造算法的算法。html
另外一方面,咱們曾經討論過遺傳算法,遺傳算法是一種優化技術,就優化技術而言,不管是何種形式的優化,算法或度量都是預先設定好的,而優化算法所作的工做就是嘗試爲其找到最佳參數。和優化算法同樣,遺傳編程也須要一種方法來度量題解的優劣程度。但與優化算法不一樣的是,遺傳編程中的題解並不只僅是一組用於給定算法的參數,相反,在遺傳編程中,連同算法自己及其全部參數,都是須要搜索肯定的。node
從某種程度上來講,遺傳編程和遺傳算法的區別在於,進化的基本單位不一樣,python
遺傳算法是受達爾文的進化論的啓發,借鑑生物進化過程而提出的一種啓發式搜索算法,所以遺傳算法 ( GA , Genetic Algorithm ) 也稱進化算法 。 所以,在討論遺傳編程的時候,會大量借用進化論中的術語和概念,爲了更好地討論遺傳算法,咱們先介紹一些基本生物進化概念,git
生物所處的環境起到一個提供生存壓的做用(反饋),雖然縱觀整個地球歷史,環境的因素是在不斷變化的(有時甚至變化的還很快),可是在某個時間段內(例如5000年內)是基本保持不變的,而物種進化的目的就是經過一代代的繁衍,逐漸適應(擬合)當前的環境,並和其餘物種達到最優平衡(納什均衡)。github
遺傳編程算法就是模擬了生物進化的過程,簡單說來講,web
下圖是遺傳算法的流程圖,正則表達式
從大的方面看,遺傳編程的兩個重要概念是基因型和表現型,算法
其實遺傳算法領域的研究中,這兩個方面的研究都有,可是,由於遺傳編程很難直接處理程序片斷(表現型)(例如一段C++可執行代碼、或者是一段python代碼),由於基於隨機變異獲得的新代碼極可能沒法經過編譯器語法檢查。shell
可是相比之下,遺傳算法反而容易處理程序片斷的內在結構(基因型)(例如C++代碼的AST抽象語法樹)。
因此,筆者認爲基因型的遺傳算法研究纔是更有研究價值的一個方向,本文的討論也會圍繞基因型的遺傳算法展開。
根據基因型形態的不一樣,遺傳編程方法能夠分爲三種:
線性遺傳編程有廣義和狹義之分,
http://www.doc88.com/p-630428999834.html https://pdfs.semanticscholar.org/958b/f0936eda72c3fc03a09a0e6af16c072449a1.pdf
基於樹的遺傳編程的基因型是樹結構。基於樹的遺傳編程是遺傳編程最先的形態,也是遺傳編程的主流方法。
大多數編程語言,在編譯或解釋時,首先會被轉換成一棵解析樹(Lisp編程語言及其變體,本質上就是一種直接訪問解析樹的方法),例以下圖,
樹上的節點有多是枝節點也多是葉節點,
例如上圖中,圓形節點表明了應用於兩個分支(Y變量和常量值5)之上的求和操做。一旦咱們求出了此處的計算值,就會將計算結果賦予上方的節點處。相應的,這一計算過程會一直向下傳播,直到遍歷全部的葉子節點(深度優先遞歸遍歷)。
若是對整棵樹進行遍歷,咱們會發現它至關於下面這個python函數:
在遺傳變異方面,基於樹的遺傳編程的演化操做有兩種,變異和交叉,
樹是一種特殊的圖,所以人們很天然地想到將基於樹的遺傳編程擴展到基於圖的遺傳編程。下圖就是基於圖的遺傳編程的基因型的一個示例。
Relevant Link:
《Adaptation in Natural and Artificial Systems》 John Henry Holland 1992 http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%80%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%ae%80%e4%bb%8b 《Evolving Evolutionary Algorithms using Linear Genetic Programming (2005)》 《A comparison of several linear genetic programming techniques》Oltean, Mihai, and Crina Grosan. Complex Systems 14.4 (2003): 285-314. https://www.jianshu.com/p/a953066cb2eb
這個章節,咱們用數學的形式化視角,來從新審視一下遺傳算法。
I | 種羣中的個體 |
m | 全部可能個體的數量 |
n | 種羣大小 |
pm | 變異機率 |
pc | 交叉機率 |
f(I) | 個體I的適應度。 |
p(I)t | 第t代種羣中,個體I出現的機率 |
![]() |
第t代種羣平均適應度。第t代種羣中個體適應度的平均值。 |
由於遺傳算法中有各類各樣的編碼方式、變異操做、交叉操做和選擇操做,遺傳算法的形態呈現多樣性。
爲了簡化分析,咱們這裏假設一個典型遺傳算法,即,
模式定理是遺傳算法創始人 J.Holland 在其突破性著做《Adaptation in Natural and Artificial Systems》引入的,用於分析遺傳算法的工做原理。
模式是指基因編碼空間中,由一類類似的基因組抽象獲得的pattern,好比 [0,*,*,1] 就是一個模式。染色體[0,1,0,1]和[0,0,0,1]都包含該模式。
在具體討論模式定理以前,咱們先介紹一些符號,
L(H) | 模式的長度。第一固定基因位和最後一個固定基因位的距離,其中L([0,*,*,1])=3。 |
O(H) | 模式的階。固定基因位的個數,其中O([0,*,*,1])=2。 |
![]() |
模式平均適應度。種羣中包含該模式的個體適應度的平均值。 |
p(H)t | 在第t代種羣中,模式H出現的機率。 |
【模式定理】
在本章定義的典型遺傳算法中,下面公式成立:
這個公式看起來有點複雜,其實很是簡單,咱們逐個部分來分解,
整體來講,遺傳算法須要在,選擇操做引發的收斂性和變異交叉操做引發的多樣性之間取得平衡。
模式定理的通俗說法是這樣的,低階、短序以及平均適應度高於種羣平均適應度的模式在子代中呈指數增加。
低階、短長以及平均適應度高於種羣平均適應度的模式H,
此時,
即模式H呈現指數增加。
這個小節咱們來討論一個有些理論化的問題,即:遺傳編程算法通過必定次數的迭代後,是否會收斂到某個穩態?若是會達到穩態,遺傳編程的收斂速度是多少?
要解決這個問題,咱們須要引入數學工具,馬爾柯夫鏈,有以下定義。
咱們把整個種羣的狀態當作馬爾科夫鏈的一個狀態 s,交叉、變異和選擇操做則構建了一個機率轉移矩陣。通常狀況下,0<pm<1,0<=pc<=1,即物種變異必定會發生,但不是必然100%發生。咱們來分析一下這時的機率轉移矩陣的性質。
標準的優化算法分析第一個要關心的問題是,優化算法能不能收斂到全局最優勢。假設全局最優勢的適應度值爲maxf,收斂到全局最優勢的定義以下,
一言以蔽之,典型遺傳算法並不收斂。
根據機率轉移矩陣收斂定理,咱們能夠知道典型遺傳算法會收斂到一個全部種羣狀態機率都大於0的機率分佈上(穩態)。所以以後,不包含全局最優解的種羣必定會不停出現,從而致使上面的公式不成立。
可是筆者這裏要強調的是,這章討論的典型遺傳算法在實際工程中是幾乎不存在的,實際上,幾乎全部遺傳算法代碼都會將保持已發現最優解。加了這個變化以後的遺傳算法是收斂的。
仍是根據上述機率轉移矩陣收斂定理,咱們能夠知道遺傳算法會收斂到一個全部種羣狀態機率都大於0的機率分佈上,那麼包含全局最優解的種羣必定會不停出現,保持已發現最優解的作法會使得上面的公式成立。
Relevant Link:
Adaptation in Natural and Artificial Systems: An Introductory Analysis with Applications to Biology, Control, and Artificial Intelligence Rudolph, Günter. 「Convergence analysis of canonical genetic algorithms.」 Neural Networks, IEEE Transactions on 5.1 (1994): 96-101. http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%89%e6%95%b0%e5%ad%a6%e6%91%86%e6%91%86%e6%89%8b%ef%bc%8c%e5%be%88%e6%83%ad%e6%84%a7%ef%bc%8c%e5%8f%aa%e5%81%9a%e4%ba%86
自 John Henry Holland 在1992年提出《Adaptation in Natural and Artificial Systems》論文後,遺傳編程又獲得了大量研究者的關注和發展,提出了不少改進型的衍生算法。雖然這些算法在工業場景中不必定都適用,可是筆者以爲咱們有必要了解和學習一下它們各自的算法思想,有利於咱們在遇到各自的實際問題的時候,觸類旁通。
典型交叉變異隨機選擇兩條染色體,按照pc的機率決定是否交叉,若是選擇交叉則隨機選擇一點並將這點以後的基因交換。這種交叉方式被稱爲單點雜交。
多點雜交指定了多個交換點用於父本的基因交換重組,具體的執行過程以下圖所示,
多點雜交改進的是突變率。
單點和多點雜交算法存在一個問題,雜交的染色體中某些部分的基因會被過早地捨棄,這是因爲在交換前它們必須肯定交換父本染色體交換位前面仍是後面的基因,從而對於那些無關的基因段在交換前就已經收斂了。
均勻雜交算法(Uniform Crossover)就能夠解決上述算法的這種侷限性,該算法的主要過程以下:
洗牌雜交的最大特色是一般將染色體的中點做爲基因的交換點,即從每一個父本中取它們一半的基因重組成新的個體。
另外針對於實值編碼方式,還有離散雜交、中間雜交、線性雜交和擴展線性雜交等算法。
精英保留策略是一種典型的選擇策略。精英保留策略是指每次迭代都保留已發現的最優解。這個策略是顯而易見的,咱們不可能捨棄已發現的最優解,而只使用最後一代種羣的最優解。同時,採用精英保留策略的典型遺傳算法是保證收斂到全局最優解的。
輪盤賭選擇策略是基於機率進行選擇策略。輪盤賭算法有放回地採樣出原種羣大小的新一代種羣,個體 Ii 的採樣機率以下所示,
從機率上看,在某一輪中,即便是適應度最差的個體,也存在必定的概率能進入到下一輪,這種策略提升了多樣性,但減緩了收斂性。
錦標賽法從大小爲 n 的種羣隨機選擇 k(k小於n) 個個體,而後在 k 個個體中選擇適應度最大的個體做爲下一代種羣的一個個體。反覆屢次,直到下一代種羣有 n 個個體。
在大天然,物種的進化是以多種羣的形式併發進行的。通常來講,一個物種只有一個種羣了,意味着這個物種有滅亡的危險(例如恐龍)。
受此啓發,人們提出了多種羣遺傳算法。多種羣遺傳算法保持多個種羣同時進化,具體流程以下圖所示,
多種羣遺傳算法和遺傳算法執行屢次的區別在於移民,種羣之間會經過移民的方式交換基因。這種移民操做會帶來更多的多樣性。
遺傳算法中,決定個體變異長度的主要因素有兩個:交叉機率pc,和變異機率pm。
在實際工程問題中,須要針對不一樣的優化問題和目標,反覆實驗來肯定pc和pm,調參成本很高。
並且在遺傳算法訓練的不一樣階段,咱們須要不一樣的pc和pm,
Srinivas.M and Patnaik.L.M (1994) 爲了讓遺傳算法具有更好的自適應性,提出來自適應遺傳算法。在論文中,pc和pm的計算公式以下:
遺傳算法的全局搜索能力強,但局部搜索能力較弱。這句話怎麼理解呢?
好比對於一條染色體,遺傳算法並不會去看看這條染色體周圍局部的染色體適應度怎麼樣,是否比這條染色體好。遺傳算法會經過變異和交叉產生新的染色體,但新產生的染色體可能和舊染色差的很遠。所以遺傳算法的局部搜索能力差。
相對的,梯度法、登山法和貪心法等算法的局部搜索能力強,運算效率也高。
受此啓發,人們提出了混合遺傳算法,將遺傳算法和這些算法結合起來。混合遺傳算法的框架是遺傳算法的,只是生成新一代種羣以後,對每一個個體使用局部搜索算法尋找個體周圍的局部最優勢。
整體來講,遺傳算法和梯度法分別表明了隨機多樣性優化和漸進定向收斂性優化的兩種思潮,取各自的優勢是一種很是好的思路。
Relevant Link:
Srinivas M, Patnaik L M. Adaptive probabilities of crossover and mutation in genetic algorithms[J]. Systems, Man and Cybernetics, IEEE Transactions on, 1994, 24(4): 656-667. http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e5%9b%9b%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%9a%84%e5%8f%98%e7%a7%8d
這個章節,咱們來完成一個小實驗,咱們如今有一個數據集,數據集的生成算法以下:
# -*- coding: utf-8 -*- from random import random,randint,choice def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): rows = [] for i in range(200): x=randint(0, 40) y=randint(0, 40) rows.append([x, y, hiddenfunction(x, y)]) return rows if __name__ == '__main__': print buildhiddenset()
部分數據例以下圖:
可視化以下,
# -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression from random import random,randint,choice from mpl_toolkits.mplot3d import axes3d def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): X = [] y = [] for i in range(200): x_ = randint(0, 40) y_ = randint(0, 40) X.append([x_, y_]) y.append(hiddenfunction(x_, y_)) return np.array(X), np.array(y) if __name__ == '__main__': # generate a dataset X, y = buildhiddenset() print "X:", X print "y:", y fig = plt.figure() ax = fig.gca(projection='3d') ax.set_title("3D_Curve") ax.set_xlabel("x") ax.set_ylabel("y") ax.set_zlabel("z") # draw the figure, the color is r = read figure = ax.plot(X[:, 0], X[:, 1], y, c='r') plt.show()
很顯然,一定存在一些函數,能夠將(X,Y)映射爲結果欄對應的數字,如今問題是這個(些)函數究竟是什麼?
從數據分析和數理統計分析的角度來看,這個問題彷佛也不是很複雜。咱們能夠用多元線性迴歸來嘗試擬合數據集。
# -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression from random import random,randint,choice from mpl_toolkits.mplot3d import axes3d from sklearn.metrics import accuracy_score from sklearn.metrics import classification_report, confusion_matrix import numpy as np np.set_printoptions(threshold=np.inf) def hiddenfunction(x,y): return x**2 + 2*y + 3*x + 5 def buildhiddenset(): X = [] y = [] for i in range(100): x_ = randint(0, 40) y_ = randint(0, 40) X.append([x_, y_]) y.append(hiddenfunction(x_, y_)) return np.array(X), np.array(y) if __name__ == '__main__': # generate a dataset X, y = buildhiddenset() print "X:", X print "y:", y fig = plt.figure() ax = fig.gca(projection='3d') ax.set_title("3D_Curve") ax.set_xlabel("x") ax.set_ylabel("y") ax.set_zlabel("z") # draw the figure, the color is r = read #figure = ax.plot(X[:, 0], X[:, 1], y, c='r') #plt.show() # use Scikit-Learn PolynomialFeature class to constructing parameter terms. # a,b,degree=2: [a, b, a^2, ab, b^2] # a,b,degree=3: [a, b, a^2, ab, b^2, a^3, a^2b, ab^2, b^3] # a,b,c,degree=3: [a, b, c, a^2, ab, ac, b^2, bc, c^2, a^3, a^2b, a^2c, ab^2, ac^2, abc, b^3, b^2c, bc^2, c^3] poly_features = PolynomialFeatures(degree=2, include_bias=False) # fit the dataset with Polynomial Regression Function, and X_poly is the fitting X result X_poly = poly_features.fit_transform(X) lin_reg = LinearRegression() lin_reg.fit(X_poly, y) print(lin_reg.intercept_, lin_reg.coef_) y_predict = lin_reg.predict(X_poly) y_predict = [int(i) for i in y_predict] print "y_predict: ", y_predict print "y: ", y print 'accuracy for LinearRegression is: {0}'.format(accuracy_score(y, y_predict)) print 'error for LinearRegression is: {0}'.format(confusion_matrix(y, y_predict)) # draw the prediction curve X_new_1 = np.linspace(0, 40, 100).reshape(100, 1) X_new_2 = np.linspace(0, 40, 100).reshape(100, 1) X_new = np.hstack((X_new_1, X_new_2)) # fit the X_new dataset with Polynomial Regression Function, and X_new_poly is the fitting X result X_new_poly = poly_features.transform(X_new) y_new = lin_reg.predict(X_new_poly) y_new = [int(i) for i in y_new] #print "X_new: ", X_new #print "y_new: ", y_new #print "y: ", y # draw the prediction line figure = ax.plot(X[:, 0], X[:, 1], y, c='r') figure = ax.plot(X_new[:, 0], X_new[:, 1], y_new, c='b', linewidth=2, label="Predictions") plt.show()
紅色是數據集,藍色線是多項式擬合結果
多項式迴歸的擬合結果以下:
由於這是一個迴歸分析問題,所以精度的要求很是高,雖然實際的偏差並非很高(0.55),使用多項式擬合得到了0.45的擬合精確度,不算特別高,出錯的點還挺多的。
這麼一看,好像多項式迴歸的效果不好了?並非這樣的。
咱們來對比下模型學習到的多項式參數翻譯成多項式函數,和原始數據集背後的目標函數形式的差距:
print(lin_reg.intercept_, lin_reg.coef_) (5.0000000000009095, array([ 3.00000000e+00, 2.00000000e+00, 1.00000000e+00, , ])) a,b,degree=2: [a, b, a^2, ab, b^2] 模型學習到的:H = 3*X + 2*Y + X^2 + 4.70974246e-17*X*Y -7.81622966e-16*Y^2 + 5 原始數據集目標函數:H = 3*X + 2*Y + X^2 + 5
能夠看到,多項式迴歸出現了一些過擬合的現象,多出了兩項:X*Y、和Y^2,若是沒有這兩項,那麼多項式迴歸獲得的方程就是完美的原始方程。
從上面的實驗中,咱們能夠獲得幾點啓發:
從這個問題,繼續延伸思考,相信讀者朋友在研究和工程項目中經常會遇到一個有趣的現象:針對某個固定的數據集,咱們設計出一個很是精巧的神經網絡,擁有上千萬的參數。和另外一我的用一個只有十幾萬參數的簡單經典神經網絡進行benchmark對比,發現性能並無太大的提高,甚至沒有提高,固然,性能也沒有降低。咋一看,複雜模型和簡單的效果同樣好。
對這個問題的研究和解釋,已經開始有很是多的學者和科研機構在進行,目前提出了不少的解釋框架(例如teacher-student模型),筆者本身在項目中也一樣遇到了這個問題。一個初步的見解是,對於一個特定場景的數據集來講,是存在一個肯定的複雜度上限的。如今若是有兩個模型,一個模型剛恰好達到這個複雜度上限,另外一個模型遠遠超過了這個複雜度上限,那麼後面那個模型就會出現所謂的過擬合現象,可是,模型會經過權重分配,將高出複雜度上限以外的多餘項(神經元)都分配0或者很是低的權重,讓這部分神經元稱爲冗餘項。最終,無論咱們用多麼複雜的模型,起做用的永遠只有那一小部分神經元在發揮做用,預測的效果也同樣好,過擬合問題並無影響最終的分類預測結果。
筆者小節:
使用多項式迴歸對一個數據集進行擬合這種作法,本質上是一種先天經驗主義思惟,它的科學假設是」複雜可約性「。所謂複雜可約性是複雜理論中的一個概念,指的是一個複雜的系統能夠經過一個簡化的模型進行約簡歸納,經過歷史的樣本學習這個約簡系統的結構參數,同時假設這個約簡系統可以比真實的複雜系統推算的更快,從而藉助約簡系統對複雜系統進行將來預測。
上面的話說的有一些繞,簡單來講就是,無論數據集背後的真實目標函數是什麼,咱們均可以用像低階多項式函數這種簡單模型來進行擬合學習,並利用學習到的模型對將來可能出現的新數據集進行預測。
這種基於先驗的簡化建模思想在今天的機器學習學術界和工業界應用很是普遍,也發揮了很好的效果。但其實咱們還有另外一種看世界的角度,那就是隨機過程。隨機過程事先不作任何假設,而是基於隨機或者遵循某種策略下的隨機,不斷進行自我迭代與優化,從一種混沌狀態逐漸收斂到目標狀態。
在這個章節,咱們要討論遺傳編程,仍是上面那個問題,咱們如今換一種思惟來想:咱們所見的數據集背後無非是一個函數公式來決定的,而函數公式又是由一些基本的」數學符號「以及」數學運算符「號組合而成的,數學符號和數學運算符的組合方式咱們將其視爲一個符號搜索空間,咱們直接去搜索這個符號搜索空間便可,經過數據集做爲反饋,直到收斂爲止,即找到了完美的目標函數。
接下來咱們逐個小節來分解任務,一步步達到咱們的目標。
咱們能夠用python構造出樹狀程序的基礎數據結構。這棵樹由若干節點組成,根據與之關聯的函數的不一樣,這些節點又能夠擁有必定數量的子節點。
有些節點將會返回傳遞給程序的參數;另外一些則會返回常量;還有一些則會返回應用於其子節點之上的操做。
一個封裝類,對應於」函數型「節點上的函數。其成員變量包括了函數名稱、函數自己、以及該函數接受的參數個數(子節點)。
class fwrapper: def __init__(self,function,params,name): self.function=function self.childcount=param self.name=name
對應於函數型節點(帶子節點的節點)。咱們以一個fwrapper類對其進行初始化。當evaluate被調用時,咱們會對各個子節點進行求值運算,而後再將函數自己應用於求得的結果。
class node: def __init__(self,fw,children): self.function=fw.function self.name=fw.name self.children=children def evaluate(self,inp): results=[n.evaluate(inp) for n in self.children] return self.function(results) def display(self,indent=0): print (' '*indent)+self.name for c in self.children: c.display(indent+1)
這個類對應的節點只返回傳遞給程序的某個參數。其evaluate方法返回的是由idx指定的參數。
class paramnode: def __init__(self,idx): self.idx=idx def evaluate(self,inp): return inp[self.idx] def display(self,indent=0): print '%sp%d' % (' '*indent,self.idx)
返回常量值的節點。其evaluate方法僅返回該類被初始化時所傳入的值。
class constnode: def __init__(self,v): self.v=v def evaluate(self,inp): return self.v def display(self,indent=0): print '%s%d' % (' '*indent,self.v)
除了基礎數學符號數據結構以外,咱們還須要定義一些針對節點的操做函數。
一些簡單的符號運算符(例如add、subtract),能夠用lambda內聯方式定義,另一些稍微複雜的運算符則須要在單獨的語句塊中定義,不論哪一種狀況,都被會封裝在一個fwrapper類中。
addw=fwrapper(lambda l:l[0]+l[1],2,'add') subw=fwrapper(lambda l:l[0]-l[1],2,'subtract') mulw=fwrapper(lambda l:l[0]*l[1],2,'multiply') def iffunc(l): if l[0]>0: return l[1] else: return l[2] ifw=fwrapper(iffunc,3,'if') def isgreater(l): if l[0]>l[1]: return 1 else: return 0 gtw=fwrapper(isgreater,2,'isgreater') flist=[addw,mulw,ifw,gtw,subw]
如今,咱們能夠利用前面建立的節點類來構造一個程序樹了,咱們來嘗試寫一個符號推理程序,
# -*- coding: utf-8 -*-
import gp
def exampletree():
# if arg[0] > 3:
# return arg[1] + 5
# else:
# return arg[1] - 2
return gp.node(
gp.ifw, [
gp.node(gp.gtw, [gp.paramnode(0), gp.constnode(3)]),
gp.node(gp.addw, [gp.paramnode(1), gp.constnode(5)]),
gp.node(gp.subw, [gp.paramnode(1), gp.constnode(2)])
]
)
if __name__ == '__main__':
exampletree = exampletree()
# expected result = 1
print exampletree.evaluate([2, 3])
# expected result = 8
print exampletree.evaluate([5, 3])
至此,咱們已經成功在python中構造出了一個以樹爲基礎的語言和解釋器。
如今咱們已經有能力進行形式化符號編程了,回到咱們的目標,生成一個可以擬合數據集的函數。首先第一步是須要隨機初始化一個符號函數,即初始化種羣。
建立一個隨機程序的步驟包括:
def makerandomtree(pc,maxdepth=4,fpr=0.5,ppr=0.6): if random()<fpr and maxdepth>0: f=choice(flist) children=[makerandomtree(pc,maxdepth-1,fpr,ppr) for i in range(f.childcount)] return node(f,children) elif random()<ppr: return paramnode(randint(0,pc-1)) else: return constnode(randint(0,10))
該函數首先建立了一個節點併爲其隨機選了一個函數,而後它遍歷了隨機選中的函數所需的子節點,針對每個子節點,函數經過遞歸調用makerandomtree來建立新的節點。經過這樣的方式,一顆完整的樹就被構造出來了。
僅當被隨機選中的函數再也不要求新的子節點時(即若是函數返回的是一個常量或輸入參數時),向下建立分支的過程纔會結束。
按照遺傳編程算法的要求,每一輪迭代中都要對種羣個體進行定量評估,獲得一個個體適應性的排序。
與優化技術同樣,我摩恩必須找到一種衡量題解優劣程度的方法,不少場景下,優劣程度並不容易定量評估(例如網絡安全中經常是非黑即白的二分類)。可是在本例中,咱們是在一個數值型結果的基礎上對程序進行測試,所以能夠很容易經過絕對值偏差進行評估。
def scorefunction(tree,s): dif=0 for data in s: v=tree.evaluate([data[0],data[1]]) dif+=abs(v-data[2]) return dif
咱們來試一試初始化的隨機種羣的適應性評估結果,
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': hiddenset = gp.buildhiddenset() random1 = gp.makerandomtree(2) random2 = gp.makerandomtree(2) print gp.scorefunction(random1, hiddenset) print gp.scorefunction(random2, hiddenset)
隨機初始化的函數種羣的適應性並非很好,這符合咱們的預期。
當表現最好的程序被選定以後,它們就會被複制並修改以進入到下一代。前面說到,遺傳變異有兩種方式,mutation和crossover,
變異的作法是對某個程序進行少許的修改,一個樹狀程序能夠有多種修改方式,包括:
須要注意的是,變異的次數不宜過多(基因突變不能太頻繁)。例如,咱們不宜對整棵樹上的大多數節點都實施變異,相反,咱們能夠位任何須要進行修改的節點定義一個相對較小的機率。從樹的根節點開始,若是每次生成的隨機數小於該機率值,就以如上所述的某種方式對節點進行變異。
def mutate(t,pc,probchange=0.1): if random()<probchange: return makerandomtree(pc) else: result=deepcopy(t) if hasattr(t,"children"): result.children=[mutate(c,pc,probchange) for c in t.children] return result
除了變異,另外一種修改程序的方法被稱爲交叉或配對,即:從本輪種羣中的優秀適應着中,選出兩個將其進行部分子樹交換。執行交叉操做的函數以兩棵樹做爲輸入,並同時開始向下遍歷,當到達某個隨機選定的閾值時,該函數便會返回前一棵樹的一份拷貝,樹上的某個分支會被後一棵樹上的一個分支所取代。經過同時對兩棵樹的即時遍歷,函數會在每棵樹上大體位於相同層次的節點處實施交叉操做。
def crossover(t1,t2,probswap=0.7,top=1): if random()<probswap and not top: return deepcopy(t2) else: result=deepcopy(t1) if hasattr(t1,'children') and hasattr(t2,'children'): result.children=[crossover(c,choice(t2.children),probswap,0) for c in t1.children] return result
讀者朋友可能會注意到,對於某次具體的變異或者交叉來講,新的種羣個體並不必定會帶來更好的性能,實際上,新種羣個體的性能幾乎徹底是隨機的。從生物進化論的角度來講,遺傳變異是無方向的,隨機的,遺傳變異的目標僅僅是引入多樣性,形成演化的是環境選擇壓(數據集的偏差反饋)。
如今,咱們將上面的步驟串起來,讓遺傳演化不斷的循環進行。本質上,咱們的思路是要生成一組隨機程序並擇優複製和修改,而後一直重複這一過程直到終止條件知足爲止。
def getrankfunction(dataset):
def rankfunction(population):
scores=[(scorefunction(t,dataset),t) for t in population]
scores.sort()
return scores
return rankfunction
def evolve(pc,popsize,rankfunction,maxgen=500,
mutationrate=0.1,breedingrate=0.4,pexp=0.7,pnew=0.05):
# Returns a random number, tending towards lower numbers. The lower pexp
# is, more lower numbers you will get
def selectindex():
return int(log(random())/log(pexp))
# Create a random initial population
population=[makerandomtree(pc) for i in range(popsize)]
for i in range(maxgen):
scores=rankfunction(population)
print "function score: ", scores[0][0]
if scores[0][0]==0: break
# The two best always make it
newpop=[scores[0][1],scores[1][1]]
# Build the next generation
while len(newpop)<popsize:
if random()>pnew:
newpop.append(mutate(
crossover(scores[selectindex()][1],
scores[selectindex()][1],
probswap=breedingrate),
pc,probchange=mutationrate))
else:
# Add a random node to mix things up
newpop.append(makerandomtree(pc))
population=newpop
scores[0][1].display()
return scores[0][1]
上述函數首先建立一個隨機種羣,而後循環至多maxgen次,每次循環都會調用rankfunction對程序按表現從優到劣的順序進行排列。表現優者會不加修改地自動進入到下一代,咱們稱這樣的方法爲精英選拔髮(elitism)。
至於下一代中的其餘程序,則是經過隨機選擇排名靠前者,再通過交叉和變異以後獲得的。
這個過程是一直重複下去,知道某個程序達到了完美的擬合適配(損失爲0),或者重複次數達到了maxgen次爲止。
evolve函數有多個參數,用以從不一樣方面對競爭環境加以控制,說明以下:
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': rf = gp.getrankfunction(gp.buildhiddenset()) gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)
程序運行的很是慢,在筆者的mac上運行了15min才最終收斂到0。有意思的是,儘管這裏給出的解是徹底正確的,可是它明顯比咱們數據集背後的真實目標函數要複雜得多,也就是說發生了過擬合。
可是,咱們若是運用一些代數知識,將遺傳編程獲得函數進行約簡,會發現它和目標函數實際上是等價的(p0爲X,p1爲Y)。
((X+6)+Y)+X + (if( (X*Y)>0 ){X}else{X} + X*X) + (X + (Y - (X + if(6>0){1}else{0})) ) # X*Y恆大於0 2*X + Y + 6 + X + X**2 + (X + (Y - (X + if(6>0){1}else{0})) ) # 6恆大於0 2*X + Y + 6 + X + X**2 + X + Y - X + 1 X**2 + 3*X + 2*Y + 5
能夠看到,遺傳編程這種基於內顯性的構造方式,能夠在形式上獲得一個全局最優解,這點上比基於優化算法的逼近方法要好。
同時,上述例子告訴咱們遺傳編程的一個重要特徵:遺傳算法找到的題解也許是徹底正確的,亦或是很是不錯的。可是一般這些題解遠比真實的目標函數要複雜得多。在遺傳編程獲得的題解中,咱們發現有不少部分是不作任何工做的,或者對應的是形式複雜,但始終都只返回同一結果的公式,例如"if(6>0){1}else{0})",只是1的一種多餘的表達方式而已。
從上一章的遺傳優化結果咱們能夠看到,在數據集是充分典型集的狀況下,過擬合是不影響模型的收斂的。
遺傳編程的這種冗餘性和優化算法和深度神經網絡中的冗餘結構本質上是一致的,這是一個冗餘的過擬合現象,即程序的題解很是複雜,可是卻對最終的決策沒有影響,惟一的缺點就是浪費了不少cpu時鐘。
可是另外一方面,這種冗餘過擬合帶來的一個額外的風險是,」若是數據集是非典型的,那麼過擬合就會致使嚴重的後果「。
咱們須要明白的是,過擬合的罪魁禍首不是模型和優化算法,而偏偏是數據集自己。在本例中咱們清楚地看到,當咱們可以獲得一個完整的典型集訓練數據時,過擬合問題就會退化成一個冗餘魯棒可約結構。
可是反之,若是咱們的數據集由於系統噪聲或採樣不徹底等緣由,沒有拿到目標函數的典型集,那麼因爲複雜模型帶來的過擬合問題就會引起很嚴重的預測誤差。咱們來稍微修改一下代碼,將原始數據集中隨機剔除1/10的數據,使數據的充分性和典型性降低,而後再來看遺傳編程最後的函數優化結果,
# -*- coding: utf-8 -*- import gp if __name__ == '__main__': hiddenset = gp.buildhiddenset() # 按照 5% 模來採樣,即剔除1/10的數據,模擬採樣不徹底的狀況 cnt = 0 for i in hiddenset: if cnt % 10 == 0: hiddenset.remove(i) cnt += 1 print hiddenset rf = gp.getrankfunction(hiddenset) gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)
獲得的函數約簡結果爲:
if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + (Y+4) + X^2 + (Y + (X + (X+X))) # if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} 不可約 if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + X**2 + 3*X + 2*Y + 4 # 真實的目標函數爲: X**2 + 3*X + 2*Y + 5
能夠看到,在數據集不完整的狀況下,遺傳算法就算完美擬合了訓練集,可是也沒法真正逼近目標函數,但這不是遺傳算法的問題,而是受到數據集的制約。
更重要的是,由於數據集的非典型性(數據機率分佈缺失),致使模型引入了真正的」過擬合複雜結構「,即」if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0}「,這是一個區間函數。要知道,這僅僅是一個很是小的例子,尚且引入瞭如此的不肯定性,在更復雜和更復雜的問題中,數據集的機率分佈缺失會引起更大的」多餘過擬合複雜結構問題「,影響程度的多少,根據數據缺失的程度而定。
這反過來提醒了咱們這些數據科學工做者,在解決一個實際問題的時候,不要過度糾結你的模型是否是足夠好,層數是否是足夠深,而要更多地關注你的數據集,數據集的質量直接決定了最終的模型效果。更進一步地說,若是你能找到一種方法,能100%拿到目標函數的數據全集,那麼恭喜你,隨便用一個機器學習模型均可以取得很好的效果。
對於遺傳編程,咱們還須要再談一個關鍵問題,即多樣性問題。
咱們看到evolve函數中,會將最優的個體直接進入下一代,除此以外,對排名以後的個體也會按照比例和機率選擇性地進行復制和修改以造成新的種羣,這種作法有什麼意義呢?
最大的問題在於,僅僅選擇表現最優異的少數個體,很快就會使種羣變得極端同質化(homogeneous),或稱爲近親交配。儘管種羣中所包含的題解,表現都很是不錯,可是它們彼此間不會有太大的差別,由於在這些題解間進行的交叉操做最終會致使羣內的題解變得愈來愈類似。咱們稱這一現象爲達到局部最大化(local maxima)。
對於種羣而言,局部最大化是一種不錯的狀態(即收斂了),但還稱不上最佳的狀態。由於處於這種狀態的種羣裏,任何細小的變化都不會對最終的結果產生太大的變化。這就是一個哲學上的矛盾與對立,收斂穩定與發散變化是彼此對立又統一的,徹底偏向任何一方都是不對的。
事實代表,將表現極爲優異的題解和大量成績尚可的題解組合在一塊兒,每每可以獲得更好的結果。基於這個緣由,evolve提供了兩個額外的參數,容許咱們對篩選進程中的多樣性進行調整。
這兩個參數都會有效地增長進化過程當中的多樣性,同時又不會對進程有過多的擾亂,由於,表現最差的程序最終老是會被剔除掉的(遺傳編程的馬爾科夫收斂性)。
Relevant Link:
《集體智慧編程》
咱們須要生成一段正則表達式,這個正則表達式須要可以匹配到全部的M數據集,同時不匹配全部的U數據集,且同時還要儘可能短,即不能是簡單的M數據集的並集拼接。
定義一個目標(損失)函數來評估每次題解的好壞,
,其中nM表明匹配M的個數,nU表明匹配U的個數,wI表明獎勵權重,r表明該正則表達式的長度
算法優化的目標是使上式儘可能大。
在討論遺傳編程以前,咱們先從常規思路,用一種貪婪迭代算法來解決這個問題。咱們的數據集以下,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools def words(text): return set(text.split()) if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') print M & U
首先,確認了U和M之間不存在交集,這道題理論上是有解的,不然無解。
咱們先準肯定義出何時意味着獲得了一個可行解,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') some_answer = "a*" print mistakes(some_answer, M, U)
能夠看到,當咱們輸入正則」a*「的時候出現了不少錯誤,顯然」a*「不是咱們要的答案。讀者朋友能夠試着輸入」foo「試試。
上面構造正則候選集的過程說的可能有些抽象,這裏經過代碼示例來講明一下,
# -*- coding: utf-8 -*- from __future__ import division import re import itertools OR = '|'.join # Join a sequence of strings with '|' between them cat = ''.join # Join a sequence of strings with nothing between them Set = frozenset # Data will be frozensets, so they can't be mutated. def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=4): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def dotify(part): "Return all ways to replace a subset of chars in part with '.'." choices = map(replacements, part) return {cat(chars) for chars in itertools.product(*choices)} def replacements(c): return c if c in '^$' else c + '.' def regex_covers(M, U): """Generate regex components and return a dict of {regex: {winner...}}. Each regex matches at least one winner and no loser.""" losers_str = '\n'.join(U) wholes = {'^'+winner+'$' for winner in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} reps = {r for p in parts for r in repetitions(p)} pool = wholes | parts | pairs(M) | reps searchers = {p:re.compile(p, re.MULTILINE).search for p in pool} return {p: Set(filter(searchers[p], M)) for p in pool if not searchers[p](losers_str)} def pairs(winners, special_chars=Set('*+?^$.[](){}|\\')): chars = Set(cat(winners)) - special_chars return {A+'.'+q+B for A in chars for B in chars for q in '*+?'} def repetitions(part): """Return a set of strings derived by inserting a single repetition character ('+' or '*' or '?'), after each non-special character. Avoid redundant repetition of dots.""" splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)] return {A + q + B for (A, B) in splits # Don't allow '^*' nor '$*' nor '..*' nor '.*.' if not (A[-1] in '^$') if not A.endswith('..') if not (A.endswith('.') and B.startswith('.')) for q in '*+?'} def tests(): assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'} assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'} assert dotify('it') == {'it', 'i.', '.t', '..'} assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'} assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...', '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'} assert repetitions('a') == {'a+', 'a*', 'a?'} assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'} assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c', 'a.c+', 'a.*c', 'a.?c', 'a.+c', 'a.c*', 'a.c?'} assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$', '^a..d+$', '^a..d*$', '^a..d?$'} assert pairs({'ab', 'c'}) == { 'a.*a', 'a.*b', 'a.*c', 'a.+a', 'a.+b', 'a.+c', 'a.?a', 'a.?b', 'a.?c', 'b.*a', 'b.*b', 'b.*c', 'b.+a', 'b.+b', 'b.+c', 'b.?a', 'b.?b', 'b.?c', 'c.*a', 'c.*b', 'c.*c', 'c.+a', 'c.+b', 'c.+c', 'c.?a', 'c.?b','c.?c'} assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3 return 'tests pass' if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') some_answer = "a*" # print mistakes(some_answer, M, U) print tests()
筆者思考:
集合覆蓋問題(set cover problem)是一個NP問題,幾乎沒有辦法直接獲得全局最優解。對這類複雜問題,一個有效的優化逼近方式就是貪婪迭代逼近,每次都求解一個局部最優值(例如每次生成一個可以覆蓋最大M集合,可是不匹配U集合的正則子串),最後經過將全部局部最優解Ensemble起來獲得一個最終題解(集成學習思想)。
咱們已經有了生成題解候選集的函數,也有了評估題解是否正確的損失函數,咱們如今能夠來將他們組合起來,用於生成咱們的目標題解。
前面說過,咱們的算法是一個迭代式的貪婪算法,所以,咱們每次尋找一個可以最大程度匹配儘可能多M的正則子串,而後將本輪已經匹配到的M子串刪除,並對餘下的M子串繼續搜索答案,直到全部的M子串都被成功匹配爲止。
# -*- coding: utf-8 -*- from __future__ import division import re import itertools OR = '|'.join # Join a sequence of strings with '|' between them cat = ''.join # Join a sequence of strings with nothing between them Set = frozenset # Data will be frozensets, so they can't be mutated. def words(text): return set(text.split()) def mistakes(regex, M, U): "The set of mistakes made by this regex in classifying M and U." return ({"Should have matched: " + W for W in M if not re.search(regex, W)} | {"Should not have matched: " + L for L in U if re.search(regex, L)}) def verify(regex, M, U): assert not mistakes(regex, M, U) return True def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=4): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def dotify(part): "Return all ways to replace a subset of chars in part with '.'." choices = map(replacements, part) return {cat(chars) for chars in itertools.product(*choices)} def replacements(c): return c if c in '^$' else c + '.' def regex_covers(M, U): """Generate regex components and return a dict of {regex: {winner...}}. Each regex matches at least one winner and no loser.""" losers_str = '\n'.join(U) wholes = {'^'+winner+'$' for winner in M} parts = {d for w in wholes for p in subparts(w) for d in dotify(p)} reps = {r for p in parts for r in repetitions(p)} pool = wholes | parts | pairs(M) | reps searchers = {p:re.compile(p, re.MULTILINE).search for p in pool} return {p: Set(filter(searchers[p], M)) for p in pool if not searchers[p](losers_str)} def pairs(winners, special_chars=Set('*+?^$.[](){}|\\')): chars = Set(cat(winners)) - special_chars return {A+'.'+q+B for A in chars for B in chars for q in '*+?'} def repetitions(part): """Return a set of strings derived by inserting a single repetition character ('+' or '*' or '?'), after each non-special character. Avoid redundant repetition of dots.""" splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)] return {A + q + B for (A, B) in splits # Don't allow '^*' nor '$*' nor '..*' nor '.*.' if not (A[-1] in '^$') if not A.endswith('..') if not (A.endswith('.') and B.startswith('.')) for q in '*+?'} def tests(): assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'} assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'} assert dotify('it') == {'it', 'i.', '.t', '..'} assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'} assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...', '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'} assert repetitions('a') == {'a+', 'a*', 'a?'} assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'} assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c', 'a.c+', 'a.*c', 'a.?c', 'a.+c', 'a.c*', 'a.c?'} assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$', '^a..d+$', '^a..d*$', '^a..d?$'} assert pairs({'ab', 'c'}) == { 'a.*a', 'a.*b', 'a.*c', 'a.+a', 'a.+b', 'a.+c', 'a.?a', 'a.?b', 'a.?c', 'b.*a', 'b.*b', 'b.*c', 'b.+a', 'b.+b', 'b.+c', 'b.?a', 'b.?b', 'b.?c', 'c.*a', 'c.*b', 'c.*c', 'c.+a', 'c.+b', 'c.+c', 'c.?a', 'c.?b','c.?c'} assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3 return 'tests pass' def findregex(winners, losers, k=4, addRepetition=False): "Find a regex that matches all winners but no losers (sets of strings)." # Make a pool of regex parts, then pick from them to cover winners. # On each iteration, add the 'best' part to 'solution', # remove winners covered by best, and keep in 'pool' only parts # that still match some winner. if addRepetition: pool = regex_covers(winners, losers) else: pool = regex_parts(winners, losers) solution = [] def score(part): return k * len(matches(part, winners)) - len(part) while winners: best = max(pool, key=score) solution.append(best) winners = winners - matches(best, winners) pool = {r for r in pool if matches(r, winners)} return OR(solution) if __name__ == '__main__': M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') solution = findregex(M, U, addRepetition=True) if verify(solution, M, U): print len(solution), solution solution = findregex(M, U, addRepetition=False) if verify(solution, M, U): print len(solution), solution
咱們來作一個和網絡安全相關的實驗,咱們如今有黑白兩份樣本,分別表明M和U,咱們如今嘗試用本節討論的算法來生成一段正則。
可是筆者在實際操做中發現,用regex golf這種問題的搜索空間是很是巨大的,當M和U的規模擴大時(例如大量webshell文件),所產生的正則子串候選集會是一個巨量的天文數字,該算法本質上仍是至關於在進行窮舉搜索,搜索效率十分低下。
更進一步地,筆者嘗試繼續擴大黑白樣本量(超過10的時候),算法已經沒法搜索出有效的正則題解,這說明,當黑白樣本超過必定數量的時候,alpha字符空間中黑白樣本已經存在交叉,全局解不存在。
仍是上一小節的Regex golf問題,如今咱們來嘗試用遺傳編程來優化搜索效率。
原論文的策略是基於遺傳算法生成一個」xx | xx「的雙正則子串,即每次獲得的個體最多有兩個子串,而後按照上一小節中相似的貪婪策略進行逐步OR拼接。
筆者這裏決定修改一下思路,直接基於遺傳編程對整個題解空間進行搜索,即構造一個完整題解的regex tree,這是一種全局最優解搜索的優化思路。
咱們的整體策略依然是貪婪分段策略,也就說,咱們要尋找的最終正則題解是由不少個」|「組成的分段正則表達式。如今咱們來定義咱們的題解中可能出現的基本元素,這裏,咱們依然採用樹結構做爲基礎數據結構的承載:
基於上述基本元素定義,咱們能夠將題解正則表達式抽象爲一個樹結構(regex tree),以下圖,
(foo) | (ba++r)
該樹結構能夠經過深度優先遍歷,打印出最終的題解正則串,如上圖的標題所示。
# -*- coding: utf-8 -*-
from random import random, randint, choice
import re
import itertools
# "ROOT"
class rootnode:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node
def display(self):
return "|"
# universal child node
class node:
def __init__(self, node):
self.node = node
# "|"
class spliternode:
def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder):
if left_child_dotplaceholder and right_child_dotplaceholder:
self.left_child_dotplaceholder = left_child_dotplaceholder
self.right_child_dotplaceholder = right_child_dotplaceholder
else:
self.left_child_dotplaceholder = node
self.right_child_dotplaceholder = node
def display(self):
return "|"
# "(.)"
class dotplaceholdernode:
def __init__(self, childnode=None):
if childnode:
self.childnode = childnode
else:
self.childnode = node
# "foo"
class charnode:
def __init__(self, charstring):
if charstring:
self.charstring = charstring
else:
self.charstring = node
def display(self):
return self.charstring
# ".."
class concat_node:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node
# "++"
class qualifiernode:
def __init__(self, qualifierstrig):
if qualifierstrig:
self.qualifierstrig = qualifierstrig
else:
self.qualifierstrig = node
def display(self):
return self.qualifierstrig
def exampletree():
return rootnode(
dotplaceholdernode(
charnode("foo")
),
dotplaceholdernode(
concat_node(
concat_node(
charnode("ba"),
qualifiernode("++")
),
charnode("r")
)
)
)
# left child deep first travel
def printregextree(rootnode_i):
if rootnode_i is None:
return ""
if isinstance(rootnode_i, rootnode):
# concat the finnal regex str
finnal_regexstr = ""
finnal_regexstr += printregextree(rootnode_i.left_child_node)
finnal_regexstr += rootnode_i.display()
finnal_regexstr += printregextree(rootnode_i.right_child_node)
return finnal_regexstr
if isinstance(rootnode_i, spliternode):
# concat the finnal regex str
split_regexstr = ""
split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder)
split_regexstr += rootnode_i.display()
split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder)
return split_regexstr
if isinstance(rootnode_i, dotplaceholdernode):
return printregextree(rootnode_i.childnode)
if isinstance(rootnode_i, charnode):
return rootnode_i.display()
if isinstance(rootnode_i, concat_node):
concat_str = ""
concat_str += printregextree(rootnode_i.left_child_node)
concat_str += printregextree(rootnode_i.right_child_node)
return concat_str
if isinstance(rootnode_i, qualifiernode):
return rootnode_i.display()
def matches(regex, strings):
"Return a set of all the strings that are matched by regex."
return {s for s in strings if re.search(regex, s)}
def regex_parts(M, U):
"Return parts that match at least one winner, but no loser."
wholes = {'^' + w + '$' for w in M}
parts = {d for w in wholes for p in subparts(w) for d in p}
return wholes | {p for p in parts if not matches(p, U)}
def subparts(word, N=5):
"Return a set of subparts of word: consecutive characters up to length N (default 4)."
return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))
def words(text):
return set(text.split())
def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0):
if curren_level > maxdepth:
print "curren_level > maxdepth: ", curren_level
return
# ROOT node
if isinstance(parentnode, rootnode):
curren_level = 0
print "curren_level: ", curren_level
# init root node
print "init rootnode: ", curren_level
rootnode_i = rootnode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
# create left child node
print "new dotplaceholdernode"
rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
print "new dotplaceholdernode"
# create right child node
rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
return rootnode_i
# ".." dot placeholder node
if isinstance(parentnode, dotplaceholdernode):
curren_level += 1
print "curren_level: ", curren_level
# "|"
if random() < splitrate:
print "new spliternode"
return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# ".."
elif random() < concatrate:
print "new concat_node"
return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo"
elif random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "|" split node
if isinstance(parentnode, spliternode):
curren_level += 1
print "curren_level: ", curren_level
print "init spliternode"
splitnode_i = spliternode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
print "new dotplaceholdernode"
splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
print "new dotplaceholdernode"
splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
return splitnode_i
# ".." concat node
if isinstance(parentnode, concat_node):
curren_level += 1
print "curren_level: ", curren_level
# "foo"
if random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "++"
if random() < qualifierate:
print "new qualifiernode"
return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo" char node
if isinstance(parentnode, charnode):
curren_level += 1
print "curren_level: ", curren_level
charnode_str = choice(list(regex_parts(M, U)))
print "charnode_str: ", charnode_str
print "new charnode"
charnode_i = charnode(charnode_str)
return charnode_i
# "++" qualifierate node
if isinstance(parentnode, qualifiernode):
curren_level += 1
print "curren_level: ", curren_level
qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?'])
print "qualifiernode_str: ", qualifiernode_str
print "new qualifiernode"
qualifiernode_i = qualifiernode(qualifiernode_str)
return qualifiernode_i
if __name__ == '__main__':
exampletree = exampletree()
print type(exampletree), exampletree
print printregextree(exampletree)
有了基本的數據結構,如今定義一下regex tree的生長準則,
這裏須要用到代價敏感學習的訓練思路,若是直接按照下面公式進行損失訓練,
那麼很快就會收斂到最優解:」|「上,緣由很顯然,全字符匹配中,nm和nu都是相等的,相減爲0,而後減去字符串長度1,就是-1,這是算法能找到的最好答案了。
爲了解決這個問題,咱們須要對TP和FP採起不一樣的懲罰力度,
def scorefunction(tree, M, U, w=1):
dif = 0
regex_str = printregextree(tree)
M_cn, U_cn = 0, 0
for s in list(M):
try:
if re.search(regex_str, s):
M_cn += 1
except Exception, e:
print e.message, "regex_str: ", regex_str
# this regex tree is illegal, low socre!!
return -8
for u in list(U):
if re.search(regex_str, u):
U_cn += 1
# print "M_cn: ", M_cn
# print "U_cn: ", U_cn
dif = w * (M_cn - U_cn) - len(regex_str)
return dif
上面代碼中有一點值得注意,因爲regex tree的生成具備必定的隨機性,所以極可能產生不合法的正則串,所以對不合法的正則串給予較低的分值,驅使它淘汰。
有了損失函數的定義,就能夠很容易算出一個種羣中全部個體的適應度排名。
def rankfunction(M, U, population):
scores = [(scorefunction(t, M, U), t) for t in population]
scores.sort()
return scores
按照遺傳編程的定義,咱們先隨機初始化一棵符合題解規約的regex tree,
# -*- coding: utf-8 -*- from random import random, randint, choice import re import itertools # "ROOT" class rootnode: def __init__(self, left_child_node, right_child_node): if left_child_node and right_child_node: self.left_child_node = left_child_node self.right_child_node = right_child_node else: self.left_child_node = node self.right_child_node = node def display(self): return "|" # universal child node class node: def __init__(self, node): self.node = node # "|" class spliternode: def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder): if left_child_dotplaceholder and right_child_dotplaceholder: self.left_child_dotplaceholder = left_child_dotplaceholder self.right_child_dotplaceholder = right_child_dotplaceholder else: self.left_child_dotplaceholder = node self.right_child_dotplaceholder = node def display(self): return "|" # "(.)" class dotplaceholdernode: def __init__(self, childnode=None): if childnode: self.childnode = childnode else: self.childnode = node # "foo" class charnode: def __init__(self, charstring): if charstring: self.charstring = charstring else: self.charstring = node def display(self): return self.charstring # ".." class concat_node: def __init__(self, left_child_node, right_child_node): if left_child_node and right_child_node: self.left_child_node = left_child_node self.right_child_node = right_child_node else: self.left_child_node = node self.right_child_node = node # "++" class qualifiernode: def __init__(self, qualifierstrig): if qualifierstrig: self.qualifierstrig = qualifierstrig else: self.qualifierstrig = node def display(self): return self.qualifierstrig def exampletree(): return rootnode( dotplaceholdernode( charnode("foo") ), dotplaceholdernode( concat_node( concat_node( charnode("ba"), qualifiernode("++") ), charnode("r") ) ) ) # left child deep first travel def printregextree(rootnode_i): if rootnode_i is None: return "" if isinstance(rootnode_i, rootnode): # concat the finnal regex str finnal_regexstr = "" finnal_regexstr += printregextree(rootnode_i.left_child_node) finnal_regexstr += rootnode_i.display() finnal_regexstr += printregextree(rootnode_i.right_child_node) return finnal_regexstr if isinstance(rootnode_i, spliternode): # concat the finnal regex str split_regexstr = "" split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder) split_regexstr += rootnode_i.display() split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder) return split_regexstr if isinstance(rootnode_i, dotplaceholdernode): return printregextree(rootnode_i.childnode) if isinstance(rootnode_i, charnode): return rootnode_i.display() if isinstance(rootnode_i, concat_node): concat_str = "" concat_str += printregextree(rootnode_i.left_child_node) concat_str += printregextree(rootnode_i.right_child_node) return concat_str if isinstance(rootnode_i, qualifiernode): return rootnode_i.display() def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in p} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=5): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def words(text): return set(text.split()) def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0): if curren_level > maxdepth: print "curren_level > maxdepth: ", curren_level return # ROOT node if isinstance(parentnode, rootnode): curren_level = 0 print "curren_level: ", curren_level # init root node print "init rootnode: ", curren_level rootnode_i = rootnode( dotplaceholdernode(None), dotplaceholdernode(None) ) # create left child node print "new dotplaceholdernode" rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) print "new dotplaceholdernode" # create right child node rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return rootnode_i # ".." dot placeholder node if isinstance(parentnode, dotplaceholdernode): curren_level += 1 print "curren_level: ", curren_level # "|" if random() < splitrate: print "new spliternode" return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # ".." elif random() < concatrate: print "new concat_node" return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" elif random() < charrate: print "new charnode" return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "|" split node if isinstance(parentnode, spliternode): curren_level += 1 print "curren_level: ", curren_level print "init spliternode" splitnode_i = spliternode( dotplaceholdernode(None), dotplaceholdernode(None) ) print "new dotplaceholdernode" splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) print "new dotplaceholdernode" splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return splitnode_i # ".." concat node if isinstance(parentnode, concat_node): curren_level += 1 print "curren_level: ", curren_level # "foo" if random() < charrate: print "new charnode" return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "++" if random() < qualifierate: print "new qualifiernode" return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" char node if isinstance(parentnode, charnode): curren_level += 1 print "curren_level: ", curren_level charnode_str = choice(list(regex_parts(M, U))) print "charnode_str: ", charnode_str print "new charnode" charnode_i = charnode(charnode_str) return charnode_i # "++" qualifierate node if isinstance(parentnode, qualifiernode): curren_level += 1 print "curren_level: ", curren_level qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?']) print "qualifiernode_str: ", qualifiernode_str print "new qualifiernode" qualifiernode_i = qualifiernode(qualifiernode_str) return qualifiernode_i if __name__ == '__main__': exampletree = exampletree() print type(exampletree), exampletree print printregextree(exampletree) M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') print regex_parts(M, U) rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) print rd1 print printregextree(rd1)
能夠看到,隨機產生的題解並非一個好的題解,可是它倒是一個有效的題解。如今咱們有了一個好的開始,接下來咱們能夠着手進行遺傳變異了。
遺傳編程的遺傳變異分爲兩種:1)mutate;2)crossover,咱們分別來定義。
mutate變異遍歷整個regex tree,針對不一樣節點採起不一樣的變異策略:
def mutate(M, U, t, probchange=0.2):
if random() < probchange:
return makerandomtree(M, U)
else:
result = deepcopy(t)
if hasattr(t, "left_concatchildnode"):
result.left_concatchildnode = mutate(M, U, t.left_concatchildnode, probchange)
if hasattr(t, "right_concatchildnode"):
result.right_concatchildnode = mutate(M, U, t.right_concatchildnode, probchange)
if hasattr(t, "childnode"):
result.childnode = mutate(M, U, t.childnode, probchange)
if hasattr(t, "qualifierstrig"):
result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?']))
if hasattr(t, "charstring"):
result.charstring = charnode(choice(list(regex_parts(M, U))))
return result
整體來講,mutate的做用在於向種羣中引入更多的多樣性,隨機性和多樣性是物種進化的原動力。
crossover交叉是同序遍歷兩棵regex tree,按照必定的機率決定是否要將各自的節點進行互換。
def crossover(t1, t2, probswap=0.7): if random() < probswap: return deepcopy(t2) else: result = deepcopy(t1) if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'): result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap) if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'): result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap) if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'): result.childnode = crossover(t1.childnode, t2.childnode, probswap) if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'): result.qualifierstrig = t2.qualifierstrig if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'): result.charstring = t2.charstring return result
整體來講,crossover的做用在於加速優秀基因和保留,和劣質基因的淘汰。由於能夠這麼理解,由於crossover的存在,同一個基因模式在種羣中會有擴大的趨勢,而若是是優秀的基因則會不斷被保留。
至此,regex tree遺傳進化的全部元素都已經準備穩當了,咱們如今能夠開始編寫遺傳進化算法主程序了,讓程序自動生成一段符合題解的正則。
# -*- coding: utf-8 -*- from random import random, randint, choice import re from copy import deepcopy import itertools from math import log import numpy as np import os # "ROOT" class rootnode: def __init__(self, left_childnode, right_childnode): if left_childnode and right_childnode: self.left_childnode = left_childnode self.right_childnode = right_childnode else: self.left_childnode = node self.right_childnode = node def display(self): return "|" # universal child node class node: def __init__(self, node): self.node = node # "|" class spliternode: def __init__(self, left_childnode, right_childnode): if left_childnode and right_childnode: self.left_childnode = left_childnode self.right_childnode = right_childnode else: self.left_childnode = node self.right_childnode = node def display(self): return "|" # "(.)" class dotplaceholdernode: def __init__(self, childnode=None): if childnode: self.childnode = childnode else: self.childnode = node # "foo" class charnode: def __init__(self, charstring): if charstring: self.charstring = charstring else: self.charstring = node def display(self): return self.charstring # ".." class concat_node: def __init__(self, left_concatchildnode, right_concatchildnode): if left_concatchildnode and right_concatchildnode: self.left_concatchildnode = left_concatchildnode self.right_concatchildnode = right_concatchildnode else: self.left_concatchildnode = node self.right_concatchildnode = node # "++" class qualifiernode: def __init__(self, qualifierstrig): if qualifierstrig: self.qualifierstrig = qualifierstrig else: self.qualifierstrig = node def display(self): return self.qualifierstrig def exampletree(): return rootnode( dotplaceholdernode( charnode("foo") ), dotplaceholdernode( concat_node( concat_node( charnode("ba"), qualifiernode("++") ), charnode("r") ) ) ) # left child deep first travel def printregextree(rootnode_i): if rootnode_i is None: return "" if isinstance(rootnode_i, rootnode): # concat the finnal regex str finnal_regexstr = "" finnal_regexstr += printregextree(rootnode_i.left_childnode) finnal_regexstr += rootnode_i.display() finnal_regexstr += printregextree(rootnode_i.right_childnode) return finnal_regexstr if isinstance(rootnode_i, spliternode): # concat the finnal regex str split_regexstr = "" split_regexstr += printregextree(rootnode_i.left_childnode) split_regexstr += rootnode_i.display() split_regexstr += printregextree(rootnode_i.right_childnode) return split_regexstr if isinstance(rootnode_i, dotplaceholdernode): return printregextree(rootnode_i.childnode) if isinstance(rootnode_i, charnode): return rootnode_i.display() if isinstance(rootnode_i, concat_node): concat_str = "" concat_str += printregextree(rootnode_i.left_concatchildnode) concat_str += printregextree(rootnode_i.right_concatchildnode) return concat_str if isinstance(rootnode_i, qualifiernode): return rootnode_i.display() def matches(regex, strings): "Return a set of all the strings that are matched by regex." return {s for s in strings if re.search(regex, s)} def regex_parts(M, U): "Return parts that match at least one winner, but no loser." wholes = {'^' + w + '$' for w in M} parts = {d for w in wholes for p in subparts(w) for d in p} return wholes | {p for p in parts if not matches(p, U)} def subparts(word, N=5): "Return a set of subparts of word: consecutive characters up to length N (default 4)." return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N)) def words(text): return set(text.split()) def makerandomtree(M, U, charnode_pool, parentnode=rootnode(None, None), splitrate=0.7, concatrate=0.7, charrate=0.7, qualifierate=0.7, maxdepth=6, curren_level=0, stopearly=0.1): if curren_level > maxdepth: #print "curren_level > maxdepth: ", curren_level return if random() < stopearly: return # ROOT node if isinstance(parentnode, rootnode): curren_level = 0 #print "curren_level: ", curren_level # init root node #print "init rootnode: ", curren_level rootnode_i = rootnode( dotplaceholdernode(None), dotplaceholdernode(None) ) # create left child node #print "new dotplaceholdernode" rootnode_i.left_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.left_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) #print "new dotplaceholdernode" # create right child node rootnode_i.right_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.right_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return rootnode_i # ".." dot placeholder node if isinstance(parentnode, dotplaceholdernode): curren_level += 1 #print "curren_level: ", curren_level # "|" if random() < splitrate: #print "new spliternode" return makerandomtree(M, U, charnode_pool, spliternode(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # ".." elif random() < concatrate: #print "new concat_node" return makerandomtree(M, U, charnode_pool, concat_node(None, None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" elif random() < charrate: #print "new charnode" return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "|" split node if isinstance(parentnode, spliternode): curren_level += 1 #print "curren_level: ", curren_level #print "init spliternode" splitnode_i = spliternode( dotplaceholdernode(None), dotplaceholdernode(None) ) #print "new dotplaceholdernode" splitnode_i.left_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.left_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) #print "new dotplaceholdernode" splitnode_i.right_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.right_childnode, splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) return splitnode_i # ".." concat node if isinstance(parentnode, concat_node): curren_level += 1 #print "curren_level: ", curren_level # "foo" if random() < charrate: #print "new charnode" return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "++" if random() < qualifierate: #print "new qualifiernode" return makerandomtree(M, U, charnode_pool, qualifiernode(None), splitrate, concatrate, charrate, qualifierate, maxdepth, curren_level) # "foo" char node if isinstance(parentnode, charnode): curren_level += 1 #print "curren_level: ", curren_level charnode_str = choice(charnode_pool) #print "charnode_str: ", charnode_str #print "new charnode" charnode_i = charnode(charnode_str) return charnode_i # "++" qualifierate node if isinstance(parentnode, qualifiernode): curren_level += 1 #print "curren_level: ", curren_level qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?']) #print "qualifiernode_str: ", qualifiernode_str #print "new qualifiernode" qualifiernode_i = qualifiernode(qualifiernode_str) return qualifiernode_i def scorefunction(tree, M, U, w=1): dif = 0 regex_str = printregextree(tree) M_cn, U_cn = 0, 0 for s in list(M): try: if re.search(regex_str, s): M_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! return -32 for u in list(U): if re.search(regex_str, u): U_cn += 1 # print "M_cn: ", M_cn # print "U_cn: ", U_cn dif = w * (M_cn - 2*U_cn) - len(regex_str) return dif def rankfunction_(M, U, population): scores = [(scorefunction(t, M, U), t) for t in population] # remove illegal regex scores_ = [] for i in scores: if i[1]: scores_.append(i) scores_.sort(reverse=True) return scores_ def mutate(M, U, charnode_pool, t, probchange=0.4): if random() < probchange: return makerandomtree(M, U, charnode_pool) else: result = deepcopy(t) if hasattr(t, "left_concatchildnode"): result.left_concatchildnode = mutate(M, U, charnode_pool, t.left_concatchildnode, probchange) if hasattr(t, "right_concatchildnode"): result.right_concatchildnode = mutate(M, U, charnode_pool, t.right_concatchildnode, probchange) if hasattr(t, "childnode"): result.childnode = mutate(M, U, charnode_pool, t.childnode, probchange) if hasattr(t, "qualifierstrig"): result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?'])) if hasattr(t, "charstring"): result.charstring = charnode(choice(charnode_pool)) return result def crossover(t1, t2, probswap=0.5): if random() < probswap: return deepcopy(t2) else: result = deepcopy(t1) if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'): result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap) if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'): result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap) if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'): result.childnode = crossover(t1.childnode, t2.childnode, probswap) if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'): result.qualifierstrig = t2.qualifierstrig if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'): result.charstring = t2.charstring return result def evolve(M, U, charnode_pool, popsize=128, rankfunction=rankfunction_, maxgen=500, mutationrate=0.6, probswap=0.5, pexp=0.3, pnew=0.8): # Returns a random number, tending towards lower numbers. # The lower pexp is, more lower numbers you will get # probexp:表示在構造新種羣時,」選擇評價較低的程序「這一律率的遞減比例。該值越大,相應的篩選過程就越嚴格,即只選擇評價最高的多少比例的個體做爲複製對象 def selectindex(): return int(log(random()) / log(pexp)) # Create a random initial population population = [makerandomtree(M, U, charnode_pool) for i in range(popsize)] scores = [] for i in range(maxgen): scores = rankfunction(M, U, population) print scores[0] print "evole round: {0}, top score: {1}, regex_str: {2}".format(i, scores[0][0], printregextree(scores[0][1])) if scores[0][0] > 0: print "found good solution: {0}".format(printregextree(scores[0][1])) break # The top 20% always make it # newpop = np.array(scores)[:int(len(scores) * 0.2), 1].tolist() newpop = [scores[0][1], scores[1][1]] # Build the next generation # probnew:表示在構造新種羣時,」引入一個全新的隨機程序「的機率,該參數和probexp是」種羣多樣性「的重要決定參數 while len(newpop) < popsize: if random() < pnew: newpop.append( mutate( M, U, charnode_pool, crossover( scores[selectindex()][1], scores[selectindex()][1], probswap ), mutationrate ) ) else: # Add a random node to mix things up new_tree = makerandomtree(M, U, charnode_pool) # print "evole round: {0}, add new tree: {1}".format(i, printregextree(new_tree)) newpop.append(new_tree) population = newpop # return the evolutionary results return scores[0][1] def test_regex(M, U, regex_str): dif = 0 M_cn, U_cn = 0, 0 for s in list(M): try: if re.search(regex_str, s): M_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! dif = -32 for u in list(U): try: if re.search(regex_str, u): U_cn += 1 except Exception, e: # print e.message, "regex_str: ", regex_str # this regex tree is illegal, low socre!! dif = -32 print "M_cn: ", M_cn print "U_cn: ", U_cn dif = 1 * (M_cn - 4 * U_cn) - 4 * len(regex_str) print "dif: ", dif def test_regex_golf(): # exampletree = exampletree() # print type(exampletree), exampletree # print printregextree(exampletree) M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''') U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''') charnode_pool = list(regex_parts(M, U)) print charnode_pool # rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) # rd2 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0) # print "rd1: " # print printregextree(rd1) # print "rd2: " # print printregextree(rd2) # dif = scorefunction(tree=rd1, M=M, U=U, w=1) # print "dif: ", dif # population = [makerandomtree(M, U) for i in range(10)] # for i in population: # print printregextree(i) # scores = rankfunction_(M, U, population) # print "function score: ", scores # print np.array(scores)[:int(len(scores) * 0.2), 1].tolist() # rd1_mutate = mutate(M, U, rd1, probchange=0.2) # print "rd1_mutate: " # print printregextree(rd1_mutate) # rd1_rd2_crossover = crossover(rd1, rd2, probswap=0.7) # print "rd1_rd2_crossover: " # print printregextree(rd1_rd2_crossover) evolutionary_regex_str = evolve(M, U, charnode_pool) print printregextree(evolutionary_regex_str) def load_data(): M, U = [], [] rootDir = "./blacksamples" for lists in os.listdir(rootDir): if lists == '.DS_Store': continue filepath = os.path.join(rootDir, lists) filecontent = open(filepath, 'r').read() # only remain English word cop = re.compile("[^^a-z^A-Z^0-9^\s]") # remove space filecontent = re.sub(r'\s+', '', filecontent).strip() filecontent = cop.sub('', filecontent) M.append(filecontent) rootDir = "./whitesamples" for lists in os.listdir(rootDir): if lists == '.DS_Store': continue filepath = os.path.join(rootDir, lists) filecontent = open(filepath, 'r').read() # only remain English word cop = re.compile("[^^a-z^A-Z^0-9^\s]") filecontent = cop.sub('', filecontent) # remove space filecontent = re.sub(r'\s+', '', filecontent).strip() U.append(filecontent) M = set(M) U = set(U) return M, U def test_webshell(): M, U = load_data() # print M charnode_pool = list(regex_parts(M, U)) print charnode_pool print len(charnode_pool) evolutionary_regex_str = evolve(M, U, charnode_pool) print printregextree(evolutionary_regex_str) print "test_regex: " test_regex(M, U, charnode_pool) if __name__ == '__main__': test_webshell() # test_regex_golf()
代碼中的blacksamples、whitesamples請讀者朋友自行準備。
Relevant Link:
http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%ba%8c%e6%84%9a%e5%bc%84%e6%b7%b1%e5%ba%a6%e5%ad%a6%e4%b9%a0%e7%9a%84%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95 Bartoli, Alberto, et al. 「Playing regex golf with genetic programming.」 Proceedings of the 2014 conference on Genetic and evolutionary computation. ACM, 2014. https://alf.nu/RegexGolf http://www.doc88.com/p-0387699026353.html http://regex.inginf.units.it/golf/# https://github.com/norvig/pytudes https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313.ipynb https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313-part2.ipynb
咱們來回顧一下要應用遺傳編程在某個具體場景中,須要的兩個必要條件:
基於php token生成一個php ast tree,而且在損失函數中找到必定定量評估方法,判斷當前文件的惡意程度。用遺傳編程自動生成一些能夠繞過當前檢測機制的php webshell
筆者提醒:
本質上來講,遺傳編程的優化方向是隨機的,和梯度驅動的SGD優化算法相比,遺傳編程每次的迭代優化並不明確朝某個方向前進,而是被動由環境來進行淘汰和篩選,因此是一種環境選擇壓驅動的優化算法。
遺傳編程的這種的優化特性特別適合像「惡意樣本檢測」這種「階躍損失」的分類問題,由於對於惡意樣原本說,只有兩種狀態,「黑或白」,損失函數的值也只有兩種,「0或者1」,所以,咱們沒法用SGD相似的算法來優化這種階躍損失函數問題,由於階躍點處的梯度要麼不存在(左極限),要麼是無窮大的(右極限)。
可是遺傳編程依然能在每輪篩選出「優勝者」,並按照必定的策略保留優勝者,進行交叉和變異以進入下一輪,同時也會按照必定的機率挑選部分的「失敗者」也進入下一輪進化,這麼作的目的是引入多樣性。