《機器學習實戰》-邏輯(Logistic)迴歸

Logistic 迴歸

本章內容

  • Sigmoid 函數和 Logistic 迴歸分類器
  • 最優化理論初步
  • 梯度降低最優化算法
  • 數據中的缺失項處理

迴歸算法

  • 迴歸算法:假設如今有一些數據點,咱們用一條直線對這些點進行擬合(該線稱爲最佳擬合直線),這個擬合過程就稱做迴歸。與分類算法同樣同屬於監督學習。

Logistic 迴歸的通常過程

  1. 收集數據:採用任意方法收集數據。
  2. 準備數據:因爲須要進行距離計算,所以要求數據類型爲數值型。
  3. 分析數據:採用任意方法對數據進行分析。
  4. 訓練算法:大部分時間講用於訓練,訓練的目的是爲了找到最佳的分類迴歸係數。
  5. 測試算法:一旦訓練步驟完成,分類將會很快。
  6. 使用算法:基於訓練好的迴歸係數對這些數值進行簡單的迴歸計算,斷定他們屬於哪一個類別,在此基礎上作一些其餘分析工做。

Logistic的優缺點

  • 優勢:計算代價不高,易於理解和實現。
  • 缺點:容易欠擬合,分類精度可能不高。
  • 適用數據類型:數值型和標稱型。

基於 Logistic 迴歸和 Sigmoid 函數的分類

Sigmoid 函數

  • 海維賽德階躍函數(單位階躍函數):輸出只有0或1的函數,而且0到1的過程屬於跳躍過程,即非0即1。
  • Sigmoid 函數:x=0時,sigmoid 值爲0.5;隨着 x 的增大,對應值將逼近1;隨着 x 的減少,對應值將逼近0。
  • Sigmoid 函數公式:\(\sigma(z)={\frac{1}{1+e^{-z}}}\)

Logistic 迴歸分類器

  • Logistic 迴歸分類器:咱們在每一個特徵上都乘以一個迴歸係數 以後詳細介紹,而後把全部的結果值相加,將這個總和代入 sigmoid 函數,進而獲得一個範圍在0~1之間的數值。大於0.5的數據被分入1類,小於0.5即被納入0類。

圖5-1 兩種座標尺度下的 Sigmoid 函數圖

  • 經過圖5-1 下面一張圖能夠看出,若是橫座標的尺度足夠大,在 x=0出 sigmoid 函數看起來很像階躍函數。

基於最優化方法的最佳迴歸係數肯定

  • Sigmoid函數的輸入記爲 z,可由該公式得出:\(z=w_0x_0+w_1x_1+w_2x_2+\cdots+w_nx_n\)
  • 上述公式向量寫法:\(z=w^Tx\) 向量 x 是分類器的輸入數據,向量 w 是咱們須要找的最佳參數(係數)

梯度上升法

  • 梯度上升法:沿着函數的梯度方向探尋某函數的最大值。即求函數的最大值。
  • 若是梯度記爲\(\nabla\),則函數\(f(x,y)\)的梯度公式:\(\nabla f(x,y)=\begin{pmatrix} {\frac{\part f(x,y)}{\part x}} \\ {\frac{\part f(x,y)}{\part y}} \\ \end{pmatrix}\)
  • \({\frac{\part f(x,y)}{\part x}}\):沿 x 的方向移動\({\frac{\part f(x,y)}{\part x}}\),函數\(f(x,y)\)必需要在待計算的點上有定義而且可微。
  • \({\frac{\part f(x,y)}{\part y}}\):沿 x 的方向移動\({\frac{\part f(x,y)}{\part y}}\),函數\(f(x,y)\)必需要在待計算的點上有定義而且可微。

圖5-2 梯度上升圖

  • 經過圖5-2 能夠看出梯度上升算法到達每一個點後都會從新估計移動的方向。
  • 梯度上升算法的迭代公式:\(w:=w+\alpha \nabla_wf(w)\),該公式將一直被迭代執行,直至達到某個中止條件爲止。
  • \(\alpha\):移動量的大小,稱爲步長。

