講過上面幾篇文章,實現了Fater RCNN中的全部模塊,此次來具體看下訓練和測試過程網絡
from keras_faster_rcnn import config, data_generators, data_augment, losses
from keras_faster_rcnn import net_model, roi_helper, RoiPoolingConv, voc_data_parser
from keras.optimizers import Adam, SGD, RMSprop
from keras.utils import generic_utils
from keras.layers import Input
from keras.models import Model
from keras import backend as K
import numpy as np
import time
import pprint
import pickle
all_imgs, classes_count, class_mapping = voc_data_parser.get_data("data")
if 'bg' not in classes_count:
classes_count['bg'] = 0
class_mapping['bg'] = len(class_mapping)
print('類別數 (包含背景) = {}'.format(len(classes_count)))
num_imgs = len(all_imgs)
train_imgs = [s for s in all_imgs if s['imageset'] == 'train'] #訓練集
val_imgs = [s for s in all_imgs if s['imageset'] == 'val'] #驗證集
test_imgs = [s for s in all_imgs if s['imageset'] == 'test'] #測試集
print('訓練樣本個數 {}'.format(len(train_imgs)))
print('驗證樣本個數 {}'.format(len(val_imgs)))
print('測試樣本個數 {}'.format(len(test_imgs)))
C = config.Config() #相關配置信息
C.class_mapping = class_mapping
config_output_filename = "config/config.pickle"
with open(config_output_filename, "wb") as config_f:
pickle.dump(C, config_f)
print('Config has been written to {}, and can be loaded when testing to ensure correct results'.format(
data_gen_train = data_generators.get_anchor_data_gt(train_imgs, classes_count, C, mode='train')
data_gen_val = data_generators.get_anchor_data_gt(val_imgs, classes_count, C, mode='val')
data_gen_test = data_generators.get_anchor_data_gt(test_imgs, classes_count, C, mode='val')
img_input = Input(shape=(None, None, 3)) #網絡模型最開始的輸入
roi_input = Input(shape=(None, 4)) #roi模塊的輸入
''' model_rpn : 輸入:圖片數據; 輸出:對應RPN網絡中分類層和迴歸層的兩個輸出 model_classifier: 輸入: 圖片數據和選取出來的ROI數據; 輸出: 最終分類層輸出和迴歸層輸出 '''
# 用來進行特徵提取的基礎網絡 VGG16
shared_layers = net_model.base_net_vgg(img_input)
# RPN網絡
num_anchors = len(C.anchor_box_scales) * len(C.anchor_box_ratios)
rpn = net_model.rpn_net(shared_layers, num_anchors)
# 最後的檢測網絡(包含ROI池化層 和 全鏈接層)
classifier = net_model.roi_classifier(shared_layers, roi_input, C.num_rois, nb_classes=len(classes_count))
model_rpn = Model(img_input, rpn[:2])
model_classifier = Model([img_input, roi_input], classifier)
model_all = Model([img_input, roi_input], rpn[:2] + classifier)
print('loading weights from {}'.format(C.model_path))
model_rpn.load_weights(C.model_path, by_name=True)
model_classifier.load_weights(C.model_path, by_name=True)
print('loading weights from {}'.format(C.base_net_weights))
model_rpn.load_weights(C.base_net_weights, by_name=True)
model_classifier.load_weights(C.base_net_weights, by_name=True)
optimizer = Adam(lr=1e-5)
optimizer_classifier = Adam(lr=1e-5)
model_rpn.compile(optimizer=optimizer, loss=[losses.rpn_cls_loss(num_anchors), losses.rpn_regr_loss(num_anchors)])
model_classifier.compile(optimizer=optimizer_classifier, loss=[losses.final_cls_loss, losses.final_regr_loss(len(classes_count)-1)], metrics={'dense_class_{}'.format(len(classes_count)): 'accuracy'})
model_all.compile(optimizer='sgd', loss='mae')
epoch_length = 1000 #每1000輪訓練,記錄一次平均loss
num_epochs = 2000
iter_num = 0
train_step = 0 #記錄訓練次數
losses = np.zeros((epoch_length, 5)) #用來存儲1000輪訓練中,沒一輪的損失
# rpn_accuracy_rpn_monitor = []
# rpn_accuracy_for_epoch = []
start_time = time.time()
best_loss = np.Inf
print('Starting training')
for epoch_num in range(num_epochs):
progbar = generic_utils.Progbar(epoch_length)
print('Epoch {}/{}'.format(epoch_num + 1, num_epochs))
while True:
# if len(rpn_accuracy_rpn_monitor) == epoch_length:
# mean_overlapping_bboxes = float(sum(rpn_accuracy_rpn_monitor)) / len(rpn_accuracy_rpn_monitor)
# rpn_accuracy_rpn_monitor = []
# print('Average number of overlapping bounding boxes from RPN = {} for {} previous iterations'.format(
# mean_overlapping_bboxes, epoch_length))
# if mean_overlapping_bboxes == 0:
# print(
# 'RPN is not producing bounding boxes that overlap the ground truth boxes. Check RPN settings or keep training.')
X, Y, img_data = next(data_gen_train) #經過構造的迭代器,得到一條數據
# print(X.shape)
# print(Y[0].shape, Y[1].shape)
loss_rpn = model_rpn.train_on_batch(X, Y) #訓練basenet 與 RPN網絡
P_rpn = model_rpn.predict_on_batch(X) #得到RPN網絡的輸出
R = roi_helper.rpn_to_roi(P_rpn[0], P_rpn[1], C, use_regr=True, overlap_thresh=0.7,
X2, Y1, Y2, IouS = roi_helper.calc_roi(R, img_data, C, class_mapping)
if X2 is None:
# print("model_classifier.train_on_batch--X.shape={},X2.shape={}".format(X.shape, X2.shape))
loss_class = model_classifier.train_on_batch([X, X2], [Y1, Y2])
train_step += 1
losses[iter_num, 0] = loss_rpn[1] #rpn_cls_loss
losses[iter_num, 1] = loss_rpn[2] #rpn_regr_loss
losses[iter_num, 2] = loss_class[1] #final_cls_loss
losses[iter_num, 3] = loss_class[2] #final_regr_loss
losses[iter_num, 4] = loss_class[3] #final_acc
iter_num += 1
[('rpn_cls', np.mean(losses[:iter_num, 0])),
('rpn_regr', np.mean(losses[:iter_num, 1])),
('detector_cls', np.mean(losses[:iter_num, 2])),
('detector_regr', np.mean(losses[:iter_num, 3]))])
if iter_num == epoch_length: #每1000輪訓練,統計一次
loss_rpn_cls = np.mean(losses[:, 0])
loss_rpn_regr = np.mean(losses[:, 1])
loss_class_cls = np.mean(losses[:, 2])
loss_class_regr = np.mean(losses[:, 3])
class_acc = np.mean(losses[:, 4])
# mean_overlapping_bboxes = float(sum(rpn_accuracy_for_epoch)) / len(rpn_accuracy_for_epoch)
# rpn_accuracy_for_epoch = []
if C.verbose:
# print('Mean number of bounding boxes from RPN overlapping ground truth boxes: {}'.format(mean_overlapping_bboxes))
print('Classifier accuracy for bounding boxes from RPN: {}'.format(class_acc))
print('Loss RPN classifier: {}'.format(loss_rpn_cls))
print('Loss RPN regression: {}'.format(loss_rpn_regr))
print('Loss Detector classifier: {}'.format(loss_class_cls))
print('Loss Detector regression: {}'.format(loss_class_regr))
print('Elapsed time: {}'.format(time.time() - start_time))
curr_loss = loss_rpn_cls + loss_rpn_regr + loss_class_cls + loss_class_regr
iter_num = 0
start_time = time.time()
if curr_loss < best_loss:
if C.verbose:
print('Total loss decreased from {} to {}, saving weights'.format(best_loss,curr_loss))
best_loss = curr_loss
import os
import cv2
import numpy as np
import sys
import pickle
import time
from keras_faster_rcnn import config, roi_helper, net_model
from keras import backend as K
from keras.layers import Input
from keras.models import Model
config_output_filename = "config/config.pickle"
with open(config_output_filename, "rb") as config_f:
C = pickle.load(config_f)
C.use_horizontal_flips = False
C.use_vertical_flips = False
C.rot_90 = False
test_img_path = "test"
class_mapping = C.class_mapping
if "bg" not in class_mapping:
class_mapping["bg"] = len(class_mapping)
class_mapping = {v:k for k,v in class_mapping.items()} #key與value調換位置
#class_to_color 定義對應類別多對應的顏色
class_to_color = {class_mapping[v]: np.random.randint(0, 255, 3) for v in class_mapping}
img_input = Input(shape=(None, None, 3))
roi_input = Input(shape=(C.num_rois, 4))
feature_map_input = Input(shape=(None, None, 512))
shared_layers = net_model.base_net_vgg(img_input)
num_anchors = len(C.anchor_box_scales) * len(C.anchor_box_ratios)
rpn_layer_out = net_model.rpn_net(shared_layers, num_anchors)
#roi pooling層以及最後網絡的輸出
final_classifer_reg = net_model.roi_classifier(feature_map_input, roi_input, C.num_rois, nb_classes=len(class_mapping))
model_rpn = Model(img_input, rpn_layer_out)
model_final_classifer_reg_only = Model([feature_map_input, roi_input], final_classifer_reg)
model_final_classifer_reg = Model([feature_map_input, roi_input], final_classifer_reg)
print("Loading weights from {}".format(C.model_path))
model_rpn.load_weights(C.model_path, by_name=True)
model_final_classifer_reg.load_weights(C.model_path, by_name=True)
model_rpn.compile(optimizer="sgd", loss="mse")
model_final_classifer_reg.compile(optimizer="sgd", loss="mse")
all_imgs = []
classes = {}
bbox_threshold = 0.8
visualise = True
def image_Preprocessing(img, C):
''' 圖片預處理 :param img: :param C: :return: '''
height, width, _ = img.shape
if width < height:
ratio = float(C.im_size) / width
new_width = C.im_size
new_height = int(height * ratio)
ratio = float(C.im_size) / height
new_height = C.im_size
new_width = int(width * ratio)
img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
x_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
x_img = x_img.astype(np.float32)
x_img[:, :, 0] -= C.img_channel_mean[0]
x_img[:, :, 1] -= C.img_channel_mean[1]
x_img[:, :, 2] -= C.img_channel_mean[2]
x_img /= C.img_scaling_factor
x_img = np.expand_dims(x_img, axis=0)
return x_img, ratio
for idx, img_name in enumerate(sorted(os.listdir(test_img_path))): #遍歷全部測試文件
if not img_name.lower().endswith(('.bmp', '.jpeg', '.jpg', '.png', '.tif', '.tiff')):
print("test image name:{}".format(img_name))
st = time.time()
filepath = os.path.join(test_img_path, img_name)
img = cv2.imread(filepath) #讀取對應圖片
X, ratio = image_Preprocessing(img, C)
[Y1, Y2, feature_map] = model_rpn.predict(X)
Rois = roi_helper.rpn_to_roi(Y1, Y2, C, overlap_thresh=0.7)
#(x1,y1,x2,y2) to (x,y,w,h)
Rois[:, 2] -= Rois[:, 0]
Rois[:, 3] -= Rois[:, 1]
bboxes = {}
probs = {}
for jk in range(Rois.shape[0] // C.num_rois +1): #一次處理32個roi
print("jk==",jk, "Rois.shape[0] // C.num_rois=",Rois.shape[0] // C.num_rois)
if jk == Rois.shape[0] // C.num_rois:
rois = np.expand_dims(Rois[C.num_rois * jk:, :], axis=0)
if rois.shape[1] == 0:
rois_zero = np.zeros((rois.shape[0], C.num_rois, rois.shape[2]))
print(rois_zero[:, rois.shape[1]:, :].shape)
rois_zero[:, :rois.shape[1], :] = rois
rois_zero[:, rois.shape[1]:, :] = rois[0, 0, :]
rois = rois_zero
rois = np.expand_dims(Rois[C.num_rois * jk: C.num_rois * (jk + 1), :], axis=0)
if rois.shape[1] == 0:
[P_cls, P_regr] = model_final_classifer_reg_only.predict([feature_map, rois])
for ii in range(P_cls.shape[1]): #遍歷每個roi對應的預測類別
#過濾調那些分類機率值不高 以及 負樣本
if np.max(P_cls[0, ii, :]) < bbox_threshold or np.argmax(P_cls[0, ii, :]) == (P_cls.shape[2]-1):
cls_num =np.argmax(P_cls[0,ii, :])
cls_name = class_mapping[cls_num]
if cls_name not in bboxes:
bboxes[cls_name] = []
probs[cls_name] = []
(x, y, w, h) = rois[0, ii, :]
tx, ty, tw, th = P_regr[0, ii, 4*cls_num: 4*(cls_num+1)]
tx /= C.classifier_regr_std[0]
ty /= C.classifier_regr_std[1]
tw /= C.classifier_regr_std[2]
th /= C.classifier_regr_std[3]
x, y, w, h = roi_helper.apply_regr(x, y, w, h, tx, ty, tw, th)
bbox_for_img = [C.rpn_stride*x, C.rpn_stride*y, C.rpn_stride*(x+w), C.rpn_stride*(y+h)]
all_dets = []
for key in bboxes:
bbox = np.array(bboxes[key])
new_boxes, new_probs = roi_helper.non_max_suppression_fast(bbox, np.array(probs[key]), overlap_thresh=0.5)
print("new_boxes.shape", new_boxes.shape)
for jk in range(new_boxes.shape[0]):
(x1, y1, x2, y2) = new_boxes[jk, :]
real_x1 = int(round(x1 // ratio))
real_y1 = int(round(y1 // ratio))
real_x2 = int(round(x2 // ratio))
real_y2 = int(round(y2 // ratio))
cv2.rectangle(img, (real_x1, real_y1), (real_x2, real_y2),
(int(class_to_color[key][0]), int(class_to_color[key][1]), int(class_to_color[key][2])), 2)
textLabel = "{}:{}".format(key, int(100 * new_probs[jk]))
all_dets.append((key, 100 * new_probs[jk]))
retval, baseLine = cv2.getTextSize(textLabel, cv2.FONT_HERSHEY_COMPLEX, 1, 1)
textOrg = (real_x1, real_y1-0)
cv2.rectangle(img, (textOrg[0] - 5, textOrg[1] + baseLine - 5),
(textOrg[0] + retval[0] + 5, textOrg[1] - retval[1] - 5), (0, 0, 0), 2)
cv2.rectangle(img, (textOrg[0] - 5, textOrg[1] + baseLine - 5),
(textOrg[0] + retval[0] + 5, textOrg[1] - retval[1] - 5), (255, 255, 255), -1)
cv2.putText(img, textLabel, textOrg, cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 0), 1)
print('Elapsed time = {}'.format(time.time() - st))
cv2.imshow('img', img)
cv2.imwrite('./results_imgs/{}.png'.format(idx), img)
