遺傳編程(GA,genetic programming)算法初探,以及用遺傳編程自動生成符合題解的正則表達式的實踐

1. 遺傳編程簡介

0x1:什麼是遺傳編程算法,和傳統機器學習算法有什麼區別

傳統上,咱們接觸的機器學習算法,都是被設計爲解決某一個某一類問題的肯定性算法。對於這些機器學習算法來講,惟一的靈活性體如今參數搜索空間上,向算法輸入樣本,算法藉助不一樣的優化手段,對參數進行調整,以此來獲得一個對訓練樣本和測試樣本的最佳適配參數組。php

遺傳編程算法徹底走了另外一外一條路,遺傳編程算法的目標是編寫一個程度,這個程序會嘗試自動構造出解決某一問題的最佳程度。從本質上看,遺傳編程算法構造的是一個可以構造算法的算法html

另外一方面,咱們曾經討論過遺傳算法,遺傳算法是一種優化技術,就優化技術而言,不管是何種形式的優化,算法或度量都是預先設定好的,而優化算法所作的工做就是嘗試爲其找到最佳參數。和優化算法同樣,遺傳編程也須要一種方法來度量題解的優劣程度。但與優化算法不一樣的是,遺傳編程中的題解並不只僅是一組用於給定算法的參數,相反,在遺傳編程中,連同算法自己及其全部參數,都是須要搜索肯定的。node

從某種程度上來講,遺傳編程和遺傳算法的區別在於,進化的基本單位不一樣,python

  • 遺傳優化:進化的基本單位是模型可變參數
  • 遺傳編程:進化的基本單位是新算法以及新算法的參數

0x2:遺傳編程和進化論的關係

遺傳算法是受達爾文的進化論的啓發,借鑑生物進化過程而提出的一種啓發式搜索算法,所以遺傳算法 ( GA , Genetic Algorithm ) 也稱進化算法 。 所以,在討論遺傳編程的時候,會大量借用進化論中的術語和概念,爲了更好地討論遺傳算法,咱們先介紹一些基本生物進化概念,git

  • 基因 ( Gene ):一個遺傳因子,種羣中的最基本單元。
  • 染色體 ( Chromosome ):一組的基因。
  • 個體 ( individual ):單個生物。在遺傳算法中,個體通常只包含一條染色體。
  • 種羣 ( Population ):由個體組成的羣體。生物的進化以種羣的形式進化。
  • 適者生存 ( The survival of the fittest ):對環境適應度高的個體參與繁殖的機會比較多,後代就會愈來愈多。適應度低的個體參與繁殖的機會比較少,後代就會愈來愈少。

生物所處的環境起到一個提供生存壓的做用(反饋),雖然縱觀整個地球歷史,環境的因素是在不斷變化的(有時甚至變化的還很快),可是在某個時間段內(例如5000年內)是基本保持不變的,而物種進化的目的就是經過一代代的繁衍,逐漸適應(擬合)當前的環境,並和其餘物種達到最優平衡(納什均衡)。github

遺傳編程算法就是模擬了生物進化的過程,簡單說來講,web

  • 生物進化的環境由一個用戶定義的任務(user-defined task)所決定,算法由一組初始的題解(程序)開始展開競爭。這裏所謂的任務能夠是多種形式,
    • 一種競賽(game):各個題解(程序)在競賽中直接展開競爭
    • 個體測試:測出哪一個題解(程序)的執行效果更好
  • 遺傳算法將基因抽象爲題解中最小的隨機變量因子(例如模型中的可變參數)
  • 一個問題的解由不少這樣的隨機變化因子組成,算法將問題的解編碼成個體的染色體(染色體是基因的集合)
  • 單個個體包含若干個染色體,個體包含的染色體(題解)越多和越好,則個體的適應度就越好。在實際工程中,爲了簡化算法,經常假設一個個體只有一條染色體
  • 多個個體組成種羣,種羣中適應度(Fitness)高的個體得到較高几率的繁殖機會,從而致使適應度高的個體會愈來愈多,通過N代的天然選擇後,保存下來的個體都是適應度很高的
  • 繁殖過程當中,算法會評估並挑選出本輪表現最好的一部分題解題解(程序),並對程序的某些部分以隨機(必定機率)的方式進行修改,包括:    
    • 基因交叉(Acrossover):在最優題解之間,挑選部分隨機變量因子進行彼此互換。遺傳算法交叉比人體內染色體交叉要簡單許多。遺傳算法的染色體是單倍體,而人體內的真正的染色體是雙倍體。下圖是遺傳算法中兩條染色體在中間進行交叉的示意圖,
    • 基因突變(Mutation):在最優題解上,直接對某些隨機變量因子(基因位)進行隨機修改。下圖是遺傳算法中一條染色體在第二位發生基因變異的示意圖,
  • 通過繁殖過程,新的種羣(即新的一組解)產生,稱爲「下一代」,理論上,這些新的題解基於原來的最優程序,但又不一樣於它們。這些新產生的題解和舊的最優題解會一塊兒進入下一輪天然選擇階段
  • 上述繁殖過程重複屢次,直到達到收斂條件,包括,
    • 找到了全局最優解
    • 找到了表現足夠好的解
    • 題解在歷經數代以後都沒有獲得任何改善
    • 繁衍的代數達到了規定的限制
  • 最終,歷史上適應度最高個體所包含的解,做爲遺傳算法的輸出

下圖是遺傳算法的流程圖,正則表達式

0x3:遺傳編程的不一樣類型

從大的方面看,遺傳編程的兩個重要概念是基因型表現型算法

  • 基因型就是種羣個體的編碼;
  • 表現型是種羣個體所表示的程序片斷;

其實遺傳算法領域的研究中,這兩個方面的研究都有,可是,由於遺傳編程很難直接處理程序片斷(表現型)(例如一段C++可執行代碼、或者是一段python代碼),由於基於隨機變異獲得的新代碼極可能沒法經過編譯器語法檢查。shell

可是相比之下,遺傳算法反而容易處理程序片斷的內在結構(基因型)(例如C++代碼的AST抽象語法樹)。

因此,筆者認爲基因型的遺傳算法研究纔是更有研究價值的一個方向,本文的討論也會圍繞基因型的遺傳算法展開。

根據基因型形態的不一樣,遺傳編程方法能夠分爲三種:

  • 線性遺傳編程
  • 基於樹的遺傳編程
  • 基於圖的遺傳編程

1. 線性遺傳編程

線性遺傳編程有廣義狹義之分,

  • 廣義線性遺傳編程將候選程序編碼進定長或者變長的字符串,即基因型是線性字符串,包括
    • Multi-Expression Programming (MEP)
    • Grammatical Evolution (GE)
    • Gene Expression Programming (GEP)
    • Cartesian Genetic Programming (CGP):該算法是一種很適合電路設計的遺傳編程算法,好比咱們要用兩個加操做兩個減操做和兩個乘操做獲得以下運算,
      • 笛卡爾遺傳編程將下面的一個候選程序編寫進字符串"001 100 131 201 044 254 2573"。字符串中的三位數字「xyz"表示x操做的輸入是y和z兩個連線,字符串中最後的四位數字"opqr"表示輸出opqr四個連線。笛卡爾遺傳編程只用變異操做,而不用交叉操做。
    • Genetic Algorithm for Deriving Software (GADS)
  • 狹義線性遺傳編程中的候選程序是彙編語言或者高級編程語言程序(例如C程序)。一個狹義線性遺傳編程的個體能夠是一段簡單 C 語言指令,這些指令做用在必定數量預先定義的變量或者常量上(變量數量通常爲指令個數的4倍)。下圖是一個狹義線性遺傳編程候選程序的示例,
    • ,能夠看到,變量數量和指令數量都是固定的,經過不一樣的排列組合方式獲得不一樣的代碼表現形式
http://www.doc88.com/p-630428999834.html
https://pdfs.semanticscholar.org/958b/f0936eda72c3fc03a09a0e6af16c072449a1.pdf

2. 基於樹的遺傳編程 

基於樹的遺傳編程的基因型是樹結構。基於樹的遺傳編程是遺傳編程最先的形態,也是遺傳編程的主流方法。

大多數編程語言,在編譯或解釋時,首先會被轉換成一棵解析樹(Lisp編程語言及其變體,本質上就是一種直接訪問解析樹的方法),例以下圖,

樹上的節點有多是枝節點也多是葉節點

  • 枝節點表明了應用於其子節點之上的某一種操做
  • 葉節點表明了某個參數或常量值

例如上圖中,圓形節點表明了應用於兩個分支(Y變量和常量值5)之上的求和操做。一旦咱們求出了此處的計算值,就會將計算結果賦予上方的節點處。相應的,這一計算過程會一直向下傳播,直到遍歷全部的葉子節點(深度優先遞歸遍歷)。 

若是對整棵樹進行遍歷,咱們會發現它至關於下面這個python函數:

在遺傳變異方面,基於樹的遺傳編程的演化操做有兩種,變異和交叉,

  • 變異:基於樹的遺傳編程的變異操做有兩種(區別在於變異的範圍不一樣),
    • 一種是隨機變換樹中的符號或者操做符
    • 另外一種是隨機變換子樹
    • ,該圖左下角是變換符號或者操做符的結果,右下角是變換子樹的結果。 
  • 交叉:兩個顆樹之間隨機交換子樹
    • ,兩棵樹之間的部分節點發生了隨機互換

3. 基於圖的遺傳編程

樹是一種特殊的圖,所以人們很天然地想到將基於樹的遺傳編程擴展到基於圖的遺傳編程。下圖就是基於圖的遺傳編程的基因型的一個示例。 

 

Relevant Link: 

《Adaptation in Natural and Artificial Systems》 John Henry Holland 1992
http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%80%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%ae%80%e4%bb%8b
《Evolving Evolutionary Algorithms using Linear Genetic Programming (2005)》
《A comparison of several linear genetic programming techniques》Oltean, Mihai, and Crina Grosan.  Complex Systems 14.4 (2003): 285-314.
https://www.jianshu.com/p/a953066cb2eb 

 

2. 遺傳編程的數學基礎

這個章節,咱們用數學的形式化視角,來從新審視一下遺傳算法。

0x1:基本數學符號定義 

I 種羣中的個體
m 全部可能個體的數量
n 種羣大小
pm 變異機率
pc 交叉機率
f(I) 個體I的適應度。
p(I)t 第t代種羣中,個體I出現的機率
第t代種羣平均適應度。第t代種羣中個體適應度的平均值。

由於遺傳算法中有各類各樣的編碼方式、變異操做、交叉操做和選擇操做,遺傳算法的形態呈現多樣性。

爲了簡化分析,咱們這裏假設一個典型遺傳算法,即,

  • 編碼方式是二進制編碼:基因的取值只能是0或者1
  • 變異操做將全部染色體全部基因位以恆定 pm 的機率翻轉
  • 交叉操做選擇選擇相鄰的個體,以 pc 的機率決定是否須要交叉。若是須要交叉,隨機選擇一個基因位,並交換這個基因位以及以後的全部基因
  • 每一代的新種羣選擇操做採用輪盤賭算法(依據機率大小):有放回地採樣出原種羣大小的新一代種羣,個體 Ii 的採樣機率以下所示,
    •  

0x2:模式定理 - 機率視角看基因模式的遺傳

模式定理是遺傳算法創始人 J.Holland 在其突破性著做《Adaptation in Natural and Artificial Systems》引入的,用於分析遺傳算法的工做原理。

模式是指基因編碼空間中,由一類類似的基因組抽象獲得的pattern,好比 [0,*,*,1] 就是一個模式。染色體[0,1,0,1]和[0,0,0,1]都包含該模式。

在具體討論模式定理以前,咱們先介紹一些符號,

L(H) 模式的長度。第一固定基因位和最後一個固定基因位的距離,其中L([0,*,*,1])=3。
O(H) 模式的階。固定基因位的個數,其中O([0,*,*,1])=2。

模式平均適應度。種羣中包含該模式的個體適應度的平均值。
p(H)t 在第t代種羣中,模式H出現的機率。

【模式定理】

在本章定義的典型遺傳算法中,下面公式成立:

這個公式看起來有點複雜,其實很是簡單,咱們逐個部分來分解,

  • 選擇操做對模式H在下一代出現的影響是固定的,即:
  • 某個模式在繁衍中,既有可能發生變異,也有可能發生交叉,因此有後面兩個括號相乘
  • 某個模式在變異中,變異操做將全部基因位以 pm 的機率翻轉,所以模式H不被破壞的機率爲(1pm)O(H)。當0<=x<=1和n=1,...時,不等式(1pm)O(H>= 1O(H)pm成立,從而通過變異操做,模式H的出現機率爲,
  • 某個模式在交叉中,交叉操做選擇選擇相鄰的個體,以 pc 的機率決定是否須要交叉。若是須要交叉,隨機選擇一個基因位,並交換這個基因位以及以後的全部基因。所以模式H不被破壞的機率爲(1pc)(1L(H)/L1>= − pcL(H)/L1。通過交叉操做,模式H的出現機率爲,

整體來講,遺傳算法須要在,選擇操做引發的收斂性和變異交叉操做引發的多樣性之間取得平衡

模式定理的通俗說法是這樣的,低階短序以及平均適應度高於種羣平均適應度的模式在子代中呈指數增加。

低階、短長以及平均適應度高於種羣平均適應度的模式H,

此時,

 

即模式H呈現指數增加。

0x3:馬爾柯夫鏈分析 - 遺傳編程收斂性分析

這個小節咱們來討論一個有些理論化的問題,即:遺傳編程算法通過必定次數的迭代後,是否會收斂到某個穩態?若是會達到穩態,遺傳編程的收斂速度是多少?

要解決這個問題,咱們須要引入數學工具,馬爾柯夫鏈,有以下定義。

  • 用 pt 表示第 t 時刻的不一樣狀態的機率
  • 表示轉移機率矩陣,其中 Pi,j表示從第 i 個狀態轉移到第 j 個狀態的機率
  • 齊次馬爾科夫鏈的第 t+1 時刻的狀態只和第 t 時刻有關,能夠用公式 pt+1=pt表示
  • 若存在一個天然數 k,使得 P中的全部元素大於0,則稱 爲素矩陣。隨着 k 趨近於無窮,Pk 收斂於 P=1Tp, 其中p=plimkPk=p0 是和初始狀態無關的惟一值,而且全部元素大於0。這實際上是由馬爾柯夫鏈穩態定理決定的。 

咱們把整個種羣的狀態當作馬爾科夫鏈的一個狀態 s,交叉、變異和選擇操做則構建了一個機率轉移矩陣。通常狀況下,0<pm<1,0<=pc<=1,即物種變異必定會發生,但不是必然100%發生。咱們來分析一下這時的機率轉移矩陣的性質。

  • 讓 C,M,分別表示交叉、變異和選擇操做帶來的機率轉移,總體機率轉移矩陣 P=CMS
    • 通過變異操做,種羣狀態 si 轉化成種羣狀態 sj 的機率 Mi,j=(pm)h(1pm)nl-h>0,其中h是兩個種羣之間不一樣值的基因位數量。也就是說,是素矩陣
    • 通過選擇操做,種羣狀態 si 保持不變的機率也就是說, 的全部列一定有一元素大於0。咱們也能夠知道機率轉移矩陣 是素矩陣

標準的優化算法分析第一個要關心的問題是,優化算法能不能收斂到全局最優勢。假設全局最優勢的適應度值爲maxf,收斂到全局最優勢的定義以下,

一言以蔽之,典型遺傳算法並不收斂

根據機率轉移矩陣收斂定理,咱們能夠知道典型遺傳算法會收斂到一個全部種羣狀態機率都大於0的機率分佈上(穩態)。所以以後,不包含全局最優解的種羣必定會不停出現,從而致使上面的公式不成立。

可是筆者這裏要強調的是,這章討論的典型遺傳算法在實際工程中是幾乎不存在的,實際上,幾乎全部遺傳算法代碼都會將保持已發現最優解。加了這個變化以後的遺傳算法是收斂的。

仍是根據上述機率轉移矩陣收斂定理,咱們能夠知道遺傳算法會收斂到一個全部種羣狀態機率都大於0的機率分佈上,那麼包含全局最優解的種羣必定會不停出現,保持已發現最優解的作法會使得上面的公式成立。

Relevant Link: 

Adaptation in Natural and Artificial Systems: An Introductory Analysis with Applications to Biology, Control, and Artificial Intelligence
Rudolph, Günter. 「Convergence analysis of canonical genetic algorithms.」 Neural Networks, IEEE Transactions on 5.1 (1994): 96-101.
http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%b8%89%e6%95%b0%e5%ad%a6%e6%91%86%e6%91%86%e6%89%8b%ef%bc%8c%e5%be%88%e6%83%ad%e6%84%a7%ef%bc%8c%e5%8f%aa%e5%81%9a%e4%ba%86

 

3. 典型遺傳算法的一些變種衍生算法

自 John Henry Holland 在1992年提出《Adaptation in Natural and Artificial Systems》論文後,遺傳編程又獲得了大量研究者的關注和發展,提出了不少改進型的衍生算法。雖然這些算法在工業場景中不必定都適用,可是筆者以爲咱們有必要了解和學習一下它們各自的算法思想,有利於咱們在遇到各自的實際問題的時候,觸類旁通。

0x1:交叉變種

典型交叉變異隨機選擇兩條染色體,按照pc的機率決定是否交叉,若是選擇交叉則隨機選擇一點並將這點以後的基因交換。這種交叉方式被稱爲單點雜交

1. 多點雜交

多點雜交指定了多個交換點用於父本的基因交換重組,具體的執行過程以下圖所示,

多點雜交改進的是突變率。 

2. 均勻雜交

單點和多點雜交算法存在一個問題,雜交的染色體中某些部分的基因會被過早地捨棄,這是因爲在交換前它們必須肯定交換父本染色體交換位前面仍是後面的基因,從而對於那些無關的基因段在交換前就已經收斂了。

均勻雜交算法(Uniform Crossover)就能夠解決上述算法的這種侷限性,該算法的主要過程以下:

  • 首先隨機選擇染色體上的交換位
  • 而後隨機肯定交換的基因是父本染色體上交換位的前部分基因,仍是後部分基因(隨機過程)
  • 最後對父本染色體的基因進行重組從而產生新的下一代個體

3. 洗牌雜交

洗牌雜交的最大特色是一般將染色體的中點做爲基因的交換點,即從每一個父本中取它們一半的基因重組成新的個體。

另外針對於實值編碼方式,還有離散雜交、中間雜交、線性雜交和擴展線性雜交等算法。

0x2:選擇策略變種

精英保留策略是一種典型的選擇策略。精英保留策略是指每次迭代都保留已發現的最優解。這個策略是顯而易見的,咱們不可能捨棄已發現的最優解,而只使用最後一代種羣的最優解。同時,採用精英保留策略的典型遺傳算法是保證收斂到全局最優解的。

1. 輪盤賭選擇策略

輪盤賭選擇策略是基於機率進行選擇策略。輪盤賭算法有放回地採樣出原種羣大小的新一代種羣,個體 Ii 的採樣機率以下所示,

從機率上看,在某一輪中,即便是適應度最差的個體,也存在必定的概率能進入到下一輪,這種策略提升了多樣性,但減緩了收斂性。

2. 錦標賽選擇策略

錦標賽法從大小爲 n 的種羣隨機選擇 k(k小於n) 個個體,而後在 k 個個體中選擇適應度最大的個體做爲下一代種羣的一個個體。反覆屢次,直到下一代種羣有 n 個個體。 

0x3:種羣繁衍策略變種 - 多種羣並行

在大天然,物種的進化是以多種羣的形式併發進行的。通常來講,一個物種只有一個種羣了,意味着這個物種有滅亡的危險(例如恐龍)。

受此啓發,人們提出了多種羣遺傳算法。多種羣遺傳算法保持多個種羣同時進化,具體流程以下圖所示,

多種羣遺傳算法和遺傳算法執行屢次的區別在於移民,種羣之間會經過移民的方式交換基因。這種移民操做會帶來更多的多樣性。

0x4:自適應遺傳算法

遺傳算法中,決定個體變異長度的主要因素有兩個:交叉機率pc,和變異機率pm。

在實際工程問題中,須要針對不一樣的優化問題和目標,反覆實驗來肯定pc和pm,調參成本很高。

並且在遺傳算法訓練的不一樣階段,咱們須要不一樣的pc和pm,

  • 當種羣中各個個體適應度趨於一致或者趨於局部最優時,使pc和pm增長,增長擾動。使得種羣具備更大的多樣性,跳出潛在的局部最優陷阱
  • 當羣體適應度比較分散時,使pc和pm減小。使得適應度高的個體和適應度低的個體保持分開,加快種羣收斂速度
  • 不一樣個體也應該有不一樣的pc和pm:
    • 對於適應度高的個體,咱們應該減小pc和pm以保護他進入下一代
    • 反之對適應度低的個體,咱們應該增長pc和pm以增長擾動,提升個體多樣性

Srinivas.M and Patnaik.L.M (1994) 爲了讓遺傳算法具有更好的自適應性,提出來自適應遺傳算法。在論文中,pc和pm的計算公式以下:

0x5:混合遺傳算法 

遺傳算法的全局搜索能力強,但局部搜索能力較弱。這句話怎麼理解呢?

好比對於一條染色體,遺傳算法並不會去看看這條染色體周圍局部的染色體適應度怎麼樣,是否比這條染色體好。遺傳算法會經過變異和交叉產生新的染色體,但新產生的染色體可能和舊染色差的很遠。所以遺傳算法的局部搜索能力差。

相對的,梯度法、登山法和貪心法等算法的局部搜索能力強,運算效率也高。

受此啓發,人們提出了混合遺傳算法,將遺傳算法和這些算法結合起來。混合遺傳算法的框架是遺傳算法的,只是生成新一代種羣以後,對每一個個體使用局部搜索算法尋找個體周圍的局部最優勢。

整體來講,遺傳算法和梯度法分別表明了隨機多樣性優化和漸進定向收斂性優化的兩種思潮,取各自的優勢是一種很是好的思路。

Relevant Link: 

Srinivas M, Patnaik L M. Adaptive probabilities of crossover and mutation in genetic algorithms[J]. Systems, Man and Cybernetics, IEEE Transactions on, 1994, 24(4): 656-667. 
http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e5%9b%9b%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%9a%84%e5%8f%98%e7%a7%8d 

 

4. 用遺傳編程自動生成一個可以擬合特定數據集的函數

0x1:用多項式迴歸擬合一個數據集

這個章節,咱們來完成一個小實驗,咱們如今有一個數據集,數據集的生成算法以下:

# -*- coding: utf-8 -*-

from random import random,randint,choice

def hiddenfunction(x,y):
    return x**2 + 2*y + 3*x + 5


def buildhiddenset():
    rows = []
    for i in range(200):
        x=randint(0, 40)
        y=randint(0, 40)
        rows.append([x, y, hiddenfunction(x, y)])
    return rows

if __name__ == '__main__':
    print buildhiddenset()
    

部分數據例以下圖:

可視化以下,

# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from random import random,randint,choice
from mpl_toolkits.mplot3d import axes3d


def hiddenfunction(x,y):
    return x**2 + 2*y + 3*x + 5


def buildhiddenset():
    X = []
    y = []
    for i in range(200):
        x_ = randint(0, 40)
        y_ = randint(0, 40)
        X.append([x_, y_])
        y.append(hiddenfunction(x_, y_))
    return np.array(X), np.array(y)


if __name__ == '__main__':
    # generate a dataset
    X, y = buildhiddenset()

    print "X:", X
    print "y:", y

    fig = plt.figure()
    ax = fig.gca(projection='3d')
    ax.set_title("3D_Curve")
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    ax.set_zlabel("z")
    # draw the figure, the color is r = read
    figure = ax.plot(X[:, 0], X[:, 1], y, c='r')
    plt.show()

 

很顯然,一定存在一些函數,能夠將(X,Y)映射爲結果欄對應的數字,如今問題是這個(些)函數究竟是什麼?

從數據分析和數理統計分析的角度來看,這個問題彷佛也不是很複雜。咱們能夠用多元線性迴歸來嘗試擬合數據集。

# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from random import random,randint,choice
from mpl_toolkits.mplot3d import axes3d
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
np.set_printoptions(threshold=np.inf)


def hiddenfunction(x,y):
    return x**2 + 2*y + 3*x + 5


def buildhiddenset():
    X = []
    y = []
    for i in range(100):
        x_ = randint(0, 40)
        y_ = randint(0, 40)
        X.append([x_, y_])
        y.append(hiddenfunction(x_, y_))
    return np.array(X), np.array(y)


if __name__ == '__main__':
    # generate a dataset
    X, y = buildhiddenset()

    print "X:", X
    print "y:", y

    fig = plt.figure()
    ax = fig.gca(projection='3d')
    ax.set_title("3D_Curve")
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    ax.set_zlabel("z")
    # draw the figure, the color is r = read
    #figure = ax.plot(X[:, 0], X[:, 1], y, c='r')
    #plt.show()

    # use Scikit-Learn PolynomialFeature class to constructing parameter terms.
    # a,b,degree=2: [a, b, a^2, ab, b^2]
    # a,b,degree=3: [a, b, a^2, ab, b^2, a^3, a^2b, ab^2, b^3]
    # a,b,c,degree=3: [a, b, c, a^2, ab, ac, b^2, bc, c^2, a^3, a^2b, a^2c, ab^2, ac^2, abc, b^3, b^2c, bc^2, c^3]
    poly_features = PolynomialFeatures(degree=2, include_bias=False)
    # fit the dataset with Polynomial Regression Function, and X_poly is the fitting X result
    X_poly = poly_features.fit_transform(X)
    lin_reg = LinearRegression()
    lin_reg.fit(X_poly, y)
    print(lin_reg.intercept_, lin_reg.coef_)
    y_predict = lin_reg.predict(X_poly)
    y_predict = [int(i) for i in y_predict]
    print "y_predict: ", y_predict
    print "y: ", y
    print 'accuracy for LinearRegression is: {0}'.format(accuracy_score(y, y_predict))
    print 'error for LinearRegression is: {0}'.format(confusion_matrix(y, y_predict))

    # draw the prediction curve
    X_new_1 = np.linspace(0, 40, 100).reshape(100, 1)
    X_new_2 = np.linspace(0, 40, 100).reshape(100, 1)
    X_new = np.hstack((X_new_1, X_new_2))
    # fit the X_new dataset with Polynomial Regression Function, and X_new_poly is the fitting X result
    X_new_poly = poly_features.transform(X_new)
    y_new = lin_reg.predict(X_new_poly)
    y_new = [int(i) for i in y_new]
    #print "X_new: ", X_new
    #print "y_new: ", y_new
    #print "y: ", y

    # draw the prediction line
    figure = ax.plot(X[:, 0], X[:, 1], y, c='r')
    figure = ax.plot(X_new[:, 0], X_new[:, 1], y_new, c='b', linewidth=2, label="Predictions")
    plt.show()

紅色是數據集,藍色線是多項式擬合結果

多項式迴歸的擬合結果以下:

  • mean_squared_error for LinearRegression is: 0.55
  • accuracy for LinearRegression is: 0.45

由於這是一個迴歸分析問題,所以精度的要求很是高,雖然實際的偏差並非很高(0.55),使用多項式擬合得到了0.45的擬合精確度,不算特別高,出錯的點還挺多的。

這麼一看,好像多項式迴歸的效果不好了?並非這樣的。

咱們來對比下模型學習到的多項式參數翻譯成多項式函數,和原始數據集背後的目標函數形式的差距:

print(lin_reg.intercept_, lin_reg.coef_)
(5.0000000000009095, array([ 3.00000000e+00, 2.00000000e+00, 1.00000000e+00, ,
]))
a,b,degree=2: [a, b, a^2, ab, b^2]

模型學習到的:H = 3*X + 2*Y + X^2 + 4.70974246e-17*X*Y -7.81622966e-16*Y^2 + 5    
原始數據集目標函數:H = 3*X + 2*Y + X^2 +  5

能夠看到,多項式迴歸出現了一些過擬合的現象,多出了兩項:X*Y、和Y^2,若是沒有這兩項,那麼多項式迴歸獲得的方程就是完美的原始方程。

從上面的實驗中,咱們能夠獲得幾點啓發

  • 過擬合問題廣泛存在:即便在一個簡單的數據集上,用這麼簡單的多項式迴歸參數,都仍是出現了過擬合問題。可見過擬合問題在實際機器學習工程項目中,的確是可能大量存在的。
  • 冗餘結構廣泛存在:在上面的例子中,X*Y、和Y^2這兩項是多出來的項,模型給這兩個項分配的參數都是一個很是小的數字,小到幾乎能夠忽略(分別是4.70974246e-17和7.81622966e-16)。基本上來講,這兩項對最終模型的預測結果的影響是很微小的,若是咱們的場景是一個分類任務,那麼這麼小的權重幾乎不會影響到最終的分類預測結果。所以,咱們將這種被模型分配權重極小的項,稱爲冗餘結構,冗餘結構對分類任務的影響每每很是小。某種程度上來講,冗餘結構緩解了過擬合問題

從這個問題,繼續延伸思考,相信讀者朋友在研究和工程項目中經常會遇到一個有趣的現象:針對某個固定的數據集,咱們設計出一個很是精巧的神經網絡,擁有上千萬的參數。和另外一我的用一個只有十幾萬參數的簡單經典神經網絡進行benchmark對比,發現性能並無太大的提高,甚至沒有提高,固然,性能也沒有降低。咋一看,複雜模型和簡單的效果同樣好。

對這個問題的研究和解釋,已經開始有很是多的學者和科研機構在進行,目前提出了不少的解釋框架(例如teacher-student模型),筆者本身在項目中也一樣遇到了這個問題。一個初步的見解是,對於一個特定場景的數據集來講,是存在一個肯定的複雜度上限的。如今若是有兩個模型,一個模型剛恰好達到這個複雜度上限,另外一個模型遠遠超過了這個複雜度上限,那麼後面那個模型就會出現所謂的過擬合現象,可是,模型會經過權重分配,將高出複雜度上限以外的多餘項(神經元)都分配0或者很是低的權重,讓這部分神經元稱爲冗餘項。最終,無論咱們用多麼複雜的模型,起做用的永遠只有那一小部分神經元在發揮做用,預測的效果也同樣好,過擬合問題並無影響最終的分類預測結果。

筆者小節

使用多項式迴歸對一個數據集進行擬合這種作法,本質上是一種先天經驗主義思惟,它的科學假設是」複雜可約性「。所謂複雜可約性是複雜理論中的一個概念,指的是一個複雜的系統能夠經過一個簡化的模型進行約簡歸納,經過歷史的樣本學習這個約簡系統的結構參數,同時假設這個約簡系統可以比真實的複雜系統推算的更快,從而藉助約簡系統對複雜系統進行將來預測。

上面的話說的有一些繞,簡單來講就是,無論數據集背後的真實目標函數是什麼,咱們均可以用像低階多項式函數這種簡單模型來進行擬合學習,並利用學習到的模型對將來可能出現的新數據集進行預測。

這種基於先驗的簡化建模思想在今天的機器學習學術界和工業界應用很是普遍,也發揮了很好的效果。但其實咱們還有另外一種看世界的角度,那就是隨機過程。隨機過程事先不作任何假設,而是基於隨機或者遵循某種策略下的隨機,不斷進行自我迭代與優化,從一種混沌狀態逐漸收斂到目標狀態。

0x2:用遺傳編程擬合同一個數據集

在這個章節,咱們要討論遺傳編程,仍是上面那個問題,咱們如今換一種思惟來想:咱們所見的數據集背後無非是一個函數公式來決定的,而函數公式又是由一些基本的」數學符號「以及」數學運算符「號組合而成的,數學符號和數學運算符的組合方式咱們將其視爲一個符號搜索空間,咱們直接去搜索這個符號搜索空間便可,經過數據集做爲反饋,直到收斂爲止,即找到了完美的目標函數。

接下來咱們逐個小節來分解任務,一步步達到咱們的目標。

1. 構造樹狀程序(tree programs)基礎數據結構及操做函數

咱們能夠用python構造出樹狀程序的基礎數據結構。這棵樹由若干節點組成,根據與之關聯的函數的不一樣,這些節點又能夠擁有必定數量的子節點。

有些節點將會返回傳遞給程序的參數;另外一些則會返回常量;還有一些則會返回應用於其子節點之上的操做。

1)fwrapper

一個封裝類,對應於」函數型「節點上的函數。其成員變量包括了函數名稱、函數自己、以及該函數接受的參數個數(子節點)。

class fwrapper:
  def __init__(self,function,params,name):
    self.function=function
    self.childcount=param
    self.name=name

2)node