梯度降低算法

  • 梯度降低算法:沿着函數的梯度方向探尋某函數的最小值。即求函數的最小值。
  • 梯度降低算法的迭代公式:\(w:=w-\alpha \nabla_wf(w)\)

訓練算法:使用梯度上升找到最佳參數

圖5-3 數據集圖

  • 圖5-3中有100個樣本點,每一個點包含兩個數值型特徵 X1和X2。

梯度上升算法的僞代碼

每一個迴歸係數初始化爲1
重複 R 次:
    計算整個數據集的梯度
    使用 alpha*gradient 更新迴歸係數的向量
    返回迴歸係數

程序5-1 Logistic 迴歸梯度上升優化算法

import os
import numpy as np
import matplotlib.pyplot as plt
from path_settings import machine_learning_PATH

data_set_path = os.path.join(machine_learning_PATH, '第五章/data-set')
testSet_path = os.path.join(data_set_path, 'testSet.txt')
horseColicTraining_path = os.path.join(data_set_path, 'horseColicTraining.txt')
horseColicTest_path = os.path.join(data_set_path, 'horseColicTest.txt')


def load_data_set():
    """導入數據集"""
    data_mat = []
    label_mat = []

    # 循環導入.txt文本數據構形成列表
    fr = open(testSet_path)
    for line in fr.readlines():
        line_arr = line.strip().split()
        data_mat.append([1, float(line_arr[0]), float(line_arr[1])])
        label_mat.append(int(line_arr[2]))

    return data_mat, label_mat


def sigmoid(in_x):
    return 1 / (1 + np.exp(-in_x))


def grad_ascent(data_mat_in, class_labels):
    # 生成特徵矩陣
    data_matrix = np.mat(data_mat_in)
    # 生成標記矩陣並反置
    label_mat = np.mat(class_labels).transpose()

    # 計算data_matrix的行列
    m, n = np.shape(data_matrix)

    # 設置移動的步長爲0.001
    alpha = 0.001
    # 設置最大遞歸次數500次
    max_cycles = 500

    # 初始化係數爲1*3的元素全爲1的矩陣
    weights = np.ones((n, 1))

    # 循環迭代梯度上升算法
    for k in range(max_cycles):
        # 計算真實類別與預測類別的差值
        h = sigmoid(data_matrix * weights)
        error = (label_mat - h)
        
        # 調整迴歸係數
        weights = weights + alpha * data_matrix.transpose() * error

    return weights


def test_grad_ascent():
    data_mat, label_mat = load_data_set()
    weights = grad_ascent(data_mat, label_mat)
    print(weights)
    """
    [[ 4.12414349]
     [ 0.48007329]
     [-0.6168482 ]]
    """


if __name__ == '__main__':
    test_grad_ascent()

分析數據:畫出決策邊界

  • 該節將經過代碼畫出決策邊界

程序5-2 畫出數據集和 Logistic 迴歸最佳擬合直線的函數

def plot_best_fit(wei):
    # getA==np.asarrayz(self)
    # 使用__class__.__name__爲了判斷是梯度上升和隨機梯度上升
    if wei.__class__.__name__ == 'matrix':
        weights = wei.getA()
    elif wei.__class__.__name__ == 'ndarray':
        weights = wei
    else:
        weights = wei

    data_mat, label_mat = load_data_set()

    # 把特徵集轉換成數組
    data_arr = np.array(data_mat)
    n = np.shape(data_arr)[0]

    # 循環數據集分類
    xcord1 = []
    ycord1 = []
    xcord2 = []
    ycord2 = []
    for i in range(n):
        if int(label_mat[i]) == 1:
            xcord1.append(data_arr[i, 1])
            ycord1.append(data_arr[i, 2])
        else:
            xcord2.append(data_arr[i, 1])
            ycord2.append(data_arr[i, 2])

    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
    ax.scatter(xcord2, ycord2, s=30, c='green')

    # 0.1是步長
    x = np.arange(-3, 3, 0.1)
    # 假設 sigmoid 函數爲0,而且這裏的 x,y 至關於上述的 x1和x2便可得出 y 的公式
    y = (-weights[0] - weights[1] * x) / weights[2]

    ax.plot(x, y)
    plt.xlabel('X1')
    plt.ylabel('X2')
    plt.show()


