寫給程序員的機器學習入門 (八) - 卷積神經網絡 (CNN) - 圖片分類和驗證碼識別

這一篇將會介紹卷積神經網絡 (CNN),CNN 模型很是適合用來進行圖片相關的學習,例如圖片分類和驗證碼識別,也能夠配合其餘模型實現 OCR。html

使用 Python 處理圖片

在具體介紹 CNN 以前,咱們先來看看怎樣使用 Python 處理圖片。Python 處理圖片最主要使用的類庫是 Pillow (Python2 PIL 的 fork),使用如下命令便可安裝:python

pip3 install Pillow

一些簡單操做的例子以下,若是你想了解更多能夠參考 Pillow 的文檔git

# 打開圖片
>>> from PIL import Image
>>> img = Image.open("1.png")

# 查看圖片信息
>>> img.size
(175, 230)
>>> img.mode
'RGB'
>>> img
<PIL.PngImagePlugin.PngImageFile image mode=RGB size=175x230 at 0x10B807B50>

# 縮放圖片
>>> img1 = img.resize((20, 30))
>>> img1
<PIL.Image.Image image mode=RGB size=20x30 at 0x106426FD0>

# 裁剪圖片
>>> img2 = img.crop((0, 0, 16, 16))
>>> img2
<PIL.Image.Image image mode=RGB size=16x16 at 0x105E0EFD0>

# 保存圖片
>>> img1.save("11.png")
>>> img2.save("12.png")

使用 pytorch 處理圖片時要首先獲取圖片的數據,即各個像素對應的顏色值,例如大小爲 175 * 230,模式是 RGB 的圖片會擁有 175 * 230 * 3 的數據,3 分別表明紅綠藍的值,範圍是 0 ~ 255,把圖片轉換爲 pytorch 的 tensor 對象須要通過 numpy 中轉,如下是轉換的例子:github

>>> import numpy
>>> import torch
>>> v = numpy.asarray(img)
>>> t = torch.tensor(v)
>>> t
tensor([[[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        ...,

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]]], dtype=torch.uint8)
>>> t.shape
torch.Size([230, 175, 3])

能夠看到 tensor 的維度是 高度 x 寬度 x 通道數 (RGB 圖片爲 3,黑白圖片爲 1),但是 pytorch 的 CNN 模型會要求維度爲 通道數 x 寬度 x 高度,而且數值應該正規化到 0 ~ 1 的範圍內,使用如下代碼能夠實現:web

# 交換維度 0 (高度) 和 維度 2 (通道數)
>>> t1 = t.transpose(0, 2)
>>> t1.shape
torch.Size([3, 175, 230])

>>> t2 = t1 / 255.0
>>> t2
tensor([[[1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         ...,
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000]],

        [[0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         ...,
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922]],

        [[0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         ...,
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961]]])

以後就能夠圍繞相似上面例子中 t2 這樣的 tensor 對象作文章了🥳。json

卷積神經網絡 (CNN)

卷積神經網絡 (CNN) 會從圖片的各個部分提取特徵,而後再從一級特徵提取二級特徵,若有必要再提取三級特徵 (以此類推),提取結束之後扁平化到最終特徵,而後使用多層或單層線性模型來實現分類識別。提取各級特徵會使用卷積層 (Convolution Layer) 和池化層 (Pooling Layer),提取特徵時能夠選擇添加通道數量以增長各個部分的信息量,分類識別最終特徵使用的線性模型又稱全鏈接層 (Fully Connected Layer),下圖是流程示例:c#

以前的文章介紹線性模型和遞歸模型的時候我使用了數學公式,但只用數學公式說明 CNN 將會很是難以理解,因此接下來我會伴隨例子逐步講解各個層具體作了怎樣的運算。瀏覽器

卷積層 (Convolution Layer)

卷積層會對圖片的各個部分作矩陣乘法操做,而後把結果做爲一個新的矩陣,每一個卷積層有兩個主要的參數,一個是內核大小 (kernel_size),一個是處理間隔 (stride),下圖是一個很是簡單的計算流程例子:網絡

若是增長處理間隔會怎樣呢?下圖展現了不一樣處理間隔的計算部分和輸出結果維度的區別:數據結構

咱們能夠看處處理間隔決定了每次向右或者向下移動的距離,輸出長度可使用公式 (長度 - 內核大小) / 處理間隔 + 1 計算,輸出寬度可使用公式 (長度 - 內核大小) / 處理間隔 + 1 計算。

如今再來看看 pytorch 中怎樣使用卷積層,建立卷積層可使用 torch.nn.Conv2d

# 建立卷積層,入通道 = 1,出通道 = 1,內核大小 = 2,處理間隔 = 1
>>> conv2d = torch.nn.Conv2d(in_channels = 1, out_channels = 1, kernel_size = 2, stride = 1)

# 查看卷積層內部的參數,第一個是內核對應的權重矩陣,第二個是偏移值
>>> p = list(conv2d.parameters())
>>> p
[Parameter containing:
tensor([[[[-0.0650, -0.0575],
          [-0.0313, -0.3539]]]], requires_grad=True), Parameter containing:
tensor([0.1482], requires_grad=True)]

# 如今生成一個 5 x 5,單通道的圖片數據,爲了方便理解這裏使用了 1 ~ 25,實際應該使用 0 ~ 1 之間的值
>>> x = torch.tensor(list(range(1, 26)), dtype=torch.float).reshape(1, 1, 5, 5)
>>> x
tensor([[[[ 1.,  2.,  3.,  4.,  5.],
          [ 6.,  7.,  8.,  9., 10.],
          [11., 12., 13., 14., 15.],
          [16., 17., 18., 19., 20.],
          [21., 22., 23., 24., 25.]]]])

