寫給程序員的機器學習入門 (十一) - 對象識別 YOLO - 識別人臉位置與是否戴口罩

這篇將會介紹目前最流行的對象識別模型 YOLO,YOLO 的特徵是快,識別速度很是快🤗,然而精度相對 Faster-RCNN 只差一點點 (YOLOv3 以後)。閱讀這篇須要先了解對象識別的原理,若是你沒看過這個系列的前幾篇文章 (介紹 RCNN, Fast-RCNN, Faster-RCNN 的文章),請先閱讀它們。python

YOLO 模型概覽

YOLO 的縮寫是 You only look once,翻譯成中文是寶貝你只須要看一次喔😘。YOLO 模型能夠直接根據圖片輸出包含對象的區域與區域對應的分類,一步到位,不像 RCNN 系列的模型須要先計算包含對象的區域,再根據區域判斷對應的分類,YOLO 模型的速度比 RCNN 系列的模型要快不少。算法

YOLO 模型的結構以下:json

是否是以爲有點熟悉?看上去就像 Faster-RCNN 的區域生成網絡 (RPN) 啊。的確,YOLO 模型原理上就是尋找區域的同時判斷區域包含的對象分類,YOLO 模型與區域生成網絡有如下的不一樣:網絡

  • YOLO 模型會輸出各個區域是否包含對象中心,而不是包含對象的一部分
  • YOLO 模型會同時輸出對象分類
  • YOLO 模型輸出的區域偏移會根據對象中心點計算,具體算法在下面說明

YOLO 模型與 Faster-RCNN 的區域生成網絡最大的不一樣是會判斷各個區域是否包含對象中心,以下圖中狗臉覆蓋了四個區域,但只有左下角的區域包含了狗臉的中心,YOLO 模型應該只判斷這個區域包含對象。app

固然,若是對象中心很是接近區域的邊界,那麼判斷起來將會很困難,YOLO 模型在訓練的時候會忽略對象重疊率高於必定水平的區域,具體能夠參考後面給出的代碼。框架

YOLO 模型會針對各個區域輸出如下的結果,這裏假設有三個分類:dom

  • 是否包含對象中心 (是爲 1, 否爲 0)
  • 區域偏移 x
  • 區域偏移 y
  • 區域偏移 w
  • 區域偏移 h
  • 分類 1 的可能性 (0 ~ 1)
  • 分類 2 的可能性 (0 ~ 1)
  • 分類 3 的可能性 (0 ~ 1)

輸出結果的維度是 批次大小, 區域數量, 5 + 分類數量ide

區域偏移用於調整輸出的區域範圍,例如上圖中狗臉的中心點大約在區域的右上角,若是把區域左上角看做 (0, 0),右下角看做 (1, 1),那麼狗臉中心點應該在 (0.95, 0.1) 的位置,而狗臉大小相對於區域長寬大概是 (1.3, 1.5) 倍,生成訓練數據的時候會根據這 4 個值計算區域偏移,具體計算代碼在下面給出。函數

看到這裏你可能會想,YOLO 模型看起來很簡單啊,我能夠丟掉操蛋的 Faster-RCNN 模型了🤢。不,沒那麼簡單,以上介紹的只是 YOLOv1 模型,YOLOv1 模型的精度很是低,後面爲了改進識別精度還發展出 YOLOv2, YOLOv3, YOLOv4, YOLOv5 模型😮,接下來將會介紹 YOLOv2, YOLOv3 模型主要改進了什麼部分,再給出 YOLOv3 模型的實現。YOLOv4 和 YOLOv5 模型主要改進了提取特徵用的 CNN 模型 (也稱骨幹網絡 Backbone Network),原始的 YOLO 模型使用了 C 語言編寫的 Darknet 做爲骨幹網絡,而這篇使用 Resnet 做爲骨幹網絡,因此只介紹到 YOLOv3。工具

YOLOv2

YOLOv2 最主要的改進點是引入了錨點 (Anchor),若是你已經看完前幾篇文章那麼應該很瞭解錨點是什麼,錨點會從每一個區域的中心點衍生出不一樣形狀的多個錨點區域:

Faster-RCNN 使用錨點主要爲了提高區域重疊率以免漏掉部分對象 (Faster-RCNN 訓練時會根據重疊率判斷區域是否包含對象,若是對象很長或者很寬但形狀只有正方形,那麼重疊率就會比較低致使該對象被漏掉),然而 YOLO 使用對象中心點,並不會存在因重疊率不足而漏掉對象的問題,YOLO 使用錨點是爲了支持識別中心位於同一個區域的多個對象,以下圖所示:

若是對象中心落在某個區域,YOLO 會計算該區域對應的各個形狀的重疊率,並使用重疊率最高的形狀,這樣若是多個對象中心落在同一個區域但它們的形狀不一樣,就會分別判斷出不一樣的分類。YOLOv2 的輸出以下圖所示:

輸出結果的維度是 批次大小, 區域數量 * 形狀數量, 5 + 分類數量

YOLOv2 還有一些針對骨幹網絡和訓練方法的改進點,但這篇文章都沒用到因此就不介紹了,若是你有興趣能夠參考後面給出的論文連接。

你可能會注意到 YOLO 劃分的區域是固定的,而且判斷區域是否存在對象和對象的分類時只會使用該區域中的數據,這樣會致使如下的問題:

  • 若是對象相對區域過大,則模型很難肯定哪一個區域包含中心點
    • Faster-RCNN 按錨點區域的重疊率而不是中心點判斷是否包含對象,因此不會有這個問題
  • 若是對象相對區域過大,則每一個區域都只包含對象的一小部分,很難依據這一小部分來判斷對象分類 (例如區域只包含鼻子的時候模型須要只根據鼻子判斷是否人臉)
    • Faster-RCNN 分兩步走,標籤分類網絡會根據區域生成網絡給出的區域截取特徵再判斷分類,因此不會有這個問題
  • 若是對象相對區域太小,則多個對象有可能處於同一個區域中
    • 由於 Faster-RCNN 不會有以上兩個問題,因此能夠用更小的區域

所以,YOLOv2 只適合對象大小和區域大小比較接近的場景。

YOLOv3

爲了更好的支持不一樣大小的對象,YOLOv3 引入了多尺度檢測機制 (Multi-Scale Detection),這個機制能夠說是 YOLO 模型的精華,引入這個機制以前 YOLO 模型的精度很不理想,而引入以後 YOLO 模型達到了接近 Faster-RCNN 的精度,而且速度仍是比 Faster-RCNN 要快。

多尺度檢測機制簡單的來講就是按不一樣的尺度劃分區域,而後再檢測這些不一樣大小的區域是否包含對象,檢測的時候大區域的特徵會混合到小區域中,使得小區域判斷時擁有必定程度的上下文信息。

實現多尺度檢測機制首先要讓 CNN 模型輸出不一樣尺度的特徵,咱們以前已經看過 CNN 模型中的卷積層能夠輸出比原有大小更小的特徵 (參考第 8 篇),例如指定內核大小 (kernel_size) 爲 3,處理間隔 (stride) 爲 2,填充大小 (padding) 爲 1 的時候,輸出大小恰好是輸入大小的一半,把這樣的卷積層放到 CNN 模型的末尾,而後保留各個卷積層的輸出,就能夠得出不一樣尺度的特徵。例如指定 3 個尺度的時候,可能會獲得如下大小的 3 個特徵:

  • 批次大小, 通道數量, 8, 8
  • 批次大小, 通道數量, 4, 4
  • 批次大小, 通道數量, 2, 2

以後再反向處理這三個特徵,首先把 批次大小, 通道數量, 2, 2 交給進一步處理特徵的 CNN 模型,這個模型會讓輸出長寬等於輸入長寬,因此輸出大小和原有大小相同,再擴大特徵到 批次大小, 通道數量, 4, 4,例如:

a b
c d

擴大之後會變爲

a a b b
a a b b
c c d d
c c d d

以後再合併這個特徵到大小爲 批次大小, 通道數量, 4, 4 的特徵,得出 批次大小, 通道數量 * 2, 4, 4 的特徵,把這個特徵交給進一步處理特徵的 CNN 模型,以後的流程就如上圖所示了,最終會得出如下大小的 3 個結果:

  • 批次大小, 形狀數量 * (5 + 分類數量), 8, 8
  • 批次大小, 形狀數量 * (5 + 分類數量), 4, 4
  • 批次大小, 形狀數量 * (5 + 分類數量), 2, 2

變形之後得出:

  • 批次大小, 8 * 8 * 形狀數量, 5 + 分類數量
  • 批次大小, 4 * 4 * 形狀數量, 5 + 分類數量
  • 批次大小, 2 * 2 * 形狀數量, 5 + 分類數量