def test_plot_best_fit():
    data_mat, label_mat = load_data_set()
    weights = grad_ascent(data_mat, label_mat)
    plot_best_fit(weights)


if __name__ == '__main__':
    # test_grad_ascent()
    test_plot_best_fit()

圖5-4 梯度上升算法500次迭代後的結果

  • 經過圖5-4 能夠看出咱們只分錯了2-4個點。

訓練算法:隨機梯度上升

  • 梯度上升法每次更新迴歸係數時都須要遍歷整個數據集,若是樣本或者特徵數過多就應該考慮使用隨機梯度上升算法。
  • 隨機梯度上升:一次僅用一個樣本點來更新迴歸係數,不須要從新讀取整個數據集。

隨機梯度上升算法僞代碼

全部迴歸係數初始化爲1
對數據集中每一個樣本
    計算該樣本的梯度
    使用 alpha*gradient 更新迴歸係數值
返回迴歸係數值

程序5-3 隨機梯度上升算法

def stoc_grad_ascent0(data_matrix, class_labels):
    """隨機梯度上升算法"""
    m, n = np.shape(data_matrix)

    alpha = 0.01
    weights = np.ones(n)
    for i in range(m):
        # 使用 sum 函數得出一個值,只用計算一次
        h = sigmoid(sum(data_matrix[i] * weights))
        error = class_labels[i] - h
        weights = weights + alpha * error * data_matrix[i]

    return weights


def test_stoc_grad_ascent0():
    data_arr, label_mat = load_data_set()
    weights = stoc_grad_ascent0(np.array(data_arr), label_mat)
    plot_best_fit(weights)


if __name__ == '__main__':
    # test_grad_ascent()
    # test_plot_best_fit()
    test_stoc_grad_ascent0()
  • 梯度上升和隨機梯度上升:從代碼中咱們能夠看到前者變量 h 和偏差 error 都是向量,然後者全是數值;前者是矩陣轉換,後者則是 numpy 數組。

圖5-5 隨機梯度上升算法圖

  • 圖5-5能夠看出隨機梯度上升算法的最佳擬合直線並不是最佳分類線

程序5-4 改進的隨機梯度上升算法

def stoc_grad_ascent1(data_matrix, class_labels, num_iter=150):
    """改進隨機梯度上升算法,默認迭代150次"""
    m, n = np.shape(data_matrix)
    weights = np.ones(n)
    for j in range(num_iter):
        data_index = list(range(m))
        for i in range(m):
            # 每次迭代減少 alpha 的值,但最小爲0.01,確保新數據依然有影響。緩解係數波動的狀況
            alpha = 4 / (1 + j + i) + 0.01

            # 隨機選取值進行更新
            rand_index = int(np.random.uniform(0, len(data_index)))

            h = sigmoid(sum(data_matrix[rand_index] * weights))
            error = class_labels[rand_index] - h
            weights = weights + alpha * error * data_matrix[rand_index]

            # 刪除更新後的值
            del (data_index[rand_index])

    return weights


def test_stoc_grad_ascent1():
    data_arr, label_mat = load_data_set()
    weights = stoc_grad_ascent1(np.array(data_arr), label_mat)
    plot_best_fit(weights)


if __name__ == '__main__':
    # test_grad_ascent()
    # test_plot_best_fit()
    # test_stoc_grad_ascent0()
    test_stoc_grad_ascent1()

