030_黏包問題的解決

# 爲何會出現黏包現象?
    # 首先只有在TCP協議中才會出現黏包現象,
    # 是由於TCP協議是面向流的協議
    # 在發送的數據傳輸的過程當中還有緩存機制來避免數據丟失
    # 所以 在連續發送小數據的時候 以及接收大小不符的時候都容易出現黏包現象
    # 本質仍是由於咱們在接收數據的時候不知道發送的數據的長短
# 解決黏包問題
    # 在傳輸大量數據以前先告訴接收端要發送的數據大小
    # 若是想更漂亮的解決問題,能夠經過struct模塊來定製協議
html

1,解決方案一python

  問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,因此解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把本身將要發送的字節流總大小讓接收端知曉,而後接收端來一個死循環接收完全部數據。算法

#_*_coding:utf-8_*_
import socket,subprocess ip_port=('127.0.0.1',8080) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(ip_port) s.listen(5) while True: conn,addr=s.accept() print('客戶端',addr) while True: msg=conn.recv(1024) if not msg:break res=subprocess.Popen(msg.decode('utf-8'),shell=True,\ stdin=subprocess.PIPE,\ stderr=subprocess.PIPE,\ stdout=subprocess.PIPE) err=res.stderr.read() if err: ret=err else: ret=res.stdout.read() data_length=len(ret) conn.send(str(data_length).encode('utf-8')) data=conn.recv(1024).decode('utf-8') if data == 'recv_ready': conn.sendall(ret) conn.close()
服務端
#_*_coding:utf-8_*_
import socket,time s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if len(msg) == 0:continue
    if msg == 'quit':break s.send(msg.encode('utf-8')) length=int(s.recv(1024).decode('utf-8')) s.send('recv_ready'.encode('utf-8')) send_size=0 recv_size=0 data=b''
    while recv_size < length: data+=s.recv(1024) recv_size+=len(data)
客戶端

  存在的問題
    程序的運行速度遠快於網絡傳輸速度,因此在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗shell

2,解決方案進階json

  咱們能夠藉助一個模塊,這個模塊能夠把要發送的數據長度轉換成固定長度的字節。這樣客戶端每次接收消息以前只要先接受這個固定長度字節的內容看一看接下來要接收的信息大小,那麼最終接受的數據只要達到這個值就中止,就能恰好很少很多的接收完整的數據了。windows

  2.1,struct模塊緩存

    該模塊能夠把一個類型,如數字,轉成固定長度的bytes服務器

>>> struct.pack('i',1111111111111)

struct.error: 'i' format requires -2147483648 <= number <= 2147483647 #這個是範圍

  

import json,struct
#假設經過客戶端上傳1T:1073741824000的文件a.txt

#爲避免粘包,必須自定製報頭
header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T數據,文件路徑和md5值

#爲了該報頭能傳送,須要序列化而且轉爲bytes
head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化並轉成bytes,用於傳輸

#爲了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節
head_len_bytes=struct.pack('i',len(head_bytes)) #這4個字節裏只包含了一個數字,該數字是報頭的長度

#客戶端開始發送
conn.send(head_len_bytes) #先發報頭的長度,4個bytes
conn.send(head_bytes) #再發報頭的字節格式
conn.sendall(文件內容) #而後發真實內容的字節格式

#服務端開始接收
head_len_bytes=s.recv(4) #先收報頭4個bytes,獲得報頭長度的字節格式
x=struct.unpack('i',head_len_bytes)[0] #提取報頭的長度

head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式
header=json.loads(json.dumps(header)) #提取報頭

#最後根據報頭的內容提取真實的數據,好比
real_data_len=s.recv(header['file_size'])
s.recv(real_data_len)

  

