36 - 網絡編程-TCP編程

1 概述

        自從互聯網誕生以來,如今基本上全部的程序都是網絡程序,不多有單機版的程序了。
        計算機網絡就是把各個計算機鏈接到一塊兒,讓網絡中的計算機能夠互相通訊。網絡編程就是如何在程序中實現兩臺計算機的通訊。
        舉個例子,當你使用瀏覽器訪問新浪網時,你的計算機就和新浪的某臺服務器經過互聯網鏈接起來了,而後,新浪的服務器把網頁內容做爲數據經過互聯網傳輸到你的電腦上。
        因爲你的電腦上可能不止瀏覽器,還有QQ、微信、郵件客戶端等,不一樣的程序鏈接的別的計算機也會不一樣,因此,更確切地說,網絡通訊是兩臺計算機上的兩個進程之間的通訊。好比,瀏覽器進程和新浪服務器上的某個Web服務進程在通訊,而QQ進程是和騰訊的某個服務器上的某個進程在通訊。
        網絡編程對全部開發語言都是同樣的,Python也不例外。用Python進行網絡編程,就是在Python程序自己這個進程內,鏈接別的服務器進程的通訊端口進行通訊。shell

2 TCP/IP協議基礎

        計算機爲了聯網,就必須規定通訊協議,早期的計算機網絡,都是由各廠商本身規定一套協議,IBM、Apple和Microsoft都有各自的網絡協議,互不兼容,這就比如一羣人有的說英語,有的說中文,有的說德語,說同一種語言的人能夠交流,不一樣的語言之間就不行了。
        後來爲了打破這個局面,出現了一套全球通用協議族,叫作互聯網協議,互聯網協議包含了上百種協議標準,可是最重要的兩個協議是TCP和IP協議,因此,你們把互聯網的協議簡稱TCP/IP協議。
        通訊的時候,雙方必須知道對方的標識,比如發郵件必須知道對方的郵件地址。互聯網上每一個計算機的惟一標識就是IP地址,是由4個點分十進制數組成(例如:12.21.21.41)。
        下面是TCP/IP協議分層:
tcp/ip
        TCP/UDP協議則是創建在IP協議之上的。TCP協議負責在兩臺計算機之間創建可靠鏈接,保證數據包按順序到達。TCP協議會經過握手創建鏈接,而後,對每一個IP包編號,確保對方按順序收到,若是包丟掉了,就自動重發。相對於TCP(面向鏈接)來講,UDP則是面向無鏈接的協議,使用UDP協議時,不須要創建鏈接,只須要知道對方的IP地址和端口號,就能夠直接發數據包。可是,能不能到達就不知道了。雖然用UDP傳輸數據不可靠,但它的優勢是和TCP比,速度快,對於不要求可靠到達的數據,就可使用UDP協議。
        許多經常使用的更高級的協議都是創建在TCP協議基礎上的,好比用於瀏覽器的HTTP協議、發送郵件的SMTP協議等。
        一個IP包除了包含要傳輸的數據外,還包含源IP地址和目標IP地址,源端口和目標端口。那麼端口有什麼做用呢?在兩臺計算機通訊時,只發IP地址是不夠的,由於同一臺計算機上跑着多個網絡程序。一個IP包來了以後,究竟是交給瀏覽器仍是QQ,就須要端口號來區分。每一個網絡程序都向操做系統申請惟一的端口號,這樣,兩個進程在兩臺計算機之間創建網絡鏈接就須要各自的IP地址和各自的端口號。編程

3 TCP編程

        Socket稱爲安全套接字,是網絡編程的一個抽象概念。一般咱們用一個Socket表示'打開了一個網絡連接',而打開一個Socket須要知道目標計算機的IP地址和端口號,再指定協議類型便可。
        大多數鏈接都是可靠的TCP鏈接。建立TCP鏈接時,主動發起鏈接的叫客戶端,被動響應鏈接的叫服務器。
        socket庫是一個底層的用於網絡通訊的庫,使用它咱們能夠便捷的進行網絡交互的開發,下面以socket庫爲例,想要使用須要先引入import socketjson

3.1 通訊流程

咱們先來了解一下,python的socket的通信流程:
tcp_socketwindows