# 使用卷積層計算輸出
>>> y = conv2d(x)
>>> y
tensor([[[[ -2.6966,  -3.2043,  -3.7119,  -4.2196],
          [ -5.2349,  -5.7426,  -6.2502,  -6.7579],
          [ -7.7732,  -8.2809,  -8.7885,  -9.2962],
          [-10.3115, -10.8192, -11.3268, -11.8345]]]],
       grad_fn=<MkldnnConvolutionBackward>)

# 咱們能夠模擬一下處理單個部分的計算,看看和上面的輸出是否一致

# 第 1 部分
>>> x[0,0,0:2,0:2]
tensor([[1., 2.],
        [6., 7.]])
>>> (p[0][0,0,:,:] * x[0,0,0:2,0:2]).sum() + p[1]
tensor([-2.6966], grad_fn=<AddBackward0>)

# 第 2 部分
>>> x[0,0,0:2,1:3]
tensor([[2., 3.],
        [7., 8.]])
>>> (p[0][0,0,:,:] * x[0,0,0:2,1:3]).sum() + p[1]
tensor([-3.2043], grad_fn=<AddBackward0>)

# 第 3 部分
>>> (p[0][0,0,:,:] * x[0,0,0:2,2:4]).sum() + p[1]
tensor([-3.7119], grad_fn=<AddBackward0>)

# 一致吧🥳

到這裏你應該瞭解單通道的卷積層是怎樣計算的,那麼多通道呢?若是有多個入通道,那麼卷積層的權重矩陣會相應有多份,若是有多個出通道,那麼卷積層的權重矩陣數量也會乘以出通道的倍數,例若有 3 個入通道,2 個出通道時,卷積層的權重矩陣會有 6 個 (3 * 2),偏移值會有 2 個,計算規則以下:

部分輸出[出通道1] = 部分輸入[入通道1] * 權重矩陣[0][0] + 部分輸入[入通道2] * 權重矩陣[0][1] + 部分輸入[入通道3] * 權重矩陣[0][2] + 偏移值1
部分輸出[出通道2] = 部分輸入[入通道1] * 權重矩陣[1][0] + 部分輸入[入通道2] * 權重矩陣[1][1] + 部分輸入[入通道3] * 權重矩陣[1][2] + 偏移值2

從計算規則能夠看出,出通道越多每一個部分可提取的特徵數量 (信息量) 也就越多,但計算量也會相應增大。

最後看看卷積層的數學公式 (基本和 pytorch 文檔的公式相同),如今應該能夠理解了吧🤢?

池化層 (Pooling Layer)

池化層的處理比較好理解,它會對每一個圖片每一個區域進行求最大值或者求平均值等運算,以下圖所示:

如今再來看看 pytorch 中怎樣使用卷積層,建立求最大值的池化層可使用 torch.nn.MaxPool2d,建立求平均值的池化層可使用 torch.nn.AvgPool2d

# 建立池化層,內核大小 = 2,處理間隔 = 2
>>> maxPool = torch.nn.MaxPool2d(2, stride=2)

# 生成一個 6 x 6,單通道的圖片數據
>>> x = torch.tensor(range(1, 37), dtype=float).reshape(1, 1, 6, 6)
>>> x
tensor([[[[ 1.,  2.,  3.,  4.,  5.,  6.],
          [ 7.,  8.,  9., 10., 11., 12.],
          [13., 14., 15., 16., 17., 18.],
          [19., 20., 21., 22., 23., 24.],
          [25., 26., 27., 28., 29., 30.],
          [31., 32., 33., 34., 35., 36.]]]], dtype=torch.float64)

# 使用池化層計算輸出
>>> maxPool(x)
tensor([[[[ 8., 10., 12.],
          [20., 22., 24.],
          [32., 34., 36.]]]], dtype=torch.float64)

# 很好理解吧🥳

# 建立和使用求平均值的池化層也很簡單
>>> avgPool = torch.nn.AvgPool2d(2, stride=2)
>>> avgPool(x)
tensor([[[[ 4.5000,  6.5000,  8.5000],
          [16.5000, 18.5000, 20.5000],
          [28.5000, 30.5000, 32.5000]]]], dtype=torch.float64)

全鏈接層 (Fully Connected Layer)

全鏈接層實際上就是多層或單層線性模型,但把特徵傳到全鏈接層以前還須要進行扁平化 (Flatten),例子以下所示:

# 模擬建立一個批次數量爲 2,通道數爲 3,長寬各爲 2 的特徵
>>> x = torch.rand((2, 3, 2, 2))
>>> x
tensor([[[[0.6395, 0.6240],
          [0.4194, 0.6054]],

         [[0.4798, 0.4690],
          [0.2647, 0.6087]],

         [[0.5727, 0.7567],
          [0.8287, 0.1382]]],


        [[[0.7903, 0.8635],
          [0.0053, 0.6417]],

         [[0.7093, 0.7740],
          [0.3115, 0.7587]],

         [[0.5875, 0.8268],
          [0.2923, 0.6016]]]])

# 對它進行扁平化,維度會變爲 批次數量, 通道數*長*寬
>>> x_flatten = x.view(x.shape[0], -1)
>>> x_flatten
tensor([[0.6395, 0.6240, 0.4194, 0.6054, 0.4798, 0.4690, 0.2647, 0.6087, 0.5727,
         0.7567, 0.8287, 0.1382],
        [0.7903, 0.8635, 0.0053, 0.6417, 0.7093, 0.7740, 0.3115, 0.7587, 0.5875,
         0.8268, 0.2923, 0.6016]])