總結起來,YOLOv3 模型的結構以下圖所示:

YOLO 模型的實現

接下來咱們來看看 YOLO 模型的實現細節,後面會給出完整代碼。注意這篇的實現與官方實現不徹底同樣🤕,這篇會用 Resnet 做爲骨幹網絡,而且會以識別人臉位置爲目標調整參數。

定義錨點 (Anchor)

首先是生成錨點範圍列表,代碼看起來和 Faster-RCNN 使用的差很少:

IMAGE_SIZE = (256, 256) # 縮放圖片的大小

Anchors = None # 錨點列表,包含 錨點數量 * 形狀數量 的範圍
AnchorSpans = (16, 32, 64) # 尺度列表,值爲錨點之間的距離
AnchorAspects = ((1, 1), (1, 2), (2, 1)) # 錨點對應區域的長寬比例列表

def generate_anchors():
    """根據錨點和形狀生成錨點範圍列表"""
    w, h = IMAGE_SIZE
    anchors = []
    for span in AnchorSpans:
        for x in range(0, w, span):
            for y in range(0, h, span):
                xcenter, ycenter = x + span / 2, y + span / 2
                for ratio in AnchorAspects:
                    ww = span * ratio[0]
                    hh = span * ratio[1]
                    xx = xcenter - ww / 2
                    yy = ycenter - hh / 2
                    xx = max(int(xx), 0)
                    yy = max(int(yy), 0)
                    ww = min(int(ww), w - xx)
                    hh = min(int(hh), h - yy)
                    anchors.append((xx, yy, ww, hh))
    return anchors

Anchors = generate_anchors()

但 YOLO 須要分別處理每一個尺度,因此生成的錨點範圍列表會首先按尺度排序,生成出來的結構以下:

[
    尺度1區域1形狀1的範圍,
    尺度1區域1形狀2的範圍,
    尺度1區域1形狀3的範圍,
    尺度1區域2形狀1的範圍,
    尺度1區域2形狀2的範圍,
    尺度1區域2形狀3的範圍,
    ...
    尺度2區域1形狀1的範圍,
    尺度2區域1形狀2的範圍,
    尺度2區域1形狀3的範圍,
    ...
    尺度3區域1形狀1的範圍,
    尺度3區域1形狀2的範圍,
    尺度3區域1形狀3的範圍,
    ...
]

最終會包含 (256/16)^2*3 + (256/32)^2*3 + (256/64)^2*3 = 768 + 192 + 48 = 1008 個錨點範圍。

這篇文章會用 YOLO 模型實現識別人臉位置與是否帶口罩,而人臉的形狀一般接近 1:1,因此下面的代碼會使用如下的參數生成錨點範圍列表:

AnchorSpans = (16, 32, 64) # 尺度列表,值爲錨點之間的距離
AnchorAspects = ((1, 1), (1.5, 1.5)) # 錨點對應區域的長寬比例列表

若是你想用來檢測其餘物體,能夠修改參數使得錨點範圍的形狀更匹配物體形狀,以提高檢測率。

調整區域範圍的算法

在有了錨點範圍以後,咱們還須要決定一個把錨點範圍調整到物體範圍的算法,一共須要四個參數,計算規則以下:

  • 區域偏移 x: 物體的中心點在錨點範圍中的 x 軸位置,0~1 之間
  • 區域偏移 y: 物體的中心點在錨點範圍中的 y 軸位置,0~1 之間
  • 區域偏移 w: log(物體的長度與錨點範圍長度的比例)
  • 區域偏移 h: log(物體的高度與錨點範圍高度的比例)

看起來比較簡單吧😎,須要注意的是這樣調整出來的物體範圍中心點必定會在錨點範圍中,這點跟 Faster-RCNN 使用的算法不同。

如下是計算使用的代碼,註釋中的 "實際區域" 表明物體範圍,"候選區域" 表明錨點範圍。

def calc_box_offset(candidate_box, true_box):
    """計算候選區域與實際區域的偏移值,要求實際區域的中心點必須在候選區域中"""
    # 計算實際區域的中心點在候選區域中的位置,範圍會在 0 ~ 1 之間
    x1, y1, w1, h1 = candidate_box
    x2, y2, w2, h2 = true_box
    x_offset = ((x2 + w2 // 2) - x1) / w1
    y_offset = ((y2 + h2 // 2) - y1) / h1
    # 計算實際區域長寬相對於候選區域長寬的比例,使用 log 減小過大的值
    w_offset = math.log(w2 / w1)
    h_offset = math.log(h2 / h1)
    return (x_offset, y_offset, w_offset, h_offset)

def adjust_box_by_offset(candidate_box, offset):
    """根據偏移值調整候選區域"""
    x1, y1, w1, h1 = candidate_box
    x_offset, y_offset, w_offset, h_offset = offset
    w2 = math.exp(w_offset) * w1
    h2 = math.exp(h_offset) * h1
    x2 = x1 + w1 * x_offset - w2 // 2
    y2 = y1 + h1 * y_offset - h2 // 2
    x2 = min(IMAGE_SIZE[0]-1,  x2)
    y2 = min(IMAGE_SIZE[1]-1,  y2)
    w2 = min(IMAGE_SIZE[0]-x2, w2)
    h2 = min(IMAGE_SIZE[1]-y2, h2)
    return (x2, y2, w2, h2)

生成用於訓練的實際輸出

決定了錨點與調整區域範圍的算法之後,咱們能夠根據訓練使用的數據集生成實際的輸出結果,訓練使用的數據集須要包含:

  • 圖片
  • 包含的對象,能夠有多個
    • 對象的範圍
    • 對象的分類

數據集準備好之後,咱們比對錨點範圍列表與數據集中對象的範圍,而後針對每張圖片的每一個錨點範圍生成如下數據:

  • 是否對象
  • 區域偏移 x
  • 區域偏移 y
  • 區域偏移 w
  • 區域偏移 h
  • 分類 1 的可能性
  • 分類 2 的可能性
  • 分類 3 的可能性

是否對象只有 0 或 1 兩個值,若是錨點範圍包含對象中心而且錨點範圍與對象範圍的重疊率 (IOU) 大於閾值 (例如 30%),則爲 1,不然爲 0。注意若是是否對象爲 0,那麼後面的區域偏移和各個分類的可能性不須要計算 (例如設置爲 0),計算損失的時候也會除掉它們。

四個區域偏移會根據錨點範圍與對象範圍計算,算法參考上面的說明。

各個分類的可能性按對象的分類計算,若是對象的分類爲 "人",而三個分類分別爲 "人 貓 狗" 那麼分類 1 的可能性爲 1,分類 2 與分類 3 的可能性爲 0。此外 YOLO 還支持多分類 (要求計算損失的時候用 BinaryCrossEntropy),若是分類爲 "人 男人 女人 豬 公豬 母豬" 而且對象是 "母豬" 時,那麼各個分類的可能性就是 "0 0 0 1 0 1"。須要注意這裏計算出來的值是供模型學習的,模型學習完之後可能會輸出 "0.9 0.2 0.0" 這樣的浮點數,須要判斷最大的值找出最可能的分類,而且根據值的大小判斷模型對結果有多少把握。

若是你記得前一篇介紹 Faster-RCNN 模型的文章,應該會想到有一個表示 "非對象" 的分類,Faster-RCNN 的區域生成網絡首先會判斷一次是否對象,以後的標籤分類網絡會再次去掉歸爲非對象分類的結果,這樣的作法讓識別的精度提高了不少。然而 YOLO 模型只有單步,原則上是不須要非對象分類的,即便加上非對象分類也不會提高判斷 "是否對象" 的精度。但若是數據量不足,添加非對象分類能夠幫助更好的識別分類。舉個例子,例如圖片中有棕色的貓和紅色的豬,模型可能會判斷棕色的都是貓,紅色的都是豬,但添加非對象分類之後,若是圖片還包含棕色的凳子和紅色的電飯鍋,那麼模型就不會只根據顏色來判斷。所以,下面識別人臉位置的例子會添加非對象分類。

具體的代碼參考後面的 prepare 函數吧🤒。

計算特徵

原始的 YOLO 模型計算特徵使用的是叫作 Darknet 的網絡,這個網絡是 YOLO 做者用 C 語言實現的,算是 YOLO 做者對本身寫的框架的宣傳吧😤。不過只要理解 YOLO 模型的原理,用其餘網絡也能夠實現差很少的效果 (雖然做者爲了刷分作出了不少調整,只是套用其餘網絡的話正確度追不上),這裏我用了目前用的最普遍的 Resnet 模型,代碼以下:

self.previous_channels_out = 4
self.resnet_models = nn.ModuleList([
    nn.Sequential(
        nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
        nn.BatchNorm2d(self.previous_channels_out),
        nn.ReLU(inplace=True),
        self._make_layer(BasicBlock, channels_out=16, num_blocks=2, stride=1),
        self._make_layer(BasicBlock, channels_out=32, num_blocks=2, stride=2),
        self._make_layer(BasicBlock, channels_out=64, num_blocks=2, stride=2),
        self._make_layer(BasicBlock, channels_out=128, num_blocks=2, stride=2),
        self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2)),
    self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2),
    self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2)
])

_make_layerBasicBlock 的代碼和以前文章給出的同樣,你也能夠參考下面的完整代碼。

這裏定義的 resnet_models 包含了三個子模型,第一個模型會輸出維度爲 批次大小,256,圖片寬度/16,圖片高度/16 的結果,第二個模型會接收第一個模型的結果真後輸出維度爲 批次大小,256,圖片寬度/32,圖片高度/32 的結果,第三個模型會接收第二個模型的結果真後輸出維度爲 批次大小,256,圖片寬度/64,圖片高度/64 的結果。這三個結果分別表明把圖片分割爲 16x1632x3264x64 個區域之後,各個區域對應的特徵。

輸出三個特徵的使用的代碼以下:

def forward(self, x):
    features_list = []
    resnet_input = x
    for m in self.resnet_models:
        resnet_input = m(resnet_input)
        features_list.append(resnet_input)

根據特徵預測輸出

上一步咱們得出了三個特徵,接下來就能夠根據這三個特徵預測三個尺度中的各個區域是否包含對象與對象的分類了。流程和上面介紹的同樣,須要分紅三步:

  • 進一步處理特徵 (長寬不變)
  • 擴大特徵長寬,而且合併到下一個尺度 (更細的尺度) 的特徵
  • 判斷是否對象中心與標籤分類

模型代碼:

self.yolo_detectors = nn.ModuleList([
    # 進一步處理特徵
    nn.ModuleList([nn.Sequential(
        nn.Conv2d(256 if index == 0 else 512, 256, kernel_size=1, stride=1, padding=0, bias=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=True),
        nn.ReLU(inplace=True)),
    # 擴大特徵長寬
    nn.Upsample(scale_factor=2, mode="nearest"),
    # 判斷是否對象中心與標籤分類
    nn.Sequential(
        nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1, bias=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, MyModel.AnchorTotalOutputs, kernel_size=1, stride=1, padding=0, bias=True))])
    for index in range(len(self.resnet_models))
])

