使用OpenCV實現道路車輛計數


點擊上方小白學視覺」,選擇加"星標"或「置頂php

重磅乾貨,第一時間送達python



今天,咱們將一塊兒探討如何基於計算機視覺實現道路交通計數。算法

在本教程中,咱們將僅使用Python和OpenCV,並藉助背景減除算法很是簡單地進行運動檢測。swift


咱們將從如下四個方面進行介紹:ruby

1. 用於物體檢測的背景減法算法主要思想。微信

2. OpenCV圖像過濾器。app

3. 利用輪廓檢測物體。框架

4. 創建進一步數據處理的結構。dom



背景扣除算法ide

有許多不一樣的背景扣除算法,可是它們的主要思想都很簡單。

假設有一個房間的視頻,在某些幀上沒有人和寵物,那麼此時的視頻基本爲靜態的,咱們將其稱爲背景(background_layer)。所以要獲取在視頻上移動的對象,咱們只須要:用當前幀減去背景便可。


因爲光照變化,人爲移動物體,或者始終存在移動的人和寵物,咱們將沒法得到靜態幀。在這種狀況下,咱們從視頻中選出一些圖像幀,若是絕大多數圖像幀中都具備某個相同的像素點,則此將像素做爲background_layer中的一部分。


咱們將使用MOG算法進行背景扣除

原始幀


代碼以下所示:

import osimport loggingimport logging.handlersimport random
import numpy as npimport skvideo.ioimport cv2import matplotlib.pyplot as plt
import utils# without this some strange errors happencv2.ocl.setUseOpenCL(False)random.seed(123)
# ============================================================================IMAGE_DIR = "./out"VIDEO_SOURCE = "input.mp4"SHAPE = (720, 1280) # HxW# ============================================================================
def train_bg_subtractor(inst, cap, num=500): ''' BG substractor need process some amount of frames to start giving result ''' print ('Training BG Subtractor...') i = 0 for frame in cap: inst.apply(frame, None, 0.001) i += 1 if i >= num: return cap
def main(): log = logging.getLogger("main")
# creting MOG bg subtractor with 500 frames in cache # and shadow detction bg_subtractor = cv2.createBackgroundSubtractorMOG2( history=500, detectShadows=True)
# Set up image source # You can use also CV2, for some reason it not working for me cap = skvideo.io.vreader(VIDEO_SOURCE)
# skipping 500 frames to train bg subtractor train_bg_subtractor(bg_subtractor, cap, num=500)
frame_number = -1 for frame in cap: if not frame.any(): log.error("Frame capture failed, stopping...") break
frame_number += 1 utils.save_frame(frame, "./out/frame_%04d.png" % frame_number) fg_mask = bg_subtractor.apply(frame, None, 0.001) utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number)# ============================================================================
if __name__ == "__main__": log = utils.init_logging()
if not os.path.exists(IMAGE_DIR): log.debug("Creating image directory `%s`...", IMAGE_DIR) os.makedirs(IMAGE_DIR)
main()



處理後獲得下面的前景圖像

去除背景後的前景圖像


咱們能夠看出前景圖像上有一些噪音,能夠經過標準濾波技術能夠將其消除。



濾波


針對咱們如今的狀況,咱們將須要如下濾波函數:ThresholdErodeDilateOpeningClosing


首先,咱們使用「Closing」來移除區域中的間隙,而後使用「Opening」來移除個別獨立的像素點,而後使用「Dilate」進行擴張以使對象變粗。代碼以下:

def filter_mask(img): kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations=2) # threshold th = dilation[dilation < 240] = 0 return th


處理後的前景以下


利用輪廓進行物體檢測

咱們將使用cv2.findContours函數對輪廓進行檢測。咱們在使用的時候能夠選擇的參數爲:

cv2.CV_RETR_EXTERNAL------僅獲取外部輪廓。

cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin鏈逼近算法(更快)


代碼以下:

def get_centroid(x, y, w, h): x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy)  def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35): matches = [] # finding external contours im, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) # filtering by with, height for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= min_contour_width) and ( h >= min_contour_height) if not contour_valid: continue # getting center of the bounding box centroid = get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches



創建數據處理框架

咱們都知道在ML和CV中,沒有一個算法能夠處理全部問題。即便存在這種算法,咱們也不會使用它,由於它很難大規模有效。例如幾年前Netflix公司用300萬美圓的獎金懸賞最佳電影推薦算法。有一個團隊完成這個任務,可是他們的推薦算法沒法大規模運行,所以其實對公司毫無用處。可是,Netflix公司仍獎勵了他們100萬美圓。


接下來咱們來創建解決當前問題的框架,這樣可使數據的處理更加方便