對應於函數型節點(帶子節點的節點)。咱們以一個fwrapper類對其進行初始化。當evaluate被調用時,咱們會對各個子節點進行求值運算,而後再將函數自己應用於求得的結果。

class node:
  def __init__(self,fw,children):
    self.function=fw.function
    self.name=fw.name
    self.children=children

  def evaluate(self,inp):    
    results=[n.evaluate(inp) for n in self.children]
    return self.function(results)

  def display(self,indent=0):
    print (' '*indent)+self.name
    for c in self.children:
      c.display(indent+1)

3)paramnode

這個類對應的節點只返回傳遞給程序的某個參數。其evaluate方法返回的是由idx指定的參數。

class paramnode:
  def __init__(self,idx):
    self.idx=idx

  def evaluate(self,inp):
    return inp[self.idx]
  
  def display(self,indent=0):
    print '%sp%d' % (' '*indent,self.idx)

4)constnode

返回常量值的節點。其evaluate方法僅返回該類被初始化時所傳入的值。

class constnode:
  def __init__(self,v):
    self.v=v
      
  def evaluate(self,inp):
    return self.v
  
  def display(self,indent=0):
    print '%s%d' % (' '*indent,self.v)

5)節點操做函數

除了基礎數學符號數據結構以外,咱們還須要定義一些針對節點的操做函數。

一些簡單的符號運算符(例如add、subtract),能夠用lambda內聯方式定義,另一些稍微複雜的運算符則須要在單獨的語句塊中定義,不論哪一種狀況,都被會封裝在一個fwrapper類中。

addw=fwrapper(lambda l:l[0]+l[1],2,'add')
subw=fwrapper(lambda l:l[0]-l[1],2,'subtract') 
mulw=fwrapper(lambda l:l[0]*l[1],2,'multiply')

def iffunc(l):
  if l[0]>0: return l[1]
  else: return l[2]
ifw=fwrapper(iffunc,3,'if')

def isgreater(l):
  if l[0]>l[1]: return 1
  else: return 0
gtw=fwrapper(isgreater,2,'isgreater')

flist=[addw,mulw,ifw,gtw,subw]

如今,咱們能夠利用前面建立的節點類來構造一個程序樹了,咱們來嘗試寫一個符號推理程序,

# -*- coding: utf-8 -*-

import gp

def exampletree():
# if arg[0] > 3:
# return arg[1] + 5
# else:
# return arg[1] - 2
return gp.node(
gp.ifw, [
gp.node(gp.gtw, [gp.paramnode(0), gp.constnode(3)]),
gp.node(gp.addw, [gp.paramnode(1), gp.constnode(5)]),
gp.node(gp.subw, [gp.paramnode(1), gp.constnode(2)])
]
)

if __name__ == '__main__':
exampletree = exampletree()

# expected result = 1
print exampletree.evaluate([2, 3])

# expected result = 8
print exampletree.evaluate([5, 3])

至此,咱們已經成功在python中構造出了一個以樹爲基礎的語言和解釋器。 

2. 初始化一個隨機種羣(函數初始化)

如今咱們已經有能力進行形式化符號編程了,回到咱們的目標,生成一個可以擬合數據集的函數。首先第一步是須要隨機初始化一個符號函數,即初始化種羣。

建立一個隨機程序的步驟包括:

  • 建立根節點併爲其隨機指定一個關聯函數,而後再隨機建立儘量多的子節點
  • 遞歸地,父節點建立的子節點也可能會有它們本身的隨機關聯子節點
def makerandomtree(pc,maxdepth=4,fpr=0.5,ppr=0.6):
  if random()<fpr and maxdepth>0:
    f=choice(flist)
    children=[makerandomtree(pc,maxdepth-1,fpr,ppr) 
              for i in range(f.childcount)]
    return node(f,children)
  elif random()<ppr:
    return paramnode(randint(0,pc-1))
  else:
    return constnode(randint(0,10))

該函數首先建立了一個節點併爲其隨機選了一個函數,而後它遍歷了隨機選中的函數所需的子節點,針對每個子節點,函數經過遞歸調用makerandomtree來建立新的節點。經過這樣的方式,一顆完整的樹就被構造出來了。

僅當被隨機選中的函數再也不要求新的子節點時(即若是函數返回的是一個常量或輸入參數時),向下建立分支的過程纔會結束。

3. 衡量種羣個體的好壞

按照遺傳編程算法的要求,每一輪迭代中都要對種羣個體進行定量評估,獲得一個個體適應性的排序。

與優化技術同樣,我摩恩必須找到一種衡量題解優劣程度的方法,不少場景下,優劣程度並不容易定量評估(例如網絡安全中經常是非黑即白的二分類)。可是在本例中,咱們是在一個數值型結果的基礎上對程序進行測試,所以能夠很容易經過絕對值偏差進行評估。

def scorefunction(tree,s):
  dif=0
  for data in s:
    v=tree.evaluate([data[0],data[1]])
    dif+=abs(v-data[2])
  return dif

咱們來試一試初始化的隨機種羣的適應性評估結果,

# -*- coding: utf-8 -*-

import gp

if __name__ == '__main__':
    hiddenset = gp.buildhiddenset()

    random1 = gp.makerandomtree(2)
    random2 = gp.makerandomtree(2)

    print gp.scorefunction(random1, hiddenset)
    print gp.scorefunction(random2, hiddenset)

隨機初始化的函數種羣的適應性並非很好,這符合咱們的預期。 

4. 對程序進行變異

當表現最好的程序被選定以後,它們就會被複制並修改以進入到下一代。前面說到,遺傳變異有兩種方式,mutation和crossover,  

1)mutation

變異的作法是對某個程序進行少許的修改,一個樹狀程序能夠有多種修改方式,包括:

  • 改變節點上的函數
  • 改變節點的分支
    • 改變節點所需子節點數目
    • 刪除舊分支
    • 增長新的分支
    • 用全新的樹來替換某一子樹

須要注意的是,變異的次數不宜過多(基因突變不能太頻繁)。例如,咱們不宜對整棵樹上的大多數節點都實施變異,相反,咱們能夠位任何須要進行修改的節點定義一個相對較小的機率。從樹的根節點開始,若是每次生成的隨機數小於該機率值,就以如上所述的某種方式對節點進行變異。

