《機器學習_05_線性模型_最大熵模型》

import numpy as np
import os
os.chdir('../')
import matplotlib.pyplot as plt
%matplotlib inline

一.最大熵原理

最大熵的思想很樸素,即將已知事實之外的未知部分看作「等可能」的,而熵是描述「等可能」大小很合適的量化指標,熵的公式以下:python

\[H(p)=-\sum_{i}p_i log p_i \]

這裏分佈\(p\)的取值有\(i\)種狀況,每種狀況的機率爲\(p_i\),下圖繪製了二值隨機變量的熵:git

p=np.linspace(0.1,0.9,90)
def entropy(p):
    return -np.log(p)*p-np.log(1-p)*(1-p)
plt.plot(p,entropy(p))
[<matplotlib.lines.Line2D at 0x245a3d6d278>]

png

當二者機率均爲0.5時,熵取得最大值,經過最大化熵,可使得分佈更「等可能」;另外,熵還有優秀的性質,它是一個凹函數,因此最大化熵實際上是一個凸問題。app

對於「已知事實」,能夠用約束條件來描述,好比4個值的隨機變量分佈,其中已知\(p_1+p_2=0.4\),它的求解能夠表述以下:dom

\[\max_{p} -\sum_{i=1}^4 p_ilogp_i \\ s.t. p_1+p_2=0.4\\ p_i\geq 0,i=1,2,3,4\\ \sum_i p_i=1 \]

顯然,最優解爲:\(p_1=0.2,p_2=0.2,p_3=0.3,p_4=0.3\)函數

二.最大熵模型

最大熵模型是最大熵原理在分類問題上的應用,它假設分類模型是一個條件機率分佈\(P(Y|X)\),即對於給定的輸入\(X\),以機率\(P(Y|X)\)輸出\(Y\),這時最大熵模型的目標函數定義爲條件熵:學習

\[H(P)=-\sum_{x,y}\tilde{P}(x)P(y|x)logP(y|x) \]

這裏,\(\tilde{P}(x)\)表示邊緣分佈\(P(X)\)的經驗分佈,\(\tilde{P}(x)=\frac{v(X=x)}{N}\)\(v(X=x)\)表示訓練樣本中輸入\(x\)出現的次數,\(N\)表示訓練樣本的總數。測試

而最大熵模型的「已知事實」能夠經過以下等式來約束:優化

\[\sum_{x,y}\tilde{P}(x)P(y|x)f(x,y)=\sum_{x,y}\tilde{P}(x,y)f(x,y) \]

爲了方便,左邊式子記着\(E_P(f)\),右邊式子記着\(E_{\tilde{P}}(f)\),等式描述的是某函數\(f(x,y)\)關於模型\(P(Y|X)\)與經驗分佈\(\tilde{P}(X)\)的指望與函數\(f(x,y)\)關於經驗分佈\(\tilde{P}(X,Y)\)的指望相同。(這裏\(\tilde{P}(x,y)=\frac{v(X=x,Y=y)}{N}\))
因此重要的約束信息將由\(f(x,y)\)來表示,它的定義以下:ui

