機器學習 | 算法筆記- 線性迴歸(Linear Regression)

前言

本系列爲機器學習算法的總結和概括,目的爲了清晰闡述算法原理,同時附帶上手代碼實例,便於理解。

目錄

   決策樹
   線性迴歸
  組合算法(Ensemble Method)
   K-Means
  機器學習算法總結
 
本章爲線性迴歸,內容包括模型介紹及代碼實現(包括自主實現和sklearn案例)。

1、算法簡介

1.1 什麼是迴歸分析

迴歸分析是一種預測性的建模技術,它研究的是因變量(目標)和自變量(預測器)之間的關係。這種技術一般用於預測分析,時間序列模型以及發現變量之間的因果關係。一般使用曲線/線來擬合數據點,目標是 使曲線到數據點的距離差別最小

1.2 線性迴歸

線性迴歸是迴歸問題中的一種,線性迴歸假設目標值與特徵之間線性相關,即知足一個多元一次方程。經過構建損失函數,來求解損失函數最小時的參數w和b。通長咱們能夠表達成以下公式:
y^爲預測值,自變量x和因變量y是已知的,而咱們想實現的是預測新增一個x,其對應的y是多少。所以,爲了構建這個函數關係,目標是經過已知數據點,求解線性模型中w和b兩個參數。

1.3 目標/損失函數

求解最佳參數,須要一個標準來對結果進行衡量,爲此咱們須要定量化一個目標函數式,使得計算機能夠在求解過程當中不斷地優化。
針對任何模型求解問題,都是最終都是能夠獲得一組預測值y^ ,對比已有的真實值 y ,數據行數爲 n ,能夠將損失函數定義以下:
即預測值與真實值之間的平均的平方距離,統計中通常稱其爲 MAE(mean square error)均方偏差。把以前的函數式代入損失函數,而且將須要求解的參數w和b看作是函數L的自變量,可得
如今的任務是求解最小化L時w和b的值,
即核心目標優化式爲
求解方式有兩種:
1)最小二乘法(least square method)
求解 w 和 b 是使損失函數最小化的過程,在統計中,稱爲線性迴歸模型的最小二乘「參數估計」(parameter estimation)。咱們能夠將 L(w,b) 分別對 w 和 b 求導,獲得
令上述兩式爲0,可獲得 w 和 b 最優解的閉式(closed-form)解:
 
2)梯度降低(gradient descent)
梯度降低核心內容是對自變量進行不斷的更新(針對w和b求偏導),使得目標函數不斷逼近最小值的過程
這裏不作展開講解。

2、代碼實現

2.1 簡單線性迴歸

首先創建linear_regression.py文件,用於實現線性迴歸的類文件,包含了線性迴歸內部的核心函數:
# -*- coding: utf-8 -*-

import numpy as np


class LinerRegression(object):

    def __init__(self, learning_rate=0.01, max_iter=100, seed=None):
        np.random.seed(seed)
        self.lr = learning_rate
        self.max_iter = max_iter
        self.w = np.random.normal(1, 0.1)
        self.b = np.random.normal(1, 0.1)
        self.loss_arr = []

    def fit(self, x, y):
        self.x = x
        self.y = y
        for i in range(self.max_iter):
            self._train_step()
            self.loss_arr.append(self.loss())
            # print('loss: \t{:.3}'.format(self.loss()))
            # print('w: \t{:.3}'.format(self.w))
            # print('b: \t{:.3}'.format(self.b))

    def _f(self, x, w, b):
        return x * w + b

    def predict(self, x=None):
        if x is None:
            x = self.x
        y_pred = self._f(x, self.w, self.b)
        return y_pred

    def loss(self, y_true=None, y_pred=None):
        if y_true is None or y_pred is None:
            y_true = self.y
            y_pred = self.predict(self.x)
        return np.mean((y_true - y_pred)**2)

    def _calc_gradient(self):
        d_w = np.mean((self.x * self.w + self.b - self.y) * self.x)
        d_b = np.mean(self.x * self.w + self.b - self.y)
        return d_w, d_b

    def _train_step(self):
        d_w, d_b = self._calc_gradient()
        self.w = self.w - self.lr * d_w
        self.b = self.b - self.lr * d_b
        return self.w, self.b