def mutate(t,pc,probchange=0.1):
  if random()<probchange:
    return makerandomtree(pc)
  else:
    result=deepcopy(t)
    if hasattr(t,"children"):
      result.children=[mutate(c,pc,probchange) for c in t.children]
    return result

2)crossover

除了變異,另外一種修改程序的方法被稱爲交叉或配對,即:從本輪種羣中的優秀適應着中,選出兩個將其進行部分子樹交換。執行交叉操做的函數以兩棵樹做爲輸入,並同時開始向下遍歷,當到達某個隨機選定的閾值時,該函數便會返回前一棵樹的一份拷貝,樹上的某個分支會被後一棵樹上的一個分支所取代。經過同時對兩棵樹的即時遍歷,函數會在每棵樹上大體位於相同層次的節點處實施交叉操做。

def crossover(t1,t2,probswap=0.7,top=1):
  if random()<probswap and not top:
    return deepcopy(t2) 
  else:
    result=deepcopy(t1)
    if hasattr(t1,'children') and hasattr(t2,'children'):
      result.children=[crossover(c,choice(t2.children),probswap,0) 
                       for c in t1.children]
    return result

讀者朋友可能會注意到,對於某次具體的變異或者交叉來講,新的種羣個體並不必定會帶來更好的性能,實際上,新種羣個體的性能幾乎徹底是隨機的。從生物進化論的角度來講,遺傳變異是無方向的,隨機的,遺傳變異的目標僅僅是引入多樣性,形成演化的是環境選擇壓(數據集的偏差反饋)。

5. 持續迭代演化

如今,咱們將上面的步驟串起來,讓遺傳演化不斷的循環進行。本質上,咱們的思路是要生成一組隨機程序並擇優複製和修改,而後一直重複這一過程直到終止條件知足爲止。

def getrankfunction(dataset):
def rankfunction(population):
scores=[(scorefunction(t,dataset),t) for t in population]
scores.sort()
return scores
return rankfunction



def evolve(pc,popsize,rankfunction,maxgen=500,
mutationrate=0.1,breedingrate=0.4,pexp=0.7,pnew=0.05):
# Returns a random number, tending towards lower numbers. The lower pexp
# is, more lower numbers you will get
def selectindex():
return int(log(random())/log(pexp))

# Create a random initial population
population=[makerandomtree(pc) for i in range(popsize)]
for i in range(maxgen):
scores=rankfunction(population)
print "function score: ", scores[0][0]
if scores[0][0]==0: break

# The two best always make it
newpop=[scores[0][1],scores[1][1]]

# Build the next generation
while len(newpop)<popsize:
if random()>pnew:
newpop.append(mutate(
crossover(scores[selectindex()][1],
scores[selectindex()][1],
probswap=breedingrate),
pc,probchange=mutationrate))
else:
# Add a random node to mix things up
newpop.append(makerandomtree(pc))

population=newpop
scores[0][1].display()
return scores[0][1]

上述函數首先建立一個隨機種羣,而後循環至多maxgen次,每次循環都會調用rankfunction對程序按表現從優到劣的順序進行排列。表現優者會不加修改地自動進入到下一代,咱們稱這樣的方法爲精英選拔髮(elitism)。

至於下一代中的其餘程序,則是經過隨機選擇排名靠前者,再通過交叉和變異以後獲得的。

這個過程是一直重複下去,知道某個程序達到了完美的擬合適配(損失爲0),或者重複次數達到了maxgen次爲止。

evolve函數有多個參數,用以從不一樣方面對競爭環境加以控制,說明以下:

  • rankfunction:對應一個函數,將一組程序從優到劣的順序進行排列
  • mutationrate:表明發生變異的機率
  • breedingrate:表明發生交叉的機率
  • popsize:初始種羣的大小
  • probexp:表示在構造新種羣時,」選擇評價較低的程序「這一律率的遞減比例。該值越大,相應的篩選過程就越嚴格,即只選擇評價最高的多少比例的個體做爲複製對象
  • probnew:表示在構造新種羣時,」引入一個全新的隨機程序「的機率,該參數和probexp是」種羣多樣性「的重要決定參數
# -*- coding: utf-8 -*-

import gp

if __name__ == '__main__':
    rf = gp.getrankfunction(gp.buildhiddenset())
    gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)

程序運行的很是慢,在筆者的mac上運行了15min才最終收斂到0。有意思的是,儘管這裏給出的解是徹底正確的,可是它明顯比咱們數據集背後的真實目標函數要複雜得多,也就是說發生了過擬合。

可是,咱們若是運用一些代數知識,將遺傳編程獲得函數進行約簡,會發現它和目標函數實際上是等價的(p0爲X,p1爲Y)。

((X+6)+Y)+X + (if( (X*Y)>0 ){X}else{X} + X*X) + (X + (Y - (X + if(6>0){1}else{0})) )
# X*Y恆大於0
2*X + Y + 6 + X + X**2 + (X + (Y - (X + if(6>0){1}else{0})) )
# 6恆大於0
2*X + Y + 6 + X + X**2 + X + Y - X + 1
X**2 + 3*X + 2*Y + 5

能夠看到,遺傳編程這種基於內顯性的構造方式,能夠在形式上獲得一個全局最優解,這點上比基於優化算法的逼近方法要好。

同時,上述例子告訴咱們遺傳編程的一個重要特徵:遺傳算法找到的題解也許是徹底正確的,亦或是很是不錯的。可是一般這些題解遠比真實的目標函數要複雜得多。在遺傳編程獲得的題解中,咱們發現有不少部分是不作任何工做的,或者對應的是形式複雜,但始終都只返回同一結果的公式,例如"if(6>0){1}else{0})",只是1的一種多餘的表達方式而已。

0x3:從遺傳編程優化結果看過擬合問題的本質

從上一章的遺傳優化結果咱們能夠看到,在數據集是充分典型集的狀況下,過擬合是不影響模型的收斂的。

遺傳編程的這種冗餘性和優化算法和深度神經網絡中的冗餘結構本質上是一致的,這是一個冗餘的過擬合現象,即程序的題解很是複雜,可是卻對最終的決策沒有影響,惟一的缺點就是浪費了不少cpu時鐘。

可是另外一方面,這種冗餘過擬合帶來的一個額外的風險是,」若是數據集是非典型的,那麼過擬合就會致使嚴重的後果「。

咱們須要明白的是,過擬合的罪魁禍首不是模型和優化算法,而偏偏是數據集自己。在本例中咱們清楚地看到,當咱們可以獲得一個完整的典型集訓練數據時,過擬合問題就會退化成一個冗餘魯棒可約結構。

可是反之,若是咱們的數據集由於系統噪聲或採樣不徹底等緣由,沒有拿到目標函數的典型集,那麼因爲複雜模型帶來的過擬合問題就會引起很嚴重的預測誤差。咱們來稍微修改一下代碼,將原始數據集中隨機剔除1/10的數據,使數據的充分性和典型性降低,而後再來看遺傳編程最後的函數優化結果,

# -*- coding: utf-8 -*-

import gp

if __name__ == '__main__':
    hiddenset = gp.buildhiddenset()
    # 按照 5% 模來採樣,即剔除1/10的數據,模擬採樣不徹底的狀況
    cnt = 0
    for i in hiddenset:
        if cnt % 10 == 0:
            hiddenset.remove(i)
        cnt += 1
    print hiddenset

    rf = gp.getrankfunction(hiddenset)
    gp.evolve(2, 500, rf, mutationrate=0.2, breedingrate=0.1)

獲得的函數約簡結果爲: 

if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + (Y+4) + X^2 + (Y + (X + (X+X)))
# if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} 不可約
if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0} + X**2 + 3*X + 2*Y + 4

# 真實的目標函數爲:
X**2 + 3*X + 2*Y + 5

能夠看到,在數據集不完整的狀況下,遺傳算法就算完美擬合了訓練集,可是也沒法真正逼近目標函數,但這不是遺傳算法的問題,而是受到數據集的制約。

更重要的是,由於數據集的非典型性(數據機率分佈缺失),致使模型引入了真正的」過擬合複雜結構「,即」if( ((Y+4) + if(Y>X){1}else{0} ) > 0){1}else{0}「,這是一個區間函數。要知道,這僅僅是一個很是小的例子,尚且引入瞭如此的不肯定性,在更復雜和更復雜的問題中,數據集的機率分佈缺失會引起更大的」多餘過擬合複雜結構問題「,影響程度的多少,根據數據缺失的程度而定。

這反過來提醒了咱們這些數據科學工做者,在解決一個實際問題的時候,不要過度糾結你的模型是否是足夠好,層數是否是足夠深,而要更多地關注你的數據集,數據集的質量直接決定了最終的模型效果。更進一步地說,若是你能找到一種方法,能100%拿到目標函數的數據全集,那麼恭喜你,隨便用一個機器學習模型均可以取得很好的效果。

0x4:多樣性的重要性

對於遺傳編程,咱們還須要再談一個關鍵問題,即多樣性問題。

咱們看到evolve函數中,會將最優的個體直接進入下一代,除此以外,對排名以後的個體也會按照比例和機率選擇性地進行復制和修改以造成新的種羣,這種作法有什麼意義呢?

最大的問題在於,僅僅選擇表現最優異的少數個體,很快就會使種羣變得極端同質化(homogeneous),或稱爲近親交配。儘管種羣中所包含的題解,表現都很是不錯,可是它們彼此間不會有太大的差別,由於在這些題解間進行的交叉操做最終會致使羣內的題解變得愈來愈類似。咱們稱這一現象爲達到局部最大化(local maxima)。

對於種羣而言,局部最大化是一種不錯的狀態(即收斂了),但還稱不上最佳的狀態。由於處於這種狀態的種羣裏,任何細小的變化都不會對最終的結果產生太大的變化。這就是一個哲學上的矛盾與對立,收斂穩定與發散變化是彼此對立又統一的,徹底偏向任何一方都是不對的。

事實代表,將表現極爲優異的題解和大量成績尚可的題解組合在一塊兒,每每可以獲得更好的結果。基於這個緣由,evolve提供了兩個額外的參數,容許咱們對篩選進程中的多樣性進行調整。

  • 經過下降probexp的值,咱們容許表現較差的題解進入最終的種羣之中,從而將」適者生存(survival of fittest)「的篩選過程調整爲」最適應者及其最幸運者生存(survival of the fittest and luckiest)「
  • 經過增長probnew的值,咱們還容許全新的程序被隨機地加入到種羣中

這兩個參數都會有效地增長進化過程當中的多樣性,同時又不會對進程有過多的擾亂,由於,表現最差的程序最終老是會被剔除掉的(遺傳編程的馬爾科夫收斂性)。

Relevant Link:  

《集體智慧編程》 

 

5. 用遺傳編程自動獲得正則表達式生成器 - Regex Golf Problem

0x1:問題描述

咱們須要生成一段正則表達式,這個正則表達式須要可以匹配到全部的M數據集,同時不匹配全部的U數據集,且同時還要儘可能短,即不能是簡單的M數據集的並集拼接。

定義一個目標(損失)函數來評估每次題解的好壞,

,其中nM表明匹配M的個數,nU表明匹配U的個數,wI表明獎勵權重,r表明該正則表達式的長度

算法優化的目標是使上式儘可能大。 

0x2:從一個貪婪算法提及 

在討論遺傳編程以前,咱們先從常規思路,用一種貪婪迭代算法來解決這個問題。咱們的數據集以下,

# -*- coding: utf-8 -*-

from __future__ import division
import re
import itertools

def words(text):
    return set(text.split())

if __name__ == '__main__':
    M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage
        foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''')

    U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked
        fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''')

    print M & U

首先,確認了U和M之間不存在交集,這道題理論上是有解的,不然無解。

1. 定義可行解判斷條件

咱們先準肯定義出何時意味着獲得了一個可行解,

# -*- coding: utf-8 -*-

from __future__ import division
import re
import itertools


def words(text):
    return set(text.split())


def mistakes(regex, M, U):
    "The set of mistakes made by this regex in classifying M and U."
    return ({"Should have matched: " + W for W in M if not re.search(regex, W)} |
            {"Should not have matched: " + L for L in U if re.search(regex, L)})


def verify(regex, M, U):
    assert not mistakes(regex, M, U)
    return True


if __name__ == '__main__':
    M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage
        foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''')

    U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked
        fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''')

    some_answer = "a*"

    print mistakes(some_answer, M, U)

能夠看到,當咱們輸入正則」a*「的時候出現了不少錯誤,顯然」a*「不是咱們要的答案。讀者朋友能夠試着輸入」foo「試試。

