先看一個例子,某銀行是否給用戶放貸的判斷規則集以下:html
if 年齡==青年: if 有工做==是: if 信貸狀況==很是好: 放 else: 不放 else: if 有本身的房子==是: if 信貸狀況==通常: 不放 else: 放 else: if 信貸狀況==很是好 or 信貸狀況==好: 放 else: if 有工做==是: 放 else: 不放 elif 年齡==中年: if 有本身的房子==是: 放 else: if 信貸狀況==很是好 or 信貸狀況==好: 放 else: if 有工做==是: 放 else: 不放 elif 年齡==老年: if 有本身的房子==是: if 信貸狀況==很是好 or 信貸狀況==好: 放 else: 不放 else: if 信貸狀況==很是好 or 信貸狀況==好: if 有工做==是: 放 else: 不放 else: 不放 if 有本身的房子==是: 放 else: if 有工做==是: 放 else: 不放
眼力好的同窗立馬會發現這代碼寫的有問題,好比只要信貸狀況==很是好
的用戶都有放款,何須嵌到裏面去?並且不少規則有冗餘,爲何不重構一下呀?但現實狀況是你可能真不敢隨意亂動!由於指不定哪天項目經理又要新增長規則了,因此寧肯讓代碼愈來愈冗餘,愈來愈複雜,也不敢隨意亂動以前的規則,亂動兩條,可能會帶來意想不到的災難。簡單總結一下這種複雜嵌套的if else
規則可能存在的痛點:node
(1)規則可能不完備,存在某些匹配不上的狀況;python
(2)規則之間存在冗餘,多個if else
狀況實際上是判斷的一樣的條件;算法
(3)嚴重時,可能會出現矛盾的狀況,即相同的條件,即有放,又有不放;app
(4)判斷規則的優先級混亂,好比信貸狀況
因子能夠優先考慮,由於只要它是很是好
就能夠放款,而沒必要先判斷其它條件dom
而決策樹算法就能解決以上痛點,它能保證全部的規則互斥且完備,即用戶的任意一種狀況必定能匹配上一條規則,且該規則惟一,這樣就能解決上面的痛點1~3,且規則判斷的優先級也很不錯,下面介紹決策樹學習算法。函數
決策樹算法能夠從已標記的數據中自動學習出if else
規則集,以下圖(圖片來源>>>),左邊是收集的一系列判斷是否打球的案例,包括4個特徵outlook,temperature,Humidity,Wind,以及y標籤是否打球,經過決策樹學習後獲得右邊的決策樹,決策樹的結構如圖所示,它由節點和有向邊組成,而節點又分爲兩種:葉子節點和非葉子節點,非葉子節點主要用於對某一特徵作判斷,而它下面所連接的有向邊表示該特徵所知足的某條件,最終的葉子節點即表示實例的預測值(分類/迴歸)學習
決策樹學習主要分爲兩個階段,決策樹生成和決策樹剪枝,決策樹生成階段最重要即是特徵選擇,下面對相關概念作介紹:測試
特徵選擇用於選擇對分類有用的特徵,ID3和C4.5一般選擇的準則是信息增益和信息增益比,下面對其做介紹並實現ui
首先介紹兩個隨機變量之間的互信息公式:
這裏\(H(X)\)表示\(X\)的熵,在最大熵模型那一節已作過介紹:
條件熵\(H(Y|X)\)表示在已知隨機變量\(X\)的條件下,隨機變量\(Y\)的不肯定性:
而信息增益就是\(Y\)取分類標籤,\(X\)取某一特徵時的互信息,它表示若是選擇特徵\(X\)對數據進行分割,可使得分割後\(Y\)分佈的熵下降多少,若下降的越多,說明分割每一個子集的\(Y\)的分佈越集中,則\(X\)對分類標籤\(Y\)越有用,下面進行python實現:
""" 定義計算熵的函數,封裝到ml_models.utils """ import numpy as np from collections import Counter import math def entropy(x,sample_weight=None): x=np.asarray(x) #x中元素個數 x_num=len(x) #若是sample_weight爲None設均設置同樣 if sample_weight is None: sample_weight=np.asarray([1.0]*x_num) x_counter={} weight_counter={} # 統計各x取值出現的次數以及其對應的sample_weight列表 for index in range(0,x_num): x_value=x[index] if x_counter.get(x_value) is None: x_counter[x_value]=0 weight_counter[x_value]=[] x_counter[x_value]+=1 weight_counter[x_value].append(sample_weight[index]) #計算熵 ent=.0 for key,value in x_counter.items(): p_i=1.0*value*np.mean(weight_counter.get(key))/x_num ent+=-p_i*math.log(p_i) return ent
#測試 entropy([1,2])
0.6931471805599453
def cond_entropy(x, y,sample_weight=None): """ 計算條件熵:H(y|x) """ x=np.asarray(x) y=np.asarray(y) # x中元素個數 x_num = len(x) #若是sample_weight爲None設均設置同樣 if sample_weight is None: sample_weight=np.asarray([1.0]*x_num) # 計算 ent = .0 for x_value in set(x): x_index=np.where(x==x_value) new_x=x[x_index] new_y=y[x_index] new_sample_weight=sample_weight[x_index] p_i=1.0*len(new_x)/x_num ent += p_i * entropy(new_y,new_sample_weight) return ent
#測試 cond_entropy([1,2],[1,2])
0.0
def muti_info(x, y,sample_weight=None): """ 互信息/信息增益:H(y)-H(y|x) """ x_num=len(x) if sample_weight is None: sample_weight=np.asarray([1.0]*x_num) return entropy(y,sample_weight) - cond_entropy(x, y,sample_weight)
接下來,作一個測試,看特徵的取值的個數對信息增益的影響
import random import numpy as np import matplotlib.pyplot as plt %matplotlib inline #做epochs次測試 epochs=100 #x的取值的個數:2->class_num_x class_num_x=100 #y標籤類別數 class_num_y=2 #樣本數量 num_samples=500 info_gains=[] for _ in range(0,epochs): info_gain=[] for class_x in range(2,class_num_x): x=[] y=[] for _ in range(0,num_samples): x.append(random.randint(1,class_x)) y.append(random.randint(1,class_num_y)) info_gain.append(muti_info(x,y)) info_gains.append(info_gain) plt.plot(np.asarray(info_gains).mean(axis=0))
[<matplotlib.lines.Line2D at 0x21ed2625ba8>]
能夠發現一個頗有意思的現象,若是特徵的取值的個數越多,越容易被選中,這比較好理解,假設一個極端狀況,若對每個實例特徵\(x\)的取值都不一樣,則其\(H(Y|X)\)項爲0,則\(MI(X,Y)=H(Y)-H(Y|X)\)將會取得最大值(\(H(Y)\)與\(X\)無關),這即是ID3算法的一個痛點,爲了矯正這一問題,C4.5算法利用信息增益比做特徵選擇
信息增益比其實就是對信息增益除以了一個\(x\)的熵:
def info_gain_rate(x, y,sample_weight=None): """ 信息增益比 """ x_num=len(x) if sample_weight is None: sample_weight=np.asarray([1.0]*x_num) return 1.0 * muti_info(x, y,sample_weight) / (1e-12 + entropy(x,sample_weight))
接下來再做一次相同的測試:
#做epochs次測試 epochs=100 #x的取值的個數:2->class_num_x class_num_x=100 #y標籤類別數 class_num_y=2 #樣本數量 num_samples=500 info_gain_rates=[] for _ in range(0,epochs): info_gain_rate_=[] for class_x in range(2,class_num_x): x=[] y=[] for _ in range(0,num_samples): x.append(random.randint(1,class_x)) y.append(random.randint(1,class_num_y)) info_gain_rate_.append(info_gain_rate(x,y)) info_gain_rates.append(info_gain_rate_) plt.plot(np.asarray(info_gain_rates).mean(axis=0))
[<matplotlib.lines.Line2D at 0x21ed26da978>]
雖然總體仍是上升的趨勢,當相比於信息增益已經緩解了不少,將它們畫一塊兒直觀感覺一下:
plt.plot(np.asarray(info_gains).mean(axis=0),'r') plt.plot(np.asarray(info_gain_rates).mean(axis=0),'y')
[<matplotlib.lines.Line2D at 0x21ed267e860>]
決策樹的生成就是一個遞歸地調用特徵選擇的過程,首先從根節點開始,利用信息增益/信息增益比選擇最佳的特徵做爲節點特徵,由該特徵的不一樣取值創建子節點,而後再對子節點調用以上方法,直到全部特徵的信息增益/信息增益比均很小或者沒有特徵能夠選擇時中止,最後獲得一顆決策樹。接下來直接進行代碼實現:
import os os.chdir('../') from ml_models import utils from ml_models.wrapper_models import DataBinWrapper """ ID3和C4.5決策樹分類器的實現,放到ml_models.tree模塊 """ class DecisionTreeClassifier(object): class Node(object): """ 樹節點,用於存儲節點信息以及關聯子節點 """ def __init__(self, feature_index: int = None, target_distribute: dict = None, weight_distribute: dict = None, children_nodes: dict = None, num_sample: int = None): """ :param feature_index: 特徵id :param target_distribute: 目標分佈 :param weight_distribute:權重分佈 :param children_nodes: 孩子節點 :param num_sample:樣本量 """ self.feature_index = feature_index self.target_distribute = target_distribute self.weight_distribute = weight_distribute self.children_nodes = children_nodes self.num_sample = num_sample def __init__(self, criterion='c4.5', max_depth=None, min_samples_split=2, min_samples_leaf=1, min_impurity_decrease=0, max_bins=10): """ :param criterion:劃分標準,包括id3,c4.5,默認爲c4.5 :param max_depth:樹的最大深度 :param min_samples_split:當對一個內部結點劃分時,要求該結點上的最小樣本數,默認爲2 :param min_samples_leaf:設置葉子結點上的最小樣本數,默認爲1 :param min_impurity_decrease:打算劃分一個內部結點時,只有當劃分後不純度(能夠用criterion參數指定的度量來描述)減小值不小於該參數指定的值,纔會對該結點進行劃分,默認值爲0 """ self.criterion = criterion if criterion == 'c4.5': self.criterion_func = utils.info_gain_rate else: self.criterion_func = utils.muti_info self.max_depth = max_depth self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.min_impurity_decrease = min_impurity_decrease self.root_node: self.Node = None self.sample_weight = None self.dbw = DataBinWrapper(max_bins=max_bins) def _build_tree(self, current_depth, current_node: Node, x, y, sample_weight): """ 遞歸進行特徵選擇,構建樹 :param x: :param y: :param sample_weight: :return: """ rows, cols = x.shape # 計算y分佈以及其權重分佈 target_distribute = {} weight_distribute = {} for index, tmp_value in enumerate(y): if tmp_value not in target_distribute: target_distribute[tmp_value] = 0.0 weight_distribute[tmp_value] = [] target_distribute[tmp_value] += 1.0 weight_distribute[tmp_value].append(sample_weight[index]) for key, value in target_distribute.items(): target_distribute[key] = value / rows weight_distribute[key] = np.mean(weight_distribute[key]) current_node.target_distribute = target_distribute current_node.weight_distribute = weight_distribute current_node.num_sample = rows # 判斷中止切分的條件 if len(target_distribute) <= 1: return if rows < self.min_samples_split: return if self.max_depth is not None and current_depth > self.max_depth: return # 尋找最佳的特徵 best_index = None best_criterion_value = 0 for index in range(0, cols): criterion_value = self.criterion_func(x[:, index], y) if criterion_value > best_criterion_value: best_criterion_value = criterion_value best_index = index # 若是criterion_value減小不夠則中止 if best_index is None: return if best_criterion_value <= self.min_impurity_decrease: return # 切分 current_node.feature_index = best_index children_nodes = {} current_node.children_nodes = children_nodes selected_x = x[:, best_index] for item in set(selected_x): selected_index = np.where(selected_x == item) # 若是切分後的點太少,以致於都不能作葉子節點,則中止分割 if len(selected_index[0]) < self.min_samples_leaf: continue child_node = self.Node() children_nodes[item] = child_node self._build_tree(current_depth + 1, child_node, x[selected_index], y[selected_index], sample_weight[selected_index]) def fit(self, x, y, sample_weight=None): # check sample_weight n_sample = x.shape[0] if sample_weight is None: self.sample_weight = np.asarray([1.0] * n_sample) else: self.sample_weight = sample_weight # check sample_weight if len(self.sample_weight) != n_sample: raise Exception('sample_weight size error:', len(self.sample_weight)) # 構建空的根節點 self.root_node = self.Node() # 對x分箱 self.dbw.fit(x) # 遞歸構建樹 self._build_tree(1, self.root_node, self.dbw.transform(x), y, self.sample_weight) # 檢索葉子節點的結果 def _search_node(self, current_node: Node, x, class_num): if current_node.feature_index is None or current_node.children_nodes is None or len( current_node.children_nodes) == 0 or current_node.children_nodes.get( x[current_node.feature_index]) is None: result = [] total_value = 0.0 for index in range(0, class_num): value = current_node.target_distribute.get(index, 0) * current_node.weight_distribute.get(index, 1.0) result.append(value) total_value += value # 歸一化 for index in range(0, class_num): result[index] = result[index] / total_value return result else: return self._search_node(current_node.children_nodes.get(x[current_node.feature_index]), x, class_num) def predict_proba(self, x): # 計算結果機率分佈 x = self.dbw.transform(x) rows = x.shape[0] results = [] class_num = len(self.root_node.target_distribute) for row in range(0, rows): results.append(self._search_node(self.root_node, x[row], class_num)) return np.asarray(results) def predict(self, x): return np.argmax(self.predict_proba(x), axis=1)
#造僞數據 from sklearn.datasets import make_classification data, target = make_classification(n_samples=100, n_features=2, n_classes=2, n_informative=1, n_redundant=0, n_repeated=0, n_clusters_per_class=1, class_sep=.5,random_state=21)
#訓練查看效果 tree = DecisionTreeClassifier(max_bins=15) tree.fit(data, target) utils.plot_decision_function(data, target, tree)
能夠發現,若是不對決策樹施加一些限制,它會嘗試創造很細碎的規則去使全部的訓練樣本正確分類,這無疑會使得模型過擬合,因此接下來須要對其進行減枝操做,避免其過擬合
顧名思義,剪掉一些沒必要要的葉子節點,那麼如何肯定那些葉子節點須要去掉,哪些不須要去掉呢?這能夠經過構建損失函數來量化,若是剪掉某一葉子結點後損失函數能減小,則進行剪枝操做,若是不能減小則不剪枝。一種簡單的量化損失函數能夠定義以下:
這裏\(\mid T \mid\)表示樹\(T\)的葉結點個數,\(t\)是樹\(\mid T \mid\)的葉結點,該葉節點有\(N_t\)個樣本點,其中\(k\)類樣本點有\(N_{tk}\)個,\(k=1,2,3,...,K\),\(H_t(T)\)爲葉結點\(t\)上的經驗熵,\(\alpha\geq 0\)爲超參數,其中:
該損失函數能夠分爲兩部分,第一部分\(\sum_{t=1}^{\mid T\mid}N_tH_t(T)\)爲經驗損失,第二部分\(\mid T \mid\)爲結構損失,\(\alpha\)爲調節其平衡度的係數,若是\(\alpha\)越大則模型結構越簡單,越不容易過擬合,接下來進行剪枝的代碼實現:
def _prune_node(self, current_node: Node, alpha): # 若是有子結點,先對子結點部分剪枝 if current_node.children_nodes is not None and len(current_node.children_nodes) != 0: for child_node in current_node.children_nodes.values(): self._prune_node(child_node, alpha) # 再嘗試對當前結點剪枝 if current_node.children_nodes is not None and len(current_node.children_nodes) != 0: # 避免跳層剪枝 for child_node in current_node.children_nodes.values(): # 當前剪枝的層必須是葉子結點的層 if child_node.children_nodes is not None and len(child_node.children_nodes) > 0: return # 計算剪枝前的損失值 pre_prune_value = alpha * len(current_node.children_nodes) for child_node in current_node.children_nodes.values(): for key, value in child_node.target_distribute.items(): pre_prune_value += -1 * child_node.num_sample * value * np.log( value) * child_node.weight_distribute.get(key, 1.0) # 計算剪枝後的損失值 after_prune_value = alpha for key, value in current_node.target_distribute.items(): after_prune_value += -1 * current_node.num_sample * value * np.log( value) * current_node.weight_distribute.get(key, 1.0) if after_prune_value <= pre_prune_value: # 剪枝操做 current_node.children_nodes = None current_node.feature_index = None def prune(self, alpha=0.01): """ 決策樹剪枝 C(T)+alpha*|T| :param alpha: :return: """ # 遞歸剪枝 self._prune_node(self.root_node, alpha)
from ml_models.tree import DecisionTreeClassifier #訓練查看效果 tree = DecisionTreeClassifier(max_bins=15) tree.fit(data, target) tree.prune(alpha=1.5) utils.plot_decision_function(data, target, tree)
經過探索\(\alpha\),咱們能夠獲得一個比較使人滿意的剪枝結果,這樣的剪枝方式一般又被稱爲後剪枝,即從一顆完整生成後的樹開始剪枝,與其對應的還有預剪枝,即在訓練過程當中就對其進行剪枝操做,這一般須要另外構建一份驗證集作支持,這裏就不實現了,另外比較一般的作法是,經過一些參數來控制模型的複雜度,好比max_depth
控制樹的最大深度,min_samples_leaf
控制葉子結點的最小樣本數,min_impurity_decrease
控制特徵劃分後的最小不純度,min_samples_split
控制結點劃分的最小樣本數,經過調節這些參數,一樣能夠達到剪枝的效果,好比下面經過控制葉結點的最小數量達到了和上面剪枝同樣的效果:
tree = DecisionTreeClassifier(max_bins=15,min_samples_leaf=3) tree.fit(data, target) utils.plot_decision_function(data, target, tree)
決策樹還能夠看做是給定特徵條件下類的條件機率分佈:
(1)訓練時,決策樹會將特徵空間劃分爲大大小小互不相交的區域,而每一個區域對應了一個類的機率分佈;
(2)預測時,落到某區域的樣本點的類標籤便是該區域對應機率最大的那個類