經典卷積神經網絡(LeNet、AlexNet、VGG、GoogleNet、ResNet)的實現(MXNet版本)

  卷積神經網絡(Convolutional Neural Network, CNN)是一種前饋神經網絡,它的人工神經元能夠響應一部分覆蓋範圍內的周圍單元,對於大型圖像處理有出色表現。html

  其中 文章 詳解卷積神經網絡(CNN)已經對卷積神經網絡進行了詳細的描述,這裏爲了學習MXNet的庫,因此對經典的神經網絡進行實現~加深學習印象,而且爲之後的使用打下基礎。其中參考的爲Gluon社區提供的學習資料~編程

 

1.簡單LeNet的實現網絡

  

def LeNet(): """ 較早的卷積神經網絡 :return: """ net = nn.Sequential() with net.name_scope(): net.add( nn.Conv2D(channels=20, kernel_size=5, activation='relu'), nn.MaxPool2D(pool_size=2, strides=2), nn.Conv2D(channels=50, kernel_size=3, activation='relu'), nn.MaxPool2D(pool_size=2, strides=2), nn.Flatten(), nn.Dense(128, activation="relu"), nn.Dense(10) ) return net

 

2. AlexNet:dom

  因爲圖片數據集的擴大和硬件設備的發展,更深層更復雜的神經網絡模型被使用,其中表明爲AlexNet,與相對較小的LeNet相比,AlexNet包含8層變換,其中有五層卷積和兩層全鏈接隱含層,以及一個輸出層。編程語言

def AlexNet(): """ 對leNet的一個擴展,得益於數據集和硬件資源的發展 :return: """ net = nn.Sequential() with net.name_scope(): net.add( # 第一階段
            nn.Conv2D(channels=96, kernel_size=11, strides=4, activation='relu'), nn.MaxPool2D(pool_size=3, strides=2), # 第二階段
            nn.Conv2D(channels=256, kernel_size=5, padding=2, activation='relu'), nn.MaxPool2D(pool_size=3, strides=2), # 第三階段
            nn.Conv2D(channels=384, kernel_size=3, padding=1, activation='relu'), nn.Conv2D(channels=384, kernel_size=3, padding=1, activation='relu'), nn.Conv2D(channels=256, kernel_size=3, padding=1, activation='relu'), nn.MaxPool2D(pool_size=3, strides=2), # 第四階段
 nn.Flatten(), nn.Dense(4096, activation="relu"), nn.Dropout(.5), # 第五階段
            nn.Dense(4096, activation="relu"), nn.Dropout(.5), # 第六階段
            nn.Dense(10) ) return net

 

3. VGGNet:ide

  考慮到當網絡層數很是多時,一層一層堆疊網絡結構,很是麻煩,VGG使用了編程語言自帶的便利,採用了函數和循環的方式,複製了網絡結構裏面的大量重複結構,所以能夠很緊湊來構造這些網絡。而第一個使用這種結構的深度網絡是VGG。函數

 

def VGGNet(architecture): """ 經過引入了函數和循環的方式,能夠快速建立任意層數的神經網絡 :return: """
    def vgg_block(num_convs, channals): """ 定義一個網絡的基本結構,由若干卷積層和一個池化層構成 VGG的一個關鍵是使用不少有着相對小的kernel(3×3)的卷積層而後接上一個池化層,以後再將這個模塊重複屢次。所以先定義一個這樣的塊: :param num_convs: 卷積層的層數 :param channals: 通道數 :return: """ net = nn.Sequential() for _ in range(num_convs): net.add(nn.Conv2D(channels=channals, kernel_size=3, padding=1, activation='relu')) net.add(nn.MaxPool2D(pool_size=2, strides=2)) return net def vgg_stack(architecture): """ 定義全部卷積層的網絡結構,經過參數將定義的網絡結構封裝起來 :param architecture: 指定的網絡結構參數 :return: """ net = nn.Sequential() for (num_convs, channals) in architecture: net.add(vgg_block(num_convs, channals)) return net # 在卷積層以後,採用了兩個全鏈接層,而後使用輸出層輸出結果。
    net = nn.Sequential() with net.name_scope(): net.add( vgg_stack(architecture), nn.Flatten(), nn.Dense(4096, activation='relu'), nn.Dropout(0.5), nn.Dense(4096, activation='relu'), nn.Dropout(0.5), nn.Dense(10) ) return net

 

