Python3 實現簡易局域網視頻聊天工具

1、課程介紹

1. 來源

課程使用的操做系統爲 Ubuntu 16.04 ,OpenCV 版本爲 opencv-python 3.4.1.15 。python

你能夠在個人Github 上找到 Windows 系統和 Linux 系統對應的源代碼,此教程對應的版本是 v0.2。目前我正在開發的版本是 v0.3,新版本將容許使用不一樣IP協議的主機通訊,而且範圍再也不侷限於局域網內。這個工具最初是爲了經過IPv6節省聊天工具使用的流量而開發的。服務器

2. 內容簡介

  • 本實驗實現簡易的視頻通訊工具
  • 在視頻通訊的基礎上加入語音
  • 用戶能夠選擇通訊的質量,即畫質、停頓等參數
  • 支持IPv6

3. 實驗知識點

本課程項目完成過程當中將學習:網絡

  • Python 基於 OpenCV 對攝像頭信息的捕獲和壓縮
  • Python 關於 線程 和 socket 通訊的一些基礎技巧
  • Python 基於 PyAudio 對語音信息的捕獲和壓縮

其中將重點介紹 socket 傳輸過程當中對數據的壓縮和處理。app

4.實驗環境

  • python 3.5
  • opencv-python 3.4.1.15
  • numpy 1.14.5
  • PyAudio 0.2.11

2、環境搭建

經過如下命令可下載項目源碼,做爲參照對比完成下面詳細步驟的學習。框架

$ cd Code
$ wget https://labfile.oss.aliyuncs.com/courses/672/ichat.zip
$ unzip ichat.zip

如今開始下載環境依賴的包,確保在剛在解壓文件下的目錄裏運行。socket

$ cd ichat
$ sudo pip3 install numpy
$ sudo pip3 install opencv_python

這一步下載了咱們須要的opencv-python和numpy兩個包。ide

剩下的PyAudio,因爲本虛擬環境的部分問題,咱們單獨分開下載。函數

$ sudo apt-get install portaudio19-dev python-all-dev python3-all-dev
$ sudo pip3 install pyaudio==0.2.11

如今,咱們的實驗環境就搭好了。工具

3、實驗原理

實驗實現了簡易的視頻通訊工具,基於 OpenCV 和 PyAudio,使用 TCP 協議通訊,通訊雙方創建雙向 CS 鏈接,雙方均維護一個客戶端和一個服務器端。在捕獲視頻信息後,根據用戶指定的參數對畫面作壓縮並傳輸。學習

4、實驗步驟

接下來咱們分步驟講解本實驗。

4.1 實現雙向 C/S 鏈接

先爲雙方的通訊設計 Server 類和 Client類,兩個類均繼承 threading.Thread ,只須要分別實現 __init__ 、 __del__ 和 run 方法,以後對象調用 .start() 方法便可在獨立線程中執行 run 方法中的內容。首先 Client 類須要存儲遠端的IP地址和端口,而 Server 類須要存儲本地服務器監聽的端口號。用戶還應當能夠指定通訊雙方使用的協議版本,即基於IPv4 仍是IPv6 的TCP鏈接。所以 Server 類的初始化須要傳入兩個參數(端口、版本), Client 類的初始化須要三個參數(遠端IP、端口、版本)。新建文件 vchat.py ,在其中定義基礎的兩個類以下。

from socket import *
import threading
class Video_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
    def __del__(self):
        self.sock.close()
        # TODO
    def run(self):
        print("server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote client success connected...")
        # TODO