View Code

創建 train.py 文件,用於生成模擬數據,並調用 liner_regression.py 中的類,完成線性迴歸任務:html

# -*- coding: utf-8 -*-

import numpy as np
import matplotlib.pyplot as plt
from liner_regression import *


def show_data(x, y, w=None, b=None):
    plt.scatter(x, y, marker='.')
    if w is not None and b is not None:
        plt.plot(x, w*x+b, c='red')
    plt.show()


# data generation
np.random.seed(272)
data_size = 100
x = np.random.uniform(low=1.0, high=10.0, size=data_size)
y = x * 20 + 10 + np.random.normal(loc=0.0, scale=10.0, size=data_size)

# plt.scatter(x, y, marker='.')
# plt.show()

# train / test split
shuffled_index = np.random.permutation(data_size)
x = x[shuffled_index]
y = y[shuffled_index]
split_index = int(data_size * 0.7)
x_train = x[:split_index]
y_train = y[:split_index]
x_test = x[split_index:]
y_test = y[split_index:]

# visualize data
# plt.scatter(x_train, y_train, marker='.')
# plt.show()
# plt.scatter(x_test, y_test, marker='.')
# plt.show()

# train the liner regression model
regr = LinerRegression(learning_rate=0.01, max_iter=10, seed=314)
regr.fit(x_train, y_train)
print('cost: \t{:.3}'.format(regr.loss()))
print('w: \t{:.3}'.format(regr.w))
print('b: \t{:.3}'.format(regr.b))
show_data(x, y, regr.w, regr.b)

# plot the evolution of cost
plt.scatter(np.arange(len(regr.loss_arr)), regr.loss_arr, marker='o', c='green')
plt.show()
View Code

2.2 sklearn實現

sklearn.linear_model提供了不少線性模型,包括嶺迴歸、貝葉斯迴歸、Lasso等。本文主要嘗試使用嶺迴歸Ridge,該函數一共有8個參數,詳見
嶺迴歸是縮減法的一種,至關於對迴歸係數的大小施加了限制。另外一種很好的縮減法是lasso。lasso難以求解,但可使用計算簡便的逐步線性迴歸方法求的近似解。
# -*-coding:utf-8 -*-
import numpy as np
from bs4 import BeautifulSoup
import random

def scrapePage(retX, retY, inFile, yr, numPce, origPrc):
    """
    函數說明:從頁面讀取數據,生成retX和retY列表
    Parameters:
        retX - 數據X
        retY - 數據Y
        inFile - HTML文件
        yr - 年份
        numPce - 樂高部件數目
        origPrc - 原價
    Returns:
        無
    """
    # 打開並讀取HTML文件
    with open(inFile, encoding='utf-8') as f:
        html = f.read()
    soup = BeautifulSoup(html)

    i = 1
    # 根據HTML頁面結構進行解析
    currentRow = soup.find_all('table', r = "%d" % i)

    while(len(currentRow) != 0):
        currentRow = soup.find_all('table', r = "%d" % i)
        title = currentRow[0].find_all('a')[1].text
        lwrTitle = title.lower()
        # 查找是否有全新標籤
        if (lwrTitle.find('new') > -1) or (lwrTitle.find('nisb') > -1):
            newFlag = 1.0
        else:
            newFlag = 0.0

        # 查找是否已經標誌出售,咱們只收集已出售的數據
        soldUnicde = currentRow[0].find_all('td')[3].find_all('span')
        if len(soldUnicde) == 0:
            print("商品 #%d 沒有出售" % i)
        else:
            # 解析頁面獲取當前價格
            soldPrice = currentRow[0].find_all('td')[4]
            priceStr = soldPrice.text
            priceStr = priceStr.replace('$','') 
            priceStr = priceStr.replace(',','')
            if len(soldPrice) > 1:
                priceStr = priceStr.replace('Free shipping', '')
            sellingPrice = float(priceStr)

            # 去掉不完整的套裝價格
            if  sellingPrice > origPrc * 0.5:
                print("%d\t%d\t%d\t%f\t%f" % (yr, numPce, newFlag, origPrc, sellingPrice))
                retX.append([yr, numPce, newFlag, origPrc])
                retY.append(sellingPrice)
        i += 1
        currentRow = soup.find_all('table', r = "%d" % i)