class PipelineRunner(object): ''' Very simple pipline. Just run passed processors in order with passing context from one to  another. You can also set log level for processors. ''' def __init__(self, pipeline=None, log_level=logging.DEBUG): self.pipeline = pipeline or [] self.context = {} self.log = logging.getLogger(self.__class__.__name__) self.log.setLevel(log_level) self.log_level = log_level self.set_log_level() def set_context(self, data): self.context = data def add(self, processor): if not isinstance(processor, PipelineProcessor): raise Exception( 'Processor should be an isinstance of PipelineProcessor.') processor.log.setLevel(self.log_level) self.pipeline.append(processor)  def remove(self, name): for i, p in enumerate(self.pipeline): if p.__class__.__name__ == name: del self.pipeline[i] return True return False  def set_log_level(self): for p in self.pipeline: p.log.setLevel(self.log_level)  def run(self): for p in self.pipeline: self.context = p(self.context)  self.log.debug("Frame #%d processed.", self.context['frame_number']) return self.context  class PipelineProcessor(object): ''' Base class for processors. ''' def __init__(self): self.log = logging.getLogger(self.__class__.__name__)


首先咱們獲取一張處理器運行順序的列表,讓每一個處理器完成一部分工做,在案順序完成執行以得到最終結果。


咱們首先建立輪廓檢測處理器。輪廓檢測處理器只需將前面的背景扣除,濾波和輪廓檢測部分合並在一塊兒便可,代碼以下所示:

class ContourDetection(PipelineProcessor): ''' Detecting moving objects. Purpose of this processor is to subtrac background, get moving objects and detect them with a cv2.findContours method, and then filter off-by width and height.  bg_subtractor - background subtractor isinstance. min_contour_width - min bounding rectangle width. min_contour_height - min bounding rectangle height. save_image - if True will save detected objects mask to file. image_dir - where to save images(must exist).  '''  def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'): super(ContourDetection, self).__init__() self.bg_subtractor = bg_subtractor self.min_contour_width = min_contour_width self.min_contour_height = min_contour_height self.save_image = save_image self.image_dir = image_dir  def filter_mask(self, img, a=None): ''' This filters are hand-picked just based on visual tests ''' kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations=2) return dilation  def detect_vehicles(self, fg_mask, context): matches = [] # finding external contours im2, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= self.min_contour_width) and ( h >= self.min_contour_height) if not contour_valid: continue centroid = utils.get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches  def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] fg_mask = self.bg_subtractor.apply(frame, None, 0.001) # just thresholding values fg_mask[fg_mask < 240] = 0 fg_mask = self.filter_mask(fg_mask, frame_number) if self.save_image: utils.save_frame(fg_mask, self.image_dir + "/mask_%04d.png" % frame_number, flip=False) context['objects'] = self.detect_vehicles(fg_mask, context) context['fg_mask'] = fg_mask return contex


如今,讓咱們建立一個處理器,該處理器將找出不一樣的幀上檢測到的相同對象,建立路徑,並對到達出口區域的車輛進行計數。代碼以下所示:

 ''' Counting vehicles that entered in exit zone.