圖5-6 改進隨機梯度上升算法圖

  • 圖5-6能夠看出150次的跌打就能獲得一條很好的分類線,而梯度上升算法須要迭代500次。

示例:從疝氣病預測病馬的死亡率

  • 疝氣病:描述馬胃腸痛的術語
  • 數據集中包含368個樣本和28個特徵,而且有30%的值是缺失的

示例:使用 Logistic 迴歸估計馬疝病的死亡率

  1. 收集數據:給定數據文件
  2. 準備數據:用 Python 解析文本文件並填充缺失值
  3. 分析數據:可視化並觀察數據
  4. 訓練算法:使用優化算法,找到最佳的係數
  5. 測試算法:觀察錯誤率,根據錯誤率決定是否會退到訓練階段;改變迭代的次數和步長等參數來獲得更好的迴歸係數
  6. 使用算法:實現一個簡單的程序來手機馬的症狀並輸出預測結果

準備數據:處理數據中的缺失值

  • 數據的獲取是至關昂貴的,扔掉和從新獲取都是不可取的
  • 如下幾種方法能夠解決數據的缺失的問題
  1. 使用可用特徵的均值來填補缺失值
  2. 使用特殊值來填補缺失值
  3. 忽略有缺失值的樣本
  4. 使用類似樣本的均值填補缺失值
  5. 使用另外的機器學習算法預測缺失值
  • 預處理第一件事:用0替代全部的缺失值,由於缺失值爲0時迴歸係數的更新公式不會更新而且 sigmoid(0)=0.5,他對結果的預測不具備任何傾向性
  • 預處理第二件事:對於數據標記缺失的數據捨棄,由於標記很難肯定採用某個合適的值來替換。
  • 預處理後的文件:對於原始數據文件能夠去 http://archive.ics.uci.edu/ml/datasets/Horse+Colic 獲取,此處只提供預處理以後的文件

測試算法:用 Logistic 迴歸進行分類

def classify_vector(in_x, weights):
    prob = sigmoid(sum(in_x * weights))
    if prob > 0.5:
        return 1
    else:
        return 0


def colic_test():
    """馬疝病形成馬死亡機率預測"""
    fr_train = open(horseColicTraining_path)
    fr_test = open(horseColicTest_path)

    training_set = []
    training_labels = []
    for line in fr_train.readlines():
        # 切分全部特徵並把特徵加入 line_arr 列表中
        curr_line = line.strip().split('\t')  # type:list
        line_arr = []
        for i in range(21):
            line_arr.append(float(curr_line[i]))
        # 分開處理特徵和標記
        training_set.append(line_arr)
        training_labels.append(float(curr_line[21]))

    train_weights = stoc_grad_ascent1(np.array(training_set), training_labels, 500)
    print(train_weights)

    error_count = 0
    num_test_vec = 0
    for line in fr_test.readlines():
        num_test_vec += 1
        curr_line = line.strip().split('\t')  # type:list
        line_arr = []
        for i in range(21):
            line_arr.append(float(curr_line[i]))

        # 經過比較樣本標記與輸入係數與特徵相乘值 sigmoid 函數獲得的標記判斷是否預測失誤
        if int(classify_vector(np.array(line_arr), train_weights)) != int(curr_line[21]):
            error_count += 1

    error_rate = (float(error_count) / num_test_vec)
    print('測試集的錯誤率: {}'.format(error_rate))
    # 測試集的錯誤率: 0.373134328358209

    return error_rate


def multi_test():
    num_tests = 10
    error_sum = 0
    for k in range(num_tests):
        error_sum += colic_test()
    print('迭代 {} 次後平均錯誤率爲: {}'.format(num_tests, error_sum / float(num_tests)))
    # 迭代 10 次後平均錯誤率爲: 0.3656716417910448


if __name__ == '__main__':
    # test_grad_ascent()
    # test_plot_best_fit()
    # test_stoc_grad_ascent0()
    # test_stoc_grad_ascent1()
    multi_test()