服務端:數組

  1. 建立Socket對象
  2. 綁定IP地址Address和端口Port,使用bind方法,IPv4地址爲一個二元組('IP',Port),一個TCP端口只能被綁定一次
  3. 開始監聽,將在指定的IP的端口上監聽。listen方法
  4. 獲取用於傳輸數據的Socket對象,accept方法。
  5. 接受數據,recv方法,使用緩衝區接受數據
  6. 發送數據,send方法,類型爲bytes
  7. 關閉鏈接瀏覽器

    客戶端關閉鏈接時,服務端只須要關閉與之相連的socket便可,服務端不用關閉,由於還有其餘客戶端會鏈接緩存

客戶端:安全

  1. 建立Socket對象
  2. 鏈接服務端。connect方法
  3. 發送數據,send方法,類型爲bytes
  4. 接受數據,recv方法,使用緩衝區接受數據
  5. 關閉鏈接

3.2 構建服務端

        服務端想要提供服務,首先須要綁定IP地址,而後啓動服務,監聽端口等待客戶端的鏈接,一旦有客戶端鏈接訪問,那麼接下來就能夠接受客戶端發送的數據了。根據上圖,以及創建服務端的流程,我門來捋一下服務端的邏輯到代碼的步驟:服務器

  1. 建立服務端
socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# socke.AF_INET 指的是使用 IPv4
# socket.SOCK_STREAM 指定使用面向流的TCP協議
  1. 綁定IP地址和端口。
socket.bind(('127.0.0.1',999))  
# 小於1024的端口只有管理員才能夠指定
  1. 開始監聽端口
socket.listen()
  1. 客戶端鏈接(阻塞)
sock, client_addr = socket.accept() 
# 返回二元組,socket鏈接和客戶端的IP及Port元祖
  1. 接受數據(阻塞)
data = sock.recv(1024) 
# 接收1024個字節的數據,通常是2的倍數,bytes格式
  1. 發送數據
sock.send('data'.encode()) # bytes格式
  1. 關閉鏈接
sock.close()

完成的代碼:

import socket
socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
socket.bind(('127.0.0.1',999))
socket.listen()
sock, client_info = socket.accept()
data = sock.recv(1024)
sock.send(data)
sock.close()    # 關閉客戶端socket鏈接
socket.close()  # 關閉服務器

3.3 構建客戶端

        根據TCP創建鏈接的三次握手機制,咱們知道客戶端想要鏈接服務端,首先建立TCP鏈接,使用某個端口號,鏈接服務端,而後發送/接受數據,而後關閉鏈接。根據上圖,以及創建客戶端的流程,咱們來捋一下客戶端的邏輯到代碼的步驟:

  1. 建立客戶端
socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 
# 默認就是socket.AF_INET,socket.SOCK_STREAM,因此TCP時,能夠直接socket.socket()
  1. 鏈接服務的
socket.connect('127.0.0.1',999)
  1. 發送數據
socket.send('data'.encode())
  1. 接受數據
data = socket.recv(1024)
  1. 關閉鏈接
socket.close()

完整的代碼:

import socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('127.0.0.1', 999))
socket.send(b'data')
data = socket.recv(1024)
print(data)
socket.close()   # 關閉客戶端socket鏈接

3.4 經常使用方法

在初始化時,socket方法提供了不一樣的參數,用於指定不一樣的連接類型以及不一樣的IP地址類型。
IP協議相關:

  1. AF_INET:IPV4
  2. AF_INET: IPV6
  3. AF_UNIX: Unix Domain Socket(windows沒有)

Socket類型:

  1. SOCK_STREM: 面向鏈接的套接字。TCP協議
  2. SOCK_DGRAM: 無鏈接的數據報文套接字。UDP協議

默認狀況下 socket.socket()的參數爲AF_INET,SOCK_STREM,因此若是須要的是IPv4的TCP鏈接,能夠直接實例化便可

服務器端套接字:

函數 描述
s.bind() 綁定地址(host,port)到套接字, 在AF_INET下,以元組(host,port)的形式表示地址。
s.listen() 開始TCP監聽。backlog指定在拒絕鏈接以前,操做系統能夠掛起的最大鏈接數量。該值至少爲1,大部分應用程序設爲5就能夠了。
s.accept() 被動接受TCP客戶端鏈接,(阻塞式)等待鏈接的到來

客戶端套接字:

函數 描述
s.connect() 主動初始化TCP服務器鏈接,。通常address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。
s.connect_ex() connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常

公共用途的套接字函數:

函數 描述
s.recv() 接收TCP數據,數據以字符串形式返回,bufsize指定要接收的最大數據量。flag提供有關消息的其餘信息,一般能夠忽略。
s.send() 發送TCP數據,將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。
s.sendall() 完整發送TCP數據,完整發送TCP數據。將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常。
s.recvfrom() 接收UDP數據,與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。
s.sendto() 發送UDP數據,將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。
s.close() 關閉套接字
s.getpeername() 返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port)。
s.getsockname() 返回套接字本身的地址。一般是一個元組(ipaddr,port)
s.setsockopt(level,optname,value) 設置給定套接字選項的值。
s.getsockopt(level,optname[.buflen]) 返回套接字選項的值。
s.settimeout(timeout) 設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如connect())
s.gettimeout() 返回當前超時期的值,單位是秒,若是沒有設置超時期,則返回None。
s.fileno() 返回套接字的文件描述符。
s.setblocking(flag) 若是flag爲0,則將套接字設爲非阻塞模式,不然將套接字設爲阻塞模式(默認值)。非阻塞模式下,若是調用recv()沒有發現任何數據,或send()調用沒法當即發送數據,那麼將引發socket.error異常。
s.makefile() 建立一個與該套接字相關連的文件

3.4.1 makefile方法

這裏單獨把makefile方法抽出來,是由於它可讓咱們用操做文件的方式來操做socket。makefile的用法以下:

makefile(self, mode="r", buffering=None, *,encoding=None, errors=None, newline=None):

        看這些參數是否是很眼熟?沒錯,和open函數的參數差很少是相同的,默認狀況下模式爲r,若是是socket的話,咱們知道能夠接受數據,也能夠發送數據,對應的文件上的話,就是能夠讀取也能夠寫入,因此模式應該爲'rw'

makefile的mode模式,只有'rw',沒有'r+',這點和文件打開方式不一樣。

import socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:  # 連接循環
    print('wait for connect')
    conn, addr = server.accept()
    print('client connect', addr)
    f = conn.makefile(mode='rw')   # 建立一個 類file對象
    while True:
        try:  # Windows下捕捉客戶端異常關閉鏈接
            print('~~~~~~~~~')
            client_msg = f.readline()   # 從緩衝區中讀取數據
            print(client_msg)
            if not client_msg: break  # Linux下處理客戶端異常退出問題
            print('client msg :', client_msg)
            f.write(client_msg.upper())  # 向緩衝區寫入數據
            f.flush()
        except (ConnectionResetError, Exception):  # except能夠同時指定多個異常
            print('1')
            break
    conn.close()
server.close()

直接用不太好用,使用read方法時,因爲沒法知道要讀取多少字節,因此會有各類問題,能夠引用封裝,將要發送的數據總大小,按照固定4個字節發到服務端,告訴服務端後面的數據有多少,而後服務端動態指定read的字節數便可。

3.5 socket交互

上面寫的代碼只能通信一次,就結束鏈接了。正經的socket交互是那種有來有往的,並非這樣這種,因此咱們須要進行修改。

3.4.1 通信循環及客戶端發空消息時的問題

拋出問題:

  1. 通信不該該是單次的,應該至少是屢次的
  2. 若是咱們發送的消息爲空的時候,就會卡住,服務端沒法接受,客戶端沒法繼續發送

針對問題作以下改進:

服務端:增長循環,完成通訊循環,而且把客戶端發來的消息轉換成大寫的並返回。

import socket

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
print('wait for connect')
conn,addr = server.accept()
print('client connect',addr)
while True:    #循環的接受消息
    client_msg = conn.recv(1024)
    print('client msg :', client_msg)
    conn.send(client_msg.upper())

conn.close()
server.close()

客戶端:增長循環,完成通訊循環,而且發送的消息由用戶來輸入,當輸入爲空的時候,繼續循環。

import socket

client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:    # 通訊循環
    msg = input('>>:').strip()
    if not msg:continue        # 當用戶輸入爲空的時候,繼續循環
    client.send(msg.encode('utf-8'))
    server_msg = client.recv(1024)
    print(server_msg.decode('utf-8'))
client.close()

3.4.2 連接循環及客戶端強制退出時的問題