#_*_coding:utf-8_*_ #http://www.cnblogs.com/coser/archive/2011/12/17/2291160.html
__author__ = 'Linhaifeng'
import struct import binascii import ctypes values1 = (1, 'abc'.encode('utf-8'), 2.7) values2 = ('defg'.encode('utf-8'),101) s1 = struct.Struct('I3sf') s2 = struct.Struct('4sI') print(s1.size,s2.size) prebuffer=ctypes.create_string_buffer(s1.size+s2.size) print('Before : ',binascii.hexlify(prebuffer)) # t=binascii.hexlify('asdfaf'.encode('utf-8')) # print(t)
 s1.pack_into(prebuffer,0,*values1) s2.pack_into(prebuffer,s1.size,*values2) print('After pack',binascii.hexlify(prebuffer)) print(s1.unpack_from(prebuffer,0)) print(s2.unpack_from(prebuffer,s1.size)) s3=struct.Struct('ii') s3.pack_into(prebuffer,0,123,123) print('After pack',binascii.hexlify(prebuffer)) print(s3.unpack_from(prebuffer,0))
關於struct的詳細用法

  2.2,使用struct解決黏包 網絡

    藉助struct模塊,咱們知道長度數字能夠被轉換成一個標準大小的4字節數字。所以能夠利用這個特色來預先發送數據長度。app