# 以後再傳給線性模型便可
>>> linear = torch.nn.Linear(in_features=12, out_features=2)
>>> linear(x_flatten)
tensor([[-0.3067, -0.5534],
        [-0.1876, -0.6523]], grad_fn=<AddmmBackward>)

填充處理

在看前面提到的卷積層操做的時候,你可能會發現若是處理間隔 (stride) 小於內核大小 (kernel_size),那麼圖片邊緣的像素參與運算的次數會比圖片中間的像素要少,也就是說圖片邊緣對運算結果的影響會更小,若是圖片邊緣的信息一樣比較重要,那麼就會影響預測輸出的精度。爲了解決這個問題發明的就是填充處理,填充處理簡單的來講就是在卷積層初期前給圖片的周邊添加 0,若是填充量等於 1,那麼長寬會各增長 2,以下圖所示:

在 pytorch 中添加填充處理能夠在建立 Conv2d 的時候指定 padding 參數:

# 建立卷積層,入通道 = 1,出通道 = 1,內核大小 = 2,處理間隔 = 1, 填充量 = 1
>>> conv2d = torch.nn.Conv2d(in_channels = 1, out_channels = 1, kernel_size = 2, stride = 1, padding = 1)



使用 CNN 實現圖片分類 (LeNet)

接下來咱們試試使用 CNN 實現圖片分類,也就是給出一張圖片讓程序識別裏面的是什麼東西,使用的數據集是 cifar-10,這是一個很經典的數據集,包含了 60000 張 32x32 的小圖片,圖片有十個分類 (飛機,汽車,鳥,貓,鹿,狗,青蛙,馬,船,貨車),官方下載地址在這裏

須要注意的是,官方下載地址只包含二進制數據,一般不少文章或者教程都會讓咱們使用 torchvision.datasets.CIFAR10 等現成的加載器來加載這個數據集,但我不推薦使用這種方法,由於若是咱們須要訓練實際業務上的數據,那麼確定不會有現成的加載器能夠用,仍是得一張張圖片的加載和轉換。因此這裏我使用了 cifar-10 的原始圖片庫,而後演示怎樣從代碼加載圖片和標籤,而後轉換到訓練使用的 tensor 對象。

如下的代碼使用了 LeNet 模型,這是 30 年前就已經被提出的模型,結構和本文第一個圖片介紹的同樣。此外還有一些須要注意的地方:

  • cifar-10 官方默認劃分了 50000 張圖片做爲訓練集,10000 張圖片做爲驗證集;而個人代碼劃分了 48000 張圖片做爲訓練集,6000 張圖片做爲驗證集,6000 張圖片做爲測試集,因此正確率等數據會和其餘文章或者論文不一致
  • 訓練時的損失計算器使用了 CrossEntropyLoss, 這個計算器的特徵是要求預測輸出是 onehot,實際輸出是索引值 (只有一個分類是正確輸出),例如圖片分類爲 時,預測輸出應該爲 [0, 0, 1, 0, 0, 0, 0, 0, 0, 0] 實際輸出應該爲 2
  • 轉換各個分類的數值到機率使用了 Softmax 函數, 這個函數必須放在模型以外,若是放在模型內部會致使訓練效果變差,由於 CrossEntropyLoss 損失計算器會盡可能讓正確輸出的數值更高,錯誤輸出的數值更低,而不是分別接近 1 和 0,使用 softmax 會干擾損失的計算
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目標的圖片大小,所有圖片都會先縮放到這個大小
IMAGE_SIZE = (32, 32)
# 分析目標的圖片所在的文件夾
IMAGE_DIR = "./cifar"
# 包含全部圖片標籤的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"

class MyModel(nn.Module):
    """圖片分類 (LeNet)"""
    def __init__(self, num_labels):
        super().__init__()
        # 卷積層和池化層
        self.cnn_model = nn.Sequential(
            nn.Conv2d(3, 6, kernel_size=5), # 維度: B,3,32,32 => B,6,28,28
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2), # 維度: B,6,14,14
            nn.Conv2d(6, 16, kernel_size=5), # 維度: B,16,10,10
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2) # 維度: B,16,5,5
        )
        # 全鏈接層
        self.fc_model = nn.Sequential(
            nn.Linear(16 * 5 * 5, 120), # 維度: B,120
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(120, 60), # 維度: B,60
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(60, num_labels), # 維度: B,num_labels
        )

    def forward(self, x):
        # 應用卷積層和池化層
        cnn_features = self.cnn_model(x)
        # 扁平化輸出的特徵
        cnn_features_flatten = cnn_features.view(cnn_features.shape[0], -1)
        # 應用全鏈接層
        y = self.fc_model(cnn_features_flatten)
        return y

def save_tensor(tensor, path):
    """保存 tensor 對象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """從文件讀取 tensor 對象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """轉換圖片對象到 tensor 對象"""
    in_img = img.resize(IMAGE_SIZE)
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
    t = t / 255.0 # 正規化數值使得範圍在 0 ~ 1
    return t

def load_image_labels():
    """讀取圖片分類列表"""
    return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))