2. 尋找可行解的策略

  • 對M中的每一個詞(短語)都進行一次正則候選集構造,包括如下步驟:
    • 遍歷M中的每個詞的每一次字符,並過濾掉特殊字符(*+?^$.[](){}|\\),而後在中間遍歷插入」*+?「,這是字符級別的混合交叉
    • 對M中的每個詞都加上首尾定界符,例如」^it$「,獲得一個wholes詞集
    • 對wholes詞集進行ngram切分,獲得一個ngram詞集,例如對於詞」^it$「來講,能夠獲得{'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'},做爲一個正則串池。能夠這麼理解,這個池中的每一個正則串都至少能命中一個M中的元素
    • 遍歷上一步獲得的正則串池中全部元素,逐字符用」.「字符進行替換,例如對於"^it$"來講,能夠獲得{'^it$', '^i.$', '^.t$', '^..$'}
    • 遍歷上一步dotify的詞集,逐字符遍歷插入」*+?「這種repetition控制符,例如對於」a.c「來講,能夠獲得{'a+.c', 'a*.c', 'a?.c','a.c+', 'a.*c', 'a.?c','a.+c', 'a.c*', 'a.c?'},須要注意的是,在首位定界符先後不要加repetition控制符,同時不要同時加入2個repetition控制符
  • 從題解候選集中,篩選出至少可以匹配一個以上M,可是不匹配U的正則子串,這一步獲得一個題解正則候選子集。這是一個貪婪迭代式的思想,它不求一步獲得一條可以匹配全部M的正則,而是尋找一些可以解決一部分問題的正則子串,將困難問題分而治之
  • 使用OR拼接將題解正則候選子集拼接起來,例如」ab | cd「

上面構造正則候選集的過程說的可能有些抽象,這裏經過代碼示例來講明一下,

# -*- coding: utf-8 -*-

from __future__ import division
import re
import itertools


OR  = '|'.join # Join a sequence of strings with '|' between them
cat = ''.join  # Join a sequence of strings with nothing between them
Set = frozenset # Data will be frozensets, so they can't be mutated.


def words(text):
    return set(text.split())


def mistakes(regex, M, U):
    "The set of mistakes made by this regex in classifying M and U."
    return ({"Should have matched: " + W for W in M if not re.search(regex, W)} |
            {"Should not have matched: " + L for L in U if re.search(regex, L)})


def verify(regex, M, U):
    assert not mistakes(regex, M, U)
    return True


def matches(regex, strings):
    "Return a set of all the strings that are matched by regex."
    return {s for s in strings if re.search(regex, s)}


def regex_parts(M, U):
    "Return parts that match at least one winner, but no loser."
    wholes = {'^' + w + '$' for w in M}
    parts = {d for w in wholes for p in subparts(w) for d in dotify(p)}
    return wholes | {p for p in parts if not matches(p, U)}


def subparts(word, N=4):
    "Return a set of subparts of word: consecutive characters up to length N (default 4)."
    return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))


def dotify(part):
    "Return all ways to replace a subset of chars in part with '.'."
    choices = map(replacements, part)
    return {cat(chars) for chars in itertools.product(*choices)}


def replacements(c):
    return c if c in '^$' else c + '.'


def regex_covers(M, U):
    """Generate regex components and return a dict of {regex: {winner...}}.
    Each regex matches at least one winner and no loser."""
    losers_str = '\n'.join(U)
    wholes = {'^'+winner+'$' for winner in M}
    parts  = {d for w in wholes for p in subparts(w) for d in dotify(p)}
    reps   = {r for p in parts for r in repetitions(p)}
    pool   = wholes | parts | pairs(M) | reps
    searchers = {p:re.compile(p, re.MULTILINE).search for p in pool}
    return {p: Set(filter(searchers[p], M))
            for p in pool
            if not searchers[p](losers_str)}


def pairs(winners, special_chars=Set('*+?^$.[](){}|\\')):
    chars = Set(cat(winners)) - special_chars
    return {A+'.'+q+B
            for A in chars for B in chars for q in '*+?'}


def repetitions(part):
    """Return a set of strings derived by inserting a single repetition character
    ('+' or '*' or '?'), after each non-special character.
    Avoid redundant repetition of dots."""
    splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)]
    return {A + q + B
            for (A, B) in splits
            # Don't allow '^*' nor '$*' nor '..*' nor '.*.'
            if not (A[-1] in '^$')
            if not A.endswith('..')
            if not (A.endswith('.') and B.startswith('.'))
            for q in '*+?'}


def tests():
    assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'}
    assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'}

    assert dotify('it') == {'it', 'i.', '.t', '..'}
    assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'}
    assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...',
                              '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'}
    assert repetitions('a') == {'a+', 'a*', 'a?'}
    assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'}
    assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c',
                                  'a.c+', 'a.*c', 'a.?c',
                                  'a.+c', 'a.c*', 'a.c?'}
    assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$',
                                     '^a..d+$', '^a..d*$', '^a..d?$'}
    assert pairs({'ab', 'c'}) == {
        'a.*a', 'a.*b', 'a.*c',
        'a.+a', 'a.+b', 'a.+c',
        'a.?a', 'a.?b', 'a.?c',
        'b.*a', 'b.*b', 'b.*c',
        'b.+a', 'b.+b', 'b.+c',
        'b.?a', 'b.?b', 'b.?c',
        'c.*a', 'c.*b', 'c.*c',
        'c.+a', 'c.+b', 'c.+c',
        'c.?a', 'c.?b','c.?c'}
    assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3

    return 'tests pass'


if __name__ == '__main__':
    M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage
        foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''')

    U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked
        fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''')

    some_answer = "a*"

    # print mistakes(some_answer, M, U)

    print tests()

筆者思考

集合覆蓋問題(set cover problem)是一個NP問題,幾乎沒有辦法直接獲得全局最優解。對這類複雜問題,一個有效的優化逼近方式就是貪婪迭代逼近,每次都求解一個局部最優值(例如每次生成一個可以覆蓋最大M集合,可是不匹配U集合的正則子串),最後經過將全部局部最優解Ensemble起來獲得一個最終題解(集成學習思想)

3. 窮舉獲得最終題解

咱們已經有了生成題解候選集的函數,也有了評估題解是否正確的損失函數,咱們如今能夠來將他們組合起來,用於生成咱們的目標題解。

前面說過,咱們的算法是一個迭代式的貪婪算法,所以,咱們每次尋找一個可以最大程度匹配儘可能多M的正則子串,而後將本輪已經匹配到的M子串刪除,並對餘下的M子串繼續搜索答案,直到全部的M子串都被成功匹配爲止。

# -*- coding: utf-8 -*-

from __future__ import division
import re
import itertools


OR  = '|'.join # Join a sequence of strings with '|' between them
cat = ''.join  # Join a sequence of strings with nothing between them
Set = frozenset # Data will be frozensets, so they can't be mutated.


def words(text):
    return set(text.split())


def mistakes(regex, M, U):
    "The set of mistakes made by this regex in classifying M and U."
    return ({"Should have matched: " + W for W in M if not re.search(regex, W)} |
            {"Should not have matched: " + L for L in U if re.search(regex, L)})


def verify(regex, M, U):
    assert not mistakes(regex, M, U)
    return True


def matches(regex, strings):
    "Return a set of all the strings that are matched by regex."
    return {s for s in strings if re.search(regex, s)}


def regex_parts(M, U):
    "Return parts that match at least one winner, but no loser."
    wholes = {'^' + w + '$' for w in M}
    parts = {d for w in wholes for p in subparts(w) for d in dotify(p)}
    return wholes | {p for p in parts if not matches(p, U)}


def subparts(word, N=4):
    "Return a set of subparts of word: consecutive characters up to length N (default 4)."
    return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))


def dotify(part):
    "Return all ways to replace a subset of chars in part with '.'."
    choices = map(replacements, part)
    return {cat(chars) for chars in itertools.product(*choices)}


def replacements(c):
    return c if c in '^$' else c + '.'


def regex_covers(M, U):
    """Generate regex components and return a dict of {regex: {winner...}}.
    Each regex matches at least one winner and no loser."""
    losers_str = '\n'.join(U)
    wholes = {'^'+winner+'$' for winner in M}
    parts  = {d for w in wholes for p in subparts(w) for d in dotify(p)}
    reps   = {r for p in parts for r in repetitions(p)}
    pool   = wholes | parts | pairs(M) | reps
    searchers = {p:re.compile(p, re.MULTILINE).search for p in pool}
    return {p: Set(filter(searchers[p], M))
            for p in pool
            if not searchers[p](losers_str)}


def pairs(winners, special_chars=Set('*+?^$.[](){}|\\')):
    chars = Set(cat(winners)) - special_chars
    return {A+'.'+q+B
            for A in chars for B in chars for q in '*+?'}


def repetitions(part):
    """Return a set of strings derived by inserting a single repetition character
    ('+' or '*' or '?'), after each non-special character.
    Avoid redundant repetition of dots."""
    splits = [(part[:i], part[i:]) for i in range(1, len(part)+1)]
    return {A + q + B
            for (A, B) in splits
            # Don't allow '^*' nor '$*' nor '..*' nor '.*.'
            if not (A[-1] in '^$')
            if not A.endswith('..')
            if not (A.endswith('.') and B.startswith('.'))
            for q in '*+?'}


def tests():
    assert subparts('^it$') == {'^', 'i', 't', '$', '^i', 'it', 't$', '^it', 'it$', '^it$'}
    assert subparts('this') == {'t', 'h', 'i', 's', 'th', 'hi', 'is', 'thi', 'his', 'this'}

    assert dotify('it') == {'it', 'i.', '.t', '..'}
    assert dotify('^it$') == {'^it$', '^i.$', '^.t$', '^..$'}
    assert dotify('this') == {'this', 'thi.', 'th.s', 'th..', 't.is', 't.i.', 't..s', 't...',
                              '.his', '.hi.', '.h.s', '.h..', '..is', '..i.', '...s', '....'}
    assert repetitions('a') == {'a+', 'a*', 'a?'}
    assert repetitions('ab') == {'a+b', 'a*b', 'a?b', 'ab+', 'ab*', 'ab?'}
    assert repetitions('a.c') == {'a+.c', 'a*.c', 'a?.c',
                                  'a.c+', 'a.*c', 'a.?c',
                                  'a.+c', 'a.c*', 'a.c?'}
    assert repetitions('^a..d$') == {'^a+..d$', '^a*..d$', '^a?..d$',
                                     '^a..d+$', '^a..d*$', '^a..d?$'}
    assert pairs({'ab', 'c'}) == {
        'a.*a', 'a.*b', 'a.*c',
        'a.+a', 'a.+b', 'a.+c',
        'a.?a', 'a.?b', 'a.?c',
        'b.*a', 'b.*b', 'b.*c',
        'b.+a', 'b.+b', 'b.+c',
        'b.?a', 'b.?b', 'b.?c',
        'c.*a', 'c.*b', 'c.*c',
        'c.+a', 'c.+b', 'c.+c',
        'c.?a', 'c.?b','c.?c'}
    assert len(pairs({'1...2...3', '($2.34)', '42', '56', '7-11'})) == 8 * 8 * 3

    return 'tests pass'


def findregex(winners, losers, k=4, addRepetition=False):
    "Find a regex that matches all winners but no losers (sets of strings)."
    # Make a pool of regex parts, then pick from them to cover winners.
    # On each iteration, add the 'best' part to 'solution',
    # remove winners covered by best, and keep in 'pool' only parts
    # that still match some winner.
    if addRepetition:
        pool = regex_covers(winners, losers)
    else:
        pool = regex_parts(winners, losers)

    solution = []

    def score(part):
        return k * len(matches(part, winners)) - len(part)

    while winners:
        best = max(pool, key=score)
        solution.append(best)
        winners = winners - matches(best, winners)
        pool = {r for r in pool if matches(r, winners)}
    return OR(solution)


if __name__ == '__main__':
    M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage
        foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''')

    U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked
        fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''')

    solution = findregex(M, U, addRepetition=True)
    if verify(solution, M, U):
        print len(solution), solution
    solution = findregex(M, U, addRepetition=False)
    if verify(solution, M, U):
        print len(solution), solution

 

4. 嘗試生成一段描述惡意webshell樣本的零誤報正則

咱們來作一個和網絡安全相關的實驗,咱們如今有黑白兩份樣本,分別表明M和U,咱們如今嘗試用本節討論的算法來生成一段正則。

可是筆者在實際操做中發現,用regex golf這種問題的搜索空間是很是巨大的,當M和U的規模擴大時(例如大量webshell文件),所產生的正則子串候選集會是一個巨量的天文數字,該算法本質上仍是至關於在進行窮舉搜索,搜索效率十分低下。

更進一步地,筆者嘗試繼續擴大黑白樣本量(超過10的時候),算法已經沒法搜索出有效的正則題解,這說明,當黑白樣本超過必定數量的時候,alpha字符空間中黑白樣本已經存在交叉,全局解不存在。

0x3:用遺傳編程來自動搜索最優正則題解 

仍是上一小節的Regex golf問題,如今咱們來嘗試用遺傳編程來優化搜索效率。

1. 解題策略分析

論文的策略是基於遺傳算法生成一個」xx | xx「的雙正則子串,即每次獲得的個體最多有兩個子串,而後按照上一小節中相似的貪婪策略進行逐步OR拼接。

筆者這裏決定修改一下思路,直接基於遺傳編程對整個題解空間進行搜索,即構造一個完整題解的regex tree,這是一種全局最優解搜索的優化思路。

2. 基礎數據結構定義