4. NiNNet:學習

  注意到卷積神經網絡通常分紅兩塊,一塊主要由卷積層構成,另外一塊主要是全鏈接層。在Alexnet裏咱們看到如何把卷積層塊和全鏈接層分別加深加寬從而獲得深度網絡。另一個天然的想法是,咱們能夠串聯數個卷積層塊和全鏈接層塊來構建深度網絡。測試

  不過這裏的一個難題是,卷積的輸入輸出是4D矩陣,然而全鏈接是2D。同時在卷積神經網絡裏咱們提到若是把4D矩陣轉成2D作全鏈接,這個會致使全鏈接層有過多的參數。NiN提出只對通道層作全鏈接而且像素之間共享權重來解決上述兩個問題。就是說,咱們使用kernel大小是1×1的卷積。this

 

def NiNNet(): """ 經過串聯多個卷積層和全鏈接層 :return: """

    def mlpconv(channels, kernel_size, padding, strides=1, max_pooling=True): """ 經過構造一個正常的卷積層,和兩個kernel=1的卷積層(功能至關於全鏈接層)構造 :param channels: :param kernel_size: :param padding: :param strides: :param max_pooling: :return: """ net = nn.Sequential() net.add( nn.Conv2D(channels=channels, kernel_size=kernel_size, strides=strides, padding=padding, activation='relu'), nn.Conv2D(channels=channels, kernel_size=1, padding=0, strides=1, activation='relu'), nn.Conv2D(channels=channels, kernel_size=1, padding=0, strides=1, activation='relu')) if max_pooling: net.add(nn.MaxPool2D(pool_size=3, strides=2)) return net """ 除了使用了1×1卷積外,NiN在最後不是使用全鏈接,而是使用通道數爲輸出類別個數的mlpconv,外接一個平均池化層來將每一個通道里的數值平均成一個標量。 """ net = nn.Sequential() with net.name_scope(): net.add( mlpconv(96, 11, 0, strides=4), mlpconv(256, 5, 2), mlpconv(384, 3, 1), nn.Dropout(0.5), # 目標類爲10類
            mlpconv(10, 3, 1, max_pooling=False), # 輸入爲 batch_size x 10 x 5 x 5, 經過AvgPool2D轉成 batch_size x 10 x 1 x 1。
            # 使用全局池化能夠避免估算pool_size大小
 nn.GlobalAvgPool2D(), # 轉成 batch_size x 10
 nn.Flatten() ) return net

 

5. GoogLeNet:

  2014年,Google使用了更加複雜的網絡模型,其中包括了網絡的串聯和並聯,以下圖

 

 

 

  能夠看到其中有多個四個並行卷積層的塊。這個塊通常叫作Inception,其基於Network in network的思想作了很大的改進。GoogleNet加入了更加結構化的Inception塊來使得咱們可使用更大的通道,更多的層,同時控制計算量和模型大小在合理範圍內。