def ridgeRegres(xMat, yMat, lam = 0.2):
    """
    函數說明:嶺迴歸
    Parameters:
        xMat - x數據集
        yMat - y數據集
        lam - 縮減係數
    Returns:
        ws - 迴歸係數
    """
    xTx = xMat.T * xMat
    denom = xTx + np.eye(np.shape(xMat)[1]) * lam
    if np.linalg.det(denom) == 0.0:
        print("矩陣爲奇異矩陣,不能求逆")
        return
    ws = denom.I * (xMat.T * yMat)
    return ws

def setDataCollect(retX, retY):
    """
    函數說明:依次讀取六種樂高套裝的數據,並生成數據矩陣
    Parameters:
        無
    Returns:
        無

    """
    scrapePage(retX, retY, './lego/lego8288.html', 2006, 800, 49.99)                #2006年的樂高8288,部件數目800,原價49.99
    scrapePage(retX, retY, './lego/lego10030.html', 2002, 3096, 269.99)                #2002年的樂高10030,部件數目3096,原價269.99
    scrapePage(retX, retY, './lego/lego10179.html', 2007, 5195, 499.99)                #2007年的樂高10179,部件數目5195,原價499.99
    scrapePage(retX, retY, './lego/lego10181.html', 2007, 3428, 199.99)                #2007年的樂高10181,部件數目3428,原價199.99
    scrapePage(retX, retY, './lego/lego10189.html', 2008, 5922, 299.99)                #2008年的樂高10189,部件數目5922,原價299.99
    scrapePage(retX, retY, './lego/lego10196.html', 2009, 3263, 249.99)                #2009年的樂高10196,部件數目3263,原價249.99

def regularize(xMat, yMat):
    """
    函數說明:數據標準化
    Parameters:
        xMat - x數據集
        yMat - y數據集
    Returns:
        inxMat - 標準化後的x數據集
        inyMat - 標準化後的y數據集

    """    
    inxMat = xMat.copy()                                                        #數據拷貝
    inyMat = yMat.copy()
    yMean = np.mean(yMat, 0)                                                    #行與行操做,求均值
    inyMat = yMat - yMean                                                        #數據減去均值
    inMeans = np.mean(inxMat, 0)                                                   #行與行操做,求均值
    inVar = np.var(inxMat, 0)                                                     #行與行操做,求方差
    # print(inxMat)
    print(inMeans)
    # print(inVar)
    inxMat = (inxMat - inMeans) / inVar                                            #數據減去均值除以方差實現標準化
    return inxMat, inyMat

def rssError(yArr,yHatArr):
    """
    函數說明:計算平方偏差
    Parameters:
        yArr - 預測值
        yHatArr - 真實值
    Returns:
        

    """
    return ((yArr-yHatArr)**2).sum()

def standRegres(xArr,yArr):
    """
    函數說明:計算迴歸係數w
    Parameters:
        xArr - x數據集
        yArr - y數據集
    Returns:
        ws - 迴歸係數

    """
    xMat = np.mat(xArr); yMat = np.mat(yArr).T
    xTx = xMat.T * xMat                            #根據文中推導的公示計算迴歸係數
    if np.linalg.det(xTx) == 0.0:
        print("矩陣爲奇異矩陣,不能求逆")
        return
    ws = xTx.I * (xMat.T*yMat)
    return ws