咱們的整體策略依然是貪婪分段策略,也就說,咱們要尋找的最終正則題解是由不少個」|「組成的分段正則表達式。如今咱們來定義咱們的題解中可能出現的基本元素,這裏,咱們依然採用樹結構做爲基礎數據結構的承載:

  • 」ROOT「:根節點,一棵樹有且只有一個根節點,根節點必定是一個」|「分裂節點,即一個題解必須包含2個及2個以上的正則子串
  • 」|「:表明一個分裂符號,樹結構從這裏分裂一次,分裂符號的參數分別是兩個placeholder佔位符
  • dot(」.「):表明一個佔位符,用於保存子節點信息,每一個佔位符解析完畢後都會在頭尾加入定界符」^「和」$「,例如」^ab$ | ^cd$「
  • 字符串:由M序列的ngram序列組成的集合(2 <= n <= 4),例如」foo「
  • 修飾符:包括
    • 」.*+「
    • 」.++「
    • 」.?+「
    • 」.{.,.}+「:花括號內部包含兩個佔位符,定義域爲正整數,且參數2大於等於參數1
    • 」(.)「:組,括號內部包含一個佔位符
    • 」[.]「:中括號內部包含一個佔位符
    • 」[^.]「:取非的字符類
  • 」..「:鏈接符,表明一種操做,將兩個子節點拼接起來

基於上述基本元素定義,咱們能夠將題解正則表達式抽象爲一個樹結構(regex tree),以下圖,

(foo) | (ba++r)

該樹結構能夠經過深度優先遍歷,打印出最終的題解正則串,如上圖的標題所示。 

# -*- coding: utf-8 -*-

from random import random, randint, choice
import re
import itertools


# "ROOT"
class rootnode:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node

def display(self):
return "|"


# universal child node
class node:
def __init__(self, node):
self.node = node


# "|"
class spliternode:
def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder):
if left_child_dotplaceholder and right_child_dotplaceholder:
self.left_child_dotplaceholder = left_child_dotplaceholder
self.right_child_dotplaceholder = right_child_dotplaceholder
else:
self.left_child_dotplaceholder = node
self.right_child_dotplaceholder = node

def display(self):
return "|"


# "(.)"
class dotplaceholdernode:
def __init__(self, childnode=None):
if childnode:
self.childnode = childnode
else:
self.childnode = node


# "foo"
class charnode:
def __init__(self, charstring):
if charstring:
self.charstring = charstring
else:
self.charstring = node

def display(self):
return self.charstring


# ".."
class concat_node:
def __init__(self, left_child_node, right_child_node):
if left_child_node and right_child_node:
self.left_child_node = left_child_node
self.right_child_node = right_child_node
else:
self.left_child_node = node
self.right_child_node = node


# "++"
class qualifiernode:
def __init__(self, qualifierstrig):
if qualifierstrig:
self.qualifierstrig = qualifierstrig
else:
self.qualifierstrig = node

def display(self):
return self.qualifierstrig


def exampletree():
return rootnode(
dotplaceholdernode(
charnode("foo")
),
dotplaceholdernode(
concat_node(
concat_node(
charnode("ba"),
qualifiernode("++")
),
charnode("r")
)
)
)


# left child deep first travel
def printregextree(rootnode_i):
if rootnode_i is None:
return ""

if isinstance(rootnode_i, rootnode):
# concat the finnal regex str
finnal_regexstr = ""
finnal_regexstr += printregextree(rootnode_i.left_child_node)
finnal_regexstr += rootnode_i.display()
finnal_regexstr += printregextree(rootnode_i.right_child_node)
return finnal_regexstr

if isinstance(rootnode_i, spliternode):
# concat the finnal regex str
split_regexstr = ""
split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder)
split_regexstr += rootnode_i.display()
split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder)
return split_regexstr

if isinstance(rootnode_i, dotplaceholdernode):
return printregextree(rootnode_i.childnode)

if isinstance(rootnode_i, charnode):
return rootnode_i.display()

if isinstance(rootnode_i, concat_node):
concat_str = ""
concat_str += printregextree(rootnode_i.left_child_node)
concat_str += printregextree(rootnode_i.right_child_node)
return concat_str

if isinstance(rootnode_i, qualifiernode):
return rootnode_i.display()


def matches(regex, strings):
"Return a set of all the strings that are matched by regex."
return {s for s in strings if re.search(regex, s)}


def regex_parts(M, U):
"Return parts that match at least one winner, but no loser."
wholes = {'^' + w + '$' for w in M}
parts = {d for w in wholes for p in subparts(w) for d in p}
return wholes | {p for p in parts if not matches(p, U)}


def subparts(word, N=5):
"Return a set of subparts of word: consecutive characters up to length N (default 4)."
return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))


def words(text):
return set(text.split())


def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0):
if curren_level > maxdepth:
print "curren_level > maxdepth: ", curren_level
return
# ROOT node
if isinstance(parentnode, rootnode):
curren_level = 0
print "curren_level: ", curren_level
# init root node
print "init rootnode: ", curren_level
rootnode_i = rootnode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
# create left child node
print "new dotplaceholdernode"
rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
print "new dotplaceholdernode"
# create right child node
rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
return rootnode_i

# ".." dot placeholder node
if isinstance(parentnode, dotplaceholdernode):
curren_level += 1
print "curren_level: ", curren_level
# "|"
if random() < splitrate:
print "new spliternode"
return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# ".."
elif random() < concatrate:
print "new concat_node"
return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "foo"
elif random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)

# "|" split node
if isinstance(parentnode, spliternode):
curren_level += 1
print "curren_level: ", curren_level
print "init spliternode"
splitnode_i = spliternode(
dotplaceholdernode(None),
dotplaceholdernode(None)
)
print "new dotplaceholdernode"
splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
print "new dotplaceholdernode"
splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder,
splitrate, concatrate, charrate, qualifierate,
maxdepth, curren_level)
return splitnode_i

# ".." concat node
if isinstance(parentnode, concat_node):
curren_level += 1
print "curren_level: ", curren_level
# "foo"
if random() < charrate:
print "new charnode"
return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)
# "++"
if random() < qualifierate:
print "new qualifiernode"
return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate,
qualifierate, maxdepth, curren_level)

# "foo" char node
if isinstance(parentnode, charnode):
curren_level += 1
print "curren_level: ", curren_level
charnode_str = choice(list(regex_parts(M, U)))
print "charnode_str: ", charnode_str
print "new charnode"
charnode_i = charnode(charnode_str)

return charnode_i

# "++" qualifierate node
if isinstance(parentnode, qualifiernode):
curren_level += 1
print "curren_level: ", curren_level
qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?'])
print "qualifiernode_str: ", qualifiernode_str
print "new qualifiernode"
qualifiernode_i = qualifiernode(qualifiernode_str)

return qualifiernode_i


if __name__ == '__main__':
exampletree = exampletree()
print type(exampletree), exampletree
print printregextree(exampletree)

3. Regex Tree生長策略

有了基本的數據結構,如今定義一下regex tree的生長準則,

  • 每棵樹都從ROOT根節點開始生長,根節點就是一個「|」節點
  • 」|「的左右子節點必須是」.「dot placeholder節點
  • 」.「dot placeholder節點的子節點能夠是如下幾種節點類型:
    • 字符串節點
    • 」..「:concat節點
    • 」|「:新的分裂節點
  • 字符串節點從M的ngram詞彙表中隨機選取,ngram list生成原理參考上一小節
  • 「..」拼接節點的左右子節點能夠是如下幾種節點類型:
    • 字符串節點
    • 修飾符節點

4. 損失函數定義

這裏須要用到代價敏感學習的訓練思路,若是直接按照下面公式進行損失訓練,

那麼很快就會收斂到最優解:」|「上,緣由很顯然,全字符匹配中,nm和nu都是相等的,相減爲0,而後減去字符串長度1,就是-1,這是算法能找到的最好答案了。

爲了解決這個問題,咱們須要對TP和FP採起不一樣的懲罰力度,

def scorefunction(tree, M, U, w=1):
dif = 0
regex_str = printregextree(tree)
M_cn, U_cn = 0, 0
for s in list(M):
try:
if re.search(regex_str, s):
M_cn += 1
except Exception, e:
print e.message, "regex_str: ", regex_str
# this regex tree is illegal, low socre!!
return -8
for u in list(U):
if re.search(regex_str, u):
U_cn += 1

# print "M_cn: ", M_cn
# print "U_cn: ", U_cn

dif = w * (M_cn - U_cn) - len(regex_str)

return dif

上面代碼中有一點值得注意,因爲regex tree的生成具備必定的隨機性,所以極可能產生不合法的正則串,所以對不合法的正則串給予較低的分值,驅使它淘汰。

有了損失函數的定義,就能夠很容易算出一個種羣中全部個體的適應度排名。

def rankfunction(M, U, population):
scores = [(scorefunction(t, M, U), t) for t in population]
scores.sort()
return scores

5. 隨機初始化regex tree

按照遺傳編程的定義,咱們先隨機初始化一棵符合題解規約的regex tree,

# -*- coding: utf-8 -*-

from random import random, randint, choice
import re
import itertools


# "ROOT"
class rootnode:
    def __init__(self, left_child_node, right_child_node):
        if left_child_node and right_child_node:
            self.left_child_node = left_child_node
            self.right_child_node = right_child_node
        else:
            self.left_child_node = node
            self.right_child_node = node

    def display(self):
        return "|"


# universal child node
class node:
    def __init__(self, node):
        self.node = node


# "|"
class spliternode:
    def __init__(self, left_child_dotplaceholder, right_child_dotplaceholder):
        if left_child_dotplaceholder and right_child_dotplaceholder:
            self.left_child_dotplaceholder = left_child_dotplaceholder
            self.right_child_dotplaceholder = right_child_dotplaceholder
        else:
            self.left_child_dotplaceholder = node
            self.right_child_dotplaceholder = node

    def display(self):
        return "|"


# "(.)"
class dotplaceholdernode:
    def __init__(self, childnode=None):
        if childnode:
            self.childnode = childnode
        else:
            self.childnode = node


# "foo"
class charnode:
    def __init__(self, charstring):
        if charstring:
            self.charstring = charstring
        else:
            self.charstring = node

    def display(self):
        return self.charstring


# ".."
class concat_node:
    def __init__(self, left_child_node, right_child_node):
        if left_child_node and right_child_node:
            self.left_child_node = left_child_node
            self.right_child_node = right_child_node
        else:
            self.left_child_node = node
            self.right_child_node = node


# "++"
class qualifiernode:
    def __init__(self, qualifierstrig):
        if qualifierstrig:
            self.qualifierstrig = qualifierstrig
        else:
            self.qualifierstrig = node

    def display(self):
        return self.qualifierstrig


def exampletree():
  return rootnode(
            dotplaceholdernode(
                charnode("foo")
            ),
            dotplaceholdernode(
                concat_node(
                    concat_node(
                        charnode("ba"),
                        qualifiernode("++")
                    ),
                    charnode("r")
                )
            )
        )


# left child deep first travel
def printregextree(rootnode_i):
    if rootnode_i is None:
        return ""

    if isinstance(rootnode_i, rootnode):
        # concat the finnal regex str
        finnal_regexstr = ""
        finnal_regexstr += printregextree(rootnode_i.left_child_node)
        finnal_regexstr += rootnode_i.display()
        finnal_regexstr += printregextree(rootnode_i.right_child_node)
        return finnal_regexstr

    if isinstance(rootnode_i, spliternode):
        # concat the finnal regex str
        split_regexstr = ""
        split_regexstr += printregextree(rootnode_i.left_child_dotplaceholder)
        split_regexstr += rootnode_i.display()
        split_regexstr += printregextree(rootnode_i.right_child_dotplaceholder)
        return split_regexstr

    if isinstance(rootnode_i, dotplaceholdernode):
        return printregextree(rootnode_i.childnode)

    if isinstance(rootnode_i, charnode):
        return rootnode_i.display()

    if isinstance(rootnode_i, concat_node):
        concat_str = ""
        concat_str += printregextree(rootnode_i.left_child_node)
        concat_str += printregextree(rootnode_i.right_child_node)
        return concat_str

    if isinstance(rootnode_i, qualifiernode):
        return rootnode_i.display()


def matches(regex, strings):
    "Return a set of all the strings that are matched by regex."
    return {s for s in strings if re.search(regex, s)}


def regex_parts(M, U):
    "Return parts that match at least one winner, but no loser."
    wholes = {'^' + w + '$' for w in M}
    parts = {d for w in wholes for p in subparts(w) for d in p}
    return wholes | {p for p in parts if not matches(p, U)}


def subparts(word, N=5):
    "Return a set of subparts of word: consecutive characters up to length N (default 4)."
    return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))


def words(text):
    return set(text.split())