拋出問題:

  1. 當客戶端異常關閉一個連接的時候,服務端也會產生異常
    • windows下會異常退出(因爲tcp是雙向連接的,客戶端異常退出,那麼服務端就不能繼續循環的收發消息了)
    • Linux下會進入死循環(收到了空消息)
  2. 當一個客戶端鏈接斷開,服務端應該能夠繼續接受其它客戶端發來的消息

因爲問題集中在服務端,因此對服務端作以下改進:

  1. 添加連接循環,當一個鏈關閉時,能夠繼續接受其餘連接。
  2. 添加異常處理,當客戶端異常關閉時,主動的關閉服務端的連接。
import socket

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
while True:     #連接循環
    print('wait for connect')
    conn,addr = server.accept()
    print('client connect',addr)
    while True:
        try:       #Windows下捕捉客戶端異常關閉鏈接
            client_msg = conn.recv(1024)
            if not client_msg:break     #Linux下處理客戶端異常退出問題
            print('client msg :', client_msg)
            conn.send(client_msg.upper())
        except (ConnectionResetError,Exception):   #except能夠同時指定多個異常
            break
    conn.close()
server.close()

客戶端異常關閉時,服務端的異常爲:ConnectionResetError,咱們能夠經過捕捉其,來控制服務端的推出,也可使用 Exception(通用)異常來捕捉。

3.4.3 模擬遠程執行命令

利用socket,遠程執行命令,並返回,模擬ssh的效果

  1. 執行命令使用subprocess模塊的Popen和PIPE
  2. 注意subprocess的Popen模塊執行結果就是bytes格式的str,因此不用轉換便可直接發送

以上需求都針對服務端,那麼對服務端作以下修改

import socket
from subprocess import Popen,PIPE

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# server.bind(('192.168.56.200',8080))
server.bind(('127.0.0.1',8080))
server.listen(5)
while True:
    print('wait for connect')
    conn,addr = server.accept()
    print('client connect',addr)
    while True:
        try:
            cmd = conn.recv(1024).strip()
            if not cmd:break
            p = Popen(cmd.decode('utf-8'),shell=True,stdout=PIPE,stderr=PIPE)
            stdout,stderr = p.communicate()   #執行的結果就是bytes格式的string
            if stderr:
                conn.send(stderr)
            else:
                conn.send(stdout)
        except (ConnectionResetError,Exception):
            break
    conn.close()
server.close()

3.6 粘包問題

        因爲咱們在接受和發送數據的時候,都指定了每次接收1024個字節的數據,而發送的數據咱們是不可估量的,若是發送的時候超過1024字節,那麼在接收端就沒法一次收取完畢,這些數據會存放在操做系統緩存中,那麼下次再接收1024字節的數據的時候,會從緩存中繼續讀取,那麼就會發生粘包現象。
        所謂粘包問題主要仍是由於接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所形成的。
只有TCP有粘包現象,UDP永遠不會粘包

  1. UDP是面向報文的,發送方的UDP對應用層交下來的報文,不合並,不拆分,只是在其上面加上首部後就交給了下面的網絡層,也就是說不管應用層交給UDP多長的報文,它通通發送,一次發送一個。而對接收方,接到後直接去除首部,交給上面的應用層就完成任務了。所以,它須要應用層控制報文的大小
  2. TCP是面向字節流的,它把上面應用層交下來的數據當作無結構的字節流來發送,能夠想象成流水形式的,發送方TCP會將數據放入「蓄水池」(緩存區),等到能夠發送的時候就發送,不能發送就等着,TCP會根據當前網絡的擁塞狀態來肯定每一個報文段的大小。

不是server端直接發送,client端直接接收

  1. 服務端
    • 應用程序是運行在用戶態的,發送數據的時候,須要去調用物理網卡,而這個操做是不準容許的,必須預先將運行狀態切換爲內核態才能夠操做網卡發送數據,因此引入操做系統緩存的概念。
    • 應用程序把須要進行系統調用(用戶態-->內核態的切換)的指令放入操做系統緩存,而後由操做系通通一去執行。
  2. 客戶端
    • 操做系統把從網卡接收到的數據存入操做系統緩存中去,供應用程序讀取
    • 應用程序直接從操做系統緩存中將數據讀出,而後進行處理

整個過程如圖:
nianbao

