優化算法之SGD

SGD-隨機梯度算法介紹

機器學習算法中的代價函數一般能夠分解成每一個樣本的代價函數的總和. 訓練數據的負條件對數似然能夠寫成python

\[J(\theta)=\mathbb{E}_{x,y \in \hat p_{data}}L(x, y, \theta)=\frac {1} {m} \sum_{i=1}^m L(x^{(i)},y^{(i)},\theta) \]

其中 \(L\) 是每一個樣本的損失 \(L(x, y, \theta) = -log(p(y|x;\theta))\).算法

​ 對於這些相加的代價函數, 梯度降低須要計算app

\[\nabla_{\theta} J(\theta) = \frac {1} {m} \sum_{i=1}^m \nabla_{\theta} L(x^{(i)},y^{(i)},\theta) \]

​ 小批量樣本 ( minibatch ) \(\mathbb{B}=\{x^{(1)},...,x^{(m')}\}\) . 梯度的估計能夠表示成dom

\[\mathcal{g}=\frac {1} {m'} \nabla_{\theta} \sum_{i=1}^{m'} L(x^{(i)}, y^{(i)}, \theta) \]

使用來自小批量 \(\mathbb{B}\) 的樣本. 而後, 隨機梯度降低算法使用以下的梯度降低估計:機器學習

\[\theta \leftarrow \theta - \epsilon \cdot \mathcal{g} \]

其中, \(\epsilon\) 是學習率.函數

實際應用:

案例1-Logistic

邏輯斯蒂迴歸(Logistic), \(Logistic(x)=\frac {1} {1+e^{w \cdot x+b}}\).學習

損失函數 ( 交叉熵損失 ): \(BCELoss(\hat y,y)=-\sum_i {y_i \cdot log(\hat y_i)}\).ui

\(Loss(\hat y, y)=-(y \cdot log(\hat y) + (1-y) \cdot log(1-\hat y))\).spa

\[\begin{split} & \frac {dL} {d \hat y} = \frac {1 - y} {1 - \hat y} - \frac {y} {\hat y} \\ & \frac {d\hat y} {dw} = -2x \cdot {\hat y} \cdot (1-\hat y) \\ & \frac {d\hat y} {db} = -2 {\hat y} \cdot (1-\hat y) \end{split} \]

則損失函數 \(Loss(\hat y, y)\)\(w, b\) 求偏導code

\[\begin{split} & \frac {dL} {dw} = \frac {dL} {d \hat y} \cdot \frac {d\hat y} {dw} = 2(y\cdot \hat y - (1-y)(1-\hat y))\cdot x \\ & \frac {dL} {db} = \frac {dL} {d \hat y} \cdot \frac {d\hat y} {db} = 2(y\cdot \hat y - (1-y)(1-\hat y)) \end{split} \]

\(\theta \leftarrow \theta - \epsilon \cdot \mathcal{g}\) 以下:

\[\begin{split} & w \leftarrow w - \epsilon \cdot \frac {dL} {dw} = w - \epsilon \cdot 2 \cdot (y\cdot \hat y - (1-y)(1-\hat y))\cdot x \\ & b \leftarrow b - \epsilon \cdot \frac {dL} {db} = b - \epsilon \cdot 2 \cdot (y\cdot \hat y - (1-y)(1-\hat y)) \end{split} \]

僅用 numpy 實現, 代碼實現以下:

實例主要包含三個文件分別是: optimizers.py, nn.py, test.py.

"""
file name: base.py
"""
import numpy as np


class Optimizer(object):
    def __init__(self, lr=0.01, delta=1e-6):
        self.lr = lr
        self.delta = delta


class OptimizerWeights(object):
    def __init__(self, lr=0.01, delta=1e-6):
        self.lr = lr
        self.delta = delta
        self.hyp_t = 1

    def __call__(self, *args, **kwargs):
        return None

    def init_parameters(self, inputs_shape):
        pass


# Module 基類
class Module(object):
    def __init__(self, weight=None, bias=None):
        self.weight = weight
        self.bias = bias
        self.train = True
        self.y_pred = None
        self.y_true = None
        self.loss_diff = np.zeros((1, 1))

        self._loss_pro = 0.
        self._loss_now = 0.
        self._weight_diff = 1.
        self._bias_diff = np.zeros((1, 1))
        self.optimizer_weights_update = None

    def __call__(self, *args, **kwargs):
        inputs_shape = []
        for arg in args:
            inputs_shape.append(arg.shape)
        for _, arg in kwargs:
            inputs_shape.append(arg.shape)

        self.args = args
        self.kwargs = kwargs

        if len(inputs_shape) == 0:
            self.build(inputs_shape)
        elif len(inputs_shape) == 1:
            self.build(inputs_shape[0])
        else:
            self.build(inputs_shape)

        if self.optimizer_weights_update:
            self.optimizer_weights_update.init_parameters(inputs_shape[0])

        if hasattr(self, 'forward'):
            forward = getattr(self, 'forward')
            self.y_pred = forward(*args, **kwargs)
            self.diff_parameters(*args, **kwargs)
        return self.y_pred

    def loss(self, *args, **kwargs):
        return 0.

    def build(self, inputs_shape):
        if len(inputs_shape) == 0:
            pass
        else:
            if self.weight is None:
                self.weight = np.zeros(*inputs_shape[:-1])[:, np.newaxis]
            if self.bias is None:
                self.bias = np.zeros((1, 1))

    def diff_parameters(self, *args, **kwargs) -> None:
        pass

    def backprop(self):
        wb_diff = [np.matmul(self._weight_diff, self.loss_diff), self._bias_diff * self.loss_diff]
        wb_diff = self.optimizer_weights_update(wb_diff)
        self.weight -= wb_diff[0]
        self.bias -= wb_diff[1]
        return True

    def set_optimizer_weights_update(self, weights_update):
        self.optimizer_weights_update = weights_update

    def set_hyp_t(self, hyp_t):
        if self.optimizer_weights_update:
            self.optimizer_weights_update.hyp_t = hyp_t

optimizers.py 文件內容以下:

"""
file name: optimizers.py
"""
import .base import OptimizerWeights, Optimizer


class SGDWeights(OptimizerWeights):
    def __init__(self, lr=0.01, delta=1e-6):
        super(SGDWeights, self).__init__(lr=lr, delta=delta)

    def __call__(self, wb_diff):
        return [wb * self.lr for wb in wb_diff]


class SGD(Optimizer):
    def __init__(self, lr=0.01, delta=1e-6):
        super(SGD, self).__init__(lr=lr, delta=delta)

    def __call__(self, x, y, model, batch_size=1, epochs=10, threshold=0.001):
        model.set_optimizer_weights_update(SGDWeights(self.lr, self.delta))
        inputs_shape = x.shape
        if inputs_shape[-1] > batch_size:
            batch_size = inputs_shape[-1]
        bool_break = False
        num_record = x.shape[-1]
        for i in range(epochs):
            loss_mean = 0.
            for j in range(num_record):
                y_pred = model(x[..., j:j + 1])
                y_true = y[..., j]
                sgd_loss = model.loss(y_pred, y_true)
                if (j + 1) % batch_size == 0:
                    if np.abs(loss_mean) < threshold or loss_mean == np.NAN:
                        bool_break = True
                        break
                    loss_mean = 0.
                loss_mean = (loss_mean * j + sgd_loss) / (j + 1)
                model.backprop()

            if bool_break:
                break
        return model


# Adam算法 ...

模型文件nn.py, 包含兩個模型: Logisitic, Linear.

"""
file name: nn.py
"""
from .base import Module


# Logistic
class Logistic(Module):
    def __init__(self, w=None, b=None):
        super(Logistic, self).__init__(w, b)

    def forward(self, x):
        return 1. / (1. + np.exp(np.matmul(self.weight.T, x) + self.bias))

    def loss(self, y_pred, y_true, delta=1e-16):
        self._loss_pro = self._loss_now
        y_pred = np.minimum(np.maximum(y_pred, delta), 1. - delta)
        self._loss_now = -(y_true * np.log(y_pred) +
                           (1. - y_true) * np.log(1. - y_pred))
        self.loss_diff = -(y_true / y_pred - (1. - y_true) / (1. - y_pred))
        return self._loss_now

    def diff_parameters(self, x):
        g_param_diff = -2. * self.y_pred * (1. - self.y_pred)
        self._weight_diff = g_param_diff * x
        self._bias_diff = g_param_diff
    pass


# Linear
class Linear(Module):
    def __init__(self, w=None, b=None):
        super(Linear, self).__init__(w, b)

    def forward(self, x):
        return np.matmul(self.weight.T, x) + self.bias

    def loss(self, y_pred, y_true):
        self._loss_pro = self._loss_now
        self._loss_now = np.sum((y_pred - y_true) ** 2)
        self.loss_diff = 2. * (y_pred - y_true)
        return self._loss_now

    def diff_parameters(self, x):
        self._weight_diff = x
        self._bias_diff = 1.
    pass
"""
file name: test.py
"""
import numpy as np
import .nn
from .optimizers import SGD, Adam


def Sigmod(x, w, b):
    return 1. / (1. + np.exp(np.matmul(w.T, x) + b))


def Linear(x, w, b):
    return np.matmul(w.T, x) + b


def test_Optimizer_Logistic(x, w, b, Optimizer):
    y_true = Sigmod(x, w, b)
    rand_y = np.random.randn(len(y_true))
    rand_y = 0.01 * rand_y / np.max(np.abs(rand_y))
    y_true = Sigmod(x, w, b) + rand_y > 0.5

    model = nn.Logistic()
    sgd_model = Optimizer(x, y_true, model, batch_size=256,
                          epochs=10000, threshold=.5)
    y_pred = np.float32(Sigmod(x, sgd_model.weight, sgd_model.bias) > 0.5)

    print('error_rate: ', np.sum(np.abs(y_pred - y_true)) / len(y_true))


def test_Optimizer_Linear(x, w, b, Optimizer):
    y_true = Linear(x, w, b)
    rand_y = np.random.randn(len(y_true))
    y_true += 0.01 * rand_y / np.max(np.abs(rand_y))

    model = nn.Linear()
    sgd_model = Optimizer(x, y_true, model, batch_size=256,
                          epochs=10000, threshold=.005)
    y_pred = Linear(x, sgd_model.weight, sgd_model.bias)

    print('MSE: ', np.sum((y_pred - y_true) ** 2) / len(y_true))


def create_optimizer(optimizer='sgd', lr=0.01, delta=1e-6, **kwargs):
    if optimizer == 'adam':
        opt = Adam(lr=lr, delta=delta)
    else:
        opt = SGD(lr=lr, delta=delta)
    return opt


def test_Optimizer(model='logistic', optimizer='sgd'):
    """
    Args:
        model: 'logistic', 'linear'
        optimizer: 'sgd', 'adam'
    """
    w = np.array([1.8, -2.5, 3.1, -2.3, .6, 2.1, -1.1])
    b = 0.1
    # Data
    x = np.random.randn(len(w), 1024)

    if model == 'logistic':
        opt_logistic = create_optimizer('sgd', lr=0.0001)
        test_Optimizer_Logistic(x, w, b, opt_logistic)
    elif model == 'linear':
        opt_linear = create_optimizer('sgd', lr=0.1)
        test_Optimizer_Linear(x, w, b, opt_linear)


if __name__ == '__main__':
    # fun: logistic linear
    # optimizer: sdg adam
    test_Optimizer('logistic')
相關文章
相關標籤/搜索