"判斷是否對象中心與標籤分類" 的部分能夠用 CNN 模型也能夠用線性模型,多個不改變長寬的卷積層組合起來能夠作到與多層線性模型接近的效果。若是用 CNN 模型能夠把維度是 (B, C, W, H) 的輸入轉換到維度是 (B, O, W, H) 的結果,若是用線性模型則須要先把輸入變形到 (B*W*H, C) 而後再經過線性模型轉換到維度是 (B*W*H, O) 的結果,再變形到 (B, O, W, H)。前一篇文章介紹的 Faster-RCNN 實現用了線性模型,而這篇使用 CNN 模型,原則上用哪一種均可以🤒。

處理特徵的代碼:

previous_upsampled_feature = None
outputs = []
for index, feature in enumerate(reversed(features_list)):
    if previous_upsampled_feature is not None:
        # 合併大的錨點距離抽取的特徵到小的錨點距離抽取的特徵
        feature = torch.cat((feature, previous_upsampled_feature), dim=1)
    # 計算用於合併的特徵
    hidden = self.yolo_detectors[index][0](feature)
    # 放大特徵 (用於下一次處理時合併)
    upsampled = self.yolo_detectors[index][1](hidden)
    # 計算最終的預測輸出
    output = self.yolo_detectors[index][2](hidden)
    previous_upsampled_feature = upsampled
    outputs.append(output)

以後 outputs 會包含三個結果,維度是 (批次大小, (5+分類數量)*形狀數量, 尺度對應的寬度, 尺度對應的高度),把這三個結果連起來數量會恰好等於以前生成的錨點數量。鏈接三個結果的代碼以下,注意順序須要與生成錨點時使用的順序同樣,這樣鏈接後的結果和錨點範圍就能夠有一對一的關係。

outputs_flatten = []
# 前面處理特徵的時候用了 reversed,這裏須要再次用 reversed 把順序調換回來
# 調換之後的三個結果順序應該與 AnchorSpans 一致
for output in reversed(outputs):
    # 變形到 (批次大小, 尺度對應的寬度, 尺度對應的高度, (5+分類數量)*形狀數量)
    output = output.permute(0, 2, 3, 1)
    # 變形到 (批次大小, 寬度*高度*形狀數量, 5+分類數量)
    # 生成錨點時使用的順序是 寬度 => 高度 => 形狀
    output = output.reshape(output.shape[0], -1, MyModel.AnchorOutputs)
    outputs_flatten.append(output)
# 鏈接之後維度是 (批次大小, 尺度數量*寬度*高度*形狀數量, 5+分類數量)
# 即 (批次大小, 錨點數量, 5+分類數量)
outputs_all = torch.cat(outputs_flatten, dim=1)

在返回 outputs_all 以前,還須要用 sigmoid 來讓是否對象中心與各個分類的可能性對應的值落在 0 ~ 1 之間。注意部分 YOLO 的實現會用 sigmoid 來處理區域偏移 x 和區域偏移 y,由於這兩個值也應該落在 0 ~ 1 之間,但我我的認爲 sigmoid 只適合處理預期結果是二進制 (0 或 1) 的值,而區域偏移預期結果平均分佈在 0 ~ 1 之間,不能起到歸併的做用,效果會跟 hardtanh 差很少。

# 是否對象中心應該在 0 ~ 1 之間,使用 sigmoid 處理
outputs_all[:,:,:1] = self.sigmoid(outputs_all[:,:,:1])
# 分類應該在 0 ~ 1 之間,使用 sigmoid 處理
outputs_all[:,:,5:] = self.sigmoid(outputs_all[:,:,5:])

處理完之後,outputs_all 就是 YOLO 模型返回的結果了,它在訓練的時候會用於計算損失並調整參數,在實際預測的時候會配合以前生成的錨點列表得出包含對象的區域與對象分類,並標記到圖片或者視頻上。

計算損失

又到計算損失的時間了😩,YOLO 的預測輸出和實際輸出維度是同樣的,但咱們不能只用一個損失函數來計算它們,YOLO 一樣須要計算多個損失併合並它們。

首先咱們須要區分正樣本 (包含對象中心的區域) 和負樣本 (不包含對象中心的區域),方法在前面也提到過了:

  • 正樣本:包含對象中心而且重疊率大於某個閾值
  • 負樣本:不包含對象中心而且與任意對象的重疊率均小於某個閾值

負樣本要求重疊率低於閾值是爲了照顧對象中心很是接近區域邊緣的對象,這時模型很難判斷對象中心具體在哪一個區域,把這些樣本從負樣本中排除掉能夠幫助模型更容易的學習,最終模型能夠判斷對象中心在相鄰的兩個區域但不會被調整。

YOLO 模型會計算與合併如下的損失:

  • 正樣本的是否對象中心,使用 MSELoss
  • 負樣本的是否對象中心 * 0.5,使用 MSELoss
    • 由於大部分區域不包含對象中心,這裏乘以 0.5 以減小負樣本的損失對調整參數的影響
  • 正樣本的區域偏移,使用 MSELoss
    • 非正樣本的區域偏移會被忽略,計算起來沒意義
  • 正樣本的標籤分類損失,使用 BCELoss
    • BinaryCrossEntropy 損失函數支持多分類,雖然本篇的例子只有單分類
  • 若是有非對象分類,則計算負樣本的標籤分類損失,使用 BCELoss
    • 若是不使用非對象分類,則不須要計算

具體計算代碼以下:

def loss_function(predicted, actual):
    """YOLO 使用的多任務損失計算器"""
    result_tensor, result_isobject_masks, result_nonobject_masks = actual
    objectness_losses = []
    offsets_losses = []
    labels_losses = []
    for x in range(result_tensor.shape[0]):
        mask_positive = result_isobject_masks[x]
        mask_negative = result_nonobject_masks[x]
        # 計算是否對象中心的損失,分別針對正負樣本計算
        # 由於大部分區域不包含對象中心,這裏減小負樣本的損失對調整參數的影響
        objectness_loss_positive = nn.functional.mse_loss(
            predicted[x,mask_positive,0], result_tensor[x,mask_positive,0])
        objectness_loss_negative = nn.functional.mse_loss(
            predicted[x,mask_negative,0], result_tensor[x,mask_negative,0]) * 0.5
        objectness_losses.append(objectness_loss_positive)
        objectness_losses.append(objectness_loss_negative)
        # 計算區域偏移的損失,只針對正樣本計算
        offsets_loss = nn.functional.mse_loss(
            predicted[x,mask_positive,1:5], result_tensor[x,mask_positive,1:5])
        offsets_losses.append(offsets_loss)
        # 計算標籤分類的損失,分別針對正負樣本計算
        labels_loss_positive = nn.functional.binary_cross_entropy(
            predicted[x,mask_positive,5:], result_tensor[x,mask_positive,5:])
        labels_loss_negative = nn.functional.binary_cross_entropy(
            predicted[x,mask_negative,5:], result_tensor[x,mask_negative,5:]) * 0.5
        labels_losses.append(labels_loss_positive)
        labels_losses.append(labels_loss_negative)
    loss = (
        torch.mean(torch.stack(objectness_losses)) +
        torch.mean(torch.stack(offsets_losses)) +
        torch.mean(torch.stack(labels_losses)))
    return loss

合併結果區域

最後就是把 YOLO 模型返回的預測結果轉換到具體的區域列表了,算法是前幾篇介紹過的 NMS 算法,代碼以下:

ObjScoreThreshold = 0.9 # 認爲是對象中心所須要的最小分數
IOUMergeThreshold = 0.3 # 判斷是否應該合併重疊區域的重疊率閾值

def convert_predicted_result(predicted):
    """轉換預測結果到 (標籤, 區域, 對象中心分數, 標籤識別分數) 的列表,重疊區域使用 NMS 算法合併"""
    # 記錄重疊的結果區域, 結果是 [ [(標籤, 區域, RPN 分數, 標籤識別分數)], ... ]
    final_result = []
    for anchor, tensor in zip(Anchors, predicted):
        obj_score = tensor[0].item()
        if obj_score <= ObjScoreThreshold:
            # 要求對象中心分數超過必定值
            continue
        offset = tensor[1:5].tolist()
        offset[0] = max(min(offset[0], 1), 0) # 中心點 x 的偏移應該在 0 ~ 1 之間
        offset[1] = max(min(offset[1], 1), 0) # 中心點 y 的偏移應該在 0 ~ 1 之間
        box = adjust_box_by_offset(anchor, offset)
        label_max = tensor[5:].max(dim=0)
        cls_score = label_max.values.item()
        label = label_max.indices.item()
        if label == 0:
            # 跳過非對象分類
            continue
        for index in range(len(final_result)):
            exists_results = final_result[index]
            if any(calc_iou(box, r[1]) > IOUMergeThreshold for r in exists_results):
                exists_results.append((label, box, obj_score, cls_score))
                break
        else:
            final_result.append([(label, box, obj_score, cls_score)])
    # 合併重疊的結果區域 (使用 對象中心分數 * 標籤識別分數 最高的區域爲結果區域)
    for index in range(len(final_result)):
        exists_results = final_result[index]
        exists_results.sort(key=lambda r: r[2]*r[3])
        final_result[index] = exists_results[-1]
    return final_result

這篇的例子用了非對象分類,因此會跳過非對象分類的區域,若是不使用則不須要這樣處理。

YOLO 模型的論文

若是你想看原始的 YOLO 論文能夠點下面的連接🤒,很難喔:

使用 YOLO 模型識別人臉位置與是否戴口罩

接下來咱們用 YOLO 模型把沒帶口罩的傢伙抓出來吧🤗,和上一篇同樣會用兩個數據集。

https://www.kaggle.com/andrewmvd/face-mask-detection

這個數據集包含了 853 張圖片 (部分圖片沒有使用),其中各個分類的數量以下:

  • 戴口罩的區域 (with_mask): 3232 個
  • 不戴口罩的區域 (without_mask): 717 個
  • 帶了口罩但姿式不正確的區域 (mask_weared_incorrect): 123 個

由於帶了口罩但姿式不正確的樣本數量不多,因此都歸到戴口罩裏面去😠。

https://www.kaggle.com/vin1234/count-the-number-of-faces-present-in-an-image

這個數據集一共有 24533 個區域,都是不戴口罩的。

加起來數量以下:

  • 戴口罩的區域 (with_mask): 3232+123=3355 個
  • 不戴口罩的區域 (without_mask): 717+24533 = 25250 個

使用這個數據集訓練,而且訓練成功之後使用模型識別圖片或視頻的完整代碼以下:

import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import math
import pandas
import json
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
from torch import nn
from matplotlib import pyplot
from collections import defaultdict
from collections import deque
import xml.etree.cElementTree as ET

# 縮放圖片的大小
IMAGE_SIZE = (256, 192)
# 訓練使用的數據集路徑
DATASET_1_IMAGE_DIR = "./archive/images"
DATASET_1_ANNOTATION_DIR = "./archive/annotations"
DATASET_2_IMAGE_DIR = "./784145_1347673_bundle_archive/train/image_data"
DATASET_2_BOX_CSV_PATH = "./784145_1347673_bundle_archive/train/bbox_train.csv"
# 分類列表
# YOLO 原則上不須要 other 分類,但實測中添加這個分類有助於提高標籤分類的精確度
CLASSES = [ "other", "with_mask", "without_mask" ]
CLASSES_MAPPING = { c: index for index, c in enumerate(CLASSES) }
# 判斷是否存在對象使用的區域重疊率的閾值 (另外要求對象中心在區域內)
IOU_POSITIVE_THRESHOLD = 0.30
IOU_NEGATIVE_THRESHOLD = 0.30

# 用於啓用 GPU 支持
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class BasicBlock(nn.Module):
    """ResNet 使用的基礎塊"""
    expansion = 1 # 定義這個塊的實際出通道是 channels_out 的幾倍,這裏的實現固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷積層
        # 處理間隔 stride = 1 時,輸出的長寬會等於輸入的長寬,例如 (32-3+2)//1+1 == 32
        # 處理間隔 stride = 2 時,輸出的長寬會等於輸入的長寬的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷積層不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定義一個讓輸出和輸入維度相同的 3x3 卷積層
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 讓原始輸入和輸出相加的時候,須要維度一致,若是維度不一致則須要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp, inplace=True)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp, inplace=True)
        return y