發生黏包的本質問題是對端不知道咱們發送數據的總長度,若是可否讓對方提早知道,那麼就不會發生粘包現象。根據TCP報文的格式獲得啓發:

  1. 發送真正的數據前,須要預先發送本次傳送的報文大小(增長報頭)
  2. 報頭的長度必須是固定的

3.6.1 struct模塊

        若是咱們要預先傳遞數據的大小(int型),那麼就須要把它看成數據傳輸,當服務端收到之後,就知道後續的數據大小了,那麼一次傳輸的數據到底佔多少字節呢,Python的struct模塊能夠幫助咱們實現這個過程,當傳遞諸如int、char之類的基本數據的時候,struct提供了一種機制將這些特定的結構體類型打包成二進制流的字符串而後再網絡傳輸,接收端也應該能夠經過struct模塊進行解包還原出原始的結構體數據。

  • struct.pack() 打包
struct.pack('i',int)
# i表示把數字用4個字節進行表示,這樣的話就能夠表示2的32次方的數字,已經知足需求
# 後面的int表示要打包的數字(要發送的報文長度)
# 經過struct.pack 會獲得bytes格式的數據,能夠直接進行發送
  • struct.unpack() 解包
struct.unpack('i',obj)
# obj表示收取到數據
# 會返回一個元組,元組的第一個元素爲對方傳過來的報文長度
# 能夠複製給一個變量來指定接收的報文長度

更多的用法需自行查找struct模塊的官方文檔。

3.6.2 經過struct傳遞包頭解決粘包問題

# 服務端 
#!/usr/bin/env python
# Author:Lee Sir 
#_*_ coding:utf-8 _*_ 

import socket 
from subprocess import Popen,PIPE 
import struct 

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 
server.bind(('127.0.0.1',8000)) 
server.listen(5) 

while True: 
    print('等待鏈接......') 
    conn,addr = server.accept() 
    print('客戶端地址爲:',addr) 
        while True: 
            try: 
                cmd_bytes = conn.recv(1024) 
                if not cmd_bytes:continue 
                cmd_str = cmd_bytes.decode('utf-8') 
                print('執行的命令是:',cmd_str) 

                #執行命令 
                p = Popen(cmd_str,shell=True,stdout=PIPE,stderr=PIPE) 
                stdout,stderr = p.communicate() 

                #返回的數據 
                if stderr: 
                    send_data = stderr 
                else: 
                    send_data = stdout 

                #構建報頭併發送報頭 
                conn.send(struct.pack('i',len(send_data))) 

                #發送數據 
                conn.send(send_data) 
            except Exception: 
                break

客戶端

#!/usr/bin/env python 
# Author:Lee Sir 
#_*_ coding:utf-8 _*_ 

import socket 
import struct 

client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8000)) 

while True: 
    msg = input('Please input msg: ') 
    if not msg:continue 
    client.send(msg.encode('utf-8')) 

    #接收報頭,服務端使用i模式,因此固定是4個字節 
    server_data_head = client.recv(4) 
    server_data_len = struct.unpack('i',server_data_head)[0] 

    #根據傳遞的報頭長度接收報文 
    server_data = client.recv(server_data_len) 
    print(server_data.decode('gbk'))

3.6.3 大併發時的問題

當數據量比較大以及須要額外其餘數據的場合下,以上的解決方案就有問題

  1. 數據量很是大,上百T,在打包的時候有可能struct.pack的i模式沒法知足需求,由於只能打長度爲2的32次方的數據,雖然可使用Q模式,支持2的64次方,可是也不能準確的預測是否知足數據的最大長度,另外客戶端直接接受那麼大的數據就顯得很是笨拙,也很吃力
  2. 在下載的場景下,咱們可能須要的數據還有文件名、以及hash值

針對上面的問題有如下解決方案:

  1. 客戶端接收的時候分段接收
  2. 定義字典記錄報文的長度,以及其餘需求:好比filename,hash值等其餘信息

服務端

#!/usr/bin/env python
# Author:Lee Sir
#_*_ coding:utf-8 _*_

import socket
from subprocess import Popen,PIPE
import struct
import json

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

