python基礎(30):黏包、socket的其餘方法

1. 黏包

1.1 黏包現象

讓咱們基於tcp先製做一個遠程執行命令的程序(命令ls -l ; lllllll ; pwd)html

同時執行多條命令以後,獲得的結果極可能只有一部分,在執行其餘命令的時候又接收到以前執行的另一部分結果,這種顯現就是黏包。算法

1.1.1 基於TCP協議實現的黏包

server:shell

#_*_coding:utf-8_*_
from socket import *
import subprocess

ip_port=('127.0.0.1',8888)
BUFSIZE=1024

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

while True:
    conn,addr=tcp_socket_server.accept()
    print('客戶端',addr)

    while True:
        cmd=conn.recv(BUFSIZE)
        if len(cmd) == 0:break

        res=subprocess.Popen(cmd.decode('utf-8'),shell=True,
                         stdout=subprocess.PIPE,
                         stdin=subprocess.PIPE,
                         stderr=subprocess.PIPE)

        stderr=res.stderr.read()
        stdout=res.stdout.read()
        conn.send(stderr)
        conn.send(stdout)

client:json

#_*_coding:utf-8_*_
import socket
BUFSIZE=1024
ip_port=('127.0.0.1',8888)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)

while True:
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    if msg == 'quit':break

    s.send(msg.encode('utf-8'))
    act_res=s.recv(BUFSIZE)

    print(act_res.decode('utf-8'),end='')

1.1.2 基於UDP協議實現的黏包

server:windows

#_*_coding:utf-8_*_
from socket import *
import subprocess

ip_port=('127.0.0.1',9000)
bufsize=1024

udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
udp_server.bind(ip_port)

while True:
    #收消息
    cmd,addr=udp_server.recvfrom(bufsize)
    print('用戶命令----->',cmd)

    #邏輯處理
    res=subprocess.Popen(cmd.decode('utf-8'),shell=True,stderr=subprocess.PIPE,stdin=subprocess.PIPE,stdout=subprocess.PIPE)
    stderr=res.stderr.read()
    stdout=res.stdout.read()

    #發消息
    udp_server.sendto(stderr,addr)
    udp_server.sendto(stdout,addr)
udp_server.close()

client:緩存

from socket import *
ip_port=('127.0.0.1',9000)
bufsize=1024

udp_client=socket(AF_INET,SOCK_DGRAM)


while True:
    msg=input('>>: ').strip()
    udp_client.sendto(msg.encode('utf-8'),ip_port)
    err,addr=udp_client.recvfrom(bufsize)
    out,addr=udp_client.recvfrom(bufsize)
    if err:
        print('error : %s'%err.decode('utf-8'),end='')
    if out:
        print(out.decode('utf-8'), end='')

注意:只有TCP有粘包現象,UDP永遠不會粘包。服務器

1.2 黏包的成因

1.2.1 TCP協議中的數據傳遞

(1) TCP協議的拆包機制

當發送端緩衝區的長度大於網卡的MTU時,tcp會將此次發送的數據拆成幾個數據包發送出去。 MTU是Maximum Transmission Unit的縮寫。意思是網絡上傳送的最大數據包。MTU的單位是字節。 大部分網絡設備的MTU都是1500。若是本機的MTU比網關的MTU大,大的數據包就會被拆開來傳送,這樣會產生不少數據包碎片,增長丟包率,下降網絡速度。網絡

(2) 面向流的通訊特色和Nagle算法

TCP(transport control protocol,傳輸控制協議)是面向鏈接的,面向流的,提供高可靠性服務。
收發兩端(客戶端和服務器端)都要有一一成對的socket,所以,發送端爲了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將屢次間隔較小且數據量小的數據,合併成一個大的數據塊,而後進行封包。
這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。 即面向流的通訊是無消息保護邊界的。
對於空消息:tcp是基於數據流的,因而收發的消息不能爲空,這就須要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即使是你輸入的是空內容(直接回車),也能夠被髮送,udp協議會幫你封裝上消息頭髮送過去。
可靠黏包的tcp協議:tcp的協議數據不會丟,沒有收完包,下次接收,會繼續上次繼續接收,己端老是在收到ack時纔會清除緩衝區內容。數據是可靠的,可是會粘包。app