class MyModel(nn.Module):
    """YOLO (基於 ResNet 的變種)"""
    Anchors = None # 錨點列表,包含 錨點數量 * 形狀數量 的範圍
    AnchorSpans = (16, 32, 64) # 尺度列表,值爲錨點之間的距離
    AnchorAspects = ((1, 1), (1.5, 1.5)) # 錨點對應區域的長寬比例列表
    AnchorOutputs = 1 + 4 + len(CLASSES) # 每一個錨點範圍對應的輸出數量,是否對象中心 (1) + 區域偏移 (4) + 分類數量
    AnchorTotalOutputs = AnchorOutputs * len(AnchorAspects) # 每一個錨點對應的輸出數量
    ObjScoreThreshold = 0.9 # 認爲是對象中心所須要的最小分數
    IOUMergeThreshold = 0.3 # 判斷是否應該合併重疊區域的重疊率閾值

    def __init__(self):
        super().__init__()
        # 抽取圖片特徵的 ResNet
        # 由於錨點距離有三個,這裏最後會輸出各個錨點距離對應的特徵
        self.previous_channels_out = 4
        self.resnet_models = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
                nn.BatchNorm2d(self.previous_channels_out),
                nn.ReLU(inplace=True),
                self._make_layer(BasicBlock, channels_out=16, num_blocks=2, stride=1),
                self._make_layer(BasicBlock, channels_out=32, num_blocks=2, stride=2),
                self._make_layer(BasicBlock, channels_out=64, num_blocks=2, stride=2),
                self._make_layer(BasicBlock, channels_out=128, num_blocks=2, stride=2),
                self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2)),
            self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2),
            self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2)
        ])
        # 根據各個錨點距離對應的特徵預測輸出的卷積層
        # 大的錨點距離抽取的特徵會合併到小的錨點距離抽取的特徵
        # 這裏的三個子模型意義分別是:
        # - 計算用於合併的特徵
        # - 放大特徵
        # - 計算最終的預測輸出
        self.yolo_detectors = nn.ModuleList([
            nn.ModuleList([nn.Sequential(
                nn.Conv2d(256 if index == 0 else 512, 256, kernel_size=1, stride=1, padding=0, bias=True),
                nn.ReLU(inplace=True),
                nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),
                nn.ReLU(inplace=True),
                nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=True),
                nn.ReLU(inplace=True)),
            nn.Upsample(scale_factor=2, mode="nearest"),
            nn.Sequential(
                nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),
                nn.ReLU(inplace=True),
                nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1, bias=True),
                nn.ReLU(inplace=True),
                nn.Conv2d(256, MyModel.AnchorTotalOutputs, kernel_size=1, stride=1, padding=0, bias=True))])
            for index in range(len(self.resnet_models))
        ])
        # 處理結果範圍的函數
        self.sigmoid = nn.Sigmoid()

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        """建立 resnet 使用的層"""
        blocks = []
        # 添加第一個塊
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩餘的塊,剩餘的塊固定處理間隔爲 1,不會改變長寬
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    @staticmethod
    def _generate_anchors():
        """根據錨點和形狀生成錨點範圍列表"""
        w, h = IMAGE_SIZE
        anchors = []
        for span in MyModel.AnchorSpans:
            for x in range(0, w, span):
                for y in range(0, h, span):
                    xcenter, ycenter = x + span / 2, y + span / 2
                    for ratio in MyModel.AnchorAspects:
                        ww = span * ratio[0]
                        hh = span * ratio[1]
                        xx = xcenter - ww / 2
                        yy = ycenter - hh / 2
                        xx = max(int(xx), 0)
                        yy = max(int(yy), 0)
                        ww = min(int(ww), w - xx)
                        hh = min(int(hh), h - yy)
                        anchors.append((xx, yy, ww, hh))
        return anchors

    def forward(self, x):
        # 抽取各個錨點距離對應的特徵
        # 維度分別是:
        # torch.Size([16, 256, 16, 12])
        # torch.Size([16, 256, 8, 6])
        # torch.Size([16, 256, 4, 3])
        features_list = []
        resnet_input = x
        for m in self.resnet_models:
            resnet_input = m(resnet_input)
            features_list.append(resnet_input)
        # 根據特徵預測輸出
        # 維度分別是:
        # torch.Size([16, 16, 4, 3])
        # torch.Size([16, 16, 8, 6])
        # torch.Size([16, 16, 16, 12])
        # 16 是 (5 + 分類3) * 形狀2
        previous_upsampled_feature = None
        outputs = []
        for index, feature in enumerate(reversed(features_list)):
            if previous_upsampled_feature is not None:
                # 合併大的錨點距離抽取的特徵到小的錨點距離抽取的特徵
                feature = torch.cat((feature, previous_upsampled_feature), dim=1)
            # 計算用於合併的特徵
            hidden = self.yolo_detectors[index][0](feature)
            # 放大特徵 (用於下一次處理時合併)
            upsampled = self.yolo_detectors[index][1](hidden)
            # 計算最終的預測輸出
            output = self.yolo_detectors[index][2](hidden)
            previous_upsampled_feature = upsampled
            outputs.append(output)
        # 鏈接全部輸出
        # 注意順序須要與 Anchors 一致
        outputs_flatten = []
        for output in reversed(outputs):
            output = output.permute(0, 2, 3, 1)
            output = output.reshape(output.shape[0], -1, MyModel.AnchorOutputs)
            outputs_flatten.append(output)
        outputs_all = torch.cat(outputs_flatten, dim=1)
        # 是否對象中心應該在 0 ~ 1 之間,使用 sigmoid 處理
        outputs_all[:,:,:1] = self.sigmoid(outputs_all[:,:,:1])
        # 分類應該在 0 ~ 1 之間,使用 sigmoid 處理
        outputs_all[:,:,5:] = self.sigmoid(outputs_all[:,:,5:])
        return outputs_all

    @staticmethod
    def loss_function(predicted, actual):
        """YOLO 使用的多任務損失計算器"""
        result_tensor, result_isobject_masks, result_nonobject_masks = actual
        objectness_losses = []
        offsets_losses = []
        labels_losses = []
        for x in range(result_tensor.shape[0]):
            mask_positive = result_isobject_masks[x]
            mask_negative = result_nonobject_masks[x]
            # 計算是否對象中心的損失,分別針對正負樣本計算
            # 由於大部分區域不包含對象中心,這裏減小負樣本的損失對調整參數的影響
            objectness_loss_positive = nn.functional.mse_loss(
                predicted[x,mask_positive,0], result_tensor[x,mask_positive,0])
            objectness_loss_negative = nn.functional.mse_loss(
                predicted[x,mask_negative,0], result_tensor[x,mask_negative,0]) * 0.5
            objectness_losses.append(objectness_loss_positive)
            objectness_losses.append(objectness_loss_negative)
            # 計算區域偏移的損失,只針對正樣本計算
            offsets_loss = nn.functional.mse_loss(
                predicted[x,mask_positive,1:5], result_tensor[x,mask_positive,1:5])
            offsets_losses.append(offsets_loss)
            # 計算標籤分類的損失,分別針對正負樣本計算
            labels_loss_positive = nn.functional.binary_cross_entropy(
                predicted[x,mask_positive,5:], result_tensor[x,mask_positive,5:])
            labels_loss_negative = nn.functional.binary_cross_entropy(
                predicted[x,mask_negative,5:], result_tensor[x,mask_negative,5:]) * 0.5
            labels_losses.append(labels_loss_positive)
            labels_losses.append(labels_loss_negative)
        loss = (
            torch.mean(torch.stack(objectness_losses)) +
            torch.mean(torch.stack(offsets_losses)) +
            torch.mean(torch.stack(labels_losses)))
        return loss

    @staticmethod
    def calc_accuracy(actual, predicted):
        """YOLO 使用的正確率計算器,這裏只計算是否對象中心與標籤分類的正確率,區域偏移不計算"""
        result_tensor, result_isobject_masks, result_nonobject_masks = actual
        # 計算是否對象中心的正確率,正樣本和負樣本的正確率分別計算再平均
        a = result_tensor[:,:,0]
        p = predicted[:,:,0] > MyModel.ObjScoreThreshold
        obj_acc_positive = ((a == 1) & (p == 1)).sum().item() / ((a == 1).sum().item() + 0.00001)
        obj_acc_negative = ((a == 0) & (p == 0)).sum().item() / ((a == 0).sum().item() + 0.00001)
        obj_acc = (obj_acc_positive + obj_acc_negative) / 2
        # 計算標籤分類的正確率
        cls_total = 0
        cls_correct = 0
        for x in range(result_tensor.shape[0]):
            mask = list(sorted(result_isobject_masks[x] + result_nonobject_masks[x]))
            actual_classes = result_tensor[x,mask,5:].max(dim=1).indices
            predicted_classes = predicted[x,mask,5:].max(dim=1).indices
            cls_total += len(mask)
            cls_correct += (actual_classes == predicted_classes).sum().item()
        cls_acc = cls_correct / cls_total
        return obj_acc, cls_acc

    @staticmethod
    def convert_predicted_result(predicted):
        """轉換預測結果到 (標籤, 區域, 對象中心分數, 標籤識別分數) 的列表,重疊區域使用 NMS 算法合併"""
        # 記錄重疊的結果區域, 結果是 [ [(標籤, 區域, RPN 分數, 標籤識別分數)], ... ]
        final_result = []
        for anchor, tensor in zip(MyModel.Anchors, predicted):
            obj_score = tensor[0].item()
            if obj_score <= MyModel.ObjScoreThreshold:
                # 要求對象中心分數超過必定值
                continue
            offset = tensor[1:5].tolist()
            offset[0] = max(min(offset[0], 1), 0) # 中心點 x 的偏移應該在 0 ~ 1 之間
            offset[1] = max(min(offset[1], 1), 0) # 中心點 y 的偏移應該在 0 ~ 1 之間
            box = adjust_box_by_offset(anchor, offset)
            label_max = tensor[5:].max(dim=0)
            cls_score = label_max.values.item()
            label = label_max.indices.item()
            if label == 0:
                # 跳過非對象分類
                continue
            for index in range(len(final_result)):
                exists_results = final_result[index]
                if any(calc_iou(box, r[1]) > MyModel.IOUMergeThreshold for r in exists_results):
                    exists_results.append((label, box, obj_score, cls_score))
                    break
            else:
                final_result.append([(label, box, obj_score, cls_score)])
        # 合併重疊的結果區域 (使用 對象中心分數 * 標籤識別分數 最高的區域爲結果區域)
        for index in range(len(final_result)):
            exists_results = final_result[index]
            exists_results.sort(key=lambda r: r[2]*r[3])
            final_result[index] = exists_results[-1]
        return final_result

    @staticmethod
    def fix_predicted_result_from_history(cls_result, history_results):
        """根據歷史結果減小預測結果中的誤判,適用於視頻識別,history_results 應爲指定了 maxlen 的 deque"""
        # 要求歷史結果中 50% 以上存在相似區域,而且選取歷史結果中最多的分類
        history_results.append(cls_result)
        final_result = []
        if len(history_results) < history_results.maxlen:
            # 歷史結果不足,不返回任何識別結果
            return final_result
        for label, box, rpn_score, cls_score in cls_result:
            # 查找歷史中的近似區域
            similar_results = []
            for history_result in history_results:
                history_result = [(calc_iou(r[1], box), r) for r in history_result]
                history_result.sort(key = lambda r: r[0])
                if history_result and history_result[-1][0] > MyModel.IOUMergeThreshold:
                    similar_results.append(history_result[-1][1])
            # 判斷近似區域數量是否過半
            if len(similar_results) < history_results.maxlen // 2:
                continue
            # 選取歷史結果中最多的分類
            cls_groups = defaultdict(lambda: [])
            for r in similar_results:
                cls_groups[r[0]].append(r)
            most_common = sorted(cls_groups.values(), key=len)[-1]
            # 添加最多的分類中的最新的結果
            final_result.append(most_common[-1])
        return final_result

