深度學習 (二十四):卷積神經網絡之GoogleNet

引入

  GoogleNet吸取了NIN網絡串聯網絡的思想,並在此基礎上作了改進。
  本文介紹其第一個版本 [ 1 ] \color{red}^{[1]} [1]html


【1】李沐、Aston Zhang等老師的這本《動手學深度學習》一書。python


1 Inception塊

  GoogleNet中的基礎卷積塊叫作Inception塊,得名於電影《盜夢空間》。
  網絡結構以下:
web

import time
import torch
import torch.nn.functional as f
from torch import nn, optim
from util.SimpleTool import load_data_fashion_mnist, train, FlattenLayer, GlobalAvgPool2d


class Inception(nn.Module):

    def __init__(self, in_c, c1, c2, c3, c4):
        super(Inception, self).__init__()
        self.p1_1 = nn.Conv2d(in_c, c1, kernel_size=1)
        self.p2_1 = nn.Conv2d(in_c, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        self.p3_1 = nn.Conv2d(in_c, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_c, c4, kernel_size=1)

    def forward(self, x):
        p1 = f.relu(self.p1_1(x))
        p2 = f.relu(self.p2_2(f.relu(self.p2_1(x))))
        p3 = f.relu(self.p3_2(f.relu(self.p3_1(x))))
        p4 = f.relu(self.p4_2(self.p4_1(x)))
        return torch.cat((p1, p2, p3, p4), dim=1)

2 GoogleNet模型

  GoogleNet與VGG模型同樣,在主體卷積部分中使用** 5 5 5個模塊**,每一個模塊之間使用步幅爲 2 2 2 3 × 3 3 \times 3 3×3最大池化層來減少輸出高寬:網絡

def get_net():
    b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                       nn.ReLU(),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                       nn.Conv2d(64, 192, kernel_size=3, padding=1),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                       Inception(256, 128, (128, 192), (32, 96), 64),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                       Inception(512, 160, (112, 224), (24, 64), 64),
                       Inception(512, 128, (128, 256), (24, 64), 64),
                       Inception(512, 112, (144, 288), (32, 64), 64),
                       Inception(528, 256, (160, 320), (32, 128), 128),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
                       Inception(832, 384, (192, 384), (48, 128), 128),
                       GlobalAvgPool2d())

    return nn.Sequential(b1, b2, b3, b4, b5,
                         FlattenLayer(), nn.Linear(1024, 10))


def test1():
    temp_net = get_net()
    temp_x = torch.rand(1, 1, 96, 96)
    for block in temp_net.children():
        temp_x = block(temp_x)
        print("Output shape:", temp_x.shape)


if __name__ == '__main__':
    test1()

  輸出以下:app

Output shape: torch.Size([1, 64, 24, 24])
Output shape: torch.Size([1, 192, 12, 12])
Output shape: torch.Size([1, 480, 6, 6])
Output shape: torch.Size([1, 832, 3, 3])
Output shape: torch.Size([1, 1024, 1, 1])
Output shape: torch.Size([1, 1024])
Output shape: torch.Size([1, 10])

3 模型訓練

def test2():
    temp_batch_size = 128
    temp_tr_iter, temp_te_iter = load_data_fashion_mnist(temp_batch_size, resize=96)
    temp_lr = 0.001
    temp_num_epochs = 5
    temp_net = get_net()
    temp_optimizer = optim.Adam(temp_net.parameters(), lr=temp_lr)
    train(temp_net, temp_tr_iter, temp_te_iter, temp_batch_size, temp_optimizer, num_epochs=temp_num_epochs)


if __name__ == '__main__':
    test2()

  輸出以下 (慢得扣。。。):ide

完整代碼

""" @author: Inki @contact: inki.yinji@qq.com @version: Created in 2020 1222, last modified in 2020 1222. """

import time
import torch
import torch.nn.functional as f
from torch import nn, optim
from util.SimpleTool import load_data_fashion_mnist, train, FlattenLayer, GlobalAvgPool2d