Purpose of this class based on detected object and local cache create objects pathes and count that entered in exit zone defined by exit masks.
exit_masks - list of the exit masks. path_size - max number of points in a path. max_dst - max distance between two points. '''
def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0): super(VehicleCounter, self).__init__()
self.exit_masks = exit_masks
self.vehicle_count = 0 self.path_size = path_size self.pathes = [] self.max_dst = max_dst self.x_weight = x_weight self.y_weight = y_weight
def check_exit(self, point): for exit_mask in self.exit_masks: try: if exit_mask[point[1]][point[0]] == 255: return True except: return True return False
def __call__(self, context): objects = context['objects'] context['exit_masks'] = self.exit_masks context['pathes'] = self.pathes context['vehicle_count'] = self.vehicle_count if not objects: return context
points = np.array(objects)[:, 0:2] points = points.tolist()
# add new points if pathes is empty if not self.pathes: for match in points: self.pathes.append([match])
else: # link new points with old pathes based on minimum distance between # points new_pathes = []
for path in self.pathes: _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = utils.distance(p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = utils.distance( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight )
if d < _min: _min = d _match = p
if _match and _min <= self.max_dst: points.remove(_match) path.append(_match) new_pathes.append(path)
# do not drop path if current frame has no matches if _match is None: new_pathes.append(path)
self.pathes = new_pathes
# add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue self.pathes.append([p])
# save only last N points in path for i, _ in enumerate(self.pathes): self.pathes[i] = self.pathes[i][self.path_size * -1:]
# count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(self.pathes): d = path[-2:]
if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path)
self.pathes = new_pathes
context['pathes'] = self.pathes context['objects'] = objects context['vehicle_count'] = self.vehicle_count
self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)
return context

上面的代碼有點複雜,所以讓咱們一個部分一個部分的介紹一下。


上面的圖像中綠色的部分是出口區域。咱們在這裏對車輛進行計數,只有當車輛移動的長度超過3個點咱們才進行計算


咱們使用掩碼來解決這個問題,由於它比使用矢量算法有效且簡單得多。只需使用「二進制和」便可選出車輛區域中點。設置方式以下:

    

EXIT_PTS = np.array([ [[732, 720], [732, 590], [1280, 500], [1280, 720]], [[0, 400], [645, 400], [645, 0], [0, 0]] ])  base = np.zeros(SHAPE + (3,), dtype='uint8') exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]

    

如今咱們將檢測到的點連接起來。


對於第一幀圖像,咱們將全部點均添加爲新路徑。

接下來,若是len(path)== 1,咱們在新檢測到的對象中找到與每條路徑最後一點距離最近的對象。

若是len(path)> 1,則使用路徑中的最後兩個點,即在同一條線上預測新點,並找到該點與當前點之間的最小距離。

具備最小距離的點將添加到當前路徑的末端並從列表中刪除。若是在此以後還剩下一些點,咱們會將其添加爲新路徑。這個過程當中咱們還會限制路徑中的點數。

new_pathes = [] for path in self.pathes: _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = utils.distance(p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = utils.distance( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight )  if d < _min: _min = d _match = p  if _match and _min <= self.max_dst: points.remove(_match) path.append(_match) new_pathes.append(path)  # do not drop path if current frame has no matches if _match is None: new_pathes.append(path)  self.pathes = new_pathes  # add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue self.pathes.append([p])  # save only last N points in path for i, _ in enumerate(self.pathes): self.pathes[i] = self.pathes[i][self.path_size * -1:]


如今,咱們將嘗試計算進入出口區域的車輛。爲此,咱們需獲取路徑中的最後2個點,並檢查len(path)是否應大於限制。

# count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(self.pathes): d = path[-2:] if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path) self.pathes = new_pathes  context['pathes'] = self.pathes context['objects'] = objects context['vehicle_count'] = self.vehicle_count  self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count) return context


最後兩個處理器是CSV編寫器,用於建立報告CSV文件,以及用於調試和精美圖片的可視化。

class CsvWriter(PipelineProcessor): def __init__(self, path, name, start_time=0, fps=15): super(CsvWriter, self).__init__() self.fp = open(os.path.join(path, name), 'w') self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles']) self.writer.writeheader() self.start_time = start_time self.fps = fps self.path = path self.name = name self.prev = None def __call__(self, context): frame_number = context['frame_number'] count = _count = context['vehicle_count'] if self.prev: _count = count - self.prev time = ((self.start_time + int(frame_number / self.fps)) * 100 + int(100.0 / self.fps) * (frame_number % self.fps)) self.writer.writerow({'time': time, 'vehicles': _count}) self.prev = count return context class Visualizer(PipelineProcessor): def __init__(self, save_image=True, image_dir='images'): super(Visualizer, self).__init__() self.save_image = save_image self.image_dir = image_dir def check_exit(self, point, exit_masks=[]): for exit_mask in exit_masks: if exit_mask[point[1]][point[0]] == 255: return True return False def draw_pathes(self, img, pathes): if not img.any(): return for i, path in enumerate(pathes): path = np.array(path)[:, 1].tolist() for point in path: cv2.circle(img, point, 2, CAR_COLOURS[0], -1) cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1) return img def draw_boxes(self, img, pathes, exit_masks=[]): for (i, match) in enumerate(pathes): contour, centroid = match[-1][:2] if self.check_exit(centroid, exit_masks): continue x, y, w, h = contour cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1) cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1) return img def draw_ui(self, img, vehicle_count, exit_masks=[]): # this just add green mask with opacity to the image for exit_mask in exit_masks: _img = np.zeros(img.shape, img.dtype) _img[:, :] = EXIT_COLOR mask = cv2.bitwise_and(_img, _img, mask=exit_mask) cv2.addWeighted(mask, 1, img, 1, 0, img) # drawing top block with counts cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED) cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) return img def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] pathes = context['pathes'] exit_masks = context['exit_masks'] vehicle_count = context['vehicle_count'] frame = self.draw_ui(frame, vehicle_count, exit_masks) frame = self.draw_pathes(frame, pathes) frame = self.draw_boxes(frame, pathes, exit_masks) utils.save_frame(frame, self.image_dir + "/processed_%04d.png" % frame_number) return context



結論


正如咱們看到的那樣,它並不像許多人想象的那麼難。可是,若是小夥伴運行腳本,小夥伴會發現此解決方案並不理想,存在前景對象存在重疊的問題,而且它也沒有按類型對車輛進行分類。可是,當相機有較好位置,例如位於道路正上方時,該算法具備很好的準確性。


若是本文對小夥伴有幫助,但願能夠在文末來個「一鍵三連」。


交流羣

歡迎加入公衆號讀者羣一塊兒和同行交流,目前有SLAM、三維視覺、傳感器自動駕駛、計算攝影、檢測、分割、識別、醫學影像、GAN算法競賽等微信羣(之後會逐漸細分),請掃描下面微信號加羣,備註:」暱稱+學校/公司+研究方向「,例如:」張三 + 上海交大 + 視覺SLAM「。請按照格式備註,不然不予經過。添加成功後會根據研究方向邀請進入相關微信羣。請勿在羣內發送廣告,不然會請出羣,謝謝理解~


本文分享自微信公衆號 - 小白學視覺(NoobCV)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索