def GoogLeNet(num_class): """ GoogLeNet加入了更加結構化的Inception塊來使得咱們可使用更大的通道,更多的層,同時控制計算量和模型大小在合理範圍內。 :return: """

    class GoogleNet(nn.Block): """ 經過串聯Inception來構造深層網絡結構 """
        def __init__(self, num_classes, verbose=False, **kwargs): super(GoogleNet, self).__init__(**kwargs) self.verbose = verbose # add name_scope on the outer most Sequential
 with self.name_scope(): # block 1
                b1 = nn.Sequential() b1.add( nn.Conv2D(64, kernel_size=7, strides=2, padding=3, activation='relu'), nn.MaxPool2D(pool_size=3, strides=2) ) # block 2
                b2 = nn.Sequential() b2.add( nn.Conv2D(64, kernel_size=1), nn.Conv2D(192, kernel_size=3, padding=1), nn.MaxPool2D(pool_size=3, strides=2) ) # block 3
                b3 = nn.Sequential() b3.add( Inception(64, 96, 128, 16, 32, 32), Inception(128, 128, 192, 32, 96, 64), nn.MaxPool2D(pool_size=3, strides=2) ) # block 4
                b4 = nn.Sequential() b4.add( Inception(192, 96, 208, 16, 48, 64), Inception(160, 112, 224, 24, 64, 64), Inception(128, 128, 256, 24, 64, 64), Inception(112, 144, 288, 32, 64, 64), Inception(256, 160, 320, 32, 128, 128), nn.MaxPool2D(pool_size=3, strides=2) ) # block 5
                b5 = nn.Sequential() b5.add( Inception(256, 160, 320, 32, 128, 128), Inception(384, 192, 384, 48, 128, 128), nn.AvgPool2D(pool_size=2) ) # block 6
                b6 = nn.Sequential() b6.add( nn.Flatten(), nn.Dense(num_classes) ) # chain blocks together
                self.net = nn.Sequential() self.net.add(b1, b2, b3, b4, b5, b6) def forward(self, x): out = x for i, b in enumerate(self.net): out = b(out) if self.verbose: print('Block %d output: %s' % (i + 1, out.shape)) return out class Inception(nn.Block): """ 網絡結構的並聯單元 """
        def __init__(self, n1_1, n2_1, n2_3, n3_1, n3_5, n4_1, **kwargs): super(Inception, self).__init__(**kwargs) # path1
            self.p1_convs_1 = nn.Conv2D(n1_1, kernel_size=1, activation='relu') # path2
            self.p2_convs_1 = nn.Conv2D(n2_1, kernel_size=1, activation='relu') # path2
            self.p2_convs_3 = nn.Conv2D(n2_3, kernel_size=1, activation='relu') # path3
            self.p3_convs_1 = nn.Conv2D(n3_1, kernel_size=1, activation='relu') self.p3_convs_5 = nn.Conv2D(n3_5, kernel_size=1, activation='relu') # path4
            self.p4_pool_3 = nn.MaxPool2D(pool_size=3, padding=1, strides=1) self.p4_convs_1 = nn.Conv2D(n4_1, kernel_size=1, activation='relu') def forward(self, x): p1 = self.p1_convs_1(x) p2 = self.p2_convs_3(self.p2_convs_1(x)) p3 = self.p3_convs_5(self.p3_convs_1(x)) p4 = self.p4_convs_1(self.p4_pool_3(x)) return nd.concat(p1, p2, p3, p4, dim=1) net = GoogleNet(num_class) return net

 

6. ResNet:

  ResNet有效的解決了深度卷積神經網絡難訓練的問題。這是由於在偏差反傳的過程當中,梯度一般變得愈來愈小,從而權重的更新量也變小。這個致使遠離損失函數的層訓練緩慢,隨着層數的增長這個現象更加明顯。以前有兩種經常使用方案來嘗試解決這個問題:

  1. 按層訓練。先訓練靠近數據的層,而後慢慢的增長後面的層。但效果不是特別好,並且比較麻煩。
  2. 使用更寬的層(增長輸出通道)而不是更深來增長模型複雜度。但更寬的模型常常不如更深的效果好。

  ResNet經過增長跨層的鏈接來解決梯度逐層回傳時變小的問題。雖然這個想法以前就提出過了,但ResNet真正的把效果作好了。

 

  最底下那層的輸入不只僅是輸出給了中間層,並且其與中間層結果相加進入最上層。這樣在梯度反傳時,最上層梯度能夠直接跳過中間層傳到最下層,從而避免最下層梯度太小狀況。

  爲何叫作殘差網絡呢?咱們能夠將上面示意圖裏的結構拆成兩個網絡的和,一個一層,一個兩層,最下面層是共享的。

  在訓練過程當中,左邊的網絡由於更簡單因此更容易訓練。這個小網絡沒有擬合到的部分,或者說殘差,則被右邊的網絡抓取住。因此直觀上來講,即便加深網絡,跨層鏈接仍然可使得底層網絡能夠充分的訓練,從而不會讓訓練更難。 

  ResNet沿用了VGG的那種全用3×33×3卷積,但在卷積和池化層之間加入了批量歸一層來加速訓練。每次跨層鏈接跨過兩層卷積。這裏咱們定義一個這樣的殘差塊(Residual塊)。注意到若是輸入的通道數和輸出不同時(same_shape=False),咱們使用一個額外的1×1卷積來作通道變化,同時使用strides=2來把長寬減半。

 