while True:
    print('等待鏈接......')
    conn,addr = server.accept()
    print('客戶端地址爲:',addr)
    while True:
        try:
            cmd_bytes = conn.recv(1024)
            if not cmd_bytes:continue
            cmd_str = cmd_bytes.decode('utf-8')
            print('執行的命令是:',cmd_str)

            #執行命令
            p = Popen(cmd_str,shell=True,stdout=PIPE,stderr=PIPE)
            stdout,stderr = p.communicate()

            #返回的數據
            if stderr:
                send_data = stderr
            else:
                send_data = stdout

            #建立報頭內容及獲取包頭長度
            file_dict = {'filename':None,'hash':None,'size':len(send_data)}
            file_json = json.dumps(file_dict).encode('utf-8')
            file_json_len = len(file_json)

            #構建報頭
            file_head = struct.pack('i',file_json_len)

            #發送報頭長度
            conn.send(file_head)

            #發送報頭
            conn.send(file_json)

            #發送數據
            conn.send(send_data)

        except Exception:
            break

客戶端:

#!/usr/bin/env python
# Author:Lee Sir

import socket
import struct
import json

client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg = input('Please input msg: ')
    if not msg:continue
    client.send(msg.encode('utf-8'))

    #接收報頭,服務端使用i模式,因此固定是4個字節
    server_file_head = client.recv(4)
    server_file_len = struct.unpack('i',server_file_head)[0]

    #接收報文頭部信息
    server_head_file = client.recv(server_file_len)

    #報文頭部信息
    server_head =  json.loads(server_head_file.decode('gbk'))

    #獲取報文的頭部信息
    server_file_name = server_head['filename']
    server_file_hash = server_head['hash']
    server_file_size = server_head['size']

    #根據傳遞的報頭長度分段接收報文
    recv_len = 0
    server_data = b''
    while recv_len < server_file_size:
        recv_data = client.recv(1024)
        server_data += recv_data
        recv_len += len(recv_data)

    print(server_data.decode('gbk'))

3.6 聊天室

下面咱們來寫一個小項目,聊天室,客戶端發送的消息須要轉發給全部已在線的客戶端,下面是實現方法:

3.6.1 聊天室之函數實現

服務端代碼:

import socket
import threading


def recv(s: socket.socket, clients, lock):
    addr = s.getpeername()   # 獲取對端IP地址

    # 通訊循環
    while True:
        try:
            data = s.recv(1024)
            print(data)
            if not data: break

            # 羣發消息,須要加鎖,防止在遍歷的同時,客戶端斷開鏈接時,觸發字典修改操做
            with lock:
                for conn in clients.values():    
                    conn.send('{}:{} {}'.format(*addr,data.decode()).encode()) 
        except (ConnectionResetError, OSError):   # 當客戶端斷開鏈接時
            s.close()

            # 在已鏈接列表中刪除關閉的鏈接
            with lock:
                clients.pop(addr)  
            break

def accept(server: socket.socket, clients, lock):
     # 鏈接循環等待客戶端鏈接
    while True:
        conn, addr = server.accept() 
        print('{} is comming'.format(addr))
        with lock:
            clients[addr] = conn
        threading.Thread(target=recv, name='recv', args=(conn, clients, lock)).start()  # 啓動鏈接線程


if __name__ == '__main__':

    # 存放全部client列表,用於消息羣發
    clients = {}  

    # 建立鎖文件,在修改clients時加鎖
    lock = threading.Lock()   

    server = socket.socket()
    server.bind(('127.0.0.1', 9999))
    server.listen()

    print('start Server!!!')
    
    # 啓動accept線程
    threading.Thread(target=accept, name='accept', args=(server, clients, lock), daemon=True).start()   
    while True:
        cmd = input('>>>>').strip().lower()
        if cmd == 'quit':
            break
        else:
            print(threading.enumerate())

    server.close()

客戶端代碼:

import socket
import threading

def recvdata(s: socket.socket, event: threading.Event):
    while True:
        try:
            data = s.recv(1024)
            print(data.decode())
       except (ConnectionResetError, OSError):
            event.set()   # 若是服務端斷開鏈接,觸發事件
            break

if __name__ == '__main__':
    event = threading.Event()
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1', 9999))

    # 啓動接受線程
    threading.Thread(target=recvdata, name='recv', args=(client, event)).start()

    # 通信循環,當服務端斷開鏈接時,結束
    while not event.is_set():  

        msg = input('>>:').strip()
        if not msg: continue 
        if msg.upper() == 'quit':
            break
        client.send(msg.encode('utf-8'))
    print('服務端斷開')
    client.close()