完整代碼logRegres.py

import os
import numpy as np
import matplotlib.pyplot as plt
from path_settings import machine_learning_PATH

data_set_path = os.path.join(machine_learning_PATH, '第五章/data-set')
testSet_path = os.path.join(data_set_path, 'testSet.txt')
horseColicTraining_path = os.path.join(data_set_path, 'horseColicTraining.txt')
horseColicTest_path = os.path.join(data_set_path, 'horseColicTest.txt')


def load_data_set():
    """導入數據集"""
    data_mat = []
    label_mat = []

    # 循環導入.txt文本數據構形成列表
    fr = open(testSet_path)
    for line in fr.readlines():
        line_arr = line.strip().split()
        data_mat.append([1, float(line_arr[0]), float(line_arr[1])])
        label_mat.append(int(line_arr[2]))

    return data_mat, label_mat


def sigmoid(in_x):
    """構造 sigmoid 函數"""
    return 1 / (1 + np.exp(-in_x))


def grad_ascent(data_mat_in, class_labels):
    """梯度上升算法"""
    # 生成特徵矩陣
    data_matrix = np.mat(data_mat_in)
    # 生成標記矩陣並反置
    label_mat = np.mat(class_labels).transpose()

    # 計算data_matrix的行列
    m, n = np.shape(data_matrix)

    # 設置移動的步長爲0.001
    alpha = 0.001
    # 設置最大遞歸次數500次
    max_cycles = 500

    # 初始化係數爲1*3的元素全爲1的矩陣
    weights = np.ones((n, 1))

    # 循環迭代梯度上升算法
    for k in range(max_cycles):
        # 計算真實類別與預測類別的差值
        h = sigmoid(data_matrix * weights)
        error = (label_mat - h)

        # 調整迴歸係數
        weights = weights + alpha * data_matrix.transpose() * error

    return weights


def test_grad_ascent():
    data_mat, label_mat = load_data_set()
    weights = grad_ascent(data_mat, label_mat)
    print(weights)
    """
    [[ 4.12414349]
     [ 0.48007329]
     [-0.6168482 ]]
    """


def plot_best_fit(wei):
    """畫出被分割的數據集"""
    # getA==np.asarrayz(self)
    # 使用__class__.__name__爲了判斷是梯度上升和隨機梯度上升
    if wei.__class__.__name__ == 'matrix':
        weights = wei.getA()
    elif wei.__class__.__name__ == 'ndarray':
        weights = wei
    else:
        weights = wei

    data_mat, label_mat = load_data_set()

    # 把特徵集轉換成數組
    data_arr = np.array(data_mat)
    n = np.shape(data_arr)[0]

    # 循環數據集分類
    xcord1 = []
    ycord1 = []
    xcord2 = []
    ycord2 = []
    for i in range(n):
        if int(label_mat[i]) == 1:
            xcord1.append(data_arr[i, 1])
            ycord1.append(data_arr[i, 2])
        else:
            xcord2.append(data_arr[i, 1])
            ycord2.append(data_arr[i, 2])

    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
    ax.scatter(xcord2, ycord2, s=30, c='green')

    # 0.1是步長
    x = np.arange(-3, 3, 0.1)
    # 假設 sigmoid 函數爲0,而且這裏的 x,y 至關於上述的 x1和x2便可得出 y 的公式
    y = (-weights[0] - weights[1] * x) / weights[2]

    ax.plot(x, y)
    plt.xlabel('X1')
    plt.ylabel('X2')
    plt.show()


def test_plot_best_fit():
    data_mat, label_mat = load_data_set()
    weights = grad_ascent(data_mat, label_mat)
    plot_best_fit(weights)