(3) 基於tcp協議特色的黏包現象成因 

 

 

socket數據傳輸過程當中的用戶態與內核態說明:socket

發送端能夠是一K一K地發送數據,而接收端的應用程序能夠兩K兩K地提走數據,固然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據。
也就是說,應用程序所看到的數據是一個總體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,所以TCP協議是面向流的協議,這也是容易出現粘包問題的緣由。
而UDP是面向消息的協議,每一個UDP段都是一條消息,應用程序必須以消息爲單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不一樣的。
怎樣定義消息呢?能夠認爲對方一次性write/send的數據爲一個消息,須要明白的是當對方send一條信息的時候,不管底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成後才呈如今內核緩衝區。

例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束

此外,發送方引發的粘包是由TCP協議自己形成的,TCP爲提升傳輸效率,發送方每每要收集到足夠多的數據後才發送一個TCP段。若連續幾回須要send的數據都不多,一般TCP會根據優化算法把這些數據合成一個TCP段後一次發送出去,這樣接收方就收到了粘包數據。

1.2.2 UDP不會發生黏包

UDP(user datagram protocol,用戶數據報協議)是無鏈接的,面向消息的,提供高效率服務。
不會使用塊的合併優化算法,, 因爲UDP支持的是一對多的模式,因此接收端的skbuff(套接字緩衝區)採用了鏈式結構來記錄每個到達的UDP包,在每一個UDP包中就有了消息頭(消息來源地址,端口等信息),這樣,對於接收端來講,就容易進行區分處理了。 即面向消息的通訊是有消息保護邊界的。
對於空消息:tcp是基於數據流的,因而收發的消息不能爲空,這就須要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即使是你輸入的是空內容(直接回車),也能夠被髮送,udp協議會幫你封裝上消息頭髮送過去。
不可靠不黏包的udp協議:udp的recvfrom是阻塞的,一個recvfrom(x)必須對惟一一個sendinto(y),收完了x個字節的數據就算完成,如果y;x數據就丟失,這意味着udp根本不會粘包,可是會丟數據,不可靠。

用UDP協議發送時,用sendto函數最大能發送數據的長度爲:65535- IP頭(20) – UDP頭(8)=65507字節。用sendto函數發送數據時,若是發送數據長度大於該值,則函數會返回錯誤。(丟棄這個包,不進行發送)。

用TCP協議發送時,因爲TCP是數據流協議,所以不存在包大小的限制(暫不考慮緩衝區的大小),這是指在用send函數時,數據長度參數不受限制。而實際上,所指定的這段數據並不必定會一次性發送出去,若是這段數據比較長,會被分段發送,若是比較短,可能會等待和下一次數據一塊兒發送。

1.2.3 會發生黏包的兩種狀況 

 (1) 狀況一:發送方的緩存機制

發送端須要等緩衝區滿才發送出去,形成粘包(發送數據時間間隔很短,數據了很小,會合到一塊兒,產生粘包)。

server:

#_*_coding:utf-8_*_
from socket import *
ip_port=('127.0.0.1',8080)

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

conn,addr=tcp_socket_server.accept()

data1=conn.recv(10)
data2=conn.recv(10)

print('----->',data1.decode('utf-8'))
print('----->',data2.decode('utf-8'))

conn.close()

client:

#_*_coding:utf-8_*_
import socket
BUFSIZE=1024
ip_port=('127.0.0.1',8080)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)

s.send('hello'.encode('utf-8'))
s.send('egg'.encode('utf-8'))

(2) 狀況二:接收方的緩存機制

接收方不及時接收緩衝區的包,形成多個包接收(客戶端發送了一段數據,服務端只收了一小部分,服務端下次再收的時候仍是從緩衝區拿上次遺留的數據,產生粘包)。

server:

#_*_coding:utf-8_*_
from socket import *
ip_port=('127.0.0.1',8080)

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)


conn,addr=tcp_socket_server.accept()


