目錄python
每一個迴歸係數初始化爲1 重複 R 次: 計算整個數據集的梯度 使用 alpha*gradient 更新迴歸係數的向量 返回迴歸係數
import os import numpy as np import matplotlib.pyplot as plt from path_settings import machine_learning_PATH data_set_path = os.path.join(machine_learning_PATH, '第五章/data-set') testSet_path = os.path.join(data_set_path, 'testSet.txt') horseColicTraining_path = os.path.join(data_set_path, 'horseColicTraining.txt') horseColicTest_path = os.path.join(data_set_path, 'horseColicTest.txt') def load_data_set(): """導入數據集""" data_mat = [] label_mat = [] # 循環導入.txt文本數據構形成列表 fr = open(testSet_path) for line in fr.readlines(): line_arr = line.strip().split() data_mat.append([1, float(line_arr[0]), float(line_arr[1])]) label_mat.append(int(line_arr[2])) return data_mat, label_mat def sigmoid(in_x): return 1 / (1 + np.exp(-in_x)) def grad_ascent(data_mat_in, class_labels): # 生成特徵矩陣 data_matrix = np.mat(data_mat_in) # 生成標記矩陣並反置 label_mat = np.mat(class_labels).transpose() # 計算data_matrix的行列 m, n = np.shape(data_matrix) # 設置移動的步長爲0.001 alpha = 0.001 # 設置最大遞歸次數500次 max_cycles = 500 # 初始化係數爲1*3的元素全爲1的矩陣 weights = np.ones((n, 1)) # 循環迭代梯度上升算法 for k in range(max_cycles): # 計算真實類別與預測類別的差值 h = sigmoid(data_matrix * weights) error = (label_mat - h) # 調整迴歸係數 weights = weights + alpha * data_matrix.transpose() * error return weights def test_grad_ascent(): data_mat, label_mat = load_data_set() weights = grad_ascent(data_mat, label_mat) print(weights) """ [[ 4.12414349] [ 0.48007329] [-0.6168482 ]] """ if __name__ == '__main__': test_grad_ascent()
def plot_best_fit(wei): # getA==np.asarrayz(self) # 使用__class__.__name__爲了判斷是梯度上升和隨機梯度上升 if wei.__class__.__name__ == 'matrix': weights = wei.getA() elif wei.__class__.__name__ == 'ndarray': weights = wei else: weights = wei data_mat, label_mat = load_data_set() # 把特徵集轉換成數組 data_arr = np.array(data_mat) n = np.shape(data_arr)[0] # 循環數據集分類 xcord1 = [] ycord1 = [] xcord2 = [] ycord2 = [] for i in range(n): if int(label_mat[i]) == 1: xcord1.append(data_arr[i, 1]) ycord1.append(data_arr[i, 2]) else: xcord2.append(data_arr[i, 1]) ycord2.append(data_arr[i, 2]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(xcord1, ycord1, s=30, c='red', marker='s') ax.scatter(xcord2, ycord2, s=30, c='green') # 0.1是步長 x = np.arange(-3, 3, 0.1) # 假設 sigmoid 函數爲0,而且這裏的 x,y 至關於上述的 x1和x2便可得出 y 的公式 y = (-weights[0] - weights[1] * x) / weights[2] ax.plot(x, y) plt.xlabel('X1') plt.ylabel('X2') plt.show() def test_plot_best_fit(): data_mat, label_mat = load_data_set() weights = grad_ascent(data_mat, label_mat) plot_best_fit(weights) if __name__ == '__main__': # test_grad_ascent() test_plot_best_fit()
全部迴歸係數初始化爲1 對數據集中每一個樣本 計算該樣本的梯度 使用 alpha*gradient 更新迴歸係數值 返回迴歸係數值
def stoc_grad_ascent0(data_matrix, class_labels): """隨機梯度上升算法""" m, n = np.shape(data_matrix) alpha = 0.01 weights = np.ones(n) for i in range(m): # 使用 sum 函數得出一個值,只用計算一次 h = sigmoid(sum(data_matrix[i] * weights)) error = class_labels[i] - h weights = weights + alpha * error * data_matrix[i] return weights def test_stoc_grad_ascent0(): data_arr, label_mat = load_data_set() weights = stoc_grad_ascent0(np.array(data_arr), label_mat) plot_best_fit(weights) if __name__ == '__main__': # test_grad_ascent() # test_plot_best_fit() test_stoc_grad_ascent0()
def stoc_grad_ascent1(data_matrix, class_labels, num_iter=150): """改進隨機梯度上升算法,默認迭代150次""" m, n = np.shape(data_matrix) weights = np.ones(n) for j in range(num_iter): data_index = list(range(m)) for i in range(m): # 每次迭代減少 alpha 的值,但最小爲0.01,確保新數據依然有影響。緩解係數波動的狀況 alpha = 4 / (1 + j + i) + 0.01 # 隨機選取值進行更新 rand_index = int(np.random.uniform(0, len(data_index))) h = sigmoid(sum(data_matrix[rand_index] * weights)) error = class_labels[rand_index] - h weights = weights + alpha * error * data_matrix[rand_index] # 刪除更新後的值 del (data_index[rand_index]) return weights def test_stoc_grad_ascent1(): data_arr, label_mat = load_data_set() weights = stoc_grad_ascent1(np.array(data_arr), label_mat) plot_best_fit(weights) if __name__ == '__main__': # test_grad_ascent() # test_plot_best_fit() # test_stoc_grad_ascent0() test_stoc_grad_ascent1()
def classify_vector(in_x, weights): prob = sigmoid(sum(in_x * weights)) if prob > 0.5: return 1 else: return 0 def colic_test(): """馬疝病形成馬死亡機率預測""" fr_train = open(horseColicTraining_path) fr_test = open(horseColicTest_path) training_set = [] training_labels = [] for line in fr_train.readlines(): # 切分全部特徵並把特徵加入 line_arr 列表中 curr_line = line.strip().split('\t') # type:list line_arr = [] for i in range(21): line_arr.append(float(curr_line[i])) # 分開處理特徵和標記 training_set.append(line_arr) training_labels.append(float(curr_line[21])) train_weights = stoc_grad_ascent1(np.array(training_set), training_labels, 500) print(train_weights) error_count = 0 num_test_vec = 0 for line in fr_test.readlines(): num_test_vec += 1 curr_line = line.strip().split('\t') # type:list line_arr = [] for i in range(21): line_arr.append(float(curr_line[i])) # 經過比較樣本標記與輸入係數與特徵相乘值 sigmoid 函數獲得的標記判斷是否預測失誤 if int(classify_vector(np.array(line_arr), train_weights)) != int(curr_line[21]): error_count += 1 error_rate = (float(error_count) / num_test_vec) print('測試集的錯誤率: {}'.format(error_rate)) # 測試集的錯誤率: 0.373134328358209 return error_rate def multi_test(): num_tests = 10 error_sum = 0 for k in range(num_tests): error_sum += colic_test() print('迭代 {} 次後平均錯誤率爲: {}'.format(num_tests, error_sum / float(num_tests))) # 迭代 10 次後平均錯誤率爲: 0.3656716417910448 if __name__ == '__main__': # test_grad_ascent() # test_plot_best_fit() # test_stoc_grad_ascent0() # test_stoc_grad_ascent1() multi_test()
import os import numpy as np import matplotlib.pyplot as plt from path_settings import machine_learning_PATH data_set_path = os.path.join(machine_learning_PATH, '第五章/data-set') testSet_path = os.path.join(data_set_path, 'testSet.txt') horseColicTraining_path = os.path.join(data_set_path, 'horseColicTraining.txt') horseColicTest_path = os.path.join(data_set_path, 'horseColicTest.txt') def load_data_set(): """導入數據集""" data_mat = [] label_mat = [] # 循環導入.txt文本數據構形成列表 fr = open(testSet_path) for line in fr.readlines(): line_arr = line.strip().split() data_mat.append([1, float(line_arr[0]), float(line_arr[1])]) label_mat.append(int(line_arr[2])) return data_mat, label_mat def sigmoid(in_x): """構造 sigmoid 函數""" return 1 / (1 + np.exp(-in_x)) def grad_ascent(data_mat_in, class_labels): """梯度上升算法""" # 生成特徵矩陣 data_matrix = np.mat(data_mat_in) # 生成標記矩陣並反置 label_mat = np.mat(class_labels).transpose() # 計算data_matrix的行列 m, n = np.shape(data_matrix) # 設置移動的步長爲0.001 alpha = 0.001 # 設置最大遞歸次數500次 max_cycles = 500 # 初始化係數爲1*3的元素全爲1的矩陣 weights = np.ones((n, 1)) # 循環迭代梯度上升算法 for k in range(max_cycles): # 計算真實類別與預測類別的差值 h = sigmoid(data_matrix * weights) error = (label_mat - h) # 調整迴歸係數 weights = weights + alpha * data_matrix.transpose() * error return weights def test_grad_ascent(): data_mat, label_mat = load_data_set() weights = grad_ascent(data_mat, label_mat) print(weights) """ [[ 4.12414349] [ 0.48007329] [-0.6168482 ]] """ def plot_best_fit(wei): """畫出被分割的數據集""" # getA==np.asarrayz(self) # 使用__class__.__name__爲了判斷是梯度上升和隨機梯度上升 if wei.__class__.__name__ == 'matrix': weights = wei.getA() elif wei.__class__.__name__ == 'ndarray': weights = wei else: weights = wei data_mat, label_mat = load_data_set() # 把特徵集轉換成數組 data_arr = np.array(data_mat) n = np.shape(data_arr)[0] # 循環數據集分類 xcord1 = [] ycord1 = [] xcord2 = [] ycord2 = [] for i in range(n): if int(label_mat[i]) == 1: xcord1.append(data_arr[i, 1]) ycord1.append(data_arr[i, 2]) else: xcord2.append(data_arr[i, 1]) ycord2.append(data_arr[i, 2]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(xcord1, ycord1, s=30, c='red', marker='s') ax.scatter(xcord2, ycord2, s=30, c='green') # 0.1是步長 x = np.arange(-3, 3, 0.1) # 假設 sigmoid 函數爲0,而且這裏的 x,y 至關於上述的 x1和x2便可得出 y 的公式 y = (-weights[0] - weights[1] * x) / weights[2] ax.plot(x, y) plt.xlabel('X1') plt.ylabel('X2') plt.show() def test_plot_best_fit(): data_mat, label_mat = load_data_set() weights = grad_ascent(data_mat, label_mat) plot_best_fit(weights) def stoc_grad_ascent0(data_matrix, class_labels): """隨機梯度上升算法""" m, n = np.shape(data_matrix) alpha = 0.01 weights = np.ones(n) for i in range(m): # 使用 sum 函數得出一個值,只用計算一次 h = sigmoid(sum(data_matrix[i] * weights)) error = class_labels[i] - h weights = weights + alpha * error * data_matrix[i] return weights def test_stoc_grad_ascent0(): data_arr, label_mat = load_data_set() weights = stoc_grad_ascent0(np.array(data_arr), label_mat) plot_best_fit(weights) def stoc_grad_ascent1(data_matrix, class_labels, num_iter=150): """改進隨機梯度上升算法,默認迭代150次""" m, n = np.shape(data_matrix) weights = np.ones(n) for j in range(num_iter): data_index = list(range(m)) for i in range(m): # 每次迭代減少 alpha 的值,但最小爲0.01,確保新數據依然有影響。緩解係數波動的狀況 alpha = 4 / (1 + j + i) + 0.01 # 隨機選取值進行更新 rand_index = int(np.random.uniform(0, len(data_index))) h = sigmoid(sum(data_matrix[rand_index] * weights)) error = class_labels[rand_index] - h weights = weights + alpha * error * data_matrix[rand_index] # 刪除更新後的值 del (data_index[rand_index]) return weights def test_stoc_grad_ascent1(): data_arr, label_mat = load_data_set() weights = stoc_grad_ascent1(np.array(data_arr), label_mat) plot_best_fit(weights) def classify_vector(in_x, weights): prob = sigmoid(sum(in_x * weights)) if prob > 0.5: return 1 else: return 0 def colic_test(): """馬疝病形成馬死亡機率預測""" fr_train = open(horseColicTraining_path) fr_test = open(horseColicTest_path) training_set = [] training_labels = [] for line in fr_train.readlines(): # 切分全部特徵並把特徵加入 line_arr 列表中 curr_line = line.strip().split('\t') # type:list line_arr = [] for i in range(21): line_arr.append(float(curr_line[i])) # 分開處理特徵和標記 training_set.append(line_arr) training_labels.append(float(curr_line[21])) train_weights = stoc_grad_ascent1(np.array(training_set), training_labels, 500) print(train_weights) error_count = 0 num_test_vec = 0 for line in fr_test.readlines(): num_test_vec += 1 curr_line = line.strip().split('\t') # type:list line_arr = [] for i in range(21): line_arr.append(float(curr_line[i])) # 經過比較樣本標記與輸入係數與特徵相乘值 sigmoid 函數獲得的標記判斷是否預測失誤 if int(classify_vector(np.array(line_arr), train_weights)) != int(curr_line[21]): error_count += 1 error_rate = (float(error_count) / num_test_vec) print('測試集的錯誤率: {}'.format(error_rate)) # 測試集的錯誤率: 0.373134328358209 return error_rate def multi_test(): num_tests = 10 error_sum = 0 for k in range(num_tests): error_sum += colic_test() print('迭代 {} 次後平均錯誤率爲: {}'.format(num_tests, error_sum / float(num_tests))) # 迭代 10 次後平均錯誤率爲: 0.3656716417910448 if __name__ == '__main__': # test_grad_ascent() # test_plot_best_fit() # test_stoc_grad_ascent0() # test_stoc_grad_ascent1() multi_test()