def prepare_save_batch(batch, tensor_in, tensor_out):
    """準備訓練 - 保存單個批次的數據"""
    # 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盤
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """準備訓練"""
    # 數據集轉換到 tensor 之後會保存在 data 文件夾下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 準備圖片分類到序號的索引
    labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }

    # 查找全部圖片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 分類名稱在文件名中,例如
            # 2598_cat.png => cat
            label = filename.split(".")[0].split("_")[1]
            label_index = labels_to_index.get(label)
            if label_index is None:
                continue
            image_paths.append((path, label_index))

    # 打亂圖片順序
    random.shuffle(image_paths)

    # 分批讀取和保存圖片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                t = image_to_tensor(img)
                image_tensors.append(t)
            image_labels.append(label_index)
        tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
        tensor_out = torch.tensor(image_labels) # 維度: B
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """開始訓練"""
    # 建立模型實例
    num_labels = len(load_image_labels())
    model = MyModel(num_labels)

    # 建立損失計算器
    # 計算單分類輸出最好使用 CrossEntropyLoss, 多分類輸出最好使用 BCELoss
    # 使用 CrossEntropyLoss 時實際輸出應該爲標籤索引值,不須要轉換爲 onehot
    loss_function = torch.nn.CrossEntropyLoss()

    # 建立參數調整器
    optimizer = torch.optim.Adam(model.parameters())

    # 記錄訓練集和驗證集的正確率變化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 記錄最高的驗證集正確率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 讀取批次的工具函數
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 計算正確率的工具函數
    def calc_accuracy(actual, predicted):
        # 把最大的值看成正確分類,而後比對有多少個分類相等
        predicted_labels = predicted.argmax(dim=1)
        acc = (actual == predicted_labels).sum().item() / actual.shape[0]
        return acc

    # 劃分輸入和輸出的工具函數
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 開始訓練過程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根據訓練集訓練並修改參數
        # 切換模型到訓練模式,將會啓用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助於泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 劃分輸入和輸出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 計算預測值
                predicted = model(batch_x)
                # 計算損失
                loss = loss_function(predicted, batch_y)
                # 從損失自動微分求導函數值
                loss.backward()
                # 使用參數調整器調整參數
                optimizer.step()
                # 清空導函數值
                optimizer.zero_grad()
                # 記錄這一個批次的正確率,torch.no_grad 表明臨時禁用自動微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 輸出批次正確率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 檢查驗證集
        # 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練後仍然沒有刷新記錄
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次訓練後仍然沒有刷新記錄,結束訓練
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用達到最高正確率時的模型狀態
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 檢查測試集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 顯示訓練集和驗證集的正確率變化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用訓練好的模型"""
    # 建立模型實例,加載訓練好的狀態,而後切換到驗證模式
    labels = load_image_labels()
    num_labels = len(labels)
    model = MyModel(num_labels)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 詢問圖片路徑,並顯示可能的分類一覽
    while True:
        try:
            # 構建輸入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
            # 預測輸出
            tensor_out = model(tensor_in)
            # 轉換到各個分類對應的機率
            tensor_out = nn.functional.softmax(tensor_out, dim=1)
            # 顯示按機率排序後的分類一覽
            rates = (t.item() for t in tensor_out[0])
            label_with_rates = list(zip(labels, rates))
            label_with_rates.sort(key=lambda p:-p[1])
            for label, rate in label_with_rates[:5]:
                rate = rate * 100
                print(f"{label}: {rate:0.2f}%")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函數"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 給隨機數生成器分配一個初始值,使得每次運行均可以生成相同的隨機數
    # 這是爲了讓過程可重現,你也能夠選擇不這樣作
    random.seed(0)
    torch.random.manual_seed(0)

    # 根據命令行參數選擇操做
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

準備訓練使用的數據和開始訓練須要分別執行如下命令:

python3 example.py prepare
python3 example.py train

最終輸出結果以下,能夠看到訓練集正確率達到了 71%,驗證集和測試集正確率達到了 61%,這個正確率表明能夠精準說出圖片所屬的分類,也稱 top 1 正確率;此外計算正確分類在機率排前三的分類之中的比率稱爲 top 3 正確率,若是是電商上傳圖片之後給出三個可能的商品分類讓商家選擇,那麼計算 top 3 正確率就有意義了。

training accuracy: 0.7162083333333331
validating accuracy: 0.6134999999999998
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.6183333333333333 from epoch 40
testing accuracy: 0.6168333333333332

訓練集與驗證集正確率變化以下圖所示:

實際使用模型的例子以下,輸出表明預測圖片有 79.23% 的機率是飛機,你也能夠試試在互聯網上隨便找一張圖片讓這個模型識別:

$ python3 example.py eval
Image path: ./cifar/test/2257_airplane.png
airplane: 79.23%
deer: 6.06%
automobile: 4.04%
cat: 2.89%
frog: 2.11%

使用 CNN 實現圖片分類 (ResNet)

上述的模型 top 1 正確率只達到了 61%, 畢竟是 30 年前的老模型了🧔,這裏我再介紹一個相對比較新的模型,ResNet 是在 2015 年中提出的模型,論文地址在這裏,特徵是會把輸入和輸出結合在一塊,例如原來計算 y = f(x) 會變爲 y = f(x) + x,從而抵消層數變多帶來的梯度消失問題 (參考我以前寫的訓練過程當中經常使用的技巧)。

下圖是 ResNet-18 模型的結構,內部能夠分爲 4 組,每一個組都包括 2 個基礎塊和 4 個卷積層,而且每一個基礎塊會把輸入和輸出結合在一塊兒,層數合計一共有 16,加上最開始轉換輸入的層和全鏈接層一共有 18 層,因此稱爲 ResNet-18,除此以外還有 ResNet-34,ResNet-50 等等變種,若是有興趣能夠參考本節末尾給出的 torchvision 的實現代碼。

從圖中能夠看到,從第二組開始會把長寬變爲一半,同時通道數增長一倍,而後維持通道數和長寬不變,全部組結束後使用一個 AvgPool2d 來讓長寬強制變爲 1x1,最後交給全鏈接層。計算卷積層輸出長寬的公式是 (長度 - 內核大小 + 填充量*2) / 處理間隔 + 1,讓長寬變爲一半會使用內核大小 3,填充量 1,處理間隔 2 ,例如長度爲 32 能夠計算得出 (32 - 3 + 2) / 2 + 1 == 16;而維持長寬的則會使用內核大小 3,填充量 1,處理間隔 1,例如長度爲 32 能夠計算得出 (32 - 3 + 2) / 1 + 1 == 32

如下是使用 ResNet-18 進行訓練的代碼:

import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目標的圖片大小,所有圖片都會先縮放到這個大小
IMAGE_SIZE = (32, 32)
# 分析目標的圖片所在的文件夾
IMAGE_DIR = "./cifar"
# 包含全部圖片標籤的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"

class BasicBlock(nn.Module):
    """ResNet 使用的基礎塊"""
    expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這裏的實現固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷積層
        # 處理間隔 stride = 1 時,輸出的長寬會等於輸入的長寬,例如 (32-3+2)//1+1 == 32
        # 處理間隔 stride = 2 時,輸出的長寬會等於輸入的長寬的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷積層不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定義一個讓輸出和輸入維度相同的 3x3 卷積層
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 讓原始輸入和輸出相加的時候,須要維度一致,若是維度不一致則須要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp)
        return y

class MyModel(nn.Module):
    """圖片分類 (ResNet-18)"""
    def __init__(self, num_labels, block_type = BasicBlock):
        super().__init__()
        # 記錄上一層的出通道數量
        self.previous_channels_out = 64
        # 把 3 通道轉換到 64 通道,長寬不變
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.previous_channels_out))
        # ResNet 使用的各個層
        self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
        # 把最後一層的長寬轉換爲 1x1 的池化層,Adaptive 表示會自動檢測原有長寬
        # 例如 B,512,4,4 的矩陣會轉換爲 B,512,1,1,每一個通道的單個值會是原有 16 個值的平均
        self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
        # 全鏈接層,只使用單層線性模型
        self.fc_model = nn.Linear(512 * block_type.expansion, num_labels)

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        blocks = []
        # 添加第一個塊
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩餘的塊,剩餘的塊固定處理間隔爲 1,不會改變長寬
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    def forward(self, x):
        # 轉換出通道到 64
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        # 應用 ResNet 的各個層
        tmp = self.layer1(tmp)
        tmp = self.layer2(tmp)
        tmp = self.layer3(tmp)
        tmp = self.layer4(tmp)
        # 轉換長寬到 1x1
        tmp = self.avgPool(tmp)
        # 扁平化,維度會變爲 B,512
        tmp = tmp.view(tmp.shape[0], -1)
        # 應用全鏈接層
        y = self.fc_model(tmp)
        return y

def save_tensor(tensor, path):
    """保存 tensor 對象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """從文件讀取 tensor 對象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """轉換圖片對象到 tensor 對象"""
    in_img = img.resize(IMAGE_SIZE)
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
    t = t / 255.0 # 正規化數值使得範圍在 0 ~ 1
    return t

def load_image_labels():
    """讀取圖片分類列表"""
    return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))

def prepare_save_batch(batch, tensor_in, tensor_out):
    """準備訓練 - 保存單個批次的數據"""
    # 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盤
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """準備訓練"""
    # 數據集轉換到 tensor 之後會保存在 data 文件夾下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 準備圖片分類到序號的索引
    labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }

    # 查找全部圖片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 分類名稱在文件名中,例如
            # 2598_cat.png => cat
            label = filename.split(".")[0].split("_")[1]
            label_index = labels_to_index.get(label)
            if label_index is None:
                continue
            image_paths.append((path, label_index))

    # 打亂圖片順序
    random.shuffle(image_paths)

    # 分批讀取和保存圖片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                t = image_to_tensor(img)
                image_tensors.append(t)
            image_labels.append(label_index)
        tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
        tensor_out = torch.tensor(image_labels) # 維度: B
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """開始訓練"""
    # 建立模型實例
    num_labels = len(load_image_labels())
    model = MyModel(num_labels)

    # 建立損失計算器
    # 計算單分類輸出最好使用 CrossEntropyLoss, 多分類輸出最好使用 BCELoss
    # 使用 CrossEntropyLoss 時實際輸出應該爲標籤索引值,不須要轉換爲 onehot
    loss_function = torch.nn.CrossEntropyLoss()

    # 建立參數調整器
    optimizer = torch.optim.Adam(model.parameters())

    # 記錄訓練集和驗證集的正確率變化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 記錄最高的驗證集正確率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 讀取批次的工具函數
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 計算正確率的工具函數
    def calc_accuracy(actual, predicted):
        # 把最大的值看成正確分類,而後比對有多少個分類相等
        predicted_labels = predicted.argmax(dim=1)
        acc = (actual == predicted_labels).sum().item() / actual.shape[0]
        return acc

    # 劃分輸入和輸出的工具函數
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 開始訓練過程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根據訓練集訓練並修改參數
        # 切換模型到訓練模式,將會啓用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助於泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 劃分輸入和輸出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 計算預測值
                predicted = model(batch_x)
                # 計算損失
                loss = loss_function(predicted, batch_y)
                # 從損失自動微分求導函數值
                loss.backward()
                # 使用參數調整器調整參數
                optimizer.step()
                # 清空導函數值
                optimizer.zero_grad()
                # 記錄這一個批次的正確率,torch.no_grad 表明臨時禁用自動微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 輸出批次正確率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 檢查驗證集
        # 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練後仍然沒有刷新記錄
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次訓練後仍然沒有刷新記錄,結束訓練
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用達到最高正確率時的模型狀態
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 檢查測試集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 顯示訓練集和驗證集的正確率變化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用訓練好的模型"""
    # 建立模型實例,加載訓練好的狀態,而後切換到驗證模式
    labels = load_image_labels()
    num_labels = len(labels)
    model = MyModel(num_labels)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 詢問圖片路徑,並顯示可能的分類一覽
    while True:
        try:
            # 構建輸入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
            # 預測輸出
            tensor_out = model(tensor_in)
            # 轉換到各個分類對應的機率
            tensor_out = nn.functional.softmax(tensor_out, dim=1)
            # 顯示按機率排序後的分類一覽
            rates = (t.item() for t in tensor_out[0])
            label_with_rates = list(zip(labels, rates))
            label_with_rates.sort(key=lambda p:-p[1])
            for label, rate in label_with_rates[:5]:
                rate = rate * 100
                print(f"{label}: {rate:0.2f}%")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函數"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 給隨機數生成器分配一個初始值,使得每次運行均可以生成相同的隨機數
    # 這是爲了讓過程可重現,你也能夠選擇不這樣作
    random.seed(0)
    torch.random.manual_seed(0)

    # 根據命令行參數選擇操做
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

最終輸出結果以下,能夠看到訓練集正確率達到了 99%,驗證集正確率達到了 85%,測試集正確率達到了 84%,比起上面的 LeNet 模型改進了不少吧🤗。

training accuracy: 0.9972708333333337
validating accuracy: 0.8373333333333337
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.8521666666666667 from epoch 38
testing accuracy: 0.8464999999999996

隨便在網上找的貓狗圖片:

輸出結果以下,不錯吧:

Image path: BlogArchive/ml-08/cat.jpg
cat: 100.00%
dog: 0.00%
frog: 0.00%
deer: 0.00%
horse: 0.00%

Image path: BlogArchive/ml-08/dog.jpg
dog: 100.00%
bird: 0.00%
deer: 0.00%
frog: 0.00%
horse: 0.00%

pytorch 有專門用於處理視覺信息的 torchvision,其中包含了 ResNet 的實現,也就是說其實咱們不用本身去寫🤒,若是你有興趣能夠參考裏面的實現代碼,再試試 ResNet-50 等層數更多的模型是否能夠帶來更好的效果。

AI 鑑黃

相信不少人都看過 AI 鑑黃的新聞🥴🤭🥺,若是你想本身實現一個,能夠從 nsfw_data_scraper 下載圖片資源而後使用上面介紹的方法訓練,識別起來會比 cifar 簡單不少。由於實際只須要兩個標籤(1 黃色圖片,0 正常圖片),因此也可使用單個值表明結果,而後用 sigmoid 代替 softmax。此外你也能夠在 github 上搜索 nsfw 找到現成的模型。