data1=conn.recv(2) #一次沒有收完整
data2=conn.recv(10)#下次收的時候,會先取舊的數據,而後取新的

print('----->',data1.decode('utf-8'))
print('----->',data2.decode('utf-8'))

conn.close()

client:

#_*_coding:utf-8_*_
import socket
BUFSIZE=1024
ip_port=('127.0.0.1',8080)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)


s.send('hello egg'.encode('utf-8'))

1.2.4 黏包成因總結

黏包現象只發生在tcp協議中:

1.從表面上看,黏包問題主要是由於發送方和接收方的緩存機制、tcp協議面向流通訊的特色。

2.實際上,主要仍是由於接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所形成的

1.3 黏包的解決方案

1.3.1 解決方案一

問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,因此解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把本身將要發送的字節流總大小讓接收端知曉,而後接收端來一個死循環接收完全部數據。

 

 server:

#_*_coding:utf-8_*_
import socket,subprocess
ip_port=('127.0.0.1',8080)
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

s.bind(ip_port)
s.listen(5)

while True:
    conn,addr=s.accept()
    print('客戶端',addr)
    while True:
        msg=conn.recv(1024)
        if not msg:break
        res=subprocess.Popen(msg.decode('utf-8'),shell=True,\
                            stdin=subprocess.PIPE,\
                         stderr=subprocess.PIPE,\
                         stdout=subprocess.PIPE)
        err=res.stderr.read()
        if err:
            ret=err
        else:
            ret=res.stdout.read()
        data_length=len(ret)
        conn.send(str(data_length).encode('utf-8'))
        data=conn.recv(1024).decode('utf-8')
        if data == 'recv_ready':
            conn.sendall(ret)
    conn.close()

client:

#_*_coding:utf-8_*_
import socket,time
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    if msg == 'quit':break

    s.send(msg.encode('utf-8'))
    length=int(s.recv(1024).decode('utf-8'))
    s.send('recv_ready'.encode('utf-8'))
    send_size=0
    recv_size=0
    data=b''
    while recv_size < length:
        data+=s.recv(1024)
        recv_size+=len(data)

    print(data.decode('utf-8'))

存在的問題:

程序的運行速度遠快於網絡傳輸速度,因此在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗。

1.3.2 方案二

剛剛的方法,問題在於咱們在發送。

咱們能夠藉助一個模塊,這個模塊能夠把要發送的數據長度轉換成固定長度的字節。這樣客戶端每次接收消息以前只要先接受這個固定長度字節的內容看一看接下來要接收的信息大小,那麼最終接受的數據只要達到這個值就中止,就能恰好很少很多的接收完整的數據了。

(1)  struct模塊

該模塊能夠把一個類型,如數字,轉成固定長度的bytes。

>>> struct.pack('i',1111111111111)

struct.error: 'i' format requires -2147483648 <= number <= 2147483647 #這個是範圍

import json,struct
#假設經過客戶端上傳1T:1073741824000的文件a.txt

#爲避免粘包,必須自定製報頭
header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T數據,文件路徑和md5值

#爲了該報頭能傳送,須要序列化而且轉爲bytes
head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化並轉成bytes,用於傳輸

#爲了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節
head_len_bytes=struct.pack('i',len(head_bytes)) #這4個字節裏只包含了一個數字,該數字是報頭的長度

#客戶端開始發送
conn.send(head_len_bytes) #先發報頭的長度,4個bytes
conn.send(head_bytes) #再發報頭的字節格式
conn.sendall(文件內容) #而後發真實內容的字節格式

#服務端開始接收
head_len_bytes=s.recv(4) #先收報頭4個bytes,獲得報頭長度的字節格式
x=struct.unpack('i',head_len_bytes)[0] #提取報頭的長度

head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式
header=json.loads(json.dumps(header)) #提取報頭

#最後根據報頭的內容提取真實的數據,好比
real_data_len=s.recv(header['file_size'])
s.recv(real_data_len)

struct的詳細用法:

#_*_coding:utf-8_*_
#http://www.cnblogs.com/coser/archive/2011/12/17/2291160.html
__author__ = 'Linhaifeng'
import struct
import binascii
import ctypes

values1 = (1, 'abc'.encode('utf-8'), 2.7)
values2 = ('defg'.encode('utf-8'),101)
s1 = struct.Struct('I3sf')
s2 = struct.Struct('4sI')

print(s1.size,s2.size)
prebuffer=ctypes.create_string_buffer(s1.size+s2.size)
print('Before : ',binascii.hexlify(prebuffer))
# t=binascii.hexlify('asdfaf'.encode('utf-8'))
# print(t)

s1.pack_into(prebuffer,0,*values1)
s2.pack_into(prebuffer,s1.size,*values2)

print('After pack',binascii.hexlify(prebuffer))
print(s1.unpack_from(prebuffer,0))
print(s2.unpack_from(prebuffer,s1.size))

s3=struct.Struct('ii')
s3.pack_into(prebuffer,0,123,123)
print('After pack',binascii.hexlify(prebuffer))
print(s3.unpack_from(prebuffer,0))

(2) 使用struct解決黏包

藉助struct模塊,咱們知道長度數字能夠被轉換成一個標準大小的4字節數字。所以能夠利用這個特色來預先發送數據長度。

 

 server:

import socket,struct,json
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加

phone.bind(('127.0.0.1',8080))

phone.listen(5)