\[f(x,y)=\left\{\begin{matrix} 1 & x與y知足某一事實\\ 0 & 不然 \end{matrix}\right. \]

故最大熵模型能夠理解爲,模型在某些事實發生的指望和訓練集相同的條件下,使得條件熵最大化。因此,對於有\(n\)個約束條件的最大熵模型能夠表示爲:spa

\[\max_P -\sum_{x,y}\tilde{P}(x)P(y|x)logP(y|x) \\ s.t. E_P(f_i)=E_{\tilde{P}}(f_i),i=1,2,...,n\\ \sum_y P(y|x)=1 \]

按照優化問題的習慣,能夠改寫爲以下:

\[\min_P \sum_{x,y}\tilde{P}(x)P(y|x)logP(y|x) \\ s.t. E_P(f_i)-E_{\tilde{P}}(f_i)=0,i=1,2,...,n\\ \sum_y P(y|x)-1=0 \]

因爲目標函數爲凸函數,約束條件爲仿射,因此咱們能夠經過求解對偶問題,獲得原始問題的最優解,首先引入拉格朗日乘子\(w_0,w_1,...,w_n\),定義拉格朗日函數\(L(P,w)\)

\[L(P,w)=-H(P)+w_0(1-\sum_yP(y|x)+\sum_{i=1}^nw_i(E_{\tilde{P}}(f_i))-E_P(f_i)) \]

因此原問題等價於:

\[\min_P\max_w L(P,w) \]

它的對偶問題:

\[\max_w\min_P L(P,w) \]

首先,解裏面的 \(\min_P L(P,w)\),其實對於\(\forall w\)\(L(P,w)\)都是關於\(P\)的凸函數,由於\(-H(P)\)是關於\(P\)的凸函數,然後面的\(w_0(1-\sum_yP(y|x)+\sum_{i=1}^nw_i(E_{\tilde{P}}(f_i))-E_P(f_i))\)是關於\(P(y|x)\)的仿射函數,因此求\(L(P,w)\)\(P\)的偏導數,並令其等於0,便可解得最優的\(P(y|x)\),記爲\(P_w(y|x)\),即:

\[\frac{\partial L(P,w)}{\partial P(y|x)}=\sum_{x,y}\tilde{P}(x)(logP(y|x)+1)-\sum_yw_0+\sum_{i=1}^n\sum_{x,y}\tilde{P}(x)f_i(x,y)w_i\\ =\sum_{x,y}\tilde{P}(x)(logP(y|x)+1-w_0-\sum_{i=1}^nw_if_i(x,y))\\ =0 \]

在訓練集中對任意樣本\(\forall x,y\),都有\(\tilde{P}(x)(logP(y|x)+1-w_0-\sum_{i=1}^nw_if_i(x,y))=0\),顯然\(\tilde{P}(x)>0\)(\(x\)原本就是訓練集中的一個樣本,天然機率大於0),因此\(logP(y|x)+1-w_0-\sum_{i=1}^nw_if_i(x,y)=0\),因此:

\[P_w(y|x)=exp(\sum_{i=1}^nw_if_i(x,y)+w_0-1)\\ =\frac{exp(\sum_{i=1}^nw_if_i(x,y))}{exp(1-w_0)}\\ =\frac{exp(\sum_{i=1}^nw_if_i(x,y))}{\sum_y exp(\sum_{i=1}^nw_if_i(x,y))} \]

這就是最大熵模型的表達式(最後一步變換用到了\(\sum_y P(y|x)=1\)),這裏\(w\)便是模型的參數,聰明的童鞋其實已經發現,最大熵模型其實就是一個線性函數外面套了一個softmax函數,它大概就是以下圖所示的這麼回事:

接下來,將\(L(P_w,w)\)帶入外層的\(max\)函數,便可求解最優的參數\(w^*\)

\[w^*=arg\max_w L(P_w,w) \]

推導一下模型的梯度更新公式:

\[L(P_w,w)=\sum_{x,y}\tilde{P}(x)P_w(y|x)logP_w(y|x)+\sum_{i=1}^nw_i\sum_{x,y}(\tilde{P}(x,y)f_i(x,y)-\tilde{P}(x)P_w(y|x)f_i(x,y))\\ =\sum_{x,y}\tilde{P}(x,y)\sum_{i=1}^nw_if_i(x,y)+\sum_{x,y}\tilde{P}(x)P_w(y|x)(logP_w(y|x)-\sum_{i=1}^nw_if_i(x,y))\\ =\sum_{x,y}\tilde{P}(x,y)\sum_{i=1}^nw_if_i(x,y)-\sum_{x,y}\tilde{P}(x)P_w(y|x)log(\sum_{y^{'}}exp(\sum_{i=1}^nw_if_i(x,y^{'})))\\ =\sum_{x,y}\tilde{P}(x,y)\sum_{i=1}^nw_if_i(x,y)-\sum_{x}\tilde{P}(x)log(\sum_{y^{'}}exp(\sum_{i=1}^nw_if_i(x,y^{'})))\\ =\sum_{x,y}\tilde{P}(x,y)w^Tf(x,y)-\sum_{x}\tilde{P}(x)log(\sum_{y^{'}}exp(w^Tf(x,y^{'}))) \]