3.6.2 聊天室之類實現

服務端:

import socket
import threading
import datetime
import logging

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class ChatTcpServer:

    """
    self.ip: 服務端地址
    self.port:服務端端口
    self.socket:建立一個socket對象,用於socket通訊
    self.event:建立一個事件對象,用於控制連接循環
    self.clients:記錄當前已鏈接的客戶端
    self.lock:用於多線程添加修改clients對象時的鎖
    """

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.socket = socket.socket()
        self.event = threading.Event()
        self.clients = {}
        self.lock = threading.Lock()

    def start(self):
        self.socket.bind((self.ip, self.port))
        self.socket.listen()
        threading.Thread(target=self.accept, name='accept', daemon=True).start()
        logging.info('ChatServer Starting!!!')

    def accept(self):
        while not self.event.is_set():
            conn, client_addr = self.socket.accept()

            # 把鏈接的客戶端保存,用於廣播消息
            with self.lock:
                self.clients[client_addr] = conn
            logging.info('{}:{} is comming'.format(*client_addr))
            threading.Thread(target=self.recv, name='recv', args=(conn, client_addr), daemon=True).start()

    def recv(self, sock, client_addr):
        while True:
            try:
                data = sock.recv(1024)

            # windows 代碼客戶端主動關閉時,不會發送b'',服務端會直接異常。這裏添加異常捕捉,當客戶端強制關閉時,刪除socket
            except (ConnectionResetError, OSError):
                with self.lock:
                    self.clients.pop(client_addr)
                logging.info('{}:{} is down'.format(*client_addr, ))
                break

            # 某些客戶端在強制關閉時會發送b'',這裏添加相關判斷
            if data == b'quit' or data == b'':
                with self.lock:
                    self.clients.pop(client_addr)
                logging.info('{}:{} is down'.format(*client_addr, ))
                break

            # 日誌及消息信息
            logging.info('{}:{} {}'.format(*client_addr, data.decode()))
            msg = '{} {}:{} {}'.format(datetime.datetime.now(), *client_addr, data.decode()).encode()

            # 廣播發送消息
            with self.lock:
                for client in self.clients.values():
                    client.send(msg)

    def stop(self):
        self.event.set()

        # 關閉全部還在存活的client鏈接
        with self.lock:
            for client in self.clients.values():
                client.close()
        self.socket.close()


def main():
    cts = ChatTcpServer('127.0.0.1', 9999)
    cts.start()

    while True:
        cmd = input('>>>').strip()
        if cmd.lower() == 'quit':
            cts.stop()
            break
        else:
            print(threading.enumerate())


if __name__ == '__main__':
    main()

客戶端:

import socket
import threading
import datetime
import logging

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class ChatTCPClient:

    """
    self.ip: 服務端地址
    self.port:服務端端口
    self.socket:建立一個socket對象,用於socket通訊
    self.event:建立一個事件對象,用於控制連接循環
    """

    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.socket = socket.socket()
        self.event = threading.Event()

    def connect(self):
        self.socket.connect((self.ip, self.port))
        threading.Thread(target=self.recv, name='recv',daemon=True).start()

    def recv(self):
        while not self.event.is_set():

            # 某些服務端強制關閉時,會出b'',這裏進行判斷
            try:
                data = self.socket.recv(1024)
                if data == b'':
                    self.event.set()
                    logging.info('{}:{} is down'.format(self.ip, self.port))
                    break
                logging.info(data.decode())

            # 有些服務端在關閉時不會觸發b'',這裏會直接提示異常,這裏進行捕捉
            except (ConnectionResetError,OSError):
                self.event.set()
                logging.info('{}:{} is down'.format(self.ip, self.port))

    def send(self, msg):
        self.socket.send(msg.encode())

    def stop(self):
        self.socket.close()


if __name__ == '__main__':
    ctc = ChatTCPClient('127.0.0.1', 9999)
    ctc.connect()

    while True:
        info = input('>>>>:').strip()
        if not info: continue
        if info.lower() == 'quit':
            logging.info('bye bye')
            ctc.stop()
            break
        if not ctc.event.is_set():
            ctc.send(info)
        else:
            logging.info('Server is down...')
            break
相關文章
相關標籤/搜索