因爲近期學業繁重QAQ,因此我就不說廢話了,直接上代碼~算法
from numpy import * import matplotlib.pyplot as plt #使用梯度上升法找到最佳參數 #使用梯度上升法找到最佳迴歸係數, #也就是擬合Logistic迴歸模型的最佳參數 #Logistic歸回梯度上升優化算法 #加載文件 def loadDataSet(): dataMat=[];labelMat=[] #打開文本文件 fr=open('testSet.txt') #逐行讀取 for line in fr.readlines(): lineArr=line.strip().split() #爲了方便計算,將x0設爲1,後面的x1,x2是文本中每行的前兩個值 dataMat.append([1.0, float(lineArr[0]), float(lineArr[1])]) #文本中每行的第三個值爲數據對應的標籤 labelMat.append(int(lineArr[2])) return dataMat,labelMat #sigmiod函數 def sigmoid(inX): return 1.0/(1+exp(-inX)) #梯度上升算法 #第一個參數是一個2維的Numpy數組 #每列表示不一樣的特徵 #每行表示每一個訓練樣本 #咱們採用100個樣本,包含兩個x1,x2特徵 #再加上第0維特徵x0,因此dataMatIn是一個100X3的矩陣 #第二個參數是類別標籤,是一個1X100的行向量 def gradAscent(dataMatIn,classLabels): dataMatrix=mat(dataMatIn) #將行向量轉置爲列向量 labelMat=mat(classLabels).transpose() #獲得矩陣的大小 m,n=shape(dataMatrix) #向目標移動的步長 alpha=0.001 #迭代次數 maxCycles=500 weights=ones((n,1)) #在for循環結束以後,將返回訓練好的迴歸係數 for k in range(maxCycles): #注:此處是矩陣相乘 #h是一個列向量,元素的個數等於樣本的個數 h=sigmoid(dataMatrix*weights) #真實類別與預測類別的差異 error=(labelMat-h) #按照該差異的方向調整迴歸係數 weights=weights+alpha*dataMatrix.transpose()*error #返回迴歸係數——肯定了不一樣類別數據之間的分割線 return weights #畫出決策邊界 #畫出數據集和Logistic迴歸最佳擬合直線的函數 #X1表示一個特徵,X2表示另外一個特徵 def plotBestFit(weights): #獲得數據集與標籤 dataMat,labelMat=loadDataSet() dataArr = array(dataMat) n = shape(dataArr)[0] xcord1 = []; ycord1 = [] xcord2 = []; ycord2 = [] #對數據集進行分類 for i in range(n): if int(labelMat[i])== 1: xcord1.append(dataArr[i,1]); ycord1.append(dataArr[i,2]) else: xcord2.append(dataArr[i,1]); ycord2.append(dataArr[i,2]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(xcord1, ycord1, s=30, c='red', marker='s') ax.scatter(xcord2, ycord2, s=30, c='green') x = arange(-3.0, 3.0, 0.1) #根據gradAscent獲得的迴歸係數繪製分割線 y = (-weights[0]-weights[1]*x)/weights[2] #print(x) #print(y) ax.plot(x, y) plt.xlabel('X1'); plt.ylabel('X2'); plt.show() #梯度上升方法在每次更新迴歸係數時都須要遍歷整個數據集 #改進方法:一次僅使用一個樣本點來更新迴歸係數——隨機梯度上升算法 #因爲能夠在新樣本到來時對分類器進行增量式更新,所以隨機梯度上升 #算法是一個在線學習算法 #與「在線學習」相對應,一次數裏全部數據被稱做「批處理」 #隨機梯度上升算法 def stocGradAscent0(dataMatrix,classLabels): #獲得矩陣的大小 m,n=shape(dataMatrix) #向目標移動的步長 alpha=0.01 weights=ones(n) for i in range(m): #h爲向量 h=sigmoid(sum(dataMatrix[i]*weights)) #error爲向量 error=classLabels[i]-h weights=weights+alpha*error*dataMatrix[i] return weights #因爲通過測試,屢次迭代後,X0,X1收斂速度較小 #且存在一些小的週期性的波動,所以及逆行改進 #改進隨機梯度上升算法 #第三個參數爲迭代次數 def stocGradAscent1(dataMatrix,classLabels,numIter=150): m,n=shape(dataMatrix) weights=ones(n) for j in range(numIter): dataIndex=list(range(m)) for i in range(m): #改進1:alpha[向目標移動的步長]會隨着迭代的次數不斷減少 #能夠緩解數據波動或高頻波動, alpha=4/(1.0+j+i)+0.01 #經過隨機選取樣原本更新迴歸係數 #能夠減小週期性的波動 randIndex=int(random.uniform(0,len(dataIndex))) h=sigmoid(sum(dataMatrix[randIndex]*weights)) error=classLabels[randIndex]-h weights=weights+alpha*error*dataMatrix[randIndex] del(dataIndex[randIndex]) return weights #Logistic迴歸預測病馬的死亡率 #對於缺失數據,咱們選擇用0來替換 #由於這樣不會影響係數weights的值 #對於標籤已經丟失的,咱們將這條數據丟棄 #使用Logistic迴歸進行分類的主要思路: #把測試集上每一個特徵向量乘最優方法獲得的迴歸係數 #再將該乘積結果求和,最後輸入Sigmoid函數中便可, #若對應的sigmoid值>0.5預測類別標籤爲1,不然爲0 #Logistic迴歸分類函數 def classifyVector(inX,weights): #以迴歸係數和特徵向量做爲輸入來計算對應的Sigmoid值 prob=sigmoid(sum(inX*weights)) if prob>0.5:return 1.0 else:return 0.0 #打開測試集和訓練集,並對數據進行格式化處理 def colicTest(): frTrain=open('horseColicTraining.txt') frTest=open('horseColicTest.txt') trainingSet=[] trainingLabels=[] #遍歷每一行 for line in frTrain.readlines(): currLine=line.strip().split('\t') lineArr=[] #遍歷每一列 for i in range(21): lineArr.append(float(currLine[i])) trainingSet.append(lineArr) #最後一列爲類別標籤 trainingLabels.append(float(currLine[21])) #計算迴歸係數向量 trainWeights=stocGradAscent1(array(trainingSet),trainingLabels,500) errorCount=0 numTestVec=0.0 for line in frTest.readlines(): numTestVec+=1.0 currLine=line.strip().split('\t') lineArr=[] for i in range(21): lineArr.append(float(currLine[i])) #對測試集進行分類,並查看結果是否正確 if int(classifyVector(array(lineArr),trainWeights))!=int(currLine[21]): errorCount+=1 #計算錯誤率 errorRate=(float(errorCount)/numTestVec) print("the error rate of this test is: %f"%errorRate) return errorRate #調用colicTest函數10次,而且結果的平均值 def multiTest(): numTests=10 errorSum=0.0 for k in range(numTests): errorSum+=colicTest() print("after %d iterations the average error rate is: %f"%(numTests,errorSum/float(numTests))) def main(): #dataArr,labelMat=loadDataSet() #weights=gradAscent(dataArr,labelMat) #print(weights) #plotBestFit(weights.getA()) #weights=stocGradAscent0(array(dataArr),labelMat) #weights=stocGradAscent1(array(dataArr),labelMat) #plotBestFit(weights) multiTest() if __name__=='__main__': main()