這裏,倒數第三步到倒數第二步用到了\(\sum_yP(y|x)=1\),最後一步中\(w=[w_1,w_2,...,w_n]^T,f(x,y)=[f_1(x,y),f_2(x,y),...,f_n(x,y)]^T\),因此:

\[\frac{\partial L(P_w,w)}{\partial w}=\sum_{x,y}\tilde{P}(x,y)f(x,y)-\sum_x\tilde{P}(x)\frac{exp(w^Tf(x,y))f(x,y)}{\sum_{y^{'}}exp(w^Tf(x,y^{'}))} \]

因此,天然\(w\)的更新公式:

\[w=w+\eta\frac{\partial L(P_w,w)}{\partial w} \]

這裏,\(\eta\)是學習率

三.對特徵函數的進一步理解

上面推導出了最大熵模型的梯度更新公式,想必你們對\(f(x,y)\)仍是有點疑惑,「知足某一事實」這句話該如何理解?這其實與咱們的學習目的相關,學習目的決定了咱們的「事實」,好比有這樣一個任務,判斷「打」這個詞是量詞仍是動詞,咱們收集了以下的語料:

句子/\(x\) 目標/\(y\)
\(x_1:\)一打火柴 \(y_1:\)量詞
\(x_2:\)三打啤酒 \(y_2:\)量詞
\(x_3:\)打電話 \(y_3:\) 動詞
\(x_4:\)打籃球 \(y_4:\) 動詞

經過觀察,咱們能夠設計以下的兩個特徵函數來分別識別"量詞"和"動詞"任務:

\[f_1(x,y)=\left\{\begin{matrix} 1 & "打"前是數字\\ 0 & 不然 \end{matrix}\right. \]

\[f_2(x,y)=\left\{\begin{matrix} 1 & "打"後是名詞,且前面無數字\\ 0 & 不然 \end{matrix}\right. \]

固然,你也能夠設計這樣的特徵函數來作識別「量詞」的任務:

\[f(x,y)=\left\{\begin{matrix} 1 & "打"前是"一","打"後是"火柴"\\ 0 & 不然 \end{matrix}\right. \]

\[f(x,y)=\left\{\begin{matrix} 1 & "打"前是"三","打"後是"啤酒"\\ 0 & 不然 \end{matrix}\right. \]

只是,這樣的特徵函數設計會使得模型學習能力變弱,好比遇到「三打火柴」,採用後面的特徵函數設計就識別不出「打」是量詞,而採用第一種特徵函數設計就能很好的識別出來,因此要使模型具備更好的泛化能力,就須要設計更好的特徵函數,而這每每依賴於人工經驗,對於天然語言處理這類任務(好比上面的例子),咱們能夠較容易的概括總結出一些有用的經驗知識,可是對於其餘狀況,人工每每難以總結出通常性的規律,因此對於這些問題,咱們須要設計更「通常」的特徵函數。

一種簡單的特徵函數設計

咱們能夠簡單考慮\(x\)的某個特徵取某個值和\(y\)取某個類的組合作特徵函數(對於連續型特徵,能夠採用分箱操做),因此咱們能夠設計這樣兩類特徵函數:

(1)離散型:

\[f(x,y)=\left\{\begin{matrix} 1 & x_i=某值,y=某類\\ 0 & 不然 \end{matrix}\right. \]

(2)連續型:

\[f(x,y)=\left\{\begin{matrix} 1 & 某值1\leq x_i< 某值2,y=某類\\ 0 & 不然 \end{matrix}\right. \]

四.代碼實現

爲了方便演示,首先構建訓練數據和測試數據

# 測試
from sklearn import datasets
from sklearn import model_selection
from sklearn.metrics import f1_score

iris = datasets.load_iris()
data = iris['data']
target = iris['target']
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.2,random_state=0)

爲了方便對數據進行分箱操做,封裝一個DataBinWrapper類,並對X_train和X_test進行轉換(該類放到ml_models.wrapper_models中)

class DataBinWrapper(object):
    def __init__(self, max_bins=10):
        # 分段數
        self.max_bins = max_bins
        # 記錄x各個特徵的分段區間
        self.XrangeMap = None

    def fit(self, x):
        n_sample, n_feature = x.shape
        # 構建分段數據
        self.XrangeMap = [[] for _ in range(0, n_feature)]
        for index in range(0, n_feature):
            tmp = x[:, index]
            for percent in range(1, self.max_bins):
                percent_value = np.percentile(tmp, (1.0 * percent / self.max_bins) * 100.0 // 1)
                self.XrangeMap[index].append(percent_value)

    def transform(self, x):
        """
        抽取x_bin_index
        :param x:
        :return:
        """
        if x.ndim == 1:
            return np.asarray([np.digitize(x[i], self.XrangeMap[i]) for i in range(0, x.size)])
        else:
            return np.asarray([np.digitize(x[:, i], self.XrangeMap[i]) for i in range(0, x.shape[1])]).T
data_bin_wrapper=DataBinWrapper(max_bins=10)
data_bin_wrapper.fit(X_train)
X_train=data_bin_wrapper.transform(X_train)
X_test=data_bin_wrapper.transform(X_test)
X_train[:5,:]
array([[7, 6, 8, 7],
       [3, 5, 5, 6],
       [2, 8, 2, 2],
       [6, 5, 6, 7],
       [7, 2, 8, 8]], dtype=int64)
X_test[:5,:]
array([[5, 2, 7, 9],
       [5, 0, 4, 3],
       [3, 9, 1, 2],
       [9, 3, 9, 7],
       [1, 8, 2, 2]], dtype=int64)

因爲特徵函數能夠有不一樣的形式,這裏咱們將特徵函數解耦出來,構造一個SimpleFeatureFunction類(後續構造其餘複雜的特徵函數,須要定義和該類相同的函數名,該類放置到ml_models.linear_model中)

class SimpleFeatureFunction(object):
    def __init__(self):
        """
        記錄特徵函數
        {
            (x_index,x_value,y_index)
        }
        """
        self.feature_funcs = set()

    # 構建特徵函數
    def build_feature_funcs(self, X, y):
        n_sample, _ = X.shape
        for index in range(0, n_sample):
            x = X[index, :].tolist()
            for feature_index in range(0, len(x)):
                self.feature_funcs.add(tuple([feature_index, x[feature_index], y[index]]))

    # 獲取特徵函數總數
    def get_feature_funcs_num(self):
        return len(self.feature_funcs)

    # 分別命中了那幾個特徵函數
    def match_feature_funcs_indices(self, x, y):
        match_indices = []
        index = 0
        for feature_index, feature_value, feature_y in self.feature_funcs:
            if feature_y == y and x[feature_index] == feature_value:
                match_indices.append(index)
            index += 1
        return match_indices

接下來對MaxEnt類進行實現,首先實現一個softmax函數的功能(ml_models.utils)

def softmax(x):
    if x.ndim == 1:
        return np.exp(x) / np.exp(x).sum()
    else:
        return np.exp(x) / np.exp(x).sum(axis=1, keepdims=True)

進行MaxEnt類的具體實現(ml_models.linear_model)

from ml_models import utils
class MaxEnt(object):
    def __init__(self, feature_func, epochs=5, eta=0.01):
        self.feature_func = feature_func
        self.epochs = epochs
        self.eta = eta

        self.class_num = None
        """
        記錄聯合機率分佈:
        {
            (x_0,x_1,...,x_p,y_index):p
        }
        """
        self.Pxy = {}
        """
        記錄邊緣機率分佈:
        {
            (x_0,x_1,...,x_p):p
        }
        """
        self.Px = {}

        """
        w[i]-->feature_func[i]
        """
        self.w = None

    def init_params(self, X, y):
        """
        初始化相應的數據
        :return:
        """
        n_sample, n_feature = X.shape
        self.class_num = np.max(y) + 1

        # 初始化聯合機率分佈、邊緣機率分佈、特徵函數
        for index in range(0, n_sample):
            range_indices = X[index, :].tolist()

            if self.Px.get(tuple(range_indices)) is None:
                self.Px[tuple(range_indices)] = 1
            else:
                self.Px[tuple(range_indices)] += 1

            if self.Pxy.get(tuple(range_indices + [y[index]])) is None:
                self.Pxy[tuple(range_indices + [y[index]])] = 1
            else:
                self.Pxy[tuple(range_indices + [y[index]])] = 1

        for key, value in self.Pxy.items():
            self.Pxy[key] = 1.0 * self.Pxy[key] / n_sample
        for key, value in self.Px.items():
            self.Px[key] = 1.0 * self.Px[key] / n_sample

        # 初始化參數權重
        self.w = np.zeros(self.feature_func.get_feature_funcs_num())

    def _sum_exp_w_on_all_y(self, x):
        """
        sum_y exp(self._sum_w_on_feature_funcs(x))
        :param x:
        :return:
        """
        sum_w = 0
        for y in range(0, self.class_num):
            tmp_w = self._sum_exp_w_on_y(x, y)
            sum_w += np.exp(tmp_w)
        return sum_w

    def _sum_exp_w_on_y(self, x, y):
        tmp_w = 0
        match_feature_func_indices = self.feature_func.match_feature_funcs_indices(x, y)
        for match_feature_func_index in match_feature_func_indices:
            tmp_w += self.w[match_feature_func_index]
        return tmp_w

    def fit(self, X, y):
        self.eta = max(1.0 / np.sqrt(X.shape[0]), self.eta)
        self.init_params(X, y)
        x_y = np.c_[X, y]
        for epoch in range(self.epochs):
            count = 0
            np.random.shuffle(x_y)
            for index in range(x_y.shape[0]):
                count += 1
                x_point = x_y[index, :-1]
                y_point = x_y[index, -1:][0]
                # 獲取聯合機率分佈
                p_xy = self.Pxy.get(tuple(x_point.tolist() + [y_point]))
                # 獲取邊緣機率分佈
                p_x = self.Px.get(tuple(x_point))
                # 更新w
                dw = np.zeros(shape=self.w.shape)
                match_feature_func_indices = self.feature_func.match_feature_funcs_indices(x_point, y_point)
                if len(match_feature_func_indices) == 0:
                    continue
                if p_xy is not None:
                    for match_feature_func_index in match_feature_func_indices:
                        dw[match_feature_func_index] = p_xy
                if p_x is not None:
                    sum_w = self._sum_exp_w_on_all_y(x_point)
                    for match_feature_func_index in match_feature_func_indices:
                        dw[match_feature_func_index] -= p_x * np.exp(self._sum_exp_w_on_y(x_point, y_point)) / (
                            1e-7 + sum_w)
                # 更新
                self.w += self.eta * dw
                # 打印訓練進度
                if count % (X.shape[0] // 4) == 0:
                    print("processing:\tepoch:" + str(epoch + 1) + "/" + str(self.epochs) + ",percent:" + str(
                        count) + "/" + str(X.shape[0]))

    def predict_proba(self, x):
        """
        預測爲y的機率分佈
        :param x:
        :return:
        """
        y = []
        for x_point in x:
            y_tmp = []
            for y_index in range(0, self.class_num):
                match_feature_func_indices = self.feature_func.match_feature_funcs_indices(x_point, y_index)
                tmp = 0
                for match_feature_func_index in match_feature_func_indices:
                    tmp += self.w[match_feature_func_index]
                y_tmp.append(tmp)
            y.append(y_tmp)
        return utils.softmax(np.asarray(y))

    def predict(self, x):
        return np.argmax(self.predict_proba(x), axis=1)
# 構建特徵函數類
feature_func=SimpleFeatureFunction()
feature_func.build_feature_funcs(X_train,y_train)

maxEnt = MaxEnt(feature_func=feature_func)
maxEnt.fit(X_train, y_train)
y = maxEnt.predict(X_test)

print('f1:', f1_score(y_test, y, average='macro'))
processing:	epoch:1/5,percent:30/120
processing:	epoch:1/5,percent:60/120
processing:	epoch:1/5,percent:90/120
processing:	epoch:1/5,percent:120/120
processing:	epoch:2/5,percent:30/120
processing:	epoch:2/5,percent:60/120
processing:	epoch:2/5,percent:90/120
processing:	epoch:2/5,percent:120/120
processing:	epoch:3/5,percent:30/120
processing:	epoch:3/5,percent:60/120
processing:	epoch:3/5,percent:90/120
processing:	epoch:3/5,percent:120/120
processing:	epoch:4/5,percent:30/120
processing:	epoch:4/5,percent:60/120
processing:	epoch:4/5,percent:90/120
processing:	epoch:4/5,percent:120/120
processing:	epoch:5/5,percent:30/120
processing:	epoch:5/5,percent:60/120
processing:	epoch:5/5,percent:90/120
processing:	epoch:5/5,percent:120/120
f1: 0.9295631904327557

經過前面的分析,咱們知道特徵函數的複雜程度決定了模型的複雜度,下面咱們添加更復雜的特徵函數來加強MaxEnt的效果,上面的特徵函數僅考慮了單個特徵與目標的關係,咱們進一步考慮二個特徵與目標的關係,即:

\[f(x,y)=\left\{\begin{matrix} 1 & x_i=某值,x_j=某值,y=某類\\ 0 & 不然 \end{matrix}\right. \]

如此,咱們能夠定義一個新的UserDefineFeatureFunction類(注意:類中的方法名稱要和SimpleFeatureFunction同樣

class UserDefineFeatureFunction(object):
    def __init__(self):
        """
        記錄特徵函數
        {
            (x_index1,x_value1,x_index2,x_value2,y_index)
        }
        """
        self.feature_funcs = set()

    # 構建特徵函數
    def build_feature_funcs(self, X, y):
        n_sample, _ = X.shape
        for index in range(0, n_sample):
            x = X[index, :].tolist()
            for feature_index in range(0, len(x)):
                self.feature_funcs.add(tuple([feature_index, x[feature_index], y[index]]))
                for new_feature_index in range(0,len(x)):
                    if feature_index!=new_feature_index:
                        self.feature_funcs.add(tuple([feature_index, x[feature_index],new_feature_index,x[new_feature_index],y[index]]))

    # 獲取特徵函數總數
    def get_feature_funcs_num(self):
        return len(self.feature_funcs)

    # 分別命中了那幾個特徵函數
    def match_feature_funcs_indices(self, x, y):
        match_indices = []
        index = 0
        for item in self.feature_funcs:
            if len(item)==5:
                feature_index1, feature_value1,feature_index2,feature_value2, feature_y=item
                if feature_y == y and x[feature_index1] == feature_value1 and x[feature_index2]==feature_value2:
                    match_indices.append(index)
            else:
                feature_index1, feature_value1, feature_y=item
                if feature_y == y and x[feature_index1] == feature_value1:
                    match_indices.append(index)
            index += 1
        return match_indices
# 檢驗
feature_func=UserDefineFeatureFunction()
feature_func.build_feature_funcs(X_train,y_train)

maxEnt = MaxEnt(feature_func=feature_func)
maxEnt.fit(X_train, y_train)
y = maxEnt.predict(X_test)

print('f1:', f1_score(y_test, y, average='macro'))
processing:	epoch:1/5,percent:30/120
processing:	epoch:1/5,percent:60/120
processing:	epoch:1/5,percent:90/120
processing:	epoch:1/5,percent:120/120
processing:	epoch:2/5,percent:30/120
processing:	epoch:2/5,percent:60/120
processing:	epoch:2/5,percent:90/120
processing:	epoch:2/5,percent:120/120
processing:	epoch:3/5,percent:30/120
processing:	epoch:3/5,percent:60/120
processing:	epoch:3/5,percent:90/120
processing:	epoch:3/5,percent:120/120
processing:	epoch:4/5,percent:30/120
processing:	epoch:4/5,percent:60/120
processing:	epoch:4/5,percent:90/120
processing:	epoch:4/5,percent:120/120
processing:	epoch:5/5,percent:30/120
processing:	epoch:5/5,percent:60/120
processing:	epoch:5/5,percent:90/120
processing:	epoch:5/5,percent:120/120
f1: 0.957351290684624

咱們能夠根據本身對數據的認識,不斷爲模型添加一些新特徵函數去加強模型的效果,只須要修改build_feature_funcsmatch_feature_funcs_indices這兩個函數便可(但注意控制函數的數量規模) 簡單總結一下MaxEnt的優缺點,優勢很明顯:咱們能夠diy任意複雜的特徵函數進去,缺點也很明顯:訓練很耗時,並且特徵函數的設計好壞須要先驗知識,對於某些任務很難直觀獲取

相關文章
相關標籤/搜索