def stoc_grad_ascent0(data_matrix, class_labels):
    """隨機梯度上升算法"""
    m, n = np.shape(data_matrix)

    alpha = 0.01
    weights = np.ones(n)
    for i in range(m):
        # 使用 sum 函數得出一個值,只用計算一次
        h = sigmoid(sum(data_matrix[i] * weights))
        error = class_labels[i] - h
        weights = weights + alpha * error * data_matrix[i]

    return weights


def test_stoc_grad_ascent0():
    data_arr, label_mat = load_data_set()
    weights = stoc_grad_ascent0(np.array(data_arr), label_mat)
    plot_best_fit(weights)


def stoc_grad_ascent1(data_matrix, class_labels, num_iter=150):
    """改進隨機梯度上升算法,默認迭代150次"""
    m, n = np.shape(data_matrix)
    weights = np.ones(n)
    for j in range(num_iter):
        data_index = list(range(m))
        for i in range(m):
            # 每次迭代減少 alpha 的值,但最小爲0.01,確保新數據依然有影響。緩解係數波動的狀況
            alpha = 4 / (1 + j + i) + 0.01

            # 隨機選取值進行更新
            rand_index = int(np.random.uniform(0, len(data_index)))

            h = sigmoid(sum(data_matrix[rand_index] * weights))
            error = class_labels[rand_index] - h
            weights = weights + alpha * error * data_matrix[rand_index]

            # 刪除更新後的值
            del (data_index[rand_index])

    return weights


def test_stoc_grad_ascent1():
    data_arr, label_mat = load_data_set()
    weights = stoc_grad_ascent1(np.array(data_arr), label_mat)
    plot_best_fit(weights)


def classify_vector(in_x, weights):
    prob = sigmoid(sum(in_x * weights))
    if prob > 0.5:
        return 1
    else:
        return 0


def colic_test():
    """馬疝病形成馬死亡機率預測"""
    fr_train = open(horseColicTraining_path)
    fr_test = open(horseColicTest_path)

    training_set = []
    training_labels = []
    for line in fr_train.readlines():
        # 切分全部特徵並把特徵加入 line_arr 列表中
        curr_line = line.strip().split('\t')  # type:list
        line_arr = []
        for i in range(21):
            line_arr.append(float(curr_line[i]))
        # 分開處理特徵和標記
        training_set.append(line_arr)
        training_labels.append(float(curr_line[21]))

    train_weights = stoc_grad_ascent1(np.array(training_set), training_labels, 500)
    print(train_weights)

    error_count = 0
    num_test_vec = 0
    for line in fr_test.readlines():
        num_test_vec += 1
        curr_line = line.strip().split('\t')  # type:list
        line_arr = []
        for i in range(21):
            line_arr.append(float(curr_line[i]))

        # 經過比較樣本標記與輸入係數與特徵相乘值 sigmoid 函數獲得的標記判斷是否預測失誤
        if int(classify_vector(np.array(line_arr), train_weights)) != int(curr_line[21]):
            error_count += 1

    error_rate = (float(error_count) / num_test_vec)
    print('測試集的錯誤率: {}'.format(error_rate))
    # 測試集的錯誤率: 0.373134328358209

    return error_rate


def multi_test():
    num_tests = 10
    error_sum = 0
    for k in range(num_tests):
        error_sum += colic_test()
    print('迭代 {} 次後平均錯誤率爲: {}'.format(num_tests, error_sum / float(num_tests)))
    # 迭代 10 次後平均錯誤率爲: 0.3656716417910448


if __name__ == '__main__':
    # test_grad_ascent()
    # test_plot_best_fit()
    # test_stoc_grad_ascent0()
    # test_stoc_grad_ascent1()
    multi_test()

總結

  • Logistic 迴歸:尋找一個非線性函數 Sigmoid 的最佳擬合參數。
  • 求解過程:經過最優化算法(經常使用的梯度上升算法),經過簡化梯度上升算法獲得隨機梯度上升算法
  • 對缺失數據的處理:機器學習中最後只能更要的問題之一,主要仍是取決於實際應用中的需求。
相關文章
相關標籤/搜索