MyModel.Anchors = MyModel._generate_anchors()

def save_tensor(tensor, path):
    """保存 tensor 對象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """從文件讀取 tensor 對象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def calc_resize_parameters(sw, sh):
    """計算縮放圖片的參數"""
    sw_new, sh_new = sw, sh
    dw, dh = IMAGE_SIZE
    pad_w, pad_h = 0, 0
    if sw / sh < dw / dh:
        sw_new = int(dw / dh * sh)
        pad_w = (sw_new - sw) // 2 # 填充左右
    else:
        sh_new = int(dh / dw * sw)
        pad_h = (sh_new - sh) // 2 # 填充上下
    return sw_new, sh_new, pad_w, pad_h

def resize_image(img):
    """縮放圖片,比例不一致時填充"""
    sw, sh = img.size
    sw_new, sh_new, pad_w, pad_h = calc_resize_parameters(sw, sh)
    img_new = Image.new("RGB", (sw_new, sh_new))
    img_new.paste(img, (pad_w, pad_h))
    img_new = img_new.resize(IMAGE_SIZE)
    return img_new

def image_to_tensor(img):
    """轉換圖片對象到 tensor 對象"""
    arr = numpy.asarray(img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 轉換維度 H,W,C 到 C,W,H
    t = t / 255.0 # 正規化數值使得範圍在 0 ~ 1
    return t

def map_box_to_resized_image(box, sw, sh):
    """把原始區域轉換到縮放後的圖片對應的區域"""
    x, y, w, h = box
    sw_new, sh_new, pad_w, pad_h = calc_resize_parameters(sw, sh)
    scale = IMAGE_SIZE[0] / sw_new
    x = int((x + pad_w) * scale)
    y = int((y + pad_h) * scale)
    w = int(w * scale)
    h = int(h * scale)
    if x + w > IMAGE_SIZE[0] or y + h > IMAGE_SIZE[1] or w == 0 or h == 0:
        return 0, 0, 0, 0
    return x, y, w, h

def map_box_to_original_image(box, sw, sh):
    """把縮放後圖片對應的區域轉換到縮放前的原始區域"""
    x, y, w, h = box
    sw_new, sh_new, pad_w, pad_h = calc_resize_parameters(sw, sh)
    scale = IMAGE_SIZE[0] / sw_new
    x = int(x / scale - pad_w)
    y = int(y / scale - pad_h)
    w = int(w / scale)
    h = int(h / scale)
    if x + w > sw or y + h > sh or x < 0 or y < 0 or w == 0 or h == 0:
        return 0, 0, 0, 0
    return x, y, w, h

def calc_iou(rect1, rect2):
    """計算兩個區域重疊部分 / 合併部分的比率 (intersection over union)"""
    x1, y1, w1, h1 = rect1
    x2, y2, w2, h2 = rect2
    xi = max(x1, x2)
    yi = max(y1, y2)
    wi = min(x1+w1, x2+w2) - xi
    hi = min(y1+h1, y2+h2) - yi
    if wi > 0 and hi > 0: # 有重疊部分
        area_overlap = wi*hi
        area_all = w1*h1 + w2*h2 - area_overlap
        iou = area_overlap / area_all
    else: # 沒有重疊部分
        iou = 0
    return iou

def calc_box_offset(candidate_box, true_box):
    """計算候選區域與實際區域的偏移值,要求實際區域的中心點必須在候選區域中"""
    # 計算實際區域的中心點在候選區域中的位置,範圍會在 0 ~ 1 之間
    x1, y1, w1, h1 = candidate_box
    x2, y2, w2, h2 = true_box
    x_offset = ((x2 + w2 // 2) - x1) / w1
    y_offset = ((y2 + h2 // 2) - y1) / h1
    # 計算實際區域長寬相對於候選區域長寬的比例,使用 log 減小過大的值
    w_offset = math.log(w2 / w1)
    h_offset = math.log(h2 / h1)
    return (x_offset, y_offset, w_offset, h_offset)

def adjust_box_by_offset(candidate_box, offset):
    """根據偏移值調整候選區域"""
    x1, y1, w1, h1 = candidate_box
    x_offset, y_offset, w_offset, h_offset = offset
    w2 = math.exp(w_offset) * w1
    h2 = math.exp(h_offset) * h1
    x2 = x1 + w1 * x_offset - w2 // 2
    y2 = y1 + h1 * y_offset - h2 // 2
    x2 = min(IMAGE_SIZE[0]-1,  x2)
    y2 = min(IMAGE_SIZE[1]-1,  y2)
    w2 = min(IMAGE_SIZE[0]-x2, w2)
    h2 = min(IMAGE_SIZE[1]-y2, h2)
    return (x2, y2, w2, h2)

def prepare_save_batch(batch, image_tensors, result_tensors, result_isobject_masks, result_nonobject_masks):
    """準備訓練 - 保存單個批次的數據"""
    # 按索引值列表生成輸入和輸出 tensor 對象的函數
    def split_dataset(indices):
        indices_list = indices.tolist()
        image_tensors_splited = torch.stack([image_tensors[x] for x in indices_list])
        result_tensors_splited = torch.stack([result_tensors[x] for x in indices_list])
        result_isobject_masks_splited = [result_isobject_masks[x] for x in indices_list]
        result_nonobject_masks_splited = [result_nonobject_masks[x] for x in indices_list]
        return image_tensors_splited, (
            result_tensors_splited, result_isobject_masks_splited, result_nonobject_masks_splited)

    # 切分訓練集 (80%),驗證集 (10%) 和測試集 (10%)
    random_indices = torch.randperm(len(image_tensors))
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = split_dataset(training_indices)
    validating_set = split_dataset(validating_indices)
    testing_set = split_dataset(testing_indices)

    # 保存到硬盤
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """準備訓練"""
    # 數據集轉換到 tensor 之後會保存在 data 文件夾下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 加載圖片和圖片對應的區域與分類列表
    # { (路徑, 是否左右翻轉): [ 區域與分類, 區域與分類, .. ] }
    # 同一張圖片左右翻轉能夠生成一個新的數據,讓數據量翻倍
    box_map = defaultdict(lambda: [])
    for filename in os.listdir(DATASET_1_IMAGE_DIR):
        # 從第一個數據集加載
        xml_path = os.path.join(DATASET_1_ANNOTATION_DIR, filename.split(".")[0] + ".xml")
        if not os.path.isfile(xml_path):
            continue
        tree = ET.ElementTree(file=xml_path)
        objects = tree.findall("object")
        path = os.path.join(DATASET_1_IMAGE_DIR, filename)
        for obj in objects:
            class_name = obj.find("name").text
            x1 = int(obj.find("bndbox/xmin").text)
            x2 = int(obj.find("bndbox/xmax").text)
            y1 = int(obj.find("bndbox/ymin").text)
            y2 = int(obj.find("bndbox/ymax").text)
            if class_name == "mask_weared_incorrect":
                # 佩戴口罩不正確的樣本數量太少 (只有 123),模型沒法學習,這裏全合併到戴口罩的樣本
                class_name = "with_mask"
            box_map[(path, False)].append((x1, y1, x2-x1, y2-y1, CLASSES_MAPPING[class_name]))
            box_map[(path, True)].append((x1, y1, x2-x1, y2-y1, CLASSES_MAPPING[class_name]))
    df = pandas.read_csv(DATASET_2_BOX_CSV_PATH)
    for row in df.values:
        # 從第二個數據集加載,這個數據集只包含沒有帶口罩的圖片
        filename, width, height, x1, y1, x2, y2 = row[:7]
        path = os.path.join(DATASET_2_IMAGE_DIR, filename)
        box_map[(path, False)].append((x1, y1, x2-x1, y2-y1, CLASSES_MAPPING["without_mask"]))
        box_map[(path, True)].append((x1, y1, x2-x1, y2-y1, CLASSES_MAPPING["without_mask"]))
    # 打亂數據集 (由於第二個數據集只有不戴口罩的圖片)
    box_list = list(box_map.items())
    random.shuffle(box_list)
    print(f"found {len(box_list)} images")

    # 保存圖片和圖片對應的分類與區域列表
    batch_size = 20
    batch = 0
    image_tensors = [] # 圖片列表
    result_tensors = [] # 圖片對應的輸出結果列表,包含 [ 是否對象中心, 區域偏移, 各個分類的可能性 ]
    result_isobject_masks = [] # 各個圖片的包含對象的區域在 Anchors 中的索引
    result_nonobject_masks = [] # 各個圖片不包含對象的區域在 Anchors 中的索引 (重疊率低於閾值的區域)
    for (image_path, flip), original_boxes_labels in box_list:
        with Image.open(image_path) as img_original: # 加載原始圖片
            sw, sh = img_original.size # 原始圖片大小
            if flip:
                img = resize_image(img_original.transpose(Image.FLIP_LEFT_RIGHT)) # 翻轉而後縮放圖片
            else:
                img = resize_image(img_original) # 縮放圖片
            image_tensors.append(image_to_tensor(img)) # 添加圖片到列表
        # 生成輸出結果的 tensor
        result_tensor = torch.zeros((len(MyModel.Anchors), MyModel.AnchorOutputs), dtype=torch.float)
        result_tensor[:,5] = 1 # 默認分類爲 other
        result_tensors.append(result_tensor)
        # 包含對象的區域在 Anchors 中的索引
        result_isobject_mask = []
        result_isobject_masks.append(result_isobject_mask)
        # 不包含對象的區域在 Anchors 中的索引
        result_nonobject_mask = []
        result_nonobject_masks.append(result_nonobject_mask)
        # 根據真實區域定位所屬的錨點,而後設置輸出結果
        negative_mapping = [1] * len(MyModel.Anchors)
        for box_label in original_boxes_labels:
            x, y, w, h, label = box_label
            if flip: # 翻轉座標
                x = sw - x - w
            x, y, w, h = map_box_to_resized_image((x, y, w, h), sw, sh) # 縮放實際區域
            if w < 20 or h < 20:
                continue # 縮放後區域太小
            # 檢查計算是否有問題
            # child_img = img.copy().crop((x, y, x+w, y+h))
            # child_img.save(f"{os.path.basename(image_path)}_{x}_{y}_{w}_{h}_{label}.png")
            # 定位所屬的錨點
            # 要求:
            # - 中心點落在錨點對應的區域中
            # - 重疊率超過必定值
            x_center = x + w // 2
            y_center = y + h // 2
            matched_anchors = []
            for index, anchor in enumerate(MyModel.Anchors):
                ax, ay, aw, ah = anchor
                is_center = (x_center >= ax and x_center < ax + aw and
                    y_center >= ay and y_center < ay + ah)
                iou = calc_iou(anchor, (x, y, w, h))
                if is_center and iou > IOU_POSITIVE_THRESHOLD:
                    matched_anchors.append((index, anchor)) # 區域包含對象中心而且重疊率超過必定值
                    negative_mapping[index] = 0
                elif iou > IOU_NEGATIVE_THRESHOLD:
                    negative_mapping[index] = 0 # 區域與某個對象重疊率超過必定值,不該該看成負樣本
            for matched_index, matched_box in matched_anchors:
                # 計算區域偏移
                offset = calc_box_offset(matched_box, (x, y, w, h))
                # 修改輸出結果的 tensor
                result_tensor[matched_index] = torch.tensor((
                    1, # 是否對象中心
                    *offset, # 區域偏移
                    *[int(c == label) for c in range(len(CLASSES))] # 對應分類
                ), dtype=torch.float)
                # 添加索引值
                # 注意若是兩個對象同時定位到相同的錨點,那麼只有一個對象能夠被識別,這裏後面的對象會覆蓋前面的對象
                if matched_index not in result_isobject_mask:
                    result_isobject_mask.append(matched_index)
        # 沒有找到可識別的對象時跳過圖片
        if not result_isobject_mask:
            image_tensors.pop()
            result_tensors.pop()
            result_isobject_masks.pop()
            result_nonobject_masks.pop()
            continue
        # 添加不包含對象的區域在 Anchors 中的索引
        for index, value in enumerate(negative_mapping):
            if value:
                result_nonobject_mask.append(index)
        # 排序索引列表
        result_isobject_mask.sort()
        # 保存批次
        if len(image_tensors) >= batch_size:
            prepare_save_batch(batch, image_tensors, result_tensors,
                result_isobject_masks, result_nonobject_masks)
            image_tensors.clear()
            result_tensors.clear()
            result_isobject_masks.clear()
            result_nonobject_masks.clear()
            batch += 1
    # 保存剩餘的批次
    if len(image_tensors) > 10:
        prepare_save_batch(batch, image_tensors, result_tensors,
            result_isobject_masks, result_nonobject_masks)

def train():
    """開始訓練"""
    # 建立模型實例
    model = MyModel().to(device)

    # 建立多任務損失計算器
    loss_function = MyModel.loss_function

    # 建立參數調整器
    optimizer = torch.optim.Adam(model.parameters())

    # 記錄訓練集和驗證集的正確率變化
    training_obj_accuracy_history = []
    training_cls_accuracy_history = []
    validating_obj_accuracy_history = []
    validating_cls_accuracy_history = []

    # 記錄最高的驗證集正確率
    validating_obj_accuracy_highest = -1
    validating_cls_accuracy_highest = -1
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 讀取批次的工具函數
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            x, (y, mask1, mask2) = load_tensor(path)
            yield x.to(device), (y.to(device), mask1, mask2)

    # 計算正確率的工具函數
    calc_accuracy = MyModel.calc_accuracy

    # 開始訓練過程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根據訓練集訓練並修改參數
        # 切換模型到訓練模式,將會啓用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.train()
        training_obj_accuracy_list = []
        training_cls_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 劃分輸入和輸出
            batch_x, batch_y = batch
            # 計算預測值
            predicted = model(batch_x)
            # 計算損失
            loss = loss_function(predicted, batch_y)
            # 從損失自動微分求導函數值
            loss.backward()
            # 使用參數調整器調整參數
            optimizer.step()
            # 清空導函數值
            optimizer.zero_grad()
            # 記錄這一個批次的正確率,torch.no_grad 表明臨時禁用自動微分功能
            with torch.no_grad():
                training_batch_obj_accuracy, training_batch_cls_accuracy = calc_accuracy(batch_y, predicted)
            # 輸出批次正確率
            training_obj_accuracy_list.append(training_batch_obj_accuracy)
            training_cls_accuracy_list.append(training_batch_cls_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: " +
                f"batch obj accuracy: {training_batch_obj_accuracy}, cls accuracy: {training_batch_cls_accuracy}")
        training_obj_accuracy = sum(training_obj_accuracy_list) / len(training_obj_accuracy_list)
        training_cls_accuracy = sum(training_cls_accuracy_list) / len(training_cls_accuracy_list)
        training_obj_accuracy_history.append(training_obj_accuracy)
        training_cls_accuracy_history.append(training_cls_accuracy)
        print(f"training obj accuracy: {training_obj_accuracy}, cls accuracy: {training_cls_accuracy}")

        # 檢查驗證集
        # 切換模型到驗證模式,將會禁用自動微分,批次正規化 (BatchNorm) 與 Dropout
        model.eval()
        validating_obj_accuracy_list = []
        validating_cls_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = batch
            predicted = model(batch_x)
            validating_batch_obj_accuracy, validating_batch_cls_accuracy = calc_accuracy(batch_y, predicted)
            validating_obj_accuracy_list.append(validating_batch_obj_accuracy)
            validating_cls_accuracy_list.append(validating_batch_cls_accuracy)
            # 釋放 predicted 佔用的顯存避免顯存不足的錯誤
            predicted = None
        validating_obj_accuracy = sum(validating_obj_accuracy_list) / len(validating_obj_accuracy_list)
        validating_cls_accuracy = sum(validating_cls_accuracy_list) / len(validating_cls_accuracy_list)
        validating_obj_accuracy_history.append(validating_obj_accuracy)
        validating_cls_accuracy_history.append(validating_cls_accuracy)
        print(f"validating obj accuracy: {validating_obj_accuracy}, cls accuracy: {validating_cls_accuracy}")

        # 記錄最高的驗證集正確率與當時的模型狀態,判斷是否在 20 次訓練後仍然沒有刷新記錄
        validating_accuracy = validating_obj_accuracy * validating_cls_accuracy
        if validating_accuracy > validating_accuracy_highest:
            validating_obj_accuracy_highest = validating_obj_accuracy
            validating_cls_accuracy_highest = validating_cls_accuracy
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次訓練後仍然沒有刷新記錄,結束訓練
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用達到最高正確率時的模型狀態
    print(f"highest obj validating accuracy: {validating_obj_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    print(f"highest cls validating accuracy: {validating_cls_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 檢查測試集
    testing_obj_accuracy_list = []
    testing_cls_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = batch
        predicted = model(batch_x)
        testing_batch_obj_accuracy, testing_batch_cls_accuracy = calc_accuracy(batch_y, predicted)
        testing_obj_accuracy_list.append(testing_batch_obj_accuracy)
        testing_cls_accuracy_list.append(testing_batch_cls_accuracy)
    testing_obj_accuracy = sum(testing_obj_accuracy_list) / len(testing_obj_accuracy_list)
    testing_cls_accuracy = sum(testing_cls_accuracy_list) / len(testing_cls_accuracy_list)
    print(f"testing obj accuracy: {testing_obj_accuracy}, cls accuracy: {testing_cls_accuracy}")

    # 顯示訓練集和驗證集的正確率變化
    pyplot.plot(training_obj_accuracy_history, label="training_obj_accuracy")
    pyplot.plot(training_cls_accuracy_history, label="training_cls_accuracy")
    pyplot.plot(validating_obj_accuracy_history, label="validating_obj_accuracy")
    pyplot.plot(validating_cls_accuracy_history, label="validating_cls_accuracy")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用訓練好的模型識別圖片"""
    # 建立模型實例,加載訓練好的狀態,而後切換到驗證模式
    model = MyModel().to(device)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 詢問圖片路徑,並顯示全部多是人臉的區域
    while True:
        try:
            image_path = input("Image path: ")
            if not image_path:
                continue
            # 構建輸入
            with Image.open(image_path) as img_original: # 加載原始圖片
                sw, sh = img_original.size # 原始圖片大小
                img = resize_image(img_original) # 縮放圖片
                img_output = img_original.copy() # 複製圖片,用於後面添加標記
                tensor_in = image_to_tensor(img)
            # 預測輸出
            predicted = model(tensor_in.unsqueeze(0).to(device))[0]
            final_result = MyModel.convert_predicted_result(predicted)
            # 標記在圖片上
            draw = ImageDraw.Draw(img_output)
            for label, box, obj_score, cls_score in final_result:
                x, y, w, h = map_box_to_original_image(box, sw, sh)
                score = obj_score * cls_score
                color = "#00FF00" if CLASSES[label] == "with_mask" else "#FF0000"
                draw.rectangle((x, y, x+w, y+h), outline=color)
                draw.text((x, y-10), CLASSES[label], fill=color)
                draw.text((x, y+h), f"{score:.2f}", fill=color)
                print((x, y, w, h), CLASSES[label], obj_score, cls_score)
            img_output.save("img_output.png")
            print("saved to img_output.png")
            print()
        except Exception as e:
            print("error:", e)

def eval_video():
    """使用訓練好的模型識別視頻"""
    # 建立模型實例,加載訓練好的狀態,而後切換到驗證模式
    model = MyModel().to(device)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 詢問視頻路徑,給多是人臉的區域添加標記並保存新視頻
    import cv2
    font = ImageFont.truetype("FreeMonoBold.ttf", 20)
    while True:
        try:
            video_path = input("Video path: ")
            if not video_path:
                continue
            # 讀取輸入視頻
            video = cv2.VideoCapture(video_path)
            # 獲取每秒的幀數
            fps = int(video.get(cv2.CAP_PROP_FPS))
            # 獲取視頻長寬
            size = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
            # 建立輸出視頻
            video_output_path = os.path.join(
                os.path.dirname(video_path),
                os.path.splitext(os.path.basename(video_path))[0] + ".output.avi")
            result = cv2.VideoWriter(video_output_path, cv2.VideoWriter_fourcc(*"XVID"), fps, size)
            # 用於減小誤判的歷史結果
            history_results = deque(maxlen = fps // 2)
            # 逐幀處理
            count = 0
            while(True):
                ret, frame = video.read()
                if not ret:
                    break
                # opencv 使用的是 BGR, Pillow 使用的是 RGB, 須要轉換通道順序
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                # 構建輸入
                img_original = Image.fromarray(frame_rgb) # 加載原始圖片
                sw, sh = img_original.size # 原始圖片大小
                img = resize_image(img_original) # 縮放圖片
                img_output = img_original.copy() # 複製圖片,用於後面添加標記
                tensor_in = image_to_tensor(img)
                # 預測輸出
                predicted = model(tensor_in.unsqueeze(0).to(device))[0]
                cls_result = MyModel.convert_predicted_result(predicted)
                # 根據歷史結果減小誤判
                final_result = MyModel.fix_predicted_result_from_history(cls_result, history_results)
                # 標記在圖片上
                draw = ImageDraw.Draw(img_output)
                for label, box, obj_score, cls_score in final_result:
                    x, y, w, h = map_box_to_original_image(box, sw, sh)
                    score = obj_score * cls_score
                    color = "#00FF00" if CLASSES[label] == "with_mask" else "#FF0000"
                    draw.rectangle((x, y, x+w, y+h), outline=color, width=3)
                    draw.text((x, y-20), CLASSES[label], fill=color, font=font)
                    draw.text((x, y+h), f"{score:.2f}", fill=color, font=font)
                # 寫入幀到輸出視頻
                frame_rgb_annotated = numpy.asarray(img_output)
                frame_bgr_annotated = cv2.cvtColor(frame_rgb_annotated, cv2.COLOR_RGB2BGR)
                result.write(frame_bgr_annotated)
                count += 1
                if count % fps == 0:
                    print(f"handled {count//fps}s")
            video.release()
            result.release()
            cv2.destroyAllWindows()
            print(f"saved to {video_output_path}")
            print()
        except Exception as e:
            raise
            print("error:", e)

def main():
    """主函數"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 給隨機數生成器分配一個初始值,使得每次運行均可以生成相同的隨機數
    # 這是爲了讓過程可重現,你也能夠選擇不這樣作
    random.seed(0)
    torch.random.manual_seed(0)

    # 根據命令行參數選擇操做
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    elif operation == "eval-video":
        eval_video()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

預處理數據集而且執行訓練的命令:

python3 example.py prepare
python3 example.py train

訓練結果:

epoch: 42, batch: 555: batch obj accuracy: 0.9909388836542586, cls accuracy: 0.983006698089804
epoch: 42, batch: 556: batch obj accuracy: 0.9814650010596331, cls accuracy: 0.9774137503102507
epoch: 42, batch: 557: batch obj accuracy: 0.9878546962973783, cls accuracy: 0.9791485664639444
epoch: 42, batch: 558: batch obj accuracy: 0.9804549878809472, cls accuracy: 0.9869710882243454
epoch: 42, batch: 559: batch obj accuracy: 0.9874521037216837, cls accuracy: 0.9825083736509118
epoch: 42, batch: 560: batch obj accuracy: 0.9686452380905726, cls accuracy: 0.9792752544055597
epoch: 42, batch: 561: batch obj accuracy: 0.9850456887221628, cls accuracy: 0.981502172563625
epoch: 42, batch: 562: batch obj accuracy: 0.9667773027084426, cls accuracy: 0.979282967373775
epoch: 42, batch: 563: batch obj accuracy: 0.9744239536970148, cls accuracy: 0.9843711237906226
training obj accuracy: 0.9823339177948931, cls accuracy: 0.9797140932720472
validating obj accuracy: 0.9166056052234632, cls accuracy: 0.9772082398493264
stop training because highest validating accuracy not updated in 20 epoches
highest obj validating accuracy: 0.94078897076641 from epoch 21
highest cls validating accuracy: 0.9635325289895568 from epoch 21
testing obj accuracy: 0.9438541768431002, cls accuracy: 0.9637055484080282

看起來正確率不錯,但由於 YOLO 只有單步,實際上對是否包含對象的誤判率比 Faster-RCNN 要高一些🙁。

使用訓練好的模型識別圖片的命令,輸入圖片路徑並回車便可生成標記過的圖片:

python3 example.py eval

使用訓練好的模型識別視頻的命令,輸入視頻路徑並回車便可生成標記過的視頻:

python3 example.py eval-video

標記後的例子以下,能用,但和前一篇相比效果差一點🤒。

寫在最後

媽蛋,寫了好幾篇識別人臉位置的文章,這篇是最後一篇了。下一篇將會介紹根據人臉找出是哪個人的模型,能夠用來實現打卡,也能夠用來抓逃犯😡。

最後祝你們牛年加工資,中國股市牛年牛逼🐮重回 6000 點。

相關文章
相關標籤/搜索