class Video_Client(threading.Thread):
    def __init__(self ,ip, port, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
    def __del__(self) :
        self.sock.close()
        # TODO
    def run(self):
        print("client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("client connected...")
        # TODO

4.2 實現攝像頭數據流捕獲

OpenCV 爲 Python 提供的接口很是簡單而且易於理解。捕獲視頻流的任務應當由 Client 類完成,下面完善 Client 的 run 函數。在下面的代碼中,咱們爲類添加了一個成員變量 cap ,它用來捕獲默認攝像頭的輸出。

class Video_Client(threading.Thread):
    def __init__(self ,ip, port, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
        self.cap = cv2.VideoCapture(0)
    def __del__(self) :
        self.sock.close()
        self.cap.release()
    def run(self):
        print("client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
        # TODO

4.3 發送捕獲到的數據到服務器

已經捕獲到數據,接下來要發送字節流。首先咱們繼續編寫 Client ,爲其添加發送數據功能的實現。這裏只改動了 run 方法。在捕獲到幀後,咱們使用 pickle.dumps 方法對其打包,並用 sock.sendall 方法發送。注意發送過程當中咱們用 struct.pack 方法爲每批數據加了一個頭,用於接收方確認接受數據的長度。

def run(self):
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            data = pickle.dumps(frame)
            try:
                self.sock.sendall(struct.pack("L", len(data)) + data)
            except:
                break

下面編寫 Server ,在服務器端鏈接成功後,應當建立一個窗口用於顯示接收到的視頻。由於鏈接不必定建立成功,所以 cv.destroyAllWindows() 被放在一個 try..catch 塊中防止出現錯誤。在接收數據過程當中,咱們使用 payload_size 記錄當前從緩衝區讀入的數據長度,這個長度經過 struct.calcsize('L') 來讀取。使用該變量的意義在於緩衝區中讀出的數據可能不足一個幀,也可能由多個幀構成。爲了準確提取每一幀,咱們用 payload_size 區分幀的邊界。在從緩衝區讀出的數據流長度超過 payload_size 時,剩餘部分和下一次讀出的數據流合併,不足 payload_size 時將合併下一次讀取的數據流到當前幀中。在接收完完整的一幀後,顯示在建立的窗口中。同時咱們爲窗口建立一個鍵盤響應,當按下 Esc 或 q 鍵時退出程序。

class Video_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
    def __del__(self):
        self.sock.close()
        try:
            cv2.destroyAllWindows()
        except:
            pass
    def run(self):
        print("server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
                break

4.4 視頻縮放和數據壓縮

如今的服務器和客戶端已經能夠運行,你能夠在代碼中建立一個 Client 類實例和一個 Server 類實例,並將IP地址設爲 127.0.0.1 ,端口設爲任意合法的(0-65535)且不衝突的值,版本設爲IPv4。執行代碼等同於本身和本身通訊。若是網絡情況很差,你也許會發現本身和本身的通訊也有卡頓現象。爲了使畫面質量、延遲可以和現實網絡情況相匹配,咱們須要容許用戶指定通訊中畫面的質量,同時咱們的代碼應當自己具備壓縮數據的能力,以儘量利用帶寬。

當用戶指定使用低畫質通訊,咱們應當對原始數據作變換,最簡單的方式即將捕獲的每一幀按比例縮放,同時下降傳輸的幀速,在代碼中體現爲 resize ,該函數的第二個參數爲縮放中心,後兩個參數爲縮放比例,而且根據用戶指定的等級,再也不傳輸捕獲的每一幀,而是間隔幾幀傳輸一幀。爲了防止用戶指定的畫質過差,代碼中限制了最壞狀況下的縮放比例爲0.3,最大幀間隔爲3。此外,咱們在發送每一幀的數據前使用 zlib.compress 對其壓縮,儘可能下降帶寬負擔。

class Video_Client(threading.Thread):
    def __init__(self ,ip, port, level, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if level <= 3:
            self.interval = level
        else:
            self.interval = 3
        self.fx = 1 / (self.interval + 1)
        if self.fx < 0.3:
            self.fx = 0.3
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
        self.cap = cv2.VideoCapture(0)
    def __del__(self) :
        self.sock.close()
        self.cap.release()
    def run(self):
        print("VEDIO client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("VEDIO client connected...")
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx)
            data = pickle.dumps(sframe)
            zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION)
            try:
                self.sock.sendall(struct.pack("L", len(zdata)) + zdata)
            except:
                break
            for i in range(self.interval):
                self.cap.read()

服務器端最終代碼以下,增長了對接收到數據的解壓縮處理。

class Video_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
    def __del__(self):
        self.sock.close()
        try:
            cv2.destroyAllWindows()
        except:
            pass
    def run(self):
        print("VEDIO server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote VEDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
                break

4.5 加入音頻的捕獲和傳輸

在完成視頻通訊的基礎上,總體框架對於音頻通訊能夠直接挪用,只須要修改其中捕獲視頻/音頻的代碼和服務器解碼播放的部分。這裏咱們使用 PyAudio 庫處理音頻,在 Linux 下你也能夠選擇 sounddevice。關於 sounddevice 這裏不作過多介紹,你能夠在這裏看到它最新版本的文檔。將 vchat.py 複製一份,重命名爲 achat.py ,簡單修改幾處,最終音頻捕獲、傳輸的完整代碼以下。我將上面代碼中的 Server 和 Client 分別加上 Video 和 Audio 前綴以區分,同時顯示給用戶的 print 輸出語句也作了必定修改,對於視頻加上 VIDEO 前綴,音頻加上 AUDIO 前綴。若是你對代碼中使用到的 PyAudio 提供的庫函數有所疑問,能夠在這裏找到相關的入門文檔及示例。

class Audio_Server(threading.Thread):
    def __init__(self, port, version) :
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = ('', port)
        if version == 4:
            self.sock = socket(AF_INET ,SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6 ,SOCK_STREAM)
        self.p = pyaudio.PyAudio()
        self.stream = None
    def __del__(self):
        self.sock.close()
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
        self.p.terminate()
    def run(self):
        print("AUDIO server starts...")
        self.sock.bind(self.ADDR)
        self.sock.listen(1)
        conn, addr = self.sock.accept()
        print("remote AUDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        self.stream = self.p.open(format=FORMAT,
                                  channels=CHANNELS,
                                  rate=RATE,
                                  output=True,
                                  frames_per_buffer = CHUNK
                                  )
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            frame_data = data[:msg_size]
            data = data[msg_size:]
            frames = pickle.loads(frame_data)
            for frame in frames:
                self.stream.write(frame, CHUNK)

class Audio_Client(threading.Thread):
    def __init__(self ,ip, port, version):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.ADDR = (ip, port)
        if version == 4:
            self.sock = socket(AF_INET, SOCK_STREAM)
        else:
            self.sock = socket(AF_INET6, SOCK_STREAM)
        self.p = pyaudio.PyAudio()
        self.stream = None
    def __del__(self) :
        self.sock.close()
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
        self.p.terminate()
    def run(self):
        print("AUDIO client starts...")
        while True:
            try:
                self.sock.connect(self.ADDR)
                break
            except:
                time.sleep(3)
                continue
        print("AUDIO client connected...")
        self.stream = self.p.open(format=FORMAT, 
                             channels=CHANNELS,
                             rate=RATE,
                             input=True,
                             frames_per_buffer=CHUNK)
        while self.stream.is_active():
            frames = []
            for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
                data = self.stream.read(CHUNK)
                frames.append(data)
            senddata = pickle.dumps(frames)
            try:
                self.sock.sendall(struct.pack("L", len(senddata)) + senddata)
            except:
                break

至此咱們完成了 vchat.py 的編寫。

4.6 編寫程序入口 main.py

爲了提供用戶參數解析,代碼使用了 argparse 。你可能對此前幾個類中初始化方法的 self.setDaemon(True) 有疑惑。這個方法的調用使每一個線程在主線程結束以後自動退出,保證程序不會出現崩潰且沒法銷燬的狀況。在 main.py 中,咱們經過每隔1s作一次線程的保活檢查,若是視頻/音頻中出現阻塞/故障,主線程會終止。

import sys
import time
import argparse
from vchat import Video_Server, Video_Client
from achat import Audio_Server, Audio_Client

parser = argparse.ArgumentParser()

parser.add_argument('--host', type=str, default='127.0.0.1')
parser.add_argument('--port', type=int, default=10087)
parser.add_argument('--level', type=int, default=1)
parser.add_argument('-v', '--version', type=int, default=4)

args = parser.parse_args()

IP = args.host
PORT = args.port
VERSION = args.version
LEVEL = args.level

if __name__ == '__main__':
    vclient = Video_Client(IP, PORT, LEVEL, VERSION)
    vserver = Video_Server(PORT, VERSION)
    aclient = Audio_Client(IP, PORT+1, VERSION)
    aserver = Audio_Server(PORT+1, VERSION)
    vclient.start()
    aclient.start()
    time.sleep(1)    # make delay to start server
    vserver.start()
    aserver.start()
    while True:
        time.sleep(1)
        if not vserver.isAlive() or not vclient.isAlive():
            print("Video connection lost...")
            sys.exit(0)
        if not aserver.isAlive() or not aclient.isAlive():
            print("Audio connection lost...")
            sys.exit(0)

4.7 運行狀況

由於實驗樓的環境沒有提供攝像頭,所以咱們須要修改一下代碼,讓程序從一個本地視頻文件讀取,模擬攝像頭的訪問。將 Video_Client 中 self.cap = cv2.VideoCapture(0) 改成 self.cap = cv2.VideoCapture('test.mp4') ,即從本地視頻 test.mp4 中讀取。在修改完你的代碼後,你能夠經過如下命令下載 test.mp4 (該視頻文件是周杰倫《浪漫手機》的MV),並檢驗代碼。(請確保在ichat文件夾下!)

$ wget http://labfile.oss.aliyuncs.com/courses/671/test.mp4
$ python3 main.py

和上面命令同樣,在本機能夠經過 python3 main.py 來實驗本機和本機的視頻聊天,若是你有條件在同一局域網內的兩臺機器上實驗,則能夠將程序部署在兩臺機器上,並相互鏈接觀察效果。下面兩張圖爲本機上實驗截圖,有些狀況下 PyAudio 可能會提示一些警告,你能夠忽視它的提示。用戶也能夠指定 level 參數, level 越高,畫質越差, level 爲 0 爲原始畫面,在咱們的 main.py 中默認 level 爲 1。

相關文章
相關標籤/搜索