本文將介紹機器學習算法中的Logistic迴歸分類算法並使用Python進行實現。會接觸到最優化算法的相關學習。python
什麼是迴歸?git
簡單來講,迴歸就是用一條線對N多個數據點進行擬合或者按照必定的規則來劃分數據集,這個擬合的過程和劃分的過程就叫作迴歸。github
Logistic 迴歸分類算法就是對數據集創建迴歸模型,依照這個模型來進行分類。算法
最優化算法在此的做用:尋找最佳迴歸係數數據庫
基本形式是用每個特徵乘以一個迴歸係數,而後把全部的結果進行相加。數組
這樣算出的結果不少是連續的,不利於分類,因此能夠將結果再代入Sigmoid函數中獲得一些比較離散的結果。bash
Sigmod函數的表達式爲:app
Sigmoid函數的形狀以下:dom
這樣計算的結果將會是0-1的值,將中間值0.5進行分類點,大於等於0.5的爲一類,小於0.5的又爲一類機器學習
在這個過程當中,工做的重點在於,如何尋找最優的迴歸係數。
Sigmoid 函數的輸入記做z
,由下面公式得出
採用向量的寫法能夠寫成
表示將兩個數值向量對應的元素所有相乘在進行相加獲得z
值。其中x
是分類器輸入的數據,向量即爲咱們要找的最佳迴歸係數,爲了尋找最佳迴歸係數,咱們須要用到最優化理論的一些知識。
這裏採用梯度上升算法(求最大值),求最小值使用梯度降低。咱們將學習到如何使用該方法求得數據集的最佳參數。接下來,展現如何繪製梯度上升法產生的決策變截圖,該圖能將梯度上升法的分類效果可視化地呈現出來。最後,咱們將學習隨機梯度上升算法,以及如何對其進行修改以得到更好的效果。
梯度上升算法基於的思想是:要找到某函數的最大值,最好的辦法就是沿着該函數的梯度方向探尋,若是梯度記爲, 則函數
的梯度由下式表示:
這是機器學習中最容易形成混淆的一個地方,但在數學上並不難,須要作的只是牢記這些符號的含義。這個梯度意味着要沿x
的方向移動, 沿
y
的方向移動。其中,函數
必需要在待計算的點上有定義而且可微。一個具體的函數例子見下圖。
圖中的梯度上升算法沿梯度方向移動了一步。樂意看到,梯度算子老是指向函數值增加最快的方向。這裏說到移動方向,而未說起移動量的大小。該量值稱爲步長,記做α。用向量來表示的話,梯度上升算法的迭代公式以下:
該公式將一直被迭代執行,直到達到某個中止條件爲止,好比設定的迭代次數或者達到某個容許的偏差範圍。
吳恩達的machine learning第三週的課程中使用的是梯度降低算法,它的公式爲:
咱們能夠看到,兩個算法實際上是同樣的,只是公式中的加減法不一樣而已。梯度上升算法用來求函數的最大值,而梯度降低算法用來求函數的最小值。
梯度上升的僞代碼
每一個迴歸係數初始化爲1
重複R次:
計算整個數據集的梯度
使用alpha下的gradient更新迴歸係數的向量
返回迴歸係數
複製代碼
Python實現
#! /usr/bin/env python
# -*- coding: utf-8 -*-
""" 實現logistic迴歸分類算法, 數據集爲: dataset.csv """
import numpy as np
import matplotlib.pyplot as plt
def loadDataSet():
""" 加載數據集 return: 數據列表, 標籤列表 """
dataMat = []
labelMat = []
#打開數據集
fr = open('dataset.csv')
#遍歷每一行
for line in fr.readlines():
#刪除空白符以後進行切分
lineArr = line.strip().split(',')
#數據加入數據列表
dataMat.append([1.0, float(lineArr[0]), float(lineArr[1])])
#標籤加入數據列表
labelMat.append(int(lineArr[2]))
#返回數據列表和標籤列表
return dataMat, labelMat
def sigmoid(inX):
""" 計算sigmoid函數 @: param intX: 矩陣計算的結果(100x1) @: return: 計算結果 """
return 1.0 / (1 + np.exp(-inX))
def gradAscent(dataMat, labelMat):
""" 梯度上升函數 @: param dataMat: 數據集 @: param labelMat: 標籤集 @: return: 權重參數矩陣(最佳迴歸係數) """
# 將數據轉爲numpy的數組
dataMatrix = np.mat(dataMat)
labelMat = np.mat(labelMat).transpose()
# 獲取矩陣的行列數
m, n = np.shape(dataMatrix)
# 初始化參數
alpha = 0.001
# 初始化迭代次數
maxCyc = 500
# 初始化矩陣的權重參數矩陣, 均爲1
weights = np.ones((n, 1))
# 開始迭代計算
for k in range(maxCyc):
h = sigmoid(dataMatrix * weights)
# 計算偏差
error = labelMat-h
# 更新迭代參數
weights = weights + alpha * dataMatrix.transpose() * error
return weights
def plotBestFit(weights):
#導入數據
dataMat, labelMat = loadDataSet()
#建立數組
dataArr = np.array(dataMat)
#獲取數組行數
n = np.shape(dataArr)[0]
#初始化座標
xcord1 = []; ycord1 = []
xcord2 = []; ycord2 = []
#遍歷每一行數據
for i in range(n):
#若是對應的類別標籤對應數值1,就添加到xcord1,ycord1中
if int(labelMat[i]) == 1:
xcord1.append(dataArr[i,1]); ycord1.append(dataArr[i,2])
#若是對應的類別標籤對應數值0,就添加到xcord2,ycord2中
else:
xcord2.append(dataArr[i,1]); ycord2.append(dataArr[i,2])
#建立空圖
fig = plt.figure()
#添加subplot,三種數據都畫在一張圖上
ax = fig.add_subplot(111)
#1類用紅色標識,marker='s'形狀爲正方形
ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
#0類用綠色標識,弄認marker='o'爲圓形
ax.scatter(xcord2, ycord2, s=30, c='green')
#設置x取值,arange支持浮點型
x = np.arange(-3.0, 3.0, 0.1)
#配計算y的值
y = (-weights[0]-weights[1]*x)/weights[2]
#畫擬合直線
ax.plot(x, y)
#貼座標表頭
plt.xlabel('X1'); plt.ylabel('X2')
#顯示結果
plt.show()
if __name__ == '__main__':
dataArr, labelMat = loadDataSet()
weights = gradAscent(dataArr, labelMat)
print (weights)
plotBestFit(weights.getA())
複製代碼
獲得圖像:
這個分類效果至關不錯,從圖上看之分錯了兩到四個點。可是,儘管例子簡單而且數據集很小,這個方法卻很須要大量的計算(300次乘積)。下面咱們將對該算法進行改進,從而使它能夠用到真實數據上。
梯度上升算法在每次更新迴歸係數時都須要遍歷整個數據集,計算複雜度過高了。一種改進方法就是一次僅用一個樣本點來更新迴歸係數,該方法稱爲隨機梯度上升算法。因爲能夠在新樣本到來時對分類器進行增量式更新,於是隨機梯度上升算法是一個在線學習方法。與「在線學習」相對應的,一次處理全部數據被稱爲是「批處理」
僞代碼:
全部迴歸係數初始化爲1
對數據集中每一個樣本
計算該樣本的梯度
使用alpha * gradient 更新迴歸係數值
返回迴歸係數值
複製代碼
Python實現
def randomGradAscent(dataMat, labelMat):
""" 隨機梯度上升函數 @: param dataMat: 數據集 @: param labelMat: 標籤集 @: return: 權重參數矩陣(最佳迴歸係數) """
dataMatrix = np.array(dataMat)
m, n = np.shape(dataMatrix)
# 設置步長
alpha = 0.01
#初始化參數
weights = np.ones(n)
for i in range(m):
h = sigmoid(sum(dataMatrix[i]*weights))
# 計算偏差
error = labelMat[i]-h
# 更新權重矩陣
weights = weights + alpha * error * dataMatrix[i]
return weights
複製代碼
獲得下圖:
改進:
Python實現
def randomGradAscent2(dataMat, labelMat):
""" 改進的隨機梯度上升算法 """
dataMatrix = np.array(dataMat)
m, n = np.shape(dataMatrix)
# 初始化參數
weights = np.ones(n)
# 迭代次數
numIter = 500
for i in range(numIter):
# 初始化index列表,這裏要注意將range輸出轉換成list
dataIndex = list(range(m))
# 遍歷每一行數據,這裏要注意將range輸出轉換成list
for j in list(range(m)):
# 更新alpha值,緩解數據高頻波動
alpha = 4/(1.0+i+j)+0.0001
# 隨機生成序列號,從而減小隨機性的波動
randIndex = int(np.random.uniform(0, len(dataIndex)))
# 序列號對應的元素與權重矩陣相乘,求和後再求sigmoid
h = sigmoid(sum(dataMatrix[randIndex]*weights))
# 求偏差,和以前同樣的操做
error = labelMat[randIndex] - h
# 更新權重矩陣
weights = weights + alpha * error * dataMatrix[randIndex]
# 刪除此次計算的數據
del(dataIndex[randIndex])
return weights
複製代碼
獲得下圖:
該實例使用Logistic迴歸來預測患有疝病的馬的存活問題。這裏的數據來自2010年1月11日的UCI機器學習數據庫,其中包含368個樣本和28個特徵。這裏的數據集是有30%的數據缺失的
咱們有如下方法處理缺失數據:
因爲這裏缺失數據佔到30%, 咱們採用用特殊值來填補缺失值,特殊值爲0
對於標籤丟失的數據,咱們選擇捨去
基於上文,咱們使用改進的隨機梯度上升算法來進行測試
Python實現
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import logistic
def classifyVector(inX, weights):
""" 分類 """
prob = logistic.sigmoid(sum(inX * weights))
if prob > 0.5:
return 1.0
else:
return 0.0
def colicTest():
""" 訓練和測試模型 """
frTrain = open('horse_colic_train.txt')
frTest = open('horse_colic_test.txt')
trainingSet = []
trainingLabels = []
# --------------訓練------------------
for line in frTrain.readlines():
currLine = line.strip().split('\t')
lineArr =[]
for i in range(21):
lineArr.append(float(currLine[i]))
trainingSet.append(lineArr)
trainingLabels.append(float(currLine[21]))
trainWeights = logistic.randomGradAscent2(np.array(trainingSet), trainingLabels)
# --------------測試------------------
numTestVec = 0.0
errorCount = 0
for line in frTest.readlines():
numTestVec += 1.0
currLine = line.strip().split('\t')
lineArr =[]
for i in range(21):
lineArr.append(float(currLine[i]))
if int(classifyVector(np.array(lineArr), trainWeights))!= int(currLine[21]):
errorCount += 1
errorRate = (float(errorCount)/numTestVec)
print ("測試的錯誤率爲: %f" % errorRate)
return errorRate
def multiTest():
""" 屢次測試 """
numTests = 10
errorSum = 0.0
for k in range(numTests):
errorSum += colicTest()
print ("第 %d 次迭代後,錯誤率爲: %f" % (numTests, errorSum / float(numTests)))
if __name__ == '__main__':
multiTest()
複製代碼
最終結果:
測試的錯誤率爲: 0.417910
測試的錯誤率爲: 0.328358
測試的錯誤率爲: 0.417910
測試的錯誤率爲: 0.268657
測試的錯誤率爲: 0.313433
測試的錯誤率爲: 0.388060
測試的錯誤率爲: 0.417910
測試的錯誤率爲: 0.358209
測試的錯誤率爲: 0.343284
測試的錯誤率爲: 0.298507
第 10 次迭代後,錯誤率爲: 0.355224
複製代碼