def ResNet(num_classes): """ 深度殘差網絡,經過增長跨層的鏈接來解決梯度逐層回傳時變小的問題。雖然這個想法以前就提出過了,但ResNet真正的把效果作好了。 :return: """

    class Residual(nn.Block): """ 構造擴層鏈接,ResNet沿用了VGG的那種全用3×3卷積,但在卷積和池化層之間加入了批量歸一層來加速訓練。 每次跨層鏈接跨過兩層卷積。這裏咱們定義一個這樣的殘差塊。注意到若是輸入的通道數和輸出不同時(same_shape=False), 咱們使用一個額外的1×1卷積來作通道變化,同時使用strides=2來把長寬減半。 """
        def __init__(self, channels, same_shape=True, **kwargs): super(Residual, self).__init__(**kwargs) self.same_shape = same_shape strides = 1 if same_shape else 2 self.conv1 = nn.Conv2D(channels, kernel_size=3, padding=1, strides=strides) self.bn1 = nn.BatchNorm() self.conv2 = nn.Conv2D(channels, kernel_size=3, padding=1) self.bn2 = nn.BatchNorm() if not same_shape: self.conv3 = nn.Conv2D(channels, kernel_size=1, strides=strides) def forward(self, x): out = nd.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) if not self.same_shape: x = self.conv3(x) return nd.relu(out + x) class ResNet(nn.Block): """ 相似GoogLeNet主體是由Inception塊串聯而成,ResNet的主體部分串聯多個Residual塊。 另外注意到一點是,這裏咱們沒用池化層來減少數據長寬,而是經過有通道變化的Residual塊裏面的使用strides=2的卷積層。 """
        def __init__(self, num_classes, verbose=False, **kwargs): super(ResNet, self).__init__(**kwargs) self.verbose = verbose # add name_scope on the outermost Sequential
 with self.name_scope(): # block 1
                b1 = nn.Conv2D(64, kernel_size=7, strides=2) # block 2
                b2 = nn.Sequential() b2.add( nn.MaxPool2D(pool_size=3, strides=2), Residual(64), Residual(64) ) # block 3
                b3 = nn.Sequential() b3.add( Residual(128, same_shape=False), Residual(128) ) # block 4
                b4 = nn.Sequential() b4.add( Residual(256, same_shape=False), Residual(256) ) # block 5
                b5 = nn.Sequential() b5.add( Residual(512, same_shape=False), Residual(512) ) # block 6
                b6 = nn.Sequential() b6.add( nn.AvgPool2D(pool_size=3), nn.Dense(num_classes) ) # chain all blocks together
                self.net = nn.Sequential() self.net.add(b1, b2, b3, b4, b5, b6) def forward(self, x): out = x for i, b in enumerate(self.net): out = b(out) if self.verbose: print('Block %d output: %s' % (i + 1, out.shape)) return out net = ResNet(num_classes) return net

 

咱們使用測試用例來對經典卷積神經網絡進行測試

def do_exp(): # 初始化
    ctx = utils.try_gpu() # 獲取數據
    # batch_size = 256
    train_data, test_data = utils.load_data_fashion_mnist(batch_size=64, resize=224) # net = LeNet()
    # net = AlexNet()

    # architecture = ((2, 64), (2, 128), (2, 256), (2, 512), (2, 512))
    # net = VGGNet(architecture)

    # net = NiNNet()
    # net = GoogLeNet(10)
    net = ResNet(10) net.initialize(ctx=ctx, init=init.Xavier()) print('initialize weight on', ctx) # 訓練
    loss = gluon.loss.SoftmaxCrossEntropyLoss() trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.01}) utils.train(train_data, test_data, net, loss, trainer, ctx, num_epochs=1) if __name__ == '__main__': do_exp()

 