class Inception(nn.Module):

    def __init__(self, in_c, c1, c2, c3, c4):
        super(Inception, self).__init__()
        self.p1_1 = nn.Conv2d(in_c, c1, kernel_size=1)
        self.p2_1 = nn.Conv2d(in_c, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        self.p3_1 = nn.Conv2d(in_c, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_c, c4, kernel_size=1)

    def forward(self, x):
        p1 = f.relu(self.p1_1(x))
        p2 = f.relu(self.p2_2(f.relu(self.p2_1(x))))
        p3 = f.relu(self.p3_2(f.relu(self.p3_1(x))))
        p4 = f.relu(self.p4_2(self.p4_1(x)))
        return torch.cat((p1, p2, p3, p4), dim=1)


def get_net():
    b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                       nn.ReLU(),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                       nn.Conv2d(64, 192, kernel_size=3, padding=1),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                       Inception(256, 128, (128, 192), (32, 96), 64),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                       Inception(512, 160, (112, 224), (24, 64), 64),
                       Inception(512, 128, (128, 256), (24, 64), 64),
                       Inception(512, 112, (144, 288), (32, 64), 64),
                       Inception(528, 256, (160, 320), (32, 128), 128),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
    b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
                       Inception(832, 384, (192, 384), (48, 128), 128),
                       GlobalAvgPool2d())

    return nn.Sequential(b1, b2, b3, b4, b5,
                         FlattenLayer(), nn.Linear(1024, 10))


def test1():
    temp_net = get_net()
    temp_x = torch.rand(1, 1, 96, 96)
    for block in temp_net.children():
        temp_x = block(temp_x)
        print("Output shape:", temp_x.shape)


def test2():
    temp_batch_size = 128
    temp_tr_iter, temp_te_iter = load_data_fashion_mnist(temp_batch_size, resize=96)
    temp_lr = 0.001
    temp_num_epochs = 5
    temp_net = get_net()
    temp_optimizer = optim.Adam(temp_net.parameters(), lr=temp_lr)
    train(temp_net, temp_tr_iter, temp_te_iter, temp_batch_size, temp_optimizer, num_epochs=temp_num_epochs)


if __name__ == '__main__':
    test2()

util.SimpleTool

""" @author: Inki @contact: inki.yinji@qq.com @version: Created in 2020 0903, last modified in 2020 1222. @note: Some common function, and all given vector data's type must be numpy.array. """

import time
import numpy as np
import sys
import scipy.io as scio
import torch
import torchvision.transforms as transforms
import torchvision
from torch import nn
from torch.nn import functional
from multiprocessing import cpu_count


def get_iter(tr, tr_lab, te, te_lab):
    """ Get iterator. :param tr: The training set. tr_lab: The training set's label. te: The test set. te_lab: The test set's label. """
    yield tr, tr_lab, te, te_lab


def is_print(para_str, para_is_print=True):
    """ Is print? :param para_str: The print string. para_is_print: True print else not. """
    if para_is_print:
        print(para_str)


def load_file(para_path):
    """ Load file. :param para_file_name: The path of the given file. :return The data. """
    temp_type = para_path.split('.')[-1]

    if temp_type == 'mat':
        ret_data = scio.loadmat(para_path)
        return ret_data['data']
    else:
        with open(para_path) as temp_fd:
            ret_data = temp_fd.readlines()

        return ret_data


def load_data_fashion_mnist(batch_size=10, root='D:/Data/Datasets/FashionMNIST', resize=None):
    """ Download the fashion mnist dataset and then load into memory. """
    trans = []
    if resize:
        trans.append(transforms.Resize(size=resize))
    trans.append(transforms.ToTensor())

    transform = transforms.Compose(trans)
    mnist_train = torchvision.datasets.FashionMNIST(root=root, train=True, download=True, transform=transform)
    mnist_test = torchvision.datasets.FashionMNIST(root=root, train=False, download=True, transform=transform)
    if sys.platform.startswith('win'):
        num_workers = 0
    else:
        num_workers = cpu_count()
    train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    return train_iter, test_iter


def owa_weight(para_num, para_type='linear_decrease'):
    """ The ordered weighted averaging operators (OWA) can replace the maximum or minimum operators. And the purpose of this function is to generate the owa weights. And the more refer is: R. R. Yager, J. Kacprzyk, The ordered weighted averaging operators: Theory and applications, Springer Science & Business Media, 2012. :param para_num: The length of weights list. para_type: 'linear_decrease'; 'inverse_additive', and its default setting is 'linear_decrease'. :return The owa weights. """
    if para_num == 1:
        return np.array([1])
    else:
        if para_type == 'linear_decrease':
            temp_num = 2 / para_num / (para_num + 1)
            return np.array([(para_num - i) * temp_num for i in range(para_num)])
        elif para_type == 'inverse_additive':
            temp_num = np.sum([1 / i for i in range(1, para_num + 1)])
            return np.array([1 / i / temp_num for i in range(1, para_num + 1)])
        else:
            return owa_weight(para_num)


def print_go_round(para_idx, para_str='Program processing'):
    """ Print the round. :param para_idx: The current index. para_str: The print words. """
    round_list = ["\\", "|", "/", "-"]
    print('\r' + para_str + ': ' + round_list[para_idx % 4], end="")


def print_progress_bar(para_idx, para_len):
    """ Print the progress bar. :param para_idx: The current index. para_len: The loop length. """
    print('\r' + '▇' * int(para_idx // (para_len / 50)) + str(np.ceil((para_idx + 1) * 100 / para_len)) + '%', end='')


def train(net, tr_iter, te_iter, batch_size, optimizer,
          loss=nn.CrossEntropyLoss(),
          device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
          num_epochs=100):
    """ The train function. """
    net = net.to(device)
    temp_batch_count = 0
    print("Training on", device)
    for epoch in range(num_epochs):
        temp_tr_loss_sum, temp_tr_acc_sum, temp_num, temp_start_time = 0., 0., 0, time.time()
        for x, y in tr_iter:
            x = x.to(device)
            y = y.to(device)
            temp_y_pred = net(x)
            temp_loss = loss(temp_y_pred, y)
            optimizer.zero_grad()
            temp_loss.backward()
            optimizer.step()
            temp_tr_loss_sum += temp_loss.cpu().item()
            temp_tr_acc_sum += (temp_y_pred.argmax(dim=1) == y).sum().cpu().item()
            temp_num += y.shape[0]
            temp_batch_count += 1
        test_acc = evaluate_accuracy(te_iter, net)
        print("Epoch %d, loss %.4f, training acc %.3f, test ass %.3f, time %.1f s" %
              (epoch + 1, temp_tr_loss_sum / temp_batch_count, temp_tr_acc_sum / temp_num, test_acc,
               time.time() - temp_start_time))


def evaluate_accuracy(data_iter, net, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')):
    """ The evaluate function, and the performance measure is accuracy. """
    ret_acc, temp_num = 0., 0
    with torch.no_grad():
        for x, y in data_iter:
            net.eval() # The evaluate mode, and the dropout is closed.
            ret_acc += (net(x.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
            net.train()
            temp_num += y.shape[0]

    return ret_acc / temp_num


class Count(dict):
    """ The count class with dict. """
    def __missing__(self, __key):
        return 0


class FlattenLayer(torch.nn.Module):
    def __init__(self):
        super(FlattenLayer, self).__init__()

    def forward(self, x):
        return x.view(x.shape[0], -1)


class GlobalAvgPool2d(nn.Module):

    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()

    def forward(self, x):
        """ The forward function. """
        return functional.avg_pool2d(x, kernel_size=x.size()[2:])


if __name__ == '__main__':
    load_data_fashion_mnist()

本文同步分享在 博客「因吉」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。svg

相關文章
相關標籤/搜索