課程使用的操做系統爲 Ubuntu 16.04
,OpenCV 版本爲 opencv-python 3.4.1.15
。python
你能夠在個人Github 上找到 Windows 系統和 Linux 系統對應的源代碼,此教程對應的版本是 v0.2。目前我正在開發的版本是 v0.3,新版本將容許使用不一樣IP協議的主機通訊,而且範圍再也不侷限於局域網內。這個工具最初是爲了經過IPv6節省聊天工具使用的流量而開發的。服務器
本課程項目完成過程當中將學習:網絡
其中將重點介紹 socket 傳輸過程當中對數據的壓縮和處理。app
經過如下命令可下載項目源碼,做爲參照對比完成下面詳細步驟的學習。框架
$ cd Code $ wget https://labfile.oss.aliyuncs.com/courses/672/ichat.zip $ unzip ichat.zip
如今開始下載環境依賴的包,確保在剛在解壓文件下的目錄裏運行。socket
$ cd ichat $ sudo pip3 install numpy $ sudo pip3 install opencv_python
這一步下載了咱們須要的opencv-python和numpy兩個包。ide
剩下的PyAudio,因爲本虛擬環境的部分問題,咱們單獨分開下載。函數
$ sudo apt-get install portaudio19-dev python-all-dev python3-all-dev $ sudo pip3 install pyaudio==0.2.11
如今,咱們的實驗環境就搭好了。工具
實驗實現了簡易的視頻通訊工具,基於 OpenCV 和 PyAudio,使用 TCP 協議通訊,通訊雙方創建雙向 CS 鏈接,雙方均維護一個客戶端和一個服務器端。在捕獲視頻信息後,根據用戶指定的參數對畫面作壓縮並傳輸。學習
接下來咱們分步驟講解本實驗。
先爲雙方的通訊設計 Server 類和 Client類,兩個類均繼承 threading.Thread
,只須要分別實現 __init__
、 __del__
和 run
方法,以後對象調用 .start()
方法便可在獨立線程中執行 run
方法中的內容。首先 Client
類須要存儲遠端的IP地址和端口,而 Server
類須要存儲本地服務器監聽的端口號。用戶還應當能夠指定通訊雙方使用的協議版本,即基於IPv4 仍是IPv6 的TCP鏈接。所以 Server
類的初始化須要傳入兩個參數(端口、版本), Client
類的初始化須要三個參數(遠端IP、端口、版本)。新建文件 vchat.py
,在其中定義基礎的兩個類以下。
from socket import * import threading class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() # TODO def run(self): print("server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote client success connected...") # TODO class Video_Client(threading.Thread): def __init__(self ,ip, port, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) def __del__(self) : self.sock.close() # TODO def run(self): print("client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("client connected...") # TODO
OpenCV 爲 Python 提供的接口很是簡單而且易於理解。捕獲視頻流的任務應當由 Client
類完成,下面完善 Client
的 run
函數。在下面的代碼中,咱們爲類添加了一個成員變量 cap
,它用來捕獲默認攝像頭的輸出。
class Video_Client(threading.Thread): def __init__(self ,ip, port, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) self.cap = cv2.VideoCapture(0) def __del__(self) : self.sock.close() self.cap.release() def run(self): print("client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() # TODO
已經捕獲到數據,接下來要發送字節流。首先咱們繼續編寫 Client
,爲其添加發送數據功能的實現。這裏只改動了 run
方法。在捕獲到幀後,咱們使用 pickle.dumps
方法對其打包,並用 sock.sendall
方法發送。注意發送過程當中咱們用 struct.pack
方法爲每批數據加了一個頭,用於接收方確認接受數據的長度。
def run(self): while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() data = pickle.dumps(frame) try: self.sock.sendall(struct.pack("L", len(data)) + data) except: break
下面編寫 Server
,在服務器端鏈接成功後,應當建立一個窗口用於顯示接收到的視頻。由於鏈接不必定建立成功,所以 cv.destroyAllWindows()
被放在一個 try..catch
塊中防止出現錯誤。在接收數據過程當中,咱們使用 payload_size
記錄當前從緩衝區讀入的數據長度,這個長度經過 struct.calcsize('L')
來讀取。使用該變量的意義在於緩衝區中讀出的數據可能不足一個幀,也可能由多個幀構成。爲了準確提取每一幀,咱們用 payload_size
區分幀的邊界。在從緩衝區讀出的數據流長度超過 payload_size
時,剩餘部分和下一次讀出的數據流合併,不足 payload_size
時將合併下一次讀取的數據流到當前幀中。在接收完完整的一幀後,顯示在建立的窗口中。同時咱們爲窗口建立一個鍵盤響應,當按下 Esc
或 q
鍵時退出程序。
class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() try: cv2.destroyAllWindows() except: pass def run(self): print("server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
如今的服務器和客戶端已經能夠運行,你能夠在代碼中建立一個 Client
類實例和一個 Server
類實例,並將IP地址設爲 127.0.0.1
,端口設爲任意合法的(0-65535)且不衝突的值,版本設爲IPv4。執行代碼等同於本身和本身通訊。若是網絡情況很差,你也許會發現本身和本身的通訊也有卡頓現象。爲了使畫面質量、延遲可以和現實網絡情況相匹配,咱們須要容許用戶指定通訊中畫面的質量,同時咱們的代碼應當自己具備壓縮數據的能力,以儘量利用帶寬。
當用戶指定使用低畫質通訊,咱們應當對原始數據作變換,最簡單的方式即將捕獲的每一幀按比例縮放,同時下降傳輸的幀速,在代碼中體現爲 resize
,該函數的第二個參數爲縮放中心,後兩個參數爲縮放比例,而且根據用戶指定的等級,再也不傳輸捕獲的每一幀,而是間隔幾幀傳輸一幀。爲了防止用戶指定的畫質過差,代碼中限制了最壞狀況下的縮放比例爲0.3,最大幀間隔爲3。此外,咱們在發送每一幀的數據前使用 zlib.compress
對其壓縮,儘可能下降帶寬負擔。
class Video_Client(threading.Thread): def __init__(self ,ip, port, level, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if level <= 3: self.interval = level else: self.interval = 3 self.fx = 1 / (self.interval + 1) if self.fx < 0.3: self.fx = 0.3 if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) self.cap = cv2.VideoCapture(0) def __del__(self) : self.sock.close() self.cap.release() def run(self): print("VEDIO client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("VEDIO client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx) data = pickle.dumps(sframe) zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION) try: self.sock.sendall(struct.pack("L", len(zdata)) + zdata) except: break for i in range(self.interval): self.cap.read()
服務器端最終代碼以下,增長了對接收到數據的解壓縮處理。
class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() try: cv2.destroyAllWindows() except: pass def run(self): print("VEDIO server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote VEDIO client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
在完成視頻通訊的基礎上,總體框架對於音頻通訊能夠直接挪用,只須要修改其中捕獲視頻/音頻的代碼和服務器解碼播放的部分。這裏咱們使用 PyAudio 庫處理音頻,在 Linux 下你也能夠選擇 sounddevice
。關於 sounddevice
這裏不作過多介紹,你能夠在這裏看到它最新版本的文檔。將 vchat.py
複製一份,重命名爲 achat.py
,簡單修改幾處,最終音頻捕獲、傳輸的完整代碼以下。我將上面代碼中的 Server
和 Client
分別加上 Video
和 Audio
前綴以區分,同時顯示給用戶的 print
輸出語句也作了必定修改,對於視頻加上 VIDEO
前綴,音頻加上 AUDIO
前綴。若是你對代碼中使用到的 PyAudio 提供的庫函數有所疑問,能夠在這裏找到相關的入門文檔及示例。
class Audio_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) self.p = pyaudio.PyAudio() self.stream = None def __del__(self): self.sock.close() if self.stream is not None: self.stream.stop_stream() self.stream.close() self.p.terminate() def run(self): print("AUDIO server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote AUDIO client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") self.stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer = CHUNK ) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) frame_data = data[:msg_size] data = data[msg_size:] frames = pickle.loads(frame_data) for frame in frames: self.stream.write(frame, CHUNK) class Audio_Client(threading.Thread): def __init__(self ,ip, port, version): threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = (ip, port) if version == 4: self.sock = socket(AF_INET, SOCK_STREAM) else: self.sock = socket(AF_INET6, SOCK_STREAM) self.p = pyaudio.PyAudio() self.stream = None def __del__(self) : self.sock.close() if self.stream is not None: self.stream.stop_stream() self.stream.close() self.p.terminate() def run(self): print("AUDIO client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("AUDIO client connected...") self.stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) while self.stream.is_active(): frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = self.stream.read(CHUNK) frames.append(data) senddata = pickle.dumps(frames) try: self.sock.sendall(struct.pack("L", len(senddata)) + senddata) except: break
至此咱們完成了 vchat.py 的編寫。
main.py
爲了提供用戶參數解析,代碼使用了 argparse
。你可能對此前幾個類中初始化方法的 self.setDaemon(True)
有疑惑。這個方法的調用使每一個線程在主線程結束以後自動退出,保證程序不會出現崩潰且沒法銷燬的狀況。在 main.py
中,咱們經過每隔1s作一次線程的保活檢查,若是視頻/音頻中出現阻塞/故障,主線程會終止。
import sys import time import argparse from vchat import Video_Server, Video_Client from achat import Audio_Server, Audio_Client parser = argparse.ArgumentParser() parser.add_argument('--host', type=str, default='127.0.0.1') parser.add_argument('--port', type=int, default=10087) parser.add_argument('--level', type=int, default=1) parser.add_argument('-v', '--version', type=int, default=4) args = parser.parse_args() IP = args.host PORT = args.port VERSION = args.version LEVEL = args.level if __name__ == '__main__': vclient = Video_Client(IP, PORT, LEVEL, VERSION) vserver = Video_Server(PORT, VERSION) aclient = Audio_Client(IP, PORT+1, VERSION) aserver = Audio_Server(PORT+1, VERSION) vclient.start() aclient.start() time.sleep(1) # make delay to start server vserver.start() aserver.start() while True: time.sleep(1) if not vserver.isAlive() or not vclient.isAlive(): print("Video connection lost...") sys.exit(0) if not aserver.isAlive() or not aclient.isAlive(): print("Audio connection lost...") sys.exit(0)
由於實驗樓的環境沒有提供攝像頭,所以咱們須要修改一下代碼,讓程序從一個本地視頻文件讀取,模擬攝像頭的訪問。將 Video_Client
中 self.cap = cv2.VideoCapture(0)
改成 self.cap = cv2.VideoCapture('test.mp4')
,即從本地視頻 test.mp4
中讀取。在修改完你的代碼後,你能夠經過如下命令下載 test.mp4
(該視頻文件是周杰倫《浪漫手機》的MV),並檢驗代碼。(請確保在ichat文件夾下!)
$ wget http://labfile.oss.aliyuncs.com/courses/671/test.mp4 $ python3 main.py
和上面命令同樣,在本機能夠經過 python3 main.py
來實驗本機和本機的視頻聊天,若是你有條件在同一局域網內的兩臺機器上實驗,則能夠將程序部署在兩臺機器上,並相互鏈接觀察效果。下面兩張圖爲本機上實驗截圖,有些狀況下 PyAudio 可能會提示一些警告,你能夠忽視它的提示。用戶也能夠指定 level
參數, level
越高,畫質越差, level
爲 0 爲原始畫面,在咱們的 main.py
中默認 level
爲 1。