def crossValidation(xArr, yArr, numVal = 10):
    """
    函數說明:交叉驗證嶺迴歸
    Parameters:
        xArr - x數據集
        yArr - y數據集
        numVal - 交叉驗證次數
    Returns:
        wMat - 迴歸係數矩陣

    """
    m = len(yArr)                                                                        #統計樣本個數                       
    indexList = list(range(m))                                                            #生成索引值列表
    errorMat = np.zeros((numVal,30))                                                    #create error mat 30columns numVal rows
    for i in range(numVal):                                                                #交叉驗證numVal次
        trainX = []; trainY = []                                                        #訓練集
        testX = []; testY = []                                                            #測試集
        random.shuffle(indexList)                                                        #打亂次序
        for j in range(m):                                                                #劃分數據集:90%訓練集,10%測試集
            if j < m * 0.9: 
                trainX.append(xArr[indexList[j]])
                trainY.append(yArr[indexList[j]])
            else:
                testX.append(xArr[indexList[j]])
                testY.append(yArr[indexList[j]])
        wMat = ridgeTest(trainX, trainY)                                                #得到30個不一樣lambda下的嶺迴歸係數
        for k in range(30):                                                                #遍歷全部的嶺迴歸係數
            matTestX = np.mat(testX); matTrainX = np.mat(trainX)                        #測試集
            meanTrain = np.mean(matTrainX,0)                                            #測試集均值
            varTrain = np.var(matTrainX,0)                                                #測試集方差
            matTestX = (matTestX - meanTrain) / varTrain                                 #測試集標準化
            yEst = matTestX * np.mat(wMat[k,:]).T + np.mean(trainY)                        #根據ws預測y值
            errorMat[i, k] = rssError(yEst.T.A, np.array(testY))                            #統計偏差
    meanErrors = np.mean(errorMat,0)                                                    #計算每次交叉驗證的平均偏差
    minMean = float(min(meanErrors))                                                    #找到最小偏差
    bestWeights = wMat[np.nonzero(meanErrors == minMean)]                                #找到最佳迴歸係數

    xMat = np.mat(xArr); yMat = np.mat(yArr).T
    meanX = np.mean(xMat,0); varX = np.var(xMat,0)
    unReg = bestWeights / varX                                                            #數據通過標準化,所以須要還原
    print('%f%+f*年份%+f*部件數量%+f*是否爲全新%+f*原價' % ((-1 * np.sum(np.multiply(meanX,unReg)) + np.mean(yMat)), unReg[0,0], unReg[0,1], unReg[0,2], unReg[0,3]))    

def ridgeTest(xArr, yArr):
    """
    函數說明:嶺迴歸測試
    Parameters:
        xMat - x數據集
        yMat - y數據集
    Returns:
        wMat - 迴歸係數矩陣

    """
    xMat = np.mat(xArr); yMat = np.mat(yArr).T
    #數據標準化
    yMean = np.mean(yMat, axis = 0)                        #行與行操做,求均值
    yMat = yMat - yMean                                    #數據減去均值
    xMeans = np.mean(xMat, axis = 0)                    #行與行操做,求均值
    xVar = np.var(xMat, axis = 0)                        #行與行操做,求方差
    xMat = (xMat - xMeans) / xVar                        #數據減去均值除以方差實現標準化
    numTestPts = 30                                        #30個不一樣的lambda測試
    wMat = np.zeros((numTestPts, np.shape(xMat)[1]))    #初始迴歸係數矩陣
    for i in range(numTestPts):                            #改變lambda計算迴歸係數
        ws = ridgeRegres(xMat, yMat, np.exp(i - 10))    #lambda以e的指數變化,最初是一個很是小的數,
        wMat[i, :] = ws.T                                 #計算迴歸係數矩陣
    return wMat

def useStandRegres():
    """
    函數說明:使用簡單的線性迴歸
    Parameters:
        無
    Returns:
        無

    """
    lgX = []
    lgY = []
    setDataCollect(lgX, lgY)
    data_num, features_num = np.shape(lgX)
    lgX1 = np.mat(np.ones((data_num, features_num + 1)))
    lgX1[:, 1:5] = np.mat(lgX)
    ws = standRegres(lgX1, lgY)
    print('%f%+f*年份%+f*部件數量%+f*是否爲全新%+f*原價' % (ws[0],ws[1],ws[2],ws[3],ws[4]))    

def usesklearn():
    """
    函數說明:使用sklearn
    Parameters:
        無
    Returns:
        無

    """
    from sklearn import linear_model
    reg = linear_model.Ridge(alpha = .5)
    lgX = []
    lgY = []
    setDataCollect(lgX, lgY)
    reg.fit(lgX, lgY) 
    print('%f%+f*年份%+f*部件數量%+f*是否爲全新%+f*原價' % (reg.intercept_, reg.coef_[0], reg.coef_[1], reg.coef_[2], reg.coef_[3]))    

if __name__ == '__main__':
    usesklearn()
View Code
相關文章
相關標籤/搜索