機器學習算法中的代價函數一般能夠分解成每一個樣本的代價函數的總和. 訓練數據的負條件對數似然能夠寫成html
其中 \(L\) 是每一個樣本的損失 \(L(x, y, \theta) = -log(p(y|x;\theta))\).python
對於這些相加的代價函數, 梯度降低須要計算算法
Adam 算法使用了動量變量 \(v_t\) 和 RMSProp 算法中小批量隨機梯度按元素平方的指數加權平移變量 \(s_t\), 並在時間步 \(0\) 將它們中每一個元素初始化爲 \(0\). 給定超參數 \(0 \le \beta_1 < 1\) (算法做者建議設爲 \(0.9\)), 時間步 \(t\) 的動量變量 \(v_t\) 即小批量隨機梯度 \(g_t\) 的指數加權移動平均:app
和 RMSProp 算法中同樣, 給定超參數 \(0 \le \beta_2 < 1\) (算法做者建議設爲 \(0.999\)), 將小批量隨機梯度按元素平方後的項 \(g_t \odot g_t\) 作指數加權移動平均獲得 \(s_t\):dom
因爲咱們將 \(v_0\) 和 \(s_0\) 中的元素都初始化爲 \(0\),在時間步 \(t\) 咱們獲得 \(v_t = (1 − \beta_1) \sum^t_{i=1} \beta_1^{t−i} g_i\) 。將過去各時間步小批量隨機梯度的權值相加,獲得 \((1 − \beta_1) \sum^t_{i=1} \beta_1^{t−i} = 1 − \beta_1^t\) 。須要注意的是,當 \(t\) 較小時,過去各時間步小批量隨機梯度權值之和會較小。例如,當 \(\beta_1 = 0.9\) 時, \(v_1 = 0.1g_1\) 。爲了消除這樣的影響,對於任意時間步 \(t\),咱們能夠將 \(v_t\) 再除以 \(1 − \beta_1^t\),從而使過去各時間步小批量隨機梯度權值之和爲 \(1\)。這也叫做誤差修正。在 Adam 算法中,咱們對變量 \(v_t\) 和 \(s_t\) 均做誤差修正:機器學習
接下來,Adam 算法使⽤以上誤差修正後的變量 \(\hat v_t\) 和 \(\hat s_t\),將模型參數中每一個元素的學習率經過按元素運算從新調整:函數
其中 \(\epsilon\) 是學習率, \(\delta\) 是爲了維持數值穩定性而添加的常數,如 \(1e−8\)。和 AdaGrad 算法、 RMSProp 算法以及 AdaDelta 算法⼀樣,⽬標函數⾃變量中每一個元素都分別擁有⾃⼰的學習率。最後,使⽤ \(g_t'\) 迭代⾃變量:學習
仍然使用 Logisitic 和 Linear, 再也不贅述, 參考 SGD.測試
實例主要包含三個文件分別是: optimizers.py
, nn.py
, test.py
.ui
""" file name: base.py """ import numpy as np class Optimizer(object): def __init__(self, lr=0.01, delta=1e-6): self.lr = lr self.delta = delta class OptimizerWeights(object): def __init__(self, lr=0.01, delta=1e-6): self.lr = lr self.delta = delta self.hyp_t = 1 def __call__(self, *args, **kwargs): return None def init_parameters(self, inputs_shape): pass # Module 基類 class Module(object): def __init__(self, weight=None, bias=None): self.weight = weight self.bias = bias self.train = True self.y_pred = None self.y_true = None self.loss_diff = np.zeros((1, 1)) self._loss_pro = 0. self._loss_now = 0. self._weight_diff = 1. self._bias_diff = np.zeros((1, 1)) self.optimizer_weights_update = None def __call__(self, *args, **kwargs): inputs_shape = [] for arg in args: inputs_shape.append(arg.shape) for _, arg in kwargs: inputs_shape.append(arg.shape) self.args = args self.kwargs = kwargs if len(inputs_shape) == 0: self.build(inputs_shape) elif len(inputs_shape) == 1: self.build(inputs_shape[0]) else: self.build(inputs_shape) if self.optimizer_weights_update: self.optimizer_weights_update.init_parameters(inputs_shape[0]) if hasattr(self, 'forward'): forward = getattr(self, 'forward') self.y_pred = forward(*args, **kwargs) self.diff_parameters(*args, **kwargs) return self.y_pred def loss(self, *args, **kwargs): return 0. def build(self, inputs_shape): if len(inputs_shape) == 0: pass else: if self.weight is None: self.weight = np.zeros(*inputs_shape[:-1])[:, np.newaxis] if self.bias is None: self.bias = np.zeros((1, 1)) def diff_parameters(self, *args, **kwargs) -> None: pass def backprop(self): wb_diff = [np.matmul(self._weight_diff, self.loss_diff), self._bias_diff * self.loss_diff] wb_diff = self.optimizer_weights_update(wb_diff) self.weight -= wb_diff[0] self.bias -= wb_diff[1] return True def set_optimizer_weights_update(self, weights_update): self.optimizer_weights_update = weights_update def set_hyp_t(self, hyp_t): if self.optimizer_weights_update: self.optimizer_weights_update.hyp_t = hyp_t
optimizers.py
文件內容以下:
""" file name: optimizers.py """ import .base import OptimizerWeights, Optimizer # SGD ... class AdamWeights(OptimizerWeights): def __init__(self, lr=0.01, delta=1e-6, beta1=0.9, beta2=0.999): super(AdamWeights, self).__init__(lr=lr, delta=delta) self.beta1 = beta1 self.beta2 = beta2 self.vector = None self.steepest = None def __call__(self, wb_diff, hyp_t=None): if not hyp_t: hyp_t = self.hyp_t v_bias_corr, s_bias_corr = self._update_parameters(wb_diff, hyp_t) g_adam_diff = [self.lr * v_corr / (np.sqrt(s_corr) + self.delta) for v_corr, s_corr in zip(v_bias_corr, s_bias_corr)] return g_adam_diff def init_parameters(self, inputs_shape): self.vector = [np.zeros((inputs_shape[0], 1)), np.zeros((1, 1))] self.steepest = [np.zeros((inputs_shape[0], 1)), np.zeros((1, 1))] pass def _update_parameters(self, wb_diff, hyp_t): self.vector = [self.beta1 * v + (1 - self.beta1) * wb_diff[i] for i, v in enumerate(self.vector)] self.steepest = [self.beta1 * s + (1 - self.beta1) * wb_diff[i] ** 2 for i, s in enumerate(self.steepest)] v_bias_corr = [v / (1 - self.beta1 ** hyp_t) for v in self.vector] s_bias_corr = [s / (1 - self.beta2 ** hyp_t) for s in self.steepest] return v_bias_corr, s_bias_corr class Adam(Optimizer): def __init__(self, lr=0.01, delta=1e-6, beta1=0.9, beta2=0.999): super(Adam, self).__init__(lr=lr, delta=delta) self.beta1 = beta1 self.beta2 = beta2 def __call__(self, x, y, model, batch_size=1, epochs=10, threshold=0.01): model.set_optimizer_weights_update(AdamWeights(self.lr, self.delta, self.beta1, self.beta2)) num_record = x.shape[-1] if num_record < batch_size: batch_size = num_record bool_break = False for i in range(epochs): loss_mean = 0. model.set_hyp_t(i + 1) for j in range(num_record): y_pred = model(x[..., j:j + 1]) y_true = y[..., j] sgd_loss = model.loss(y_pred, y_true) if (j + 1) % batch_size == 0: if np.abs(loss_mean) < threshold or loss_mean == np.NAN: bool_break = True break loss_mean = 0. loss_mean = (loss_mean * j + sgd_loss) / (j + 1) model.backprop() if bool_break: break return model
""" file name: nn.py """ from .base import Module # Logistic class Logistic(Module): def __init__(self, w=None, b=None): super(Logistic, self).__init__(w, b) def forward(self, x): return 1. / (1. + np.exp(np.matmul(self.weight.T, x) + self.bias)) def loss(self, y_pred, y_true, delta=1e-16): self._loss_pro = self._loss_now y_pred = np.minimum(np.maximum(y_pred, delta), 1. - delta) self._loss_now = -(y_true * np.log(y_pred) + (1. - y_true) * np.log(1. - y_pred)) self.loss_diff = -(y_true / y_pred - (1. - y_true) / (1. - y_pred)) return self._loss_now def diff_parameters(self, x): g_param_diff = -2. * self.y_pred * (1. - self.y_pred) self._weight_diff = g_param_diff * x self._bias_diff = g_param_diff pass # Linear class Linear(Module): def __init__(self, w=None, b=None): super(Linear, self).__init__(w, b) def forward(self, x): return np.matmul(self.weight.T, x) + self.bias def loss(self, y_pred, y_true): self._loss_pro = self._loss_now self._loss_now = np.sum((y_pred - y_true) ** 2) self.loss_diff = 2. * (y_pred - y_true) return self._loss_now def diff_parameters(self, x): self._weight_diff = x self._bias_diff = 1. pass
測試文件:
""" file name: test.py """ import numpy as np import .nn from .optimizers import SGD, Adam def Sigmod(x, w, b): return 1. / (1. + np.exp(np.matmul(w.T, x) + b)) def Linear(x, w, b): return np.matmul(w.T, x) + b def test_Optimizer_Logistic(x, w, b, Optimizer): y_true = Sigmod(x, w, b) rand_y = np.random.randn(len(y_true)) rand_y = 0.01 * rand_y / np.max(np.abs(rand_y)) y_true = Sigmod(x, w, b) + rand_y > 0.5 model = nn.Logistic() sgd_model = Optimizer(x, y_true, model, batch_size=256, epochs=10000, threshold=.5) y_pred = np.float32(Sigmod(x, sgd_model.weight, sgd_model.bias) > 0.5) print('error_rate: ', np.sum(np.abs(y_pred - y_true)) / len(y_true)) def test_Optimizer_Linear(x, w, b, Optimizer): y_true = Linear(x, w, b) rand_y = np.random.randn(len(y_true)) y_true += 0.01 * rand_y / np.max(np.abs(rand_y)) model = nn.Linear() sgd_model = Optimizer(x, y_true, model, batch_size=256, epochs=10000, threshold=.005) y_pred = Linear(x, sgd_model.weight, sgd_model.bias) print('MSE: ', np.sum((y_pred - y_true) ** 2) / len(y_true)) def create_optimizer(optimizer='sgd', lr=0.01, delta=1e-6, **kwargs): if optimizer == 'adam': opt = Adam(lr=lr, delta=delta) else: opt = SGD(lr=lr, delta=delta) return opt def test_Optimizer(model='logistic', optimizer='sgd'): """ Args: model: 'logistic', 'linear' optimizer: 'sgd', 'adam' """ w = np.array([1.8, -2.5, 3.1, -2.3, .6, 2.1, -1.1]) b = 0.1 # Data x = np.random.randn(len(w), 1024) if model == 'logistic': opt_logistic = create_optimizer('adam', lr=0.0001) test_Optimizer_Logistic(x, w, b, opt_logistic) elif model == 'linear': opt_linear = create_optimizer('adam', lr=0.001) test_Optimizer_Linear(x, w, b, opt_linear) if __name__ == '__main__': # fun: logistic linear # optimizer: sdg adam test_Optimizer('logistic')