樹莓派因爲硬件配置低,在運行復雜計算的時候會比較吃力。爲了解決這種瓶頸,能夠考慮雲計算。這篇文章分享下如何經過樹莓派發送視頻流到遠程服務器作條形碼識別。html
下載OpenCV
, scipy
和 pillow
:python
$ sudo apt-get install libopencv-dev python-opencv python-scipy $ python -m pip install pillow
下載條形碼SDK,經過源碼(https://github.com/dynamsoft-dbr/python)編譯出Python條碼識別模塊。git
下載OpenCV
:github
pip install opencv-python
有人實現了 NumpySocket,能夠直接拿來使用,主要解決了NumPy Array
(經過OpenCV獲取的視頻幀)的傳輸。json
經過Socket
發送接收JSON:服務器
import json def sendJSON(self, data): out = json.dumps(data) try: self.connection.sendall(out) except Exception: exit() def receiveJSON(self): try: chunk = self.socket.recv(1024) except Exception: exit() return json.loads(chunk)
建立用於Windows的pc.py
文件:socket
from numpysocket import NumpySocket import cv2 import time import json import dbr # Get the license of Dynamsoft Barcode Reader from https://www.dynamsoft.com/CustomerPortal/Portal/Triallicense.aspx dbr.initLicense('LICENSE KEY') npSocket = NumpySocket() npSocket.startServer(9999) # Receive frames for barcode detection while(True): try: frame = npSocket.recieveNumpy() # cv2.imshow('PC Reader', frame) results = dbr.decodeBuffer(frame, 0x3FF | 0x2000000 | 0x4000000 | 0x8000000 | 0x10000000) out = {} out['results'] = results # Send barcode results to Raspberry Pi npSocket.sendJSON({'results': results}) except: break # Press ESC to exit key = cv2.waitKey(20) if key == 27 or key == ord('q'): break npSocket.endServer() print('Closed')
這段代碼主要是經過循環,不斷接收視頻幀作條形碼識別。而後把結果用JSON格式打包發送。ide
在樹莓派上建立一個rpi.py
文件。經過OpenCV的接口能夠不斷獲取視頻幀。使用queue
來保存:雲計算
def read_barcode(): vc = cv2.VideoCapture(0) vc.set(3, 640) #set width vc.set(4, 480) #set height if not vc.isOpened(): print('Camera is not ready.') return host_ip = '192.168.8.84' npSocket = NumpySocket() npSocket.startClient(host_ip, 9999) socket_thread = SocketThread('SocketThread', npSocket) socket_thread.start() while vc.isOpened(): ret, f = vc.read() cv2.imshow("RPi Reader", f) frame = imresize(f, .5) key = cv2.waitKey(20) if key == 27 or key == ord('q'): socket_thread.isRunning = False socket_thread.join() break if not socket_thread.is_alive(): break try: frame_queue.put_nowait(frame) except: # Clear unused frames try: while True: frame_queue.get_nowait() except: pass frame_queue.close() frame_queue.join_thread() vc.release()
建立了一個線程用於收發視頻幀數據和結果:.net
class SocketThread (threading.Thread): def __init__(self, name, npSocket): threading.Thread.__init__(self) self.name = name self.npSocket = npSocket self.isRunning = True def run(self): while self.isRunning: frame = frame_queue.get(timeout=100) try: start_time = time.time() self.npSocket.sendNumpy(frame) obj = self.npSocket.receiveJSON() print("--- %.2f ms seconds ---" % ((time.time() - start_time) * 1000)) data = obj['results'] if (len(data) > 0): for result in data: print("Type: " + result[0]) print("Value: " + result[1] + "\n") else: print('No barcode detected.') except: break self.npSocket.endClient()
在Windows和樹莓派上分別運行對應的Python程序: