如何用Python實現樹莓派遠程視頻流掃碼

樹莓派因爲硬件配置低,在運行復雜計算的時候會比較吃力。爲了解決這種瓶頸,能夠考慮雲計算。這篇文章分享下如何經過樹莓派發送視頻流到遠程服務器作條形碼識別。html

環境配置

安裝

樹莓派

下載OpenCV, scipypillow:python

$ sudo apt-get install libopencv-dev python-opencv python-scipy
$ python -m pip install pillow

Windows 10

下載條形碼SDK,經過源碼(https://github.com/dynamsoft-dbr/python)編譯出Python條碼識別模塊。git

下載OpenCV:github

pip install opencv-python

使用Python Socket傳輸視頻

有人實現了 NumpySocket,能夠直接拿來使用,主要解決了NumPy Array(經過OpenCV獲取的視頻幀)的傳輸。json

識別條形碼,用JSON返回結果

經過Socket發送接收JSON:服務器

import json
def sendJSON(self, data):
    out = json.dumps(data)
 
    try:
        self.connection.sendall(out)
    except Exception:
        exit()
 
def receiveJSON(self):
    try:
        chunk = self.socket.recv(1024)    
    except Exception:
        exit()
             
    return json.loads(chunk)

建立用於Windows的pc.py文件:socket

from numpysocket import NumpySocket
import cv2
import time
import json
import dbr
 
# Get the license of Dynamsoft Barcode Reader from https://www.dynamsoft.com/CustomerPortal/Portal/Triallicense.aspx
dbr.initLicense('LICENSE KEY')
 
npSocket = NumpySocket()
npSocket.startServer(9999)
 
# Receive frames for barcode detection
while(True):
    try:
        frame = npSocket.recieveNumpy()
        # cv2.imshow('PC Reader', frame)
        results = dbr.decodeBuffer(frame, 0x3FF | 0x2000000 | 0x4000000 | 0x8000000 | 0x10000000)
        out = {}
        out['results'] = results
 
        # Send barcode results to Raspberry Pi
        npSocket.sendJSON({'results': results})
    except:
        break
     
    # Press ESC to exit
    key = cv2.waitKey(20)
    if key == 27 or key == ord('q'):
        break
 
npSocket.endServer()
print('Closed')

這段代碼主要是經過循環,不斷接收視頻幀作條形碼識別。而後把結果用JSON格式打包發送。ide

在樹莓派上建立一個rpi.py文件。經過OpenCV的接口能夠不斷獲取視頻幀。使用queue來保存:雲計算

def read_barcode():
    vc = cv2.VideoCapture(0)
    vc.set(3, 640) #set width
    vc.set(4, 480) #set height
 
    if not vc.isOpened():
        print('Camera is not ready.')
        return
 
    host_ip = '192.168.8.84'
    npSocket = NumpySocket()
    npSocket.startClient(host_ip, 9999)
 
    socket_thread = SocketThread('SocketThread', npSocket)
    socket_thread.start() 
 
    while vc.isOpened():
         
        ret, f = vc.read()
        cv2.imshow("RPi Reader", f)
        frame = imresize(f, .5)
 
        key = cv2.waitKey(20)
        if key == 27 or key == ord('q'):   
            socket_thread.isRunning = False
            socket_thread.join() 
            break
 
        if not socket_thread.is_alive():
            break
         
        try:
            frame_queue.put_nowait(frame)
        except:
            # Clear unused frames
            try:
                while True:
                    frame_queue.get_nowait()
            except:
                pass
 
    frame_queue.close()
    frame_queue.join_thread()
    vc.release()

建立了一個線程用於收發視頻幀數據和結果:.net

class SocketThread (threading.Thread):
    def __init__(self, name, npSocket):
        threading.Thread.__init__(self)
        self.name = name
        self.npSocket = npSocket
        self.isRunning = True
 
    def run(self):      
        while self.isRunning:
            frame = frame_queue.get(timeout=100)
 
            try:
                start_time = time.time()
                self.npSocket.sendNumpy(frame)
                obj = self.npSocket.receiveJSON()
                print("--- %.2f ms seconds ---" % ((time.time() - start_time) * 1000))
                data = obj['results']
 
                if (len(data) > 0):
                    for result in data:
                        print("Type: " + result[0])
                        print("Value: " + result[1] + "\n")
                else:
                    print('No barcode detected.')
            except:
                break
             
        self.npSocket.endClient()

在Windows和樹莓派上分別運行對應的Python程序:

參考

源碼

https://github.com/yushulx/python-socket-rpi-barcode

相關文章
相關標籤/搜索