def makerandomtree(M, U, parentnode=None, splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0):
    if curren_level > maxdepth:
        print "curren_level > maxdepth: ", curren_level
        return
    # ROOT node
    if isinstance(parentnode, rootnode):
        curren_level = 0
        print "curren_level: ", curren_level
        # init root node
        print "init rootnode: ", curren_level
        rootnode_i = rootnode(
            dotplaceholdernode(None),
            dotplaceholdernode(None)
        )
        # create left child node
        print "new dotplaceholdernode"
        rootnode_i.left_child_node = makerandomtree(M, U, rootnode_i.left_child_node, splitrate, concatrate, charrate,
                                                    qualifierate, maxdepth, curren_level)
        print "new dotplaceholdernode"
        # create right child node
        rootnode_i.right_child_node = makerandomtree(M, U, rootnode_i.right_child_node, splitrate, concatrate, charrate,
                                                     qualifierate, maxdepth, curren_level)
        return rootnode_i

    # ".." dot placeholder node
    if isinstance(parentnode, dotplaceholdernode):
        curren_level += 1
        print "curren_level: ", curren_level
        # "|"
        if random() < splitrate:
            print "new spliternode"
            return makerandomtree(M, U, spliternode(None, None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)
        # ".."
        elif random() < concatrate:
            print "new concat_node"
            return makerandomtree(M, U, concat_node(None, None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)
        # "foo"
        elif random() < charrate:
            print "new charnode"
            return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)

    # "|" split node
    if isinstance(parentnode, spliternode):
        curren_level += 1
        print "curren_level: ", curren_level
        print "init spliternode"
        splitnode_i = spliternode(
            dotplaceholdernode(None),
            dotplaceholdernode(None)
        )
        print "new dotplaceholdernode"
        splitnode_i.left_child_dotplaceholder = makerandomtree(M, U, splitnode_i.left_child_dotplaceholder,
                                                               splitrate, concatrate, charrate, qualifierate,
                                                               maxdepth, curren_level)
        print "new dotplaceholdernode"
        splitnode_i.right_child_dotplaceholder = makerandomtree(M, U, splitnode_i.right_child_dotplaceholder,
                                                                splitrate, concatrate, charrate, qualifierate,
                                                                maxdepth, curren_level)
        return splitnode_i

    # ".." concat node
    if isinstance(parentnode, concat_node):
        curren_level += 1
        print "curren_level: ", curren_level
        # "foo"
        if random() < charrate:
            print "new charnode"
            return makerandomtree(M, U, charnode(None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)
        # "++"
        if random() < qualifierate:
            print "new qualifiernode"
            return makerandomtree(M, U, qualifiernode(None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)

    # "foo" char node
    if isinstance(parentnode, charnode):
        curren_level += 1
        print "curren_level: ", curren_level
        charnode_str = choice(list(regex_parts(M, U)))
        print "charnode_str: ", charnode_str
        print "new charnode"
        charnode_i = charnode(charnode_str)

        return charnode_i

    # "++" qualifierate node
    if isinstance(parentnode, qualifiernode):
        curren_level += 1
        print "curren_level: ", curren_level
        qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?'])
        print "qualifiernode_str: ", qualifiernode_str
        print "new qualifiernode"
        qualifiernode_i = qualifiernode(qualifiernode_str)

        return qualifiernode_i


if __name__ == '__main__':
    exampletree = exampletree()
    print type(exampletree), exampletree
    print printregextree(exampletree)

    M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage
            foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''')

    U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked
            fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''')

    print regex_parts(M, U)

    rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5,
                         qualifierate=0.5, maxdepth=12, curren_level=0)

    print rd1
    print printregextree(rd1)

能夠看到,隨機產生的題解並非一個好的題解,可是它倒是一個有效的題解。如今咱們有了一個好的開始,接下來咱們能夠着手進行遺傳變異了。

6. 遺傳變異

遺傳編程的遺傳變異分爲兩種:1)mutate;2)crossover,咱們分別來定義。

1)mutate

mutate變異遍歷整個regex tree,針對不一樣節點採起不一樣的變異策略:

  • 」concat node「:根據必定的機率決定是否隨機用一顆新樹代替兩個子節點
  • 」char node「節點:根據必定的機率決定是否隨機從ngram候選列表中選一個新的char sring代替
  • "qualifiernode node"節點:根據必定的機率決定是否隨機從修飾符候選集中選一個新的qualifiernode string代替
def mutate(M, U, t, probchange=0.2):
if random() < probchange:
return makerandomtree(M, U)
else:
result = deepcopy(t)
if hasattr(t, "left_concatchildnode"):
result.left_concatchildnode = mutate(M, U, t.left_concatchildnode, probchange)
if hasattr(t, "right_concatchildnode"):
result.right_concatchildnode = mutate(M, U, t.right_concatchildnode, probchange)
if hasattr(t, "childnode"):
result.childnode = mutate(M, U, t.childnode, probchange)
if hasattr(t, "qualifierstrig"):
result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?']))
if hasattr(t, "charstring"):
result.charstring = charnode(choice(list(regex_parts(M, U))))

return result

整體來講,mutate的做用在於向種羣中引入更多的多樣性,隨機性和多樣性是物種進化的原動力。

2)crossover

crossover交叉是同序遍歷兩棵regex tree,按照必定的機率決定是否要將各自的節點進行互換。

def crossover(t1, t2, probswap=0.7):
    if random() < probswap:
        return deepcopy(t2)
    else:
        result = deepcopy(t1)
        if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'):
            result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap)
        if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'):
            result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap)
        if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'):
            result.childnode = crossover(t1.childnode, t2.childnode, probswap)
        if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'):
            result.qualifierstrig = t2.qualifierstrig
        if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'):
            result.charstring = t2.charstring

    return result 

整體來講,crossover的做用在於加速優秀基因和保留,和劣質基因的淘汰。由於能夠這麼理解,由於crossover的存在,同一個基因模式在種羣中會有擴大的趨勢,而若是是優秀的基因則會不斷被保留。

7. 自動遺傳迭代進化

至此,regex tree遺傳進化的全部元素都已經準備穩當了,咱們如今能夠開始編寫遺傳進化算法主程序了,讓程序自動生成一段符合題解的正則。

# -*- coding: utf-8 -*-

from random import random, randint, choice
import re
from copy import deepcopy
import itertools
from math import log
import numpy as np
import os


# "ROOT"
class rootnode:
    def __init__(self, left_childnode, right_childnode):
        if left_childnode and right_childnode:
            self.left_childnode = left_childnode
            self.right_childnode = right_childnode
        else:
            self.left_childnode = node
            self.right_childnode = node

    def display(self):
        return "|"


# universal child node
class node:
    def __init__(self, node):
        self.node = node


# "|"
class spliternode:
    def __init__(self, left_childnode, right_childnode):
        if left_childnode and right_childnode:
            self.left_childnode = left_childnode
            self.right_childnode = right_childnode
        else:
            self.left_childnode = node
            self.right_childnode = node

    def display(self):
        return "|"


# "(.)"
class dotplaceholdernode:
    def __init__(self, childnode=None):
        if childnode:
            self.childnode = childnode
        else:
            self.childnode = node


# "foo"
class charnode:
    def __init__(self, charstring):
        if charstring:
            self.charstring = charstring
        else:
            self.charstring = node

    def display(self):
        return self.charstring


# ".."
class concat_node:
    def __init__(self, left_concatchildnode, right_concatchildnode):
        if left_concatchildnode and right_concatchildnode:
            self.left_concatchildnode = left_concatchildnode
            self.right_concatchildnode = right_concatchildnode
        else:
            self.left_concatchildnode = node
            self.right_concatchildnode = node


# "++"
class qualifiernode:
    def __init__(self, qualifierstrig):
        if qualifierstrig:
            self.qualifierstrig = qualifierstrig
        else:
            self.qualifierstrig = node

    def display(self):
        return self.qualifierstrig


def exampletree():
  return rootnode(
            dotplaceholdernode(
                charnode("foo")
            ),
            dotplaceholdernode(
                concat_node(
                    concat_node(
                        charnode("ba"),
                        qualifiernode("++")
                    ),
                    charnode("r")
                )
            )
        )


# left child deep first travel
def printregextree(rootnode_i):
    if rootnode_i is None:
        return ""

    if isinstance(rootnode_i, rootnode):
        # concat the finnal regex str
        finnal_regexstr = ""
        finnal_regexstr += printregextree(rootnode_i.left_childnode)
        finnal_regexstr += rootnode_i.display()
        finnal_regexstr += printregextree(rootnode_i.right_childnode)
        return finnal_regexstr

    if isinstance(rootnode_i, spliternode):
        # concat the finnal regex str
        split_regexstr = ""
        split_regexstr += printregextree(rootnode_i.left_childnode)
        split_regexstr += rootnode_i.display()
        split_regexstr += printregextree(rootnode_i.right_childnode)
        return split_regexstr

    if isinstance(rootnode_i, dotplaceholdernode):
        return printregextree(rootnode_i.childnode)

    if isinstance(rootnode_i, charnode):
        return rootnode_i.display()

    if isinstance(rootnode_i, concat_node):
        concat_str = ""
        concat_str += printregextree(rootnode_i.left_concatchildnode)
        concat_str += printregextree(rootnode_i.right_concatchildnode)
        return concat_str

    if isinstance(rootnode_i, qualifiernode):
        return rootnode_i.display()


def matches(regex, strings):
    "Return a set of all the strings that are matched by regex."
    return {s for s in strings if re.search(regex, s)}


def regex_parts(M, U):
    "Return parts that match at least one winner, but no loser."
    wholes = {'^' + w + '$' for w in M}
    parts = {d for w in wholes for p in subparts(w) for d in p}
    return wholes | {p for p in parts if not matches(p, U)}


def subparts(word, N=5):
    "Return a set of subparts of word: consecutive characters up to length N (default 4)."
    return set(word[i:i + n + 1] for i in range(len(word)) for n in range(N))


def words(text):
    return set(text.split())


def makerandomtree(M, U, charnode_pool, parentnode=rootnode(None, None), splitrate=0.7, concatrate=0.7, charrate=0.7, qualifierate=0.7, maxdepth=6, curren_level=0, stopearly=0.1):
    if curren_level > maxdepth:
        #print "curren_level > maxdepth: ", curren_level
        return
    if random() < stopearly:
        return
    # ROOT node
    if isinstance(parentnode, rootnode):
        curren_level = 0
        #print "curren_level: ", curren_level
        # init root node
        #print "init rootnode: ", curren_level
        rootnode_i = rootnode(
            dotplaceholdernode(None),
            dotplaceholdernode(None)
        )
        # create left child node
        #print "new dotplaceholdernode"
        rootnode_i.left_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.left_childnode, splitrate, concatrate, charrate,
                                                    qualifierate, maxdepth, curren_level)
        #print "new dotplaceholdernode"
        # create right child node
        rootnode_i.right_childnode = makerandomtree(M, U, charnode_pool, rootnode_i.right_childnode, splitrate, concatrate, charrate,
                                                     qualifierate, maxdepth, curren_level)
        return rootnode_i

    # ".." dot placeholder node
    if isinstance(parentnode, dotplaceholdernode):
        curren_level += 1
        #print "curren_level: ", curren_level
        # "|"
        if random() < splitrate:
            #print "new spliternode"
            return makerandomtree(M, U, charnode_pool, spliternode(None, None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)
        # ".."
        elif random() < concatrate:
            #print "new concat_node"
            return makerandomtree(M, U, charnode_pool, concat_node(None, None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)
        # "foo"
        elif random() < charrate:
            #print "new charnode"
            return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)

    # "|" split node
    if isinstance(parentnode, spliternode):
        curren_level += 1
        #print "curren_level: ", curren_level
        #print "init spliternode"
        splitnode_i = spliternode(
            dotplaceholdernode(None),
            dotplaceholdernode(None)
        )
        #print "new dotplaceholdernode"
        splitnode_i.left_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.left_childnode,
                                                               splitrate, concatrate, charrate, qualifierate,
                                                               maxdepth, curren_level)
        #print "new dotplaceholdernode"
        splitnode_i.right_childnode = makerandomtree(M, U, charnode_pool, splitnode_i.right_childnode,
                                                                splitrate, concatrate, charrate, qualifierate,
                                                                maxdepth, curren_level)
        return splitnode_i

    # ".." concat node
    if isinstance(parentnode, concat_node):
        curren_level += 1
        #print "curren_level: ", curren_level
        # "foo"
        if random() < charrate:
            #print "new charnode"
            return makerandomtree(M, U, charnode_pool, charnode(None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)
        # "++"
        if random() < qualifierate:
            #print "new qualifiernode"
            return makerandomtree(M, U, charnode_pool, qualifiernode(None), splitrate, concatrate, charrate,
                                  qualifierate, maxdepth, curren_level)

    # "foo" char node
    if isinstance(parentnode, charnode):
        curren_level += 1
        #print "curren_level: ", curren_level
        charnode_str = choice(charnode_pool)
        #print "charnode_str: ", charnode_str
        #print "new charnode"
        charnode_i = charnode(charnode_str)

        return charnode_i

    # "++" qualifierate node
    if isinstance(parentnode, qualifiernode):
        curren_level += 1
        #print "curren_level: ", curren_level
        qualifiernode_str = choice(['.', '+', '?', '*', '.*', '.+', '.*?'])
        #print "qualifiernode_str: ", qualifiernode_str
        #print "new qualifiernode"
        qualifiernode_i = qualifiernode(qualifiernode_str)

        return qualifiernode_i


def scorefunction(tree, M, U, w=1):
    dif = 0
    regex_str = printregextree(tree)
    M_cn, U_cn = 0, 0
    for s in list(M):
        try:
            if re.search(regex_str, s):
                M_cn += 1
        except Exception, e:
            # print e.message, "regex_str: ", regex_str
            # this regex tree is illegal, low socre!!
            return -32
    for u in list(U):
        if re.search(regex_str, u):
            U_cn += 1

    # print "M_cn: ", M_cn
    # print "U_cn: ", U_cn

    dif = w * (M_cn - 2*U_cn) - len(regex_str)

    return dif


def rankfunction_(M, U, population):
    scores = [(scorefunction(t, M, U), t) for t in population]
    # remove illegal regex
    scores_ = []
    for i in scores:
        if i[1]:
            scores_.append(i)
    scores_.sort(reverse=True)
    return scores_


def mutate(M, U, charnode_pool, t, probchange=0.4):
    if random() < probchange:
        return makerandomtree(M, U, charnode_pool)
    else:
        result = deepcopy(t)
        if hasattr(t, "left_concatchildnode"):
            result.left_concatchildnode = mutate(M, U, charnode_pool, t.left_concatchildnode, probchange)
        if hasattr(t, "right_concatchildnode"):
            result.right_concatchildnode = mutate(M, U, charnode_pool, t.right_concatchildnode, probchange)
        if hasattr(t, "childnode"):
            result.childnode = mutate(M, U, charnode_pool, t.childnode, probchange)
        if hasattr(t, "qualifierstrig"):
            result.qualifierstrig = qualifiernode(choice(['.', '+', '?', '*', '.*', '.+', '.*?']))
        if hasattr(t, "charstring"):
            result.charstring = charnode(choice(charnode_pool))

        return result


def crossover(t1, t2, probswap=0.5):
    if random() < probswap:
        return deepcopy(t2)
    else:
        result = deepcopy(t1)
        if hasattr(t1, 'left_childnode') and hasattr(t2, 'left_childnode'):
            result.left_childnode = crossover(t1.left_childnode, t2.left_childnode, probswap)
        if hasattr(t1, 'right_childnode') and hasattr(t2, 'right_childnode'):
            result.right_childnode = crossover(t1.right_childnode, t2.right_childnode, probswap)
        if hasattr(t1, 'childnode') and hasattr(t2, 'childnode'):
            result.childnode = crossover(t1.childnode, t2.childnode, probswap)
        if hasattr(t1, 'qualifierstrig') and hasattr(t2, 'qualifierstrig'):
            result.qualifierstrig = t2.qualifierstrig
        if hasattr(t1, 'charstring') and hasattr(t2, 'charstring'):
            result.charstring = t2.charstring

    return result


def evolve(M, U, charnode_pool, popsize=128, rankfunction=rankfunction_, maxgen=500, mutationrate=0.6, probswap=0.5, pexp=0.3, pnew=0.8):
    # Returns a random number, tending towards lower numbers.
    # The lower pexp is, more lower numbers you will get
    # probexp:表示在構造新種羣時,」選擇評價較低的程序「這一律率的遞減比例。該值越大,相應的篩選過程就越嚴格,即只選擇評價最高的多少比例的個體做爲複製對象
    def selectindex():
        return int(log(random()) / log(pexp))

    # Create a random initial population
    population = [makerandomtree(M, U, charnode_pool) for i in range(popsize)]
    scores = []
    for i in range(maxgen):
        scores = rankfunction(M, U, population)
        print scores[0]
        print "evole round: {0}, top score: {1}, regex_str: {2}".format(i, scores[0][0], printregextree(scores[0][1]))
        if scores[0][0] > 0:
            print "found good solution: {0}".format(printregextree(scores[0][1]))
            break

        # The top 20% always make it
        # newpop = np.array(scores)[:int(len(scores) * 0.2), 1].tolist()
        newpop = [scores[0][1], scores[1][1]]

        # Build the next generation
        # probnew:表示在構造新種羣時,」引入一個全新的隨機程序「的機率,該參數和probexp是」種羣多樣性「的重要決定參數
        while len(newpop) < popsize:
            if random() < pnew:
                newpop.append(
                    mutate(
                        M, U, charnode_pool,
                        crossover(
                            scores[selectindex()][1],
                            scores[selectindex()][1],
                            probswap
                        ),
                        mutationrate
                    )
                )
            else:
                # Add a random node to mix things up
                new_tree = makerandomtree(M, U, charnode_pool)
                # print "evole round: {0}, add new tree: {1}".format(i, printregextree(new_tree))
                newpop.append(new_tree)

        population = newpop
    # return the evolutionary results
    return scores[0][1]


def test_regex(M, U, regex_str):
    dif = 0
    M_cn, U_cn = 0, 0
    for s in list(M):
        try:
            if re.search(regex_str, s):
                M_cn += 1
        except Exception, e:
            # print e.message, "regex_str: ", regex_str
            # this regex tree is illegal, low socre!!
            dif = -32
    for u in list(U):
        try:
            if re.search(regex_str, u):
                U_cn += 1
        except Exception, e:
            # print e.message, "regex_str: ", regex_str
            # this regex tree is illegal, low socre!!
            dif = -32

    print "M_cn: ", M_cn
    print "U_cn: ", U_cn

    dif = 1 * (M_cn - 4 * U_cn) - 4 * len(regex_str)
    print "dif: ", dif


def test_regex_golf():
    # exampletree = exampletree()
    # print type(exampletree), exampletree
    # print printregextree(exampletree)

    M = words('''afoot catfoot dogfoot fanfoot foody foolery foolish fooster footage
                foothot footle footpad footway hotfoot jawfoot mafoo nonfood padfoot prefool sfoot unfool''')

    U = words('''Atlas Aymoro Iberic Mahran Ormazd Silipan altared chandoo crenel crooked
                fardo folksy forest hebamic idgah manlike marly palazzi sixfold tarrock unfold''')

    charnode_pool = list(regex_parts(M, U))
    print charnode_pool

    # rd1 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0)
    # rd2 = makerandomtree(M, U, parentnode=rootnode(None, None), splitrate=0.5, concatrate=0.5, charrate=0.5, qualifierate=0.5, maxdepth=12, curren_level=0)

    # print "rd1: "
    # print printregextree(rd1)
    # print "rd2: "
    # print printregextree(rd2)

    # dif = scorefunction(tree=rd1, M=M, U=U, w=1)
    # print "dif: ", dif
    # population = [makerandomtree(M, U) for i in range(10)]
    # for i in population:
    #      print printregextree(i)
    # scores = rankfunction_(M, U, population)
    # print "function score: ", scores

    # print np.array(scores)[:int(len(scores) * 0.2), 1].tolist()

    # rd1_mutate = mutate(M, U, rd1, probchange=0.2)
    # print "rd1_mutate: "
    # print printregextree(rd1_mutate)

    # rd1_rd2_crossover = crossover(rd1, rd2, probswap=0.7)
    # print "rd1_rd2_crossover: "
    # print printregextree(rd1_rd2_crossover)

    evolutionary_regex_str = evolve(M, U, charnode_pool)
    print printregextree(evolutionary_regex_str)


def load_data():
    M, U = [], []
    rootDir = "./blacksamples"
    for lists in os.listdir(rootDir):
        if lists == '.DS_Store':
            continue
        filepath = os.path.join(rootDir, lists)
        filecontent = open(filepath, 'r').read()
        # only remain English word
        cop = re.compile("[^^a-z^A-Z^0-9^\s]")
        # remove space
        filecontent = re.sub(r'\s+', '', filecontent).strip()
        filecontent = cop.sub('', filecontent)
        M.append(filecontent)
    rootDir = "./whitesamples"
    for lists in os.listdir(rootDir):
        if lists == '.DS_Store':
            continue
        filepath = os.path.join(rootDir, lists)
        filecontent = open(filepath, 'r').read()
        # only remain English word
        cop = re.compile("[^^a-z^A-Z^0-9^\s]")
        filecontent = cop.sub('', filecontent)
        # remove space
        filecontent = re.sub(r'\s+', '', filecontent).strip()
        U.append(filecontent)

    M = set(M)
    U = set(U)

    return M, U


def test_webshell():
    M, U = load_data()

    # print M

    charnode_pool = list(regex_parts(M, U))
    print charnode_pool
    print len(charnode_pool)

    evolutionary_regex_str = evolve(M, U, charnode_pool)
    print printregextree(evolutionary_regex_str)

    print "test_regex: "
    test_regex(M, U, charnode_pool)


if __name__ == '__main__':
    test_webshell()
    # test_regex_golf()

代碼中的blacksamples、whitesamples請讀者朋友自行準備。 

Relevant Link:  

http://www.algorithmdog.com/%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95%e7%b3%bb%e5%88%97%e4%b9%8b%e4%ba%8c%e6%84%9a%e5%bc%84%e6%b7%b1%e5%ba%a6%e5%ad%a6%e4%b9%a0%e7%9a%84%e9%81%97%e4%bc%a0%e7%ae%97%e6%b3%95
Bartoli, Alberto, et al. 「Playing regex golf with genetic programming.」 Proceedings of the 2014 conference on Genetic and evolutionary computation. ACM, 2014. 
https://alf.nu/RegexGolf
http://www.doc88.com/p-0387699026353.html
http://regex.inginf.units.it/golf/# 
https://github.com/norvig/pytudes
https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313.ipynb
https://github.com/norvig/pytudes/blob/master/ipynb/xkcd1313-part2.ipynb

 

6. 遺傳編程可以應用在網絡安全攻防上?

咱們來回顧一下要應用遺傳編程在某個具體場景中,須要的兩個必要條件:

  • 可以明肯定義出可數值化的損失函數:針對每一次變種後的結果都可以實時計算出全部個體對當前環境的適應度(被判黑的程度)
  • 有明確生成外部表象的內顯子算法:例如PHP Token Tree、Regex Tree、Four fundamental operations of arithmeticTree,可以按照某種深度優先遍歷算法,對整個AST Tree進行遍歷,在遍歷的過程當中完成節點變異和個體間交叉
  • 基於內顯子算法生成的外顯子須要具有業務可解釋性:和正則表達式,數學函數方程式這種純數學概念的外顯子不一樣,在安全領域,對生成的文本還有一個「業務可解釋性」的要求。例如說基於cmd ast tree生成了一段cmdline,雖然可能這段cmdline是符合cmdline語法的,可是自己不具有攻擊性,即這個cmdline沒法對被執行對象完成特定的攻擊目的。也許有讀者會說,那很簡單,咱們只要在內顯子變異算法上增長一個約束條件,強制生成的外顯子字符串具有攻擊性不就行了嗎?可是最難的問題就在這裏,一個cmdline是否具有攻擊性,具有多大的攻擊性,是很是難經過數學方式形式化定義的

0x1:基於遺傳編程自動挖掘0day樣本

基於php token生成一個php ast tree,而且在損失函數中找到必定定量評估方法,判斷當前文件的惡意程度。用遺傳編程自動生成一些能夠繞過當前檢測機制的php webshell

  • 隨機化初始化一棵php token tree
  • 基於token tree重構出原始文件
  • 個體適應度判斷:這一步有兩種不一樣的思路,
    • 繞過全部檢測引擎發現0day樣本的優化目標:經過多個檢測引擎對文件進行惡意行爲檢測,並根據命中狀況計算損失函數值,這麼作的目的是區分出種羣中適應度不一樣的個體。
    • 繞過單個機率檢測引擎的優化目標:對於想深度學習sigmoid損失函數來講,倒數最後一層sigmoid函數輸出的是一個置信機率,區間是【0,1】,這就給不一樣的個體賦予了不一樣的適應度,遺傳編程能夠經過優化嘗試下降這個置信機率,使之逃過模型的判黑閾值,這也是一種攻擊深度學習算法模型的方式
  • 篩選出本輪中適應度最高(損失值最低)的個體,按照標準遺傳編程進行種羣繁殖
  • 直到找到一個損失值爲零(徹底繞過現有檢測體系的新文件)

筆者提醒

本質上來講,遺傳編程的優化方向是隨機的,和梯度驅動的SGD優化算法相比,遺傳編程每次的迭代優化並不明確朝某個方向前進,而是被動由環境來進行淘汰和篩選,因此是一種環境選擇壓驅動的優化算法。

遺傳編程的這種的優化特性特別適合像「惡意樣本檢測」這種「階躍損失」的分類問題,由於對於惡意樣原本說,只有兩種狀態,「黑或白」,損失函數的值也只有兩種,「0或者1」,所以,咱們沒法用SGD相似的算法來優化這種階躍損失函數問題,由於階躍點處的梯度要麼不存在(左極限),要麼是無窮大的(右極限)。

可是遺傳編程依然能在每輪篩選出「優勝者」,並按照必定的策略保留優勝者,進行交叉和變異以進入下一輪,同時也會按照必定的機率挑選部分的「失敗者」也進入下一輪進化,這麼作的目的是引入多樣性。

相關文章
相關標籤/搜索