發送時 接收時
先發送struct轉換好的數據長度4字節 先接受4個字節使用struct轉換成數字來獲取要接收的數據長度
再發送數據 再按照長度接收數據
import socket,struct,json import subprocess phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
 phone.bind(('127.0.0.1',8080)) phone.listen(5) while True: conn,addr=phone.accept() while True: cmd=conn.recv(1024) if not cmd:break
        print('cmd: %s' %cmd) res=subprocess.Popen(cmd.decode('utf-8'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) err=res.stderr.read() print(err) if err: back_msg=err else: back_msg=res.stdout.read() conn.send(struct.pack('i',len(back_msg))) #先發back_msg的長度
        conn.sendall(back_msg) #在發真實的內容
 conn.close()
服務端(自定製報頭
#_*_coding:utf-8_*_
import socket,time,struct s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if len(msg) == 0:continue
    if msg == 'quit':break s.send(msg.encode('utf-8')) l=s.recv(4) x=struct.unpack('i',l)[0] print(type(x),x) # print(struct.unpack('I',l))
    r_s=0 data=b''
    while r_s < x: r_d=s.recv(1024) data+=r_d r_s+=len(r_d) # print(data.decode('utf-8'))
    print(data.decode('gbk')) #windows默認gbk編碼
客戶端(自定製報頭)

 

  2.3,咱們還能夠把報頭作成字典,字典裏包含將要發送的真實數據的詳細信息,而後json序列化,而後用struck將序列化後的數據長度打包成4個字節(4個本身足夠用了)
發送時 接收時

先發報頭長度

先收報頭長度,用struct取出來
再編碼報頭內容而後發送 根據取出的長度收取報頭內容,而後解碼,反序列化
最後發真實內容 從反序列化的結果中取出待取數據的詳細信息,而後去取真實的數據內容

 

import socket,struct,json import subprocess phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
 phone.bind(('127.0.0.1',8080)) phone.listen(5) while True: conn,addr=phone.accept() while True: cmd=conn.recv(1024) if not cmd:break
        print('cmd: %s' %cmd) res=subprocess.Popen(cmd.decode('utf-8'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) err=res.stderr.read() print(err) if err: back_msg=err else: back_msg=res.stdout.read() headers={'data_size':len(back_msg)} head_json=json.dumps(headers) head_json_bytes=bytes(head_json,encoding='utf-8') conn.send(struct.pack('i',len(head_json_bytes))) #先發報頭的長度
        conn.send(head_json_bytes) #再發報頭
        conn.sendall(back_msg) #在發真實的內容
 conn.close()
服務端:定製稍微複雜一點的報頭
from socket import *
import struct,json ip_port=('127.0.0.1',8080) client=socket(AF_INET,SOCK_STREAM) client.connect(ip_port) while True: cmd=input('>>: ') if not cmd:continue client.send(bytes(cmd,encoding='utf-8')) head=client.recv(4) head_json_len=struct.unpack('i',head)[0] head_json=json.loads(client.recv(head_json_len).decode('utf-8')) data_len=head_json['data_size'] recv_size=0 recv_data=b''
    while recv_size < data_len: recv_data+=client.recv(1024) recv_size+=len(recv_data) print(recv_data.decode('utf-8')) #print(recv_data.decode('gbk')) #windows默認gbk編碼
客戶端

   FTP做業:上傳下載文件

import socket import struct import json import subprocess import os class MYTCPServer: address_family = socket.AF_INET socket_type = socket.SOCK_STREAM allow_reuse_address = False max_packet_size = 8192 coding='utf-8' request_queue_size = 5 server_dir='file_upload'

    def __init__(self, server_address, bind_and_activate=True): """Constructor. May be extended, do not override.""" self.server_address=server_address self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise

    def server_bind(self): """Called by constructor to bind the socket. """
        if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. """ self.socket.close() def get_request(self): """Get the request and client address from the socket. """
        return self.socket.accept() def close_request(self, request): """Called to clean up an individual request.""" request.close() def run(self): while True: self.conn,self.client_addr=self.get_request() print('from client ',self.client_addr) while True: try: head_struct = self.conn.recv(4) if not head_struct:break head_len = struct.unpack('i', head_struct)[0] head_json = self.conn.recv(head_len).decode(self.coding) head_dic = json.loads(head_json) print(head_dic) #head_dic={'cmd':'put','filename':'a.txt','filesize':123123}
                    cmd=head_dic['cmd'] if hasattr(self,cmd): func=getattr(self,cmd) func(head_dic) except Exception: break

    def put(self,args): file_path=os.path.normpath(os.path.join( self.server_dir, args['filename'] )) filesize=args['filesize'] recv_size=0 print('----->',file_path) with open(file_path,'wb') as f: while recv_size < filesize: recv_data=self.conn.recv(self.max_packet_size) f.write(recv_data) recv_size+=len(recv_data) print('recvsize:%s filesize:%s' %(recv_size,filesize)) tcpserver1=MYTCPServer(('127.0.0.1',8080)) tcpserver1.run() #下列代碼與本題無關
class MYUDPServer: """UDP server class.""" address_family = socket.AF_INET socket_type = socket.SOCK_DGRAM allow_reuse_address = False max_packet_size = 8192 coding='utf-8'

    def get_request(self): data, client_addr = self.socket.recvfrom(self.max_packet_size) return (data, self.socket), client_addr def server_activate(self): # No need to call listen() for UDP.
        pass

    def shutdown_request(self, request): # No need to shutdown anything.
 self.close_request(request) def close_request(self, request): # No need to close anything.
        pass
服務端
import socket import struct import json import os class MYTCPClient: address_family = socket.AF_INET socket_type = socket.SOCK_STREAM allow_reuse_address = False max_packet_size = 8192 coding='utf-8' request_queue_size = 5

    def __init__(self, server_address, connect=True): self.server_address=server_address self.socket = socket.socket(self.address_family, self.socket_type) if connect: try: self.client_connect() except: self.client_close() raise

    def client_connect(self): self.socket.connect(self.server_address) def client_close(self): self.socket.close() def run(self): while True: inp=input(">>: ").strip() if not inp:continue l=inp.split() cmd=l[0] if hasattr(self,cmd): func=getattr(self,cmd) func(l) def put(self,args): cmd=args[0] filename=args[1] if not os.path.isfile(filename): print('file:%s is not exists' %filename) return
        else: filesize=os.path.getsize(filename) head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize} print(head_dic) head_json=json.dumps(head_dic) head_json_bytes=bytes(head_json,encoding=self.coding) head_struct=struct.pack('i',len(head_json_bytes)) self.socket.send(head_struct) self.socket.send(head_json_bytes) send_size=0 with open(filename,'rb') as f: for line in f: self.socket.send(line) send_size+=len(line) print(send_size) else: print('upload successful') client=MYTCPClient(('127.0.0.1',8080)) client.run()
客戶端

 3,socket的更多方法介紹

服務端套接字函數
s.bind()    綁定(主機,端口號)到套接字
s.listen()  開始TCP監聽
s.accept()  被動接受TCP客戶的鏈接,(阻塞式)等待鏈接的到來

客戶端套接字函數
s.connect()     主動初始化TCP服務器鏈接
s.connect_ex()  connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常

公共用途的套接字函數
s.recv()            接收TCP數據
s.send()            發送TCP數據
s.sendall()         發送TCP數據
s.recvfrom()        接收UDP數據
s.sendto()          發送UDP數據
s.getpeername()     鏈接到當前套接字的遠端的地址
s.getsockname()     當前套接字的地址
s.getsockopt()      返回指定套接字的參數
s.setsockopt()      設置指定套接字的參數
s.close()           關閉套接字

面向鎖的套接字方法
s.setblocking()     設置套接字的阻塞與非阻塞模式
s.settimeout()      設置阻塞套接字操做的超時時間
s.gettimeout()      獲得阻塞套接字操做的超時時間

面向文件的套接字的函數
s.fileno()          套接字的文件描述符
s.makefile()        建立一個與該套接字相關的文件
更多方法
官方文檔對socket模塊下的socket.send()和socket.sendall()解釋以下: socket.send(string[, flags]) Send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was transmitted, the application needs to attempt delivery of the remaining data. send()的返回值是發送的字節數量,這個數量值可能小於要發送的string的字節數,也就是說可能沒法發送string中全部的數據。若是有錯誤則會拋出異常。 – socket.sendall(string[, flags]) Send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Unlike send(), this method continues to send data from string until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully sent. 嘗試發送string的全部數據,成功則返回None,失敗則拋出異常。 故,下面兩段代碼是等價的: #sock.sendall('Hello world\n')

#buffer = 'Hello world\n' #while buffer: # bytes = sock.send(buffer) # buffer = buffer[bytes:]
send和sendall方法

4,驗證客戶端連接的合法性 

# sever端
import os
import hmac
import socket
secret_key = b'egg'    # 密鑰
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
def check_conn(conn):
    msg = os.urandom(32)     # 用os模塊的urandom隨機生成一個32位的字節
    conn.send(msg)                 # 將要加密內容發送給客戶端
    h = hmac.new(secret_key,msg)  # 建立一個對象  # 傳入的密鑰,要加密的內容
    digest = h.digest()                      # 加密以後的內容
    client_digest = conn.recv(1024)  # 接收客戶端加密後的驗證內容
    return hmac.compare_digest(digest,client_digest)  # 對比兩個加密內容是否同樣
conn,addr = sk.accept()
res = check_conn(conn)
if res:
    print('合法的客戶端')
    conn.close()
else:
    print('不合法的客戶端')
    conn.close()
sk.close()

  

# client端
import hmac
import socket
secret_key = b'egg'
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
msg = sk.recv(1024)     # 接收服務端驗證合法性時要加密的內容
h = hmac.new(secret_key,msg)
digest = h.digest()
sk.send(digest)
sk.close()

 

  若是你想在分佈式系統中實現一個簡單的客戶端連接認證功能,又不像SSL那麼複雜,那麼利用hmac+加鹽的方式來實現

#_*_coding:utf-8_*_
from socket import *
import hmac,os secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn): ''' 認證客戶端連接 :param conn: :return: '''
    print('開始驗證新連接的合法性') msg=os.urandom(32) conn.sendall(msg) h=hmac.new(secret_key,msg) digest=h.digest() respone=conn.recv(len(digest)) return hmac.compare_digest(respone,digest) def data_handler(conn,bufsize=1024): if not conn_auth(conn): print('該連接不合法,關閉') conn.close() return
    print('連接合法,開始通訊') while True: data=conn.recv(bufsize) if not data:break conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5): ''' 只處理連接 :param ip_port: :return: ''' tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(backlog) while True: conn,addr=tcp_socket_server.accept() print('新鏈接[%s:%s]' %(addr[0],addr[1])) data_handler(conn,bufsize) if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 server_handler(ip_port,bufsize)
服務端
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
import hmac,os secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn): ''' 驗證客戶端到服務器的連接 :param conn: :return: ''' msg=conn.recv(32) h=hmac.new(secret_key,msg) digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue
        if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)
客戶端(合法)
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *

def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) while True: data=input('>>: ').strip() if not data:continue
        if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)
客戶端(非法:不知道加密方式)
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
import hmac,os secret_key=b'linhaifeng bang bang bang1111'
def conn_auth(conn): ''' 驗證客戶端到服務器的連接 :param conn: :return: ''' msg=conn.recv(32) h=hmac.new(secret_key,msg) digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue
        if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)
客戶端(非法:不知道secret_key)

5,socketserver

解讀python中SocketServer源碼

import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): self.data = self.request.recv(1024).strip() print("{} wrote:".format(self.client_address[0])) print(self.data) self.request.sendall(self.data.upper()) if __name__ == "__main__": HOST, PORT = "127.0.0.1", 9999

    # 設置allow_reuse_address容許服務器重用地址
    socketserver.TCPServer.allow_reuse_address = True # 建立一個server, 將服務地址綁定到127.0.0.1:9999
    server = socketserver.TCPServer((HOST, PORT),Myserver) # 讓server永遠運行下去,除非強制中止程序
    server.serve_forever()
server端
import socket HOST, PORT = "127.0.0.1", 9999 data = "hello"

# 建立一個socket連接,SOCK_STREAM表明使用TCP協議
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((HOST, PORT)) # 連接到客戶端
    sock.sendall(bytes(data + "\n", "utf-8")) # 向服務端發送數據
    received = str(sock.recv(1024), "utf-8")# 從服務端接收數據

print("Sent: {}".format(data)) print("Received: {}".format(received))
client

# socket  創建一個tcp,在同一時間只能鏈接一個客戶端
# socketserver 創建一個tcp,在同一時間能鏈接多個客戶端

import socketserver class MyServer(socketserver.BaseRequestHandler): def handle(self):   # self.request 就至關於一個conn,就是拿到了conn(地址)
        while True: print(self.client_address) msg = self.request.recv(1024).decode('utf-8') if msg == 'q':break
            print(msg) info = input('%s>>>'%msg[:2]) self.request.send(info.encode('utf-8')) if __name__ == '__main__': server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer) # thread 線程
    server.allow_reuse_address = True server.serve_forever()
sever 端

 

import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) while True: msg = input('>>>') if msg == 'q': sk.send(b'q') break sk.send(('美團 :'+msg).encode('utf-8')) ret = sk.recv(1024).decode('utf-8') print(ret) sk.close()
client
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) while True: msg = input('>>>') if msg == 'q': sk.send(b'q') break sk.send(('大衆點評 :'+msg).encode('utf-8')) ret = sk.recv(1024).decode('utf-8') print(ret) sk.close()
client2

6,setblocking()  將套接字設爲非阻塞

import socket
sk = socket.socket()
sk.setblocking(False)    #  設置套接字對象,
		     #設置在接收時若是沒有接收到內容就直接過去,不會在等。
sk.bind(('127.0.0.1',8080))
sk.listen()
try:
    conn,addr = sk.accept()    # 通過設置後,當執行到這沒有接收到內容時,
		              #不會像之前寫的程序(沒有設置的程序)同樣,
		               #程序會停在此處等待接收內容,而是直接過去,
		               #可是兩個變量沒有被賦值會報錯。
except BlockingIOError:
    pass
print('+++++++')

 

7,

# 多個send  小的數據連在一塊兒,會發生黏包現象,是tcp協議內部的優化算法形成的# 多個接收  只有一個接收到,其餘的沒有接收到,當斷開連接時, windows 高版本會發送一個空消息 ,低版本會直接報錯

相關文章
相關標籤/搜索