while True:
    conn,addr=phone.accept()
    while True:
        cmd=conn.recv(1024)
        if not cmd:break
        print('cmd: %s' %cmd)

        res=subprocess.Popen(cmd.decode('utf-8'),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        err=res.stderr.read()
        print(err)
        if err:
            back_msg=err
        else:
            back_msg=res.stdout.read()

        conn.send(struct.pack('i',len(back_msg))) #先發back_msg的長度
        conn.sendall(back_msg) #在發真實的內容

    conn.close()

client:

#_*_coding:utf-8_*_
import socket,time,struct

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    if msg == 'quit':break

    s.send(msg.encode('utf-8'))

    l=s.recv(4)
    x=struct.unpack('i',l)[0]
    print(type(x),x)
    # print(struct.unpack('I',l))
    r_s=0
    data=b''
    while r_s < x:
        r_d=s.recv(1024)
        data+=r_d
        r_s+=len(r_d)

    # print(data.decode('utf-8'))
    print(data.decode('gbk')) #windows默認gbk編碼

咱們還能夠把報頭作成字典,字典裏包含將要發送的真實數據的詳細信息,而後json序列化,而後用struck將序列化後的數據長度打包成4個字節(4個本身足夠用了)。

 

 server:

import socket
import struct
import json
import subprocess
import os

class MYTCPServer:
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    allow_reuse_address = False

    max_packet_size = 8192

    coding='utf-8'

    request_queue_size = 5

    server_dir='file_upload'

    def __init__(self, server_address, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        self.server_address=server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.
        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.
        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.
        """
        self.socket.close()

    def get_request(self):
        """Get the request and client address from the socket.
        """
        return self.socket.accept()

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()

    def run(self):
        while True:
            self.conn,self.client_addr=self.get_request()
            print('from client ',self.client_addr)
            while True:
                try:
                    head_struct = self.conn.recv(4)
                    if not head_struct:break

                    head_len = struct.unpack('i', head_struct)[0]
                    head_json = self.conn.recv(head_len).decode(self.coding)
                    head_dic = json.loads(head_json)

                    print(head_dic)
                    #head_dic={'cmd':'put','filename':'a.txt','filesize':123123}
                    cmd=head_dic['cmd']
                    if hasattr(self,cmd):
                        func=getattr(self,cmd)
                        func(head_dic)
                except Exception:
                    break

    def put(self,args):
        file_path=os.path.normpath(os.path.join(
            self.server_dir,
            args['filename']
        ))

        filesize=args['filesize']
        recv_size=0
        print('----->',file_path)
        with open(file_path,'wb') as f:
            while recv_size < filesize:
                recv_data=self.conn.recv(self.max_packet_size)
                f.write(recv_data)
                recv_size+=len(recv_data)
                print('recvsize:%s filesize:%s' %(recv_size,filesize))

tcpserver1=MYTCPServer(('127.0.0.1',8080))

tcpserver1.run()

#下列代碼與本題無關
class MYUDPServer:

    """UDP server class."""
    address_family = socket.AF_INET

    socket_type = socket.SOCK_DGRAM

    allow_reuse_address = False

    max_packet_size = 8192

    coding='utf-8'

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        # No need to call listen() for UDP.
        pass

    def shutdown_request(self, request):
        # No need to shutdown anything.
        self.close_request(request)

    def close_request(self, request):
        # No need to close anything.
        pass

client:

import socket
import struct
import json
import os

class MYTCPClient:
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    allow_reuse_address = False

    max_packet_size = 8192

    coding='utf-8'

    request_queue_size = 5

    def __init__(self, server_address, connect=True):
        self.server_address=server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if connect:
            try:
                self.client_connect()
            except:
                self.client_close()
                raise

    def client_connect(self):
        self.socket.connect(self.server_address)

    def client_close(self):
        self.socket.close()

    def run(self):
        while True:
            inp=input(">>: ").strip()
            if not inp:continue
            l=inp.split()
            cmd=l[0]
            if hasattr(self,cmd):
                func=getattr(self,cmd)
                func(l)

    def put(self,args):
        cmd=args[0]
        filename=args[1]
        if not os.path.isfile(filename):
            print('file:%s is not exists' %filename)
            return
        else:
            filesize=os.path.getsize(filename)

        head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
        print(head_dic)
        head_json=json.dumps(head_dic)
        head_json_bytes=bytes(head_json,encoding=self.coding)

        head_struct=struct.pack('i',len(head_json_bytes))
        self.socket.send(head_struct)
        self.socket.send(head_json_bytes)
        send_size=0
        with open(filename,'rb') as f:
            for line in f:
                self.socket.send(line)
                send_size+=len(line)
                print(send_size)
            else:
                print('upload successful')


client=MYTCPClient(('127.0.0.1',8080))

client.run()

2. socket其餘方法介紹

服務端套接字函數
s.bind()    綁定(主機,端口號)到套接字
s.listen()  開始TCP監聽
s.accept()  被動接受TCP客戶的鏈接,(阻塞式)等待鏈接的到來

客戶端套接字函數
s.connect()     主動初始化TCP服務器鏈接
s.connect_ex()  connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常

公共用途的套接字函數
s.recv()            接收TCP數據
s.send()            發送TCP數據
s.sendall()         發送TCP數據
s.recvfrom()        接收UDP數據
s.sendto()          發送UDP數據
s.getpeername()     鏈接到當前套接字的遠端的地址
s.getsockname()     當前套接字的地址
s.getsockopt()      返回指定套接字的參數
s.setsockopt()      設置指定套接字的參數
s.close()           關閉套接字

面向鎖的套接字方法
s.setblocking()     設置套接字的阻塞與非阻塞模式
s.settimeout()      設置阻塞套接字操做的超時時間
s.gettimeout()      獲得阻塞套接字操做的超時時間

面向文件的套接字的函數
s.fileno()          套接字的文件描述符
s.makefile()        建立一個與該套接字相關的文件

send和sendall方法:

官方文檔對socket模塊下的socket.send()和socket.sendall()解釋以下:

socket.send(string[, flags])
Send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was transmitted, the application needs to attempt delivery of the remaining data.

send()的返回值是發送的字節數量,這個數量值可能小於要發送的string的字節數,也就是說可能沒法發送string中全部的數據。若是有錯誤則會拋出異常。

–

socket.sendall(string[, flags])
Send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Unlike send(), this method continues to send data from string until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully sent.

嘗試發送string的全部數據,成功則返回None,失敗則拋出異常。

故,下面兩段代碼是等價的:

#sock.sendall('Hello world\n')

#buffer = 'Hello world\n'
#while buffer:
#    bytes = sock.send(buffer)
#    buffer = buffer[bytes:]
相關文章
相關標籤/搜索