使用到的其餘函數還有:

class DataLoader(object): """similiar to gluon.data.DataLoader, but might be faster. The main difference this data loader tries to read more exmaples each time. But the limits are 1) all examples in dataset have the same shape, 2) data transfomer needs to process multiple examples at each time """
    def __init__(self, dataset, batch_size, shuffle, transform=None): self.dataset = dataset self.batch_size = batch_size self.shuffle = shuffle self.transform = transform def __iter__(self): data = self.dataset[:] X = data[0] y = nd.array(data[1]) n = X.shape[0] # 順序打亂
        if self.shuffle: idx = np.arange(n) np.random.shuffle(idx) X = nd.array(X.asnumpy()[idx]) y = nd.array(y.asnumpy()[idx]) for i in range(n//self.batch_size): if self.transform is not None: yield self.transform(X[i*self.batch_size:(i+1)*self.batch_size], y[i*self.batch_size:(i+1)*self.batch_size]) else: yield (X[i*self.batch_size:(i+1)*self.batch_size], y[i*self.batch_size:(i+1)*self.batch_size]) def __len__(self): return len(self.dataset)//self.batch_size def load_data_fashion_mnist(batch_size, resize=None, root="~/.mxnet/datasets/fashion-mnist"): """download the fashion mnist dataest and then load into memory"""
    def transform_mnist(data, label): # Transform a batch of examples.
        if resize: n = data.shape[0] new_data = nd.zeros((n, resize, resize, data.shape[3])) for i in range(n): new_data[i] = image.imresize(data[i], resize, resize) data = new_data # change data from batch x height x width x channel to batch x channel x height x width
        return nd.transpose(data.astype('float32'), (0,3,1,2))/255, label.astype('float32') mnist_train = gluon.data.vision.FashionMNIST(root=root, train=True, transform=None) mnist_test = gluon.data.vision.FashionMNIST(root=root, train=False, transform=None) # Transform later to avoid memory explosion. 
    train_data = DataLoader(mnist_train, batch_size, shuffle=True, transform=transform_mnist) test_data = DataLoader(mnist_test, batch_size, shuffle=False, transform=transform_mnist) return train_data, test_data def try_gpu(): """If GPU is available, return mx.gpu(0); else return mx.cpu()"""
    try: ctx = mx.gpu() _ = nd.array([0], ctx=ctx) except: ctx = mx.cpu() return ctx def _get_batch(batch, ctx): """return data and label on ctx"""
    if isinstance(batch, mx.io.DataBatch): data = batch.data[0] label = batch.label[0] else: data, label = batch return (gluon.utils.split_and_load(data, ctx), gluon.utils.split_and_load(label, ctx), data.shape[0]) def train(train_data, test_data, net, loss, trainer, ctx, num_epochs, print_batches=None): """Train a network"""
    print("Start training on ", ctx) if isinstance(ctx, mx.Context): ctx = [ctx] for epoch in range(num_epochs): train_loss, train_acc, n, m = 0.0, 0.0, 0.0, 0.0
        if isinstance(train_data, mx.io.MXDataIter): train_data.reset() start = time() for i, batch in enumerate(train_data): data, label, batch_size = _get_batch(batch, ctx) losses = [] with autograd.record(): outputs = [net(X) for X in data] losses = [loss(yhat, y) for yhat, y in zip(outputs, label)] for l in losses: l.backward() train_acc += sum([(yhat.argmax(axis=1)==y).sum().asscalar() for yhat, y in zip(outputs, label)]) train_loss += sum([l.sum().asscalar() for l in losses]) trainer.step(batch_size) n += batch_size m += sum([y.size for y in label]) if print_batches and (i+1) % print_batches == 0: print("Batch %d. Loss: %f, Train acc %f" % ( n, train_loss/n, train_acc/m )) test_acc = evaluate_accuracy(test_data, net, ctx) print("Epoch %d. Loss: %.3f, Train acc %.2f, Test acc %.2f, Time %.1f sec" % ( epoch, train_loss/n, train_acc/m, test_acc, time() - start ))
View Code
相關文章
相關標籤/搜索