迴歸分析特徵選擇(包括Stepwise算法) python 實現

# -*- coding: utf-8 -*-
"""
Created on Sat Aug 18 16:23:17 2018

@author: acadsoc
"""
import scipy
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
from sklearn.cross_validation import cross_val_predict, cross_val_score, train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, r2_score
from sklearn.grid_search import RandomizedSearchCV
from sklearn.linear_model import  Lasso, LassoCV, ElasticNet
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from statsmodels.formula import api as smf
import sys
import os

plt.style.use('ggplot') # 設置ggplot2畫圖風格
# 根據不一樣平臺設置其中文字體路徑
if sys.platform == 'linux':
    zh_font = matplotlib.font_manager.FontProperties(
        fname='/path/anaconda3/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/STZHONGS.TTF')
else:
    zh_font = matplotlib.font_manager.FontProperties(fname='C:\Windows\Fonts\STZHONGS.ttf')  # 設置中文字體

# 根據不一樣平臺設定工做目錄
if sys.platform == 'linux':
    os.chdir('path/jupyternb/ml/acadsoc/rollingRegression') # Linux path
else:
    os.chdir('D:/Python/rollingRegression') # Windows path

class featureSelection():
    '''
    多元線性迴歸特徵選擇類。
    
    參數
    ----
    random_state : int,默認是None
        隨機種子。
        
    屬性
    ----
    elasticnet_rs_best : model
        彈性網絡隨機搜索最佳模型。
    elasticnet_rs_feat_selected_ : dataframe
        彈性網絡隨機搜索最佳模型選擇的係數大於0的變量。
    elasticnet_rs_R2_ : float
        彈性網絡隨機搜索最佳模型Rsquared。
    eln : model
        彈性網絡。
    elasticnet_coef_ : dataframe
        彈性網絡係數。
    elasticnet_feat_selected_ : list
        彈性網絡選擇係數大於0的變量。
    elasticnet_feat_ : float
        彈性網絡Rsquared。
    rf_rs_best : model
        隨機森林隨機搜索最佳模型。
    rf_rs_feat_impo_ : dataframe
        隨機森林隨機搜索變量重要性排序。
    rf_rs_feat_selected_ : list
        隨機森林隨機搜索累積重要性大於impo_cum_threshold的變量列表。
    rf_rs_R2_ : float
        隨機森林隨機搜索Rsquared。
    rf_feat_impo_ : dataframe
        隨機森林變量重要性排序。
    rf_feat_selected_ : list
        隨機森林累積重要性大於impo_cum_threshold的變量列表。
    rf_R2_ : float
        隨機森林Rsquared。
    stepwise_model : model
        逐步迴歸模型。
    '''      
    def __init__(self, random_state=None):
        self.random_state = random_state # 隨機種子        

    def elasticNetRandomSearch(self, df, cv=10, n_iter=1000, n_jobs=-1, intercept=True,
                               normalize=True):
        '''
        ElasticNet隨機搜索,搜索最佳模型。
        
        參數
        ----
        df : dataframe
            分析用數據框,response爲第一列。
        cv : int, 默認是10
            交叉驗證次數。
        n_iter : int, 默認是1000
            最大迭代次數。
        n_jobs : int, 默認是-1
            使用cpu線程數,默認爲-1,表示全部線程。
        intercept : bool, 默認是True
            是否有截距項。
        normalize : bool, 默認是True
            是否將數據標準化。  
        '''
        if normalize: # 若是須要標準化數據
            df_std = StandardScaler().fit_transform(df)
            df = pd.DataFrame(df_std, columns=df.columns, index=df.index)
            
        X = df.iloc[:, 1:]
        y = df.iloc[:, 0]
        # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
        eln = ElasticNet(fit_intercept=intercept)
        param_rs = {'alpha' : scipy.stats.expon(loc=0, scale=1),  # 模型需搜索的參數
                    'l1_ratio' : scipy.stats.uniform(loc=0, scale=1)}
        
        elasticnet_rs = RandomizedSearchCV(eln,  # 創建隨機搜索
                                param_distributions=param_rs,
                                scoring='r2',
                                cv=cv,
                                n_iter=n_iter,
                                n_jobs=n_jobs)
        elasticnet_rs.fit(X, y)  # 模型訓練        
        # 用最佳模型進行變量篩選變量、係數  
        self.elasticnet_rs_best = ElasticNet(alpha=elasticnet_rs.best_params_['alpha'],
                                          l1_ratio = elasticnet_rs.best_params_['l1_ratio'])
        self.elasticnet_rs_best.fit(X, y)
        coef = pd.DataFrame(self.elasticnet_rs_best.coef_, index=df.columns[1:],
                            columns=['係數']).sort_values(by='係數', axis=0, ascending=False)
        self.elasticnet_rs_feat_selected_ = coef[coef > 0].dropna(axis=1).columns
        self.elasticnet_rs_R2_ = 1 - np.mean((y.values.reshape(-1,1) -
                                              self.elasticnet_rs_best.predict(X).reshape(-1,1)) ** 2) / np.var(y)
        return self    
    
    def elasticNetFeatureSelectPlot(self, df, l1_ratio=.7, normalize=True, intercept=True,
                                    plot_width=12, plot_height=5, xlim_exp=[-5, 1], ylim=[-1, 1]):
        '''
        繪製ElasticNet正則化效果圖。
        
        參數
        ----
        df : dataframe
            分析用數據框,response爲第一列。
        l1_ratio : float, 默認是0.7
            l1正則化率。
        normalize : bool, 默認是True
            是否將數據標準化。  
        intercept : bool, 默認是True
            迴歸方程是否有常數項。
        plot_width : int, 默認是12
            畫板寬度。
        plot_height : int, 默認是5
            畫板高度。
        xlim_exp : list, 默認是[-5, 1]
            x軸顯示指數取值範圍。
        ylim : list, 默認是[-1, 1]
            y軸顯示取值範圍。
        '''
        if normalize: # 若是須要標準化數據
            df_std = StandardScaler().fit_transform(df)
            df = pd.DataFrame(df_std, columns=df.columns, index=df.index)  
        
        X = df.iloc[:, 1:]
        y = df.iloc[:, 0]
        
        plt.figure(figsize=(plot_width, plot_height))
        ax = plt.subplot(111)
        colors = ['blue', 'green', 'red', 'cyan', 'magenta', 'yellow', 'black', 'pink', 'lightgreen',
                  'lightblue', 'gray', 'indigo', 'orange', 'seagreen', 'gold', 'purple']
        weights, params = [], []
        for alpha in np.arange(-5, 1, 0.1, dtype=float):
            eln = ElasticNet(alpha=10 ** alpha, l1_ratio=l1_ratio, random_state=123,
                             fit_intercept=intercept)
            eln.fit(X, y)
            weights.append(eln.coef_)
            params.append(10 ** alpha)
        
        weights = np.array(weights)
        for column, color in zip(range(weights.shape[1]), colors):
            plt.plot(params, weights[:, column], label=df.columns[column + 1], color=color)
        
        plt.axhline(0, color='black', linestyle='--', linewidth=3)
        plt.xlim(10 ** xlim_exp[0], 10 ** xlim_exp[1])
        plt.ylim(ylim)
        plt.title('彈性網絡變量選擇圖', fontproperties=zh_font)
        plt.ylabel('權重係數', fontproperties=zh_font)
        plt.xlabel('$alpha$')
        plt.xscale('log')
        plt.xticks(10 ** np.arange(xlim_exp[0], xlim_exp[1], dtype=float),
                   10 ** np.arange(xlim_exp[0], xlim_exp[1], dtype=float))
        plt.legend(loc='best', prop=zh_font)
        ax.legend(prop=zh_font)
        #plt.grid()
        plt.show()
        return self
    
    ''''''
    def elasticNet(self, df, feat_selected=None, alpha=1, l1_ratio=.7, intercept=True, normalize=False):
        '''
        ElasticNet迴歸分析。
        
        參數
        ----
        df : dataframe
            分析用數據框,response爲第一列。
        alpha : float, 默認是1
            alpha。
        l1_ratio : float, 默認是0.7
            l1正則化率。
        intercept : bool, 默認是True
            是否有截距項。
        normalize : bool, 默認是True
            是否將數據標準化。          
        '''
        if normalize: # 若是須要標準化數據
            df_std = StandardScaler().fit_transform(df)
            df = pd.DataFrame(df_std, columns=df.columns, index=df.index)  
        
        if feat_selected is not None:  # 若是輸入了選擇好的變量
            X = df[feat_selected]
        else:            
            X = df.iloc[:, 1:]
        y = df.iloc[:, 0]
        
        self.eln = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, fit_intercept=intercept)
        self.eln.fit(X, y)  # 模型訓練
        
        # 變量、係數,R2
        self.elasticnet_coef_ = pd.DataFrame(self.eln.coef_, index = X.columns,
                            columns=['係數']).sort_values(by='係數', ascending=False)
        self.elasticnet_feat_selected_ = self.elasticnet_coef_[self.elasticnet_coef_ > 0].dropna(axis=0).index
        self.elasticnet_R2_ = 1 - np.mean((y.values.reshape(-1,1) -
                                           self.eln.predict(X).reshape(-1,1)) ** 2) / np.var(y)
        return self        
    
    def featureBarhPlot(self, df_coef, figsize=(12, 6)):   
        '''
        畫特徵條形圖(縱向排列)。
        
        參數
        ----
        df_coef : dataframe
            特徵係數(重要性)數據框。
        fitsize : tuple, 默認是(12, 6)
            畫布寬高。
        '''       
        coef = df_coef.sort_values(by=df_coef.columns[0], axis=0, ascending=True)
        plt.figure(figsize=figsize)
        y_label = np.arange(len(coef))
        plt.barh(y_label, coef.iloc[:, 0])
        plt.yticks(y_label, coef.index, fontproperties=zh_font)
        
        for i in np.arange(len(coef)):
            if coef.iloc[i, 0] >= 0:
                dist = 0.003 * coef.iloc[:, 0].max()
            else:
                dist = -0.02 * coef.iloc[:, 0].max()
            plt.text(coef.iloc[i, 0] + dist, i - 0.2, '%.3f' % coef.iloc[i, 0], fontproperties=zh_font)
            
        # plt.grid()
        plt.ylabel('特徵', fontproperties=zh_font)
        plt.xlabel('特徵係數', fontproperties=zh_font)
        plt.title('特徵係數條形圖', fontproperties=zh_font)
        plt.legend(prop=zh_font)
        plt.show()         
    
    def randomForestRandomSearch(self, df, cv=10, n_iter=100, n_jobs=-1, impo_cum_threshold=.85,
                                 normalize=True):
        '''
        RandomForest隨機搜索,搜索最佳模型。
        
        參數
        ----
        df : dataframe
            分析用數據框,response爲第一列。
        cv : int, 默認是10
            交叉驗證次數。
        n_iter : int, 默認是100
            最大迭代次數。
        n_jobs : int, 默認是-1
            使用cpu線程數,默認爲-1,表示全部線程。
        impo_cum_threshold : float, 默認是0.85
            按累積重要性選擇變量閾值。
        normalize : bool, 默認是True
            是否將數據標準化。
        '''
        if normalize: # 若是須要標準化數據
            df_std = StandardScaler().fit_transform(df)
            df = pd.DataFrame(df_std, columns=df.columns, index=df.index)  
            
        X = df.iloc[:, 1:]
        y = df.iloc[:, 0]
        # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
        rf = RandomForestRegressor()
        param_rs = {'n_estimators' : np.arange(1, 500),  # 模型需搜索的參數
                    'max_features' : np.arange(1, X.shape[1] + 1)}
        
        rf_rs = RandomizedSearchCV(rf,  # 創建隨機搜索
                                param_distributions=param_rs,
                                scoring='r2',
                                cv=cv,
                                n_iter=n_iter,
                                n_jobs=n_jobs)
        rf_rs.fit(X, y)  # 模型訓練        
        # 用最佳模型進行變量篩選變量、係數  
        self.rf_rs_best = RandomForestRegressor(n_estimators=rf_rs.best_params_['n_estimators'],
                                          max_features=rf_rs.best_params_['max_features'])
        self.rf_rs_best.fit(X, y)
        self.rf_rs_feat_impo_ = pd.DataFrame(self.rf_rs_best.feature_importances_, index = df.columns[1:],
                            columns=['係數']).sort_values(by='係數', axis=0, ascending=False)
        
        n = 0
        for i, v in enumerate(self.rf_rs_feat_impo_.values.cumsum()):
            if v >= impo_cum_threshold:
                n = i
                break
                
        self.rf_rs_feat_selected_ = self.rf_rs_feat_impo_.index[:n+1]           
        self.rf_rs_R2_ = 1 - np.mean((y.values.reshape(-1,1) - \
                                              self.rf_rs_best.predict(X).reshape(-1,1)) ** 2) / np.var(y)
        return self
    
    def randomForest(self, df, feat_selected=None, impo_cum_threshold=.85,
                     n_estimators=100, max_features='auto', normalize=False):
        '''
        Randomforest迴歸分析。
        
        參數
        ----
        df : dataframe
            分析用數據框,response爲第一列。
        feat_selected : list, 默認是None
            選擇的特徵。
        impo_cum_threshold : float, 默認是0.85
            按累積重要性選擇變量閾值。
        n_estimators : int, 默認是100
            森林含樹數。
        max_features : int, 默認是'auto'
            每課時最大選擇特徵數。        
        normalize : bool, 默認是True
            是否將數據標準化。
        '''    
        if normalize: # 若是須要標準化數據
            df_std = StandardScaler().fit_transform(df)
            df = pd.DataFrame(df_std, columns=df.columns, index=df.index)  
        
        if feat_selected is not None:  # 若是輸入了選擇好的變量
            X = df[feat_selected]
        else:            
            X = df.iloc[:, 1:]
        y = df.iloc[:, 0]
        
        self.rf = RandomForestRegressor(n_estimators=n_estimators, max_features=max_features)
        self.rf.fit(X, y)  # 模型訓練
        
        # 變量、係數,R2
        self.rf_feat_impo_ = pd.DataFrame(self.rf.feature_importances_, index = X.columns,
                            columns=['係數']).sort_values(by='係數', ascending=False)
        
        n = 0
        for i, v in enumerate(self.rf_feat_impo_.values.cumsum()):
            if v >= impo_cum_threshold:
                n = i
                break
                
        self.rf_feat_selected_ = self.rf_feat_impo_.index[:n+1]      
        self.rf_R2_ = 1 - np.mean((y.values.reshape(-1,1) - self.rf.predict(X).reshape(-1,1)) ** 2) / np.var(y)
        return self    
    
    def stepwise(self, df, response, intercept=True, normalize=False, criterion='bic',
                 f_pvalue_enter=.05, p_value_enter=.05, direction='backward', show_step=True,
                 criterion_enter=None, criterion_remove=None,max_iter=200, **kw):
                 
        '''
        逐步迴歸。
        
        參數
        ----
        df : dataframe
            分析用數據框,response爲第一列。
        response : str
            迴歸分析相應變量。
        intercept : bool, 默認是True
            模型是否有截距項。
        criterion : str, 默認是'bic'
            逐步迴歸優化規則。
        f_pvalue_enter : float, 默認是.05
            當選擇criterion=’ssr‘時,模型加入或移除變量的f_pvalue閾值。
        p_value_enter : float, 默認是.05
            當選擇derection=’both‘時,移除變量的pvalue閾值。
        direction : str, 默認是'backward'
            逐步迴歸方向。
        show_step : bool, 默認是True
            是否顯示逐步迴歸過程。
        criterion_enter : float, 默認是None
            當選擇derection=’both‘或'forward'時,模型加入變量的相應的criterion閾值。
        criterion_remove : float, 默認是None
            當選擇derection='backward'時,模型移除變量的相應的criterion閾值。
        max_iter : int, 默認是200
            模型最大迭代次數。
        '''
        criterion_list = ['bic', 'aic', 'ssr', 'rsquared', 'rsquared_adj']
        if criterion not in criterion_list:
            raise IOError('請輸入正確的criterion, 必須是如下內容之一:', '\n', criterion_list)
            
        direction_list = ['backward', 'forward', 'both']
        if direction not in direction_list:
            raise IOError('請輸入正確的direction, 必須是如下內容之一:', '\n', direction_list)
            
        # 默認p_enter參數    
        p_enter = {'bic':0.0, 'aic':0.0, 'ssr':0.05, 'rsquared':0.05, 'rsquared_adj':-0.05}
        if criterion_enter:  # 若是函數中對p_remove相應key傳參,則變動該參數
            p_enter[criterion] = criterion_enter
            
        # 默認p_remove參數    
        p_remove = {'bic':0.01, 'aic':0.01, 'ssr':0.1, 'rsquared':0.05, 'rsquared_adj':-0.05}
        if criterion_remove:  # 若是函數中對p_remove相應key傳參,則變動該參數
            p_remove[criterion] = criterion_remove
            
        if normalize: # 若是須要標準化數據
            intercept = False  # 截距強制設置爲0
            df_std = StandardScaler().fit_transform(df)
            df = pd.DataFrame(df_std, columns=df.columns, index=df.index)  
                
        ''' forward '''
        if direction == 'forward':
            remaining = list(df.columns)  # 自變量集合
            remaining.remove(response)
            selected = []  # 初始化選入模型的變量列表
            # 初始化當前評分,最優新評分
            if intercept: # 是否有截距
                formula = "{} ~ {} + 1".format(response, remaining[0])
            else:
                formula = "{} ~ {} - 1".format(response, remaining[0])
                    
            result = smf.ols(formula, df).fit() # 最小二乘法迴歸模型擬合            
            current_score = eval('result.' + criterion)
            best_new_score = eval('result.' + criterion)
                
            if show_step:    
                print('\nstepwise starting:\n')
            iter_times = 0
            # 當變量未剔除完,而且當前評分更新時進行循環
            while remaining and (current_score == best_new_score) and (iter_times<max_iter):
                scores_with_candidates = []  # 初始化變量以及其評分列表
                for candidate in remaining:  # 在未剔除的變量中每次選擇一個變量進入模型,如此循環
                    if intercept: # 是否有截距
                        formula = "{} ~ {} + 1".format(response, ' + '.join(selected + [candidate]))
                    else:
                        formula = "{} ~ {} - 1".format(response, ' + '.join(selected + [candidate]))
                        
                    result = smf.ols(formula, df).fit() # 最小二乘法迴歸模型擬合
                    fvalue = result.fvalue
                    f_pvalue = result.f_pvalue                    
                    score = eval('result.' + criterion)                    
                    scores_with_candidates.append((score, candidate, fvalue, f_pvalue)) # 記錄這次循環的變量、評分列表
                    
                if criterion == 'ssr':  # 這幾個指標取最小值進行優化
                    scores_with_candidates.sort(reverse=True)  # 對評分列表進行降序排序
                    best_new_score, best_candidate, best_new_fvalue, best_new_f_pvalue = scores_with_candidates.pop()  # 提取最小分數及其對應變量
                    if ((current_score - best_new_score) > p_enter[criterion]) and (best_new_f_pvalue < f_pvalue_enter):  # 若是當前評分大於最新評分
                        remaining.remove(best_candidate)  # 從剩餘未評分變量中剔除最新最優分對應的變量
                        selected.append(best_candidate)  # 將最新最優分對應的變量放入已選變量列表
                        current_score = best_new_score  # 更新當前評分
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, SSR = %.3f, Fstat = %.3f, FpValue = %.3e' %
                                  (best_candidate, best_new_score, best_new_fvalue, best_new_f_pvalue))
                    elif (current_score - best_new_score) >= 0 and (best_new_f_pvalue < f_pvalue_enter) and iter_times == 0: # 當評分差大於等於0,且爲第一次迭代
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif (best_new_f_pvalue < f_pvalue_enter) and iter_times == 0:  # 當評分差小於p_enter,且爲第一次迭代
                        selected.append(remaining[0])
                        remaining.remove(remaining[0])
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (remaining[0], criterion, best_new_score))
                elif criterion in ['bic', 'aic']:  # 這幾個指標取最小值進行優化
                    scores_with_candidates.sort(reverse=True)  # 對評分列表進行降序排序
                    best_new_score, best_candidate, best_new_fvalue, best_new_f_pvalue = scores_with_candidates.pop()  # 提取最小分數及其對應變量
                    if (current_score - best_new_score) > p_enter[criterion]:  # 若是當前評分大於最新評分
                        remaining.remove(best_candidate)  # 從剩餘未評分變量中剔除最新最優分對應的變量
                        selected.append(best_candidate)  # 將最新最優分對應的變量放入已選變量列表
                        current_score = best_new_score  # 更新當前評分
                        #print(iter_times)
                        if show_step:  # 是否顯示逐步迴歸過程  
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif (current_score - best_new_score) >= 0 and iter_times == 0: # 當評分差大於等於0,且爲第一次迭代
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif iter_times == 0:  # 當評分差小於p_enter,且爲第一次迭代
                        selected.append(remaining[0])
                        remaining.remove(remaining[0])
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (remaining[0], criterion, best_new_score))
                else:
                    scores_with_candidates.sort()
                    best_new_score, best_candidate, best_new_fvalue, best_new_f_pvalue = scores_with_candidates.pop()
                    if (best_new_score - current_score) > p_enter[criterion]:
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        print(iter_times, flush=True)
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif (best_new_score - current_score) >= 0 and iter_times == 0: # 當評分差大於等於0,且爲第一次迭代
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif iter_times == 0:  # 當評分差小於p_enter,且爲第一次迭代
                        selected.append(remaining[0])
                        remaining.remove(remaining[0])
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (remaining[0], criterion, best_new_score))
                iter_times += 1                        

            if intercept: # 是否有截距
                formula = "{} ~ {} + 1".format(response, ' + '.join(selected))
            else:
                formula = "{} ~ {} - 1".format(response, ' + '.join(selected))
                
            self.stepwise_model = smf.ols(formula, df).fit()  # 最優模型擬合
            
            if show_step:  # 是否顯示逐步迴歸過程                
                print('\nLinear regression model:', '\n  ', self.stepwise_model.model.formula)
                print('\n', self.stepwise_model.summary())
        
        ''' backward '''
        if direction == 'backward':
            remaining, selected = set(df.columns), set(df.columns)  # 自變量集合
            remaining.remove(response)
            selected.remove(response)  # 初始化選入模型的變量列表
             # 初始化當前評分,最優新評分
            if intercept: # 是否有截距
                formula = "{} ~ {} + 1".format(response, ' + '.join(selected))
            else:
                formula = "{} ~ {} - 1".format(response, ' + '.join(selected))
                    
            result = smf.ols(formula, df).fit() # 最小二乘法迴歸模型擬合            
            current_score = eval('result.' + criterion)
            worst_new_score = eval('result.' + criterion)
                
            if show_step:    
                print('\nstepwise starting:\n')
            iter_times = 0
            # 當變量未剔除完,而且當前評分更新時進行循環
            while remaining and (current_score == worst_new_score) and (iter_times<max_iter):
                scores_with_eliminations = []  # 初始化變量以及其評分列表
                for elimination in remaining:  # 在未剔除的變量中每次選擇一個變量進入模型,如此循環
                    if intercept: # 是否有截距
                        formula = "{} ~ {} + 1".format(response, ' + '.join(selected - set(elimination)))
                    else:
                        formula = "{} ~ {} - 1".format(response, ' + '.join(selected - set(elimination)))
                        
                    result = smf.ols(formula, df).fit() # 最小二乘法迴歸模型擬合
                    fvalue = result.fvalue
                    f_pvalue = result.f_pvalue                    
                    score = eval('result.' + criterion)                    
                    scores_with_eliminations.append((score, elimination, fvalue, f_pvalue)) # 記錄這次循環的變量、評分列表
                    
                if criterion == 'ssr':  # 這幾個指標取最小值進行優化
                    scores_with_eliminations.sort(reverse=False)  # 對評分列表進行降序排序
                    worst_new_score, worst_elimination, worst_new_fvalue, worst_new_f_pvalue = scores_with_eliminations.pop()  # 提取最小分數及其對應變量
                    if ((worst_new_score - current_score) < p_remove[criterion]) and (worst_new_f_pvalue < f_pvalue_enter):  # 若是當前評分大於最新評分
                        remaining.remove(worst_elimination)  # 從剩餘未評分變量中剔除最新最優分對應的變量
                        selected.remove(worst_elimination)  # 從已選變量列表中剔除最新最優分對應的變量
                        current_score = worst_new_score  # 更新當前評分
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Removing %s, SSR = %.3f, Fstat = %.3f, FpValue = %.3e' %
                                  (worst_elimination, worst_new_score, worst_new_fvalue, worst_new_f_pvalue))
                elif criterion in ['bic', 'aic']:  # 這幾個指標取最小值進行優化
                    scores_with_eliminations.sort(reverse=False)  # 對評分列表進行降序排序
                    worst_new_score, worst_elimination, worst_new_fvalue, worst_new_f_pvalue = scores_with_eliminations.pop()  # 提取最小分數及其對應變量
                    if (worst_new_score - current_score) < p_remove[criterion]:  # 若是評分變更不顯著
                        remaining.remove(worst_elimination)  # 從剩餘未評分變量中剔除最新最優分對應的變量
                        selected.remove(worst_elimination)  # 從已選變量列表中剔除最新最優分對應的變量
                        current_score = worst_new_score  # 更新當前評分
                        if show_step:  # 是否顯示逐步迴歸過程  
                            print('Removing %s, %s = %.3f' % (worst_elimination, criterion, worst_new_score))                        
                else:
                    scores_with_eliminations.sort(reverse=True)
                    worst_new_score, worst_elimination, worst_new_fvalue, worst_new_f_pvalue = scores_with_eliminations.pop()
                    if (current_score - worst_new_score) < p_remove[criterion]:
                        remaining.remove(worst_elimination)
                        selected.remove(worst_elimination)
                        current_score = worst_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Removing %s, %s = %.3f' % (worst_elimination, criterion, worst_new_score))                    
                iter_times += 1
                
            if intercept: # 是否有截距
                formula = "{} ~ {} + 1".format(response, ' + '.join(selected))
            else:
                formula = "{} ~ {} - 1".format(response, ' + '.join(selected))
                
            self.stepwise_model = smf.ols(formula, df).fit()  # 最優模型擬合
            
            if show_step:  # 是否顯示逐步迴歸過程                
                print('\nLinear regression model:', '\n  ', self.stepwise_model.model.formula)
                print('\n', self.stepwise_model.summary())
        
        ''' both '''
        if direction == 'both':
            remaining = list(df.columns)  # 自變量集合
            remaining.remove(response)
            selected = []  # 初始化選入模型的變量列表
            # 初始化當前評分,最優新評分
            if intercept: # 是否有截距
                formula = "{} ~ {} + 1".format(response, remaining[0])
            else:
                formula = "{} ~ {} - 1".format(response, remaining[0])
                    
            result = smf.ols(formula, df).fit() # 最小二乘法迴歸模型擬合            
            current_score = eval('result.' + criterion)
            best_new_score = eval('result.' + criterion)
                
            if show_step:    
                print('\nstepwise starting:\n')
            # 當變量未剔除完,而且當前評分更新時進行循環
            iter_times = 0
            while remaining and (current_score == best_new_score) and (iter_times<max_iter):
                scores_with_candidates = []  # 初始化變量以及其評分列表
                for candidate in remaining:  # 在未剔除的變量中每次選擇一個變量進入模型,如此循環
                    if intercept: # 是否有截距
                        formula = "{} ~ {} + 1".format(response, ' + '.join(selected + [candidate]))
                    else:
                        formula = "{} ~ {} - 1".format(response, ' + '.join(selected + [candidate]))
                        
                    result = smf.ols(formula, df).fit() # 最小二乘法迴歸模型擬合
                    fvalue = result.fvalue
                    f_pvalue = result.f_pvalue                    
                    score = eval('result.' + criterion)                    
                    scores_with_candidates.append((score, candidate, fvalue, f_pvalue)) # 記錄這次循環的變量、評分列表
                    
                if criterion == 'ssr':  # 這幾個指標取最小值進行優化
                    scores_with_candidates.sort(reverse=True)  # 對評分列表進行降序排序
                    best_new_score, best_candidate, best_new_fvalue, best_new_f_pvalue = scores_with_candidates.pop()  # 提取最小分數及其對應變量
                    if ((current_score - best_new_score) > p_enter[criterion]) and (best_new_f_pvalue < f_pvalue_enter):  # 若是當前評分大於最新評分
                        remaining.remove(best_candidate)  # 從剩餘未評分變量中剔除最新最優分對應的變量
                        selected.append(best_candidate)  # 將最新最優分對應的變量放入已選變量列表
                        current_score = best_new_score  # 更新當前評分
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, SSR = %.3f, Fstat = %.3f, FpValue = %.3e' %
                                  (best_candidate, best_new_score, best_new_fvalue, best_new_f_pvalue))
                    elif (current_score - best_new_score) >= 0 and (best_new_f_pvalue < f_pvalue_enter) and iter_times == 0: # 當評分差大於等於0,且爲第一次迭代
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif (best_new_f_pvalue < f_pvalue_enter) and iter_times == 0:  # 當評分差小於p_enter,且爲第一次迭代
                        selected.append(remaining[0])
                        remaining.remove(remaining[0])
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (remaining[0], criterion, best_new_score))
                elif criterion in ['bic', 'aic']:  # 這幾個指標取最小值進行優化
                    scores_with_candidates.sort(reverse=True)  # 對評分列表進行降序排序
                    best_new_score, best_candidate, best_new_fvalue, best_new_f_pvalue = scores_with_candidates.pop()  # 提取最小分數及其對應變量
                    if (current_score - best_new_score) > p_enter[criterion]:  # 若是當前評分大於最新評分
                        remaining.remove(best_candidate)  # 從剩餘未評分變量中剔除最新最優分對應的變量
                        selected.append(best_candidate)  # 將最新最優分對應的變量放入已選變量列表
                        current_score = best_new_score  # 更新當前評分
                        if show_step:  # 是否顯示逐步迴歸過程  
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif (current_score - best_new_score) >= 0 and iter_times == 0: # 當評分差大於等於0,且爲第一次迭代
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif iter_times == 0:  # 當評分差小於p_enter,且爲第一次迭代
                        selected.append(remaining[0])
                        remaining.remove(remaining[0])
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (remaining[0], criterion, best_new_score))
                else:
                    scores_with_candidates.sort()
                    best_new_score, best_candidate, best_new_fvalue, best_new_f_pvalue = scores_with_candidates.pop()
                    if (best_new_score - current_score) > p_enter[criterion]:  # 當評分差大於p_enter
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif (best_new_score - current_score) >= 0 and iter_times == 0: # 當評分差大於等於0,且爲第一次迭代
                        remaining.remove(best_candidate)
                        selected.append(best_candidate)
                        current_score = best_new_score
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (best_candidate, criterion, best_new_score))
                    elif iter_times == 0:  # 當評分差小於p_enter,且爲第一次迭代
                        selected.append(remaining[0])
                        remaining.remove(remaining[0])
                        if show_step:  # 是否顯示逐步迴歸過程                             
                            print('Adding %s, %s = %.3f' % (remaining[0], criterion, best_new_score))
                            
                if intercept: # 是否有截距
                    formula = "{} ~ {} + 1".format(response, ' + '.join(selected))
                else:
                    formula = "{} ~ {} - 1".format(response, ' + '.join(selected))                    

                result = smf.ols(formula, df).fit()  # 最優模型擬合                    
                if iter_times >= 1: # 當第二次循環時判斷變量的pvalue是否達標
                    if result.pvalues.max() > p_value_enter:
                        var_removed = result.pvalues[result.pvalues == result.pvalues.max()].index[0]
                        p_value_removed = result.pvalues[result.pvalues == result.pvalues.max()].values[0]
                        selected.remove(result.pvalues[result.pvalues == result.pvalues.max()].index[0])
                        if show_step:  # 是否顯示逐步迴歸過程                
                            print('Removing %s, Pvalue = %.3f' % (var_removed, p_value_removed))
                iter_times += 1
                
            if intercept: # 是否有截距
                formula = "{} ~ {} + 1".format(response, ' + '.join(selected))
            else:
                formula = "{} ~ {} - 1".format(response, ' + '.join(selected))
                
            self.stepwise_model = smf.ols(formula, df).fit()  # 最優模型擬合           
            if show_step:  # 是否顯示逐步迴歸過程                
                print('\nLinear regression model:', '\n  ', self.stepwise_model.model.formula)
                print('\n', self.stepwise_model.summary())                
        # 最終模型選擇的變量
        if intercept:
            self.stepwise_feat_selected_ = list(self.stepwise_model.params.index[1:])
        else:
            self.stepwise_feat_selected_ = list(self.stepwise_model.params.index)
        return selfpython

相關文章
相關標籤/搜索