使用 CNN 實現驗證碼識別 (ResNet-18)

最後再給出一個實用的例子。不少網站爲了防機器人操做會使用驗證碼機制,傳統的驗證碼會顯示一張包含數字字母的圖片,而後讓用戶填寫裏面的內容再對比是否正確,來判斷用戶是普通人仍是機器人,這樣的驗證碼能夠用本篇介紹的 CNN 模型識別出來😈。

首先咱們來選一個生成驗證碼的類庫,github 上搜索 captcha c# 裏面難度相對比較高的是 Hei.Captcha,這篇就使用 CNN 模型識別這個類庫生成的驗證碼。(個人 zkweb 裏面也有生成驗證碼的模塊,但難度比較低因此就不用了)

如下步驟和代碼會生成十萬張用於訓練和測試使用的驗證碼圖片:

mkdir generate-captcha
cd generate-captcha
dotnet new console
dotnet add package Hei.Captcha
mkdir output
mkdir fonts
cd fonts
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/Candara.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/STCAIYUN.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/impact.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/monbaiti.ttf?raw=true
cd ..
# 添加程序代碼
dotnet run -c Release
using System;
using System.IO;
using Hei.Captcha;

namespace generate_captcha
{
    class Program
    {
        static void Main(string[] args)
        {
            var helper = new SecurityCodeHelper();
            var iterations = 100000;
            for (var x = 0; x < iterations; ++x)
            {
                var code = helper.GetRandomEnDigitalText(4);
                var bytes = helper.GetEnDigitalCodeByte(code);
                File.WriteAllBytes($"output/{x:D5}-{code}.png", bytes);
                if (x % 100 == 0)
                    Console.WriteLine($"{x}/{iterations}");
            }
        }
    }
}

如下是生成的驗證碼圖片例子,變形旋轉幹擾線動態背景色該有的都有😠:

接下來咱們想一想應該用什麼數據結構來表達驗證碼。在圖片識別的例子中有十個分類,咱們用了 onehot 編碼,即便用長度爲 10 的 tensor 對象來表示結果,正確的分類爲 1,不正確的分類爲 0。換成驗證碼之後,能夠用長度爲 36 的 tensor 對象來表示 1 位驗證碼 (26 個英文數字 + 10 個字母,假設驗證碼不分大小寫),若是有多位則能夠 36 * 位數的 tensor 對象來表達多位驗證碼。如下函數能夠把驗證碼轉換爲對應的 tensor 對象:

# 字母數字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 驗證碼位數
DIGITS = 4
# 標籤數量,字母數字混合*位數
NUM_LABELS = len(ALPHA_NUMS)*DIGITS

def code_to_tensor(code):
    """轉換驗證碼到 tensor 對象,使用 onehot 編碼"""
    t = torch.zeros((NUM_LABELS,))
    code = code.lower() # 驗證碼不分大小寫
    for index, c in enumerate(code):
        p = ALPHA_NUMS_MAP[c]
        t[index*len(ALPHA_NUMS)+p] = 1
    return t

轉換例子以下:

>>> code_to_tensor("abcd")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
>>> code_to_tensor("a123")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.])

反過來也同樣,咱們能夠把 tensor 的長度按 36 分爲多組,而後求每一組最大的值所在的索引,再根據該索引找到對應的字母或者數字,就能夠把 tensor 對象轉換回驗證碼:

def tensor_to_code(tensor):
    """轉換 tensor 對象到驗證碼"""
    tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
    indices = tensor.max(dim=1).indices
    code = "".join(ALPHA_NUMS[index] for index in indices)
    return code

接下來就能夠用前面介紹過的 ResNet-18 模型進行訓練了😎,相比前面的圖片分類,這份代碼有如下幾點不一樣:

  • 由於是多分類,損失計算器應該使用 BCELoss 代替 CrossEntropyLoss
  • BCELoss 要求模型輸出值範圍在 0 ~ 1 之間,因此須要在模型內部添加控制函數 (CrossEntropyLoss 這麼作會影響訓練效果,但 BCELoss 不會)
  • 由於每一組都只有一個值是正確的,用 softmax 效果會比 sigmoid 要好 (普通的多分類問題會使用 sigmoid)
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目標的圖片大小,所有圖片都會先縮放到這個大小
# 驗證碼原圖是 120x50
IMAGE_SIZE = (56, 24)
# 分析目標的圖片所在的文件夾
IMAGE_DIR = "./generate-captcha/output/"
# 字母數字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 驗證碼位數
DIGITS = 4
# 標籤數量,字母數字混合*位數
NUM_LABELS = len(ALPHA_NUMS)*DIGITS

class BasicBlock(nn.Module):
    """ResNet 使用的基礎塊"""
    expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這裏的實現固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷積層
        # 處理間隔 stride = 1 時,輸出的長寬會等於輸入的長寬,例如 (32-3+2)//1+1 == 32
        # 處理間隔 stride = 2 時,輸出的長寬會等於輸入的長寬的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷積層不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定義一個讓輸出和輸入維度相同的 3x3 卷積層
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 讓原始輸入和輸出相加的時候,須要維度一致,若是維度不一致則須要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp)
        return y

class MyModel(nn.Module):
    """識別驗證碼 (ResNet-18)"""
    def __init__(self, block_type = BasicBlock):
        super().__init__()
        # 記錄上一層的出通道數量
        self.previous_channels_out = 64
        # 把 3 通道轉換到 64 通道,長寬不變
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.previous_channels_out))
        # ResNet 使用的各個層
        self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
        # 把最後一層的長寬轉換爲 1x1 的池化層,Adaptive 表示會自動檢測原有長寬
        # 例如 B,512,4,4 的矩陣會轉換爲 B,512,1,1,每一個通道的單個值會是原有 16 個值的平均
        self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
        # 全鏈接層,只使用單層線性模型
        self.fc_model = nn.Linear(512 * block_type.expansion, NUM_LABELS)
        # 控制輸出在 0 ~ 1 之間,BCELoss 須要
        # 由於每組只應該有一個值爲真,使用 softmax 效果會比 sigmoid 好
        self.softmax = nn.Softmax(dim=2)

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        blocks = []
        # 添加第一個塊
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩餘的塊,剩餘的塊固定處理間隔爲 1,不會改變長寬
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    def forward(self, x):
        # 轉換出通道到 64
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        # 應用 ResNet 的各個層
        tmp = self.layer1(tmp)
        tmp = self.layer2(tmp)
        tmp = self.layer3(tmp)
        tmp = self.layer4(tmp)
        # 轉換長寬到 1x1
        tmp = self.avgPool(tmp)
        # 扁平化,維度會變爲 B,512
        tmp = tmp.view(tmp.shape[0], -1)
        # 應用全鏈接層
        tmp = self.fc_model(tmp)
        # 劃分每一個字符對應的組,以後維度爲 batch_size, digits, alpha_nums
        tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
        # 應用 softmax 到每一組
        tmp = self.softmax(tmp)
        # 從新扁平化,以後維度爲 batch_size, num_labels
        y = tmp.reshape(tmp.shape[0], NUM_LABELS)
        return y

def save_tensor(tensor, path):
    """保存 tensor 對象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """從文件讀取 tensor 對象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """轉換圖片對象到 tensor 對象"""
    in_img = img.resize(IMAGE_SIZE)
    in_img = in_img.convert("RGB") # 轉換圖片模式到 RGB
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
    t = t / 255.0 # 正規化數值使得範圍在 0 ~ 1
    return t

def code_to_tensor(code):
    """轉換驗證碼到 tensor 對象,使用 onehot 編碼"""
    t = torch.zeros((NUM_LABELS,))
    code = code.lower() # 驗證碼不分大小寫
    for index, c in enumerate(code):
        p = ALPHA_NUMS_MAP[c]
        t[index*len(ALPHA_NUMS)+p] = 1
    return t

def tensor_to_code(tensor):
    """轉換 tensor 對象到驗證碼"""
    tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
    indices = tensor.max(dim=1).indices
    code = "".join(ALPHA_NUMS[index] for index in indices)
    return code

def prepare_save_batch(batch, tensor_in, tensor_out):
    """準備訓練 - 保存單個批次的數據"""
    # 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盤
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """準備訓練"""
    # 數據集轉換到 tensor 之後會保存在 data 文件夾下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 查找全部圖片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 驗證碼在文件名中,例如
            # 00000-R865.png => R865
            code = filename.split(".")[0].split("-")[1]
            image_paths.append((path, code))

    # 打亂圖片順序
    random.shuffle(image_paths)

    # 分批讀取和保存圖片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, code in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                image_tensors.append(image_to_tensor(img))
            image_labels.append(code_to_tensor(code))
        tensor_in = torch.stack(image_tensors) # 維度: B,C,W,H
        tensor_out = torch.stack(image_labels) # 維度: B,N
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """開始訓練"""
    # 建立模型實例
    model = MyModel()

    # 建立損失計算器
    # 計算多分類輸出最好使用 BCELoss
    loss_function = torch.nn.BCELoss()

    # 建立參數調整器
    optimizer = torch.optim.Adam(model.parameters())

    # 記錄訓練集和驗證集的正確率變化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 記錄最高的驗證集正確率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 讀取批次的工具函數
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 計算正確率的工具函數
    def calc_accuracy(actual, predicted):
        # 把每一位的最大值看成正確字符,而後比對有多少個字符相等
        actual_indices = actual.reshape(actual.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
        predicted_indices = predicted.reshape(predicted.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
        matched = (actual_indices - predicted_indices).abs().sum(dim=1) == 0
        acc = matched.sum().item() / actual.shape[0]
        return acc
 
    # 劃分輸入和輸出的工具函數
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size, num_labels
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 開始訓練過程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根據訓練集訓練並修改參數
        # 切換模型到訓練模式,將會啓用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助於泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 劃分輸入和輸出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 計算預測值
                predicted = model(batch_x)
                # 計算損失
                loss = loss_function(predicted, batch_y)
                # 從損失自動微分求導函數值
                loss.backward()
                # 使用參數調整器調整參數
                optimizer.step()
                # 清空導函數值
                optimizer.zero_grad()
                # 記錄這一個批次的正確率,torch.no_grad 表明臨時禁用自動微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 輸出批次正確率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 檢查驗證集
        # 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練後仍然沒有刷新記錄
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次訓練後仍然沒有刷新記錄,結束訓練
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用達到最高正確率時的模型狀態
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 檢查測試集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 顯示訓練集和驗證集的正確率變化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用訓練好的模型"""
    # 建立模型實例,加載訓練好的狀態,而後切換到驗證模式
    model = MyModel()
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 詢問圖片路徑,並顯示可能的分類一覽
    while True:
        try:
            # 構建輸入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 維度 C,W,H => 1,C,W,H
            # 預測輸出
            tensor_out = model(tensor_in)
            # 轉換到驗證碼
            code = tensor_to_code(tensor_out[0])
            print(f"code: {code}")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函數"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 給隨機數生成器分配一個初始值,使得每次運行均可以生成相同的隨機數
    # 這是爲了讓過程可重現,你也能夠選擇不這樣作
    random.seed(0)
    torch.random.manual_seed(0)

    # 根據命令行參數選擇操做
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

由於訓練須要大量時間而我機器只有 CPU 能夠用,因此此次我就只訓練到 epoch 23 🤢,訓練結果以下。能夠看到訓練集正確率達到了 98%,驗證集正確率達到了 91%,已是實用的級別了。

epoch: 23, batch: 98: batch accuracy: 0.99125
epoch: 23, batch: 99: batch accuracy: 0.9862500000000001
training accuracy: 0.9849874999999997
validating accuracy: 0.9103000000000003
highest validating accuracy updated

使用訓練好的模型識別驗證碼,你能夠對比上面的圖片看看是否是識別對了 (第二張的 P 看起來很像 D 🤒):

$ python3 example.py eval
Image path: BlogArchive/ml-08/captcha-1.png
code: 8ca6

Image path: BlogArchive/ml-08/captcha-2.png
code: tp8s

Image path: BlogArchive/ml-08/captcha-3.png
code: k225

注意這裏介紹出來的模型只能識別這一種驗證碼,其餘不一樣種類的驗證碼須要分別訓練和生成模型,作打碼平臺的話會先識別驗證碼種類再使用該種類對應的模型識別驗證碼內容。若是你的目標只是單種驗證碼,那麼用這篇文章介紹的方法應該能夠幫你節省調打碼平臺的錢 🤠。若是你機器有好顯卡,也能夠試試用更高級的模型提高正確率。

此外,有不少人問我如今流行的滑動驗證碼如何破解,其實破解這種驗證碼只須要作簡單的圖片分析,例如這裏這裏都沒有使用機器學習。但滑動驗證碼通常會配合瀏覽器指紋和鼠標軌跡採集一塊兒使用,後臺會根據大量數據分析用戶是普通人仍是機器人,因此破解幾回很簡單,但一直破解下去則會有很大概率被檢測出來。

寫在最後

這個系列中預約要寫的內容已經所有寫出來了,接下來要寫什麼還不肯定,有時間可能會從新維護那些放了半年以上的項目,也可能會想辦法搞好飯店的生意,最近生意實在很差啊🤒。

相關文章
相關標籤/搜索