閱讀目錄
網絡編程
一.楔子
你如今已經學會了寫python代碼,假如你寫了兩個python文件a.py和b.py,分別去運行,你就會發現,這兩個python的文件分別運行的很好。可是若是這兩個程序之間想要傳遞一個數據,你要怎麼作呢?html
這個問題以你如今的知識就能夠解決了,咱們能夠建立一個文件,把a.py想要傳遞的內容寫到文件中,而後b.py從這個文件中讀取內容就能夠了。python
可是當你的a.py和b.py分別在不一樣電腦上的時候,你要怎麼辦呢?git
相似的機制有計算機網盤,qq等等。咱們能夠在咱們的電腦上和別人聊天,能夠在本身的電腦上向網盤中上傳、下載內容。這些都是兩個程序在通訊。程序員
二.軟件開發的架構
咱們瞭解的涉及到兩個程序之間通信的應用大體能夠分爲兩種:github
第一種是應用類:qq、微信、網盤、優酷這一類是屬於須要安裝的桌面應用web
第二種是web類:好比百度、知乎、博客園等使用瀏覽器訪問就能夠直接使用的應用算法
這些應用的本質其實都是兩個程序之間的通信。而這兩個分類又對應了兩個軟件開發的架構~shell
1.C/S架構
C/S即:Client與Server ,中文意思:客戶端與服務器端架構,這種架構也是從用戶層面(也能夠是物理層面)來劃分的。編程
這裏的客戶端通常泛指客戶端應用程序EXE,程序須要先安裝後,才能運行在用戶的電腦上,對用戶的電腦操做系統環境依賴較大。json
2.B/S架構
B/S即:Browser與Server,中文意思:瀏覽器端與服務器端架構,這種架構是從用戶層面來劃分的。
Browser瀏覽器,其實也是一種Client客戶端,只是這個客戶端不須要你們去安裝什麼應用程序,只需在瀏覽器上經過HTTP請求服務器端相關的資源(網頁資源),客戶端Browser瀏覽器就能進行增刪改查。
三.網絡基礎
1.一個程序如何在網絡上找到另外一個程序?
首先,程序必需要啓動,其次,必須有這臺機器的地址,咱們都知道咱們人的地址大概就是國家\省\市\區\街道\樓\門牌號這樣字。那麼每一臺聯網的機器在網絡上也有本身的地址,它的地址是怎麼表示的呢?
就是使用一串數字來表示的,例如:100.4.5.6
IP地址是指互聯網協議地址(英語:Internet Protocol Address,又譯爲網際協議地址),是IP Address的縮寫。IP地址是IP協議提供的一種統一的地址格式,它爲互聯網上的每個網絡和每一臺主機分配一個邏輯地址,以此來屏蔽物理地址的差別。
IP地址是一個32位的二進制數,一般被分割爲4個「8位二進制數」(也就是4個字節)。IP地址一般用「點分十進制」表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之間的十進制整數。例:點分十進IP地址(100.4.5.6),其實是32位二進制數(01100100.00000100.00000101.00000110)。
"端口"是英文port的意譯,能夠認爲是設備與外界通信交流的出口。
netstat -aon|findstr "49157"
所以ip地址精確到具體的一臺電腦,而端口精確到具體的程序。
2.理解socket
Socket是應用層與TCP/IP協議族通訊的中間軟件抽象層,它是一組接口。在設計模式中,Socket其實就是一個門面模式,它把複雜的TCP/IP協議族隱藏在Socket接口後面,對用戶來講,一組簡單的接口就是所有,讓Socket去組織數據,以符合指定的協議。
其實站在你的角度上看,socket就是一個模塊。咱們經過調用模塊中已經實現的方法創建兩個進程之間的鏈接和通訊。 也有人將socket說成ip+port,由於ip是用來標識互聯網中的一臺主機的位置,而port是用來標識這臺機器上的一個應用程序。 因此咱們只要確立了ip和port就能找到一個應用程序,而且使用socket模塊來與之通訊。
3.套接字(socket)的發展史
套接字起源於 20 世紀 70 年代加利福尼亞大學伯克利分校版本的 Unix,即人們所說的 BSD Unix。 所以,有時人們也把套接字稱爲「伯克利套接字」或「BSD 套接字」。一開始,套接字被設計用在同 一臺主機上多個應用程序之間的通信。這也被稱進程間通信,或 IPC。套接字有兩種(或者稱爲有兩個種族),分別是基於文件型的和基於網絡型的。
基於文件類型的套接字家族
套接字家族的名字:AF_UNIX
unix一切皆文件,基於文件的套接字調用的就是底層的文件系統來取數據,兩個套接字進程運行在同一機器,能夠經過訪問同一個文件系統間接完成通訊
基於網絡類型的套接字家族
套接字家族的名字:AF_INET
(還有AF_INET6被用於ipv6,還有一些其餘的地址家族,不過,他們要麼是隻用於某個平臺,要麼就是已經被廢棄,或者是不多被使用,或者是根本沒有實現,全部地址家族中,AF_INET是使用最普遍的一個,python支持不少種地址家族,可是因爲咱們只關心網絡編程,因此大部分時候我麼只使用AF_INET)
4.tcp協議和udp協議
TCP(Transmission Control Protocol)可靠的、面向鏈接的協議(eg:打電話)、傳輸效率低全雙工通訊(發送緩存&接收緩存)、面向字節流。使用TCP的應用:Web瀏覽器;電子郵件、文件傳輸程序。
UDP(User Datagram Protocol)不可靠的、無鏈接的服務,傳輸效率高(發送前時延小),一對1、一對多、多對1、多對多、面向報文,盡最大努力服務,無擁塞控制。使用UDP的應用:域名系統 (DNS);視頻流;IP語音(VoIP)。
我知道說這些大家也不懂,直接上圖。
四.套接字(socket)初使用
基於TCP協議的socket
tcp是基於連接的,必須先啓動服務端,而後再啓動客戶端去連接服務端
server端
import socket sk = socket.socket() sk.bind(('127.0.0.1',8898)) #把地址綁定到套接字 sk.listen() #監聽連接 conn,addr = sk.accept() #接受客戶端連接 ret = conn.recv(1024) #接收客戶端信息 print(ret) #打印客戶端信息 conn.send(b'hi') #向客戶端發送信息 conn.close() #關閉客戶端套接字 sk.close() #關閉服務器套接字(可選)
client端
import socket sk = socket.socket() # 建立客戶套接字 sk.connect(('127.0.0.1',8898)) # 嘗試鏈接服務器 sk.send(b'hello!') ret = sk.recv(1024) # 對話(發送/接收) print(ret) sk.close() # 關閉客戶套接字
問題:有的同窗在重啓服務端時可能會遇到
解決方法:
#加入一條socket配置,重用ip和端口 import socket from socket import SOL_SOCKET,SO_REUSEADDR sk = socket.socket() sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加 sk.bind(('127.0.0.1',8898)) #把地址綁定到套接字 sk.listen() #監聽連接 conn,addr = sk.accept() #接受客戶端連接 ret = conn.recv(1024) #接收客戶端信息 print(ret) #打印客戶端信息 conn.send(b'hi') #向客戶端發送信息 conn.close() #關閉客戶端套接字 sk.close() #關閉服務器套接字(可選)
基於UDP協議的socket
udp是無連接的,啓動服務以後能夠直接接受消息,不須要提早創建連接
簡單使用
server端
import socket udp_sk = socket.socket(type=socket.SOCK_DGRAM) #建立一個服務器的套接字 udp_sk.bind(('127.0.0.1',9000)) #綁定服務器套接字 msg,addr = udp_sk.recvfrom(1024) print(msg) udp_sk.sendto(b'hi',addr) # 對話(接收與發送) udp_sk.close() # 關閉服務器套接字
client端
import socket ip_port=('127.0.0.1',9000) udp_sk=socket.socket(type=socket.SOCK_DGRAM) udp_sk.sendto(b'hello',ip_port) back_msg,addr=udp_sk.recvfrom(1024) print(back_msg.decode('utf-8'),addr)
qq聊天
#_*_coding:utf-8_*_ import socket ip_port=('127.0.0.1',8081) udp_server_sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) udp_server_sock.bind(ip_port) while True: qq_msg,addr=udp_server_sock.recvfrom(1024) print('來自[%s:%s]的一條消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],qq_msg.decode('utf-8'))) back_msg=input('回覆消息: ').strip() udp_server_sock.sendto(back_msg.encode('utf-8'),addr)
#_*_coding:utf-8_*_ import socket BUFSIZE=1024 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) qq_name_dic={ '金老闆':('127.0.0.1',8081), '哪吒':('127.0.0.1',8081), 'egg':('127.0.0.1',8081), 'yuan':('127.0.0.1',8081), } while True: qq_name=input('請選擇聊天對象: ').strip() while True: msg=input('請輸入消息,回車發送,輸入q結束和他的聊天: ').strip() if msg == 'q':break if not msg or not qq_name or qq_name not in qq_name_dic:continue udp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name]) back_msg,addr=udp_client_socket.recvfrom(BUFSIZE) print('來自[%s:%s]的一條消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8'))) udp_client_socket.close()
時間服務器
# _*_coding:utf-8_*_ from socket import * from time import strftime ip_port = ('127.0.0.1', 9000) bufsize = 1024 tcp_server = socket(AF_INET, SOCK_DGRAM) tcp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) tcp_server.bind(ip_port) while True: msg, addr = tcp_server.recvfrom(bufsize) print('===>', msg) if not msg: time_fmt = '%Y-%m-%d %X' else: time_fmt = msg.decode('utf-8') back_msg = strftime(time_fmt) tcp_server.sendto(back_msg.encode('utf-8'), addr) tcp_server.close()
#_*_coding:utf-8_*_ from socket import * ip_port=('127.0.0.1',9000) bufsize=1024 tcp_client=socket(AF_INET,SOCK_DGRAM) while True: msg=input('請輸入時間格式(例%Y %m %d)>>: ').strip() tcp_client.sendto(msg.encode('utf-8'),ip_port) data=tcp_client.recv(bufsize)
飛秋通訊
from socket import * updsocket = socket(type = SOCK_DGRAM) addr = ("192.168.0.168",2425) msg = input('>>>') updsocket.sendto(("1:111:eva:eva:32:%s"%msg).encode('gbk'),addr)
飛秋運行時,會監聽2425端口,因此咱們首先要和本地創建UDP鏈接 1:111:eva:eva:32:要發送的內容 1表示版本號,111標識包號,eva表示用戶名,第二個eva表示主機名,32表示發送消息,後面的表示要發送的消息內容。
socket參數的詳解
socket.socket(family=AF_INET,type=SOCK_STREAM,proto=0,fileno=None)
建立socket對象的參數說明:
family | 地址系列應爲AF_INET(默認值),AF_INET6,AF_UNIX,AF_CAN或AF_RDS。 (AF_UNIX 域其實是使用本地 socket 文件來通訊) |
type | 套接字類型應爲SOCK_STREAM(默認值),SOCK_DGRAM,SOCK_RAW或其餘SOCK_常量之一。 SOCK_STREAM 是基於TCP的,有保障的(即能保證數據正確傳送到對方)面向鏈接的SOCKET,多用於資料傳送。 SOCK_DGRAM 是基於UDP的,無保障的面向消息的socket,多用於在網絡上發廣播信息。 |
proto | 協議號一般爲零,能夠省略,或者在地址族爲AF_CAN的狀況下,協議應爲CAN_RAW或CAN_BCM之一。 |
fileno | 若是指定了fileno,則其餘參數將被忽略,致使帶有指定文件描述符的套接字返回。 與socket.fromfd()不一樣,fileno將返回相同的套接字,而不是重複的。 這可能有助於使用socket.close()關閉一個獨立的插座。 |
五.黏包
黏包現象
讓咱們基於tcp先製做一個遠程執行命令的程序(命令ls -l ; lllllll ; pwd)
res=subprocess.Popen(cmd.decode('utf-8'), shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) 的結果的編碼是以當前所在的系統爲準的,若是是windows,那麼res.stdout.read()讀出的就是GBK編碼的,在接收端須要用GBK解碼 且只能從管道里讀一次結果
同時執行多條命令以後,獲得的結果極可能只有一部分,在執行其餘命令的時候又接收到以前執行的另一部分結果,這種顯現就是黏包。
基於tcp協議實現的黏包
#_*_coding:utf-8_*_ from socket import * import subprocess ip_port=('127.0.0.1',8888) BUFSIZE=1024 tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(5) while True: conn,addr=tcp_socket_server.accept() print('客戶端',addr) while True: cmd=conn.recv(BUFSIZE) if len(cmd) == 0:break res=subprocess.Popen(cmd.decode('utf-8'),shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) stderr=res.stderr.read() stdout=res.stdout.read() conn.send(stderr) conn.send(stdout)
#_*_coding:utf-8_*_ import socket BUFSIZE=1024 ip_port=('127.0.0.1',8888) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(ip_port) while True: msg=input('>>: ').strip() if len(msg) == 0:continue if msg == 'quit':break s.send(msg.encode('utf-8')) act_res=s.recv(BUFSIZE) print(act_res.decode('utf-8'),end='')
基於udp協議實現的黏包
#_*_coding:utf-8_*_ from socket import * import subprocess ip_port=('127.0.0.1',9000) bufsize=1024 udp_server=socket(AF_INET,SOCK_DGRAM) udp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) udp_server.bind(ip_port) while True: #收消息 cmd,addr=udp_server.recvfrom(bufsize) print('用戶命令----->',cmd) #邏輯處理 res=subprocess.Popen(cmd.decode('utf-8'),shell=True,stderr=subprocess.PIPE,stdin=subprocess.PIPE,stdout=subprocess.PIPE) stderr=res.stderr.read() stdout=res.stdout.read() #發消息 udp_server.sendto(stderr,addr) udp_server.sendto(stdout,addr) udp_server.close()
from socket import * ip_port=('127.0.0.1',9000) bufsize=1024 udp_client=socket(AF_INET,SOCK_DGRAM) while True: msg=input('>>: ').strip() udp_client.sendto(msg.encode('utf-8'),ip_port) err,addr=udp_client.recvfrom(bufsize) out,addr=udp_client.recvfrom(bufsize) if err: print('error : %s'%err.decode('utf-8'),end='') if out: print(out.decode('utf-8'), end='')
注意:只有TCP有粘包現象,UDP永遠不會粘包
黏包成因
TCP協議中的數據傳遞
tcp協議的拆包機制
當發送端緩衝區的長度大於網卡的MTU時,tcp會將此次發送的數據拆成幾個數據包發送出去。 MTU是Maximum Transmission Unit的縮寫。意思是網絡上傳送的最大數據包。MTU的單位是字節。 大部分網絡設備的MTU都是1500。若是本機的MTU比網關的MTU大,大的數據包就會被拆開來傳送,這樣會產生不少數據包碎片,增長丟包率,下降網絡速度。
面向流的通訊特色和Nagle算法
TCP(transport control protocol,傳輸控制協議)是面向鏈接的,面向流的,提供高可靠性服務。 收發兩端(客戶端和服務器端)都要有一一成對的socket,所以,發送端爲了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將屢次間隔較小且數據量小的數據,合併成一個大的數據塊,而後進行封包。 這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。 即面向流的通訊是無消息保護邊界的。 對於空消息:tcp是基於數據流的,因而收發的消息不能爲空,這就須要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即使是你輸入的是空內容(直接回車),也能夠被髮送,udp協議會幫你封裝上消息頭髮送過去。 可靠黏包的tcp協議:tcp的協議數據不會丟,沒有收完包,下次接收,會繼續上次繼續接收,己端老是在收到ack時纔會清除緩衝區內容。數據是可靠的,可是會粘包。
基於tcp協議特色的黏包現象成因
發送端能夠是一K一K地發送數據,而接收端的應用程序能夠兩K兩K地提走數據,固然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據。
也就是說,應用程序所看到的數據是一個總體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,所以TCP協議是面向流的協議,這也是容易出現粘包問題的緣由。
而UDP是面向消息的協議,每一個UDP段都是一條消息,應用程序必須以消息爲單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不一樣的。
怎樣定義消息呢?能夠認爲對方一次性write/send的數據爲一個消息,須要明白的是當對方send一條信息的時候,不管底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成後才呈如今內核緩衝區。
例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束
此外,發送方引發的粘包是由TCP協議自己形成的,TCP爲提升傳輸效率,發送方每每要收集到足夠多的數據後才發送一個TCP段。若連續幾回須要send的數據都不多,一般TCP會根據優化算法把這些數據合成一個TCP段後一次發送出去,這樣接收方就收到了粘包數據。
UDP不會發生黏包
UDP(user datagram protocol,用戶數據報協議)是無鏈接的,面向消息的,提供高效率服務。 不會使用塊的合併優化算法,, 因爲UDP支持的是一對多的模式,因此接收端的skbuff(套接字緩衝區)採用了鏈式結構來記錄每個到達的UDP包,在每一個UDP包中就有了消息頭(消息來源地址,端口等信息),這樣,對於接收端來講,就容易進行區分處理了。 即面向消息的通訊是有消息保護邊界的。 對於空消息:tcp是基於數據流的,因而收發的消息不能爲空,這就須要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即使是你輸入的是空內容(直接回車),也能夠被髮送,udp協議會幫你封裝上消息頭髮送過去。 不可靠不黏包的udp協議:udp的recvfrom是阻塞的,一個recvfrom(x)必須對惟一一個sendinto(y),收完了x個字節的數據就算完成,如果y;x數據就丟失,這意味着udp根本不會粘包,可是會丟數據,不可靠。
補充說明:
用UDP協議發送時,用sendto函數最大能發送數據的長度爲:65535- IP頭(20) – UDP頭(8)=65507字節。用sendto函數發送數據時,若是發送數據長度大於該值,則函數會返回錯誤。(丟棄這個包,不進行發送)
用TCP協議發送時,因爲TCP是數據流協議,所以不存在包大小的限制(暫不考慮緩衝區的大小),這是指在用send函數時,數據長度參數不受限制。而實際上,所指定的這段數據並不必定會一次性發送出去,若是這段數據比較長,會被分段發送,若是比較短,可能會等待和下一次數據一塊兒發送。
會發生黏包的兩種狀況
狀況一 發送方的緩存機制
發送端須要等緩衝區滿才發送出去,形成粘包(發送數據時間間隔很短,數據了很小,會合到一塊兒,產生粘包)
#_*_coding:utf-8_*_ from socket import * ip_port=('127.0.0.1',8080) tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(5) conn,addr=tcp_socket_server.accept() data1=conn.recv(10) data2=conn.recv(10) print('----->',data1.decode('utf-8')) print('----->',data2.decode('utf-8')) conn.close()
#_*_coding:utf-8_*_ import socket BUFSIZE=1024 ip_port=('127.0.0.1',8080) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(ip_port) s.send('hello'.encode('utf-8')) s.send('egg'.encode('utf-8'))
狀況二 接收方的緩存機制
接收方不及時接收緩衝區的包,形成多個包接收(客戶端發送了一段數據,服務端只收了一小部分,服務端下次再收的時候仍是從緩衝區拿上次遺留的數據,產生粘包)
#_*_coding:utf-8_*_ from socket import * ip_port=('127.0.0.1',8080) tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(5) conn,addr=tcp_socket_server.accept() data1=conn.recv(2) #一次沒有收完整 data2=conn.recv(10)#下次收的時候,會先取舊的數據,而後取新的 print('----->',data1.decode('utf-8')) print('----->',data2.decode('utf-8')) conn.close()
#_*_coding:utf-8_*_ import socket BUFSIZE=1024 ip_port=('127.0.0.1',8080) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(ip_port) s.send('hello egg'.encode('utf-8'))
總結
黏包現象只發生在tcp協議中:
1.從表面上看,黏包問題主要是由於發送方和接收方的緩存機制、tcp協議面向流通訊的特色。
2.實際上,主要仍是由於接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所形成的
黏包的解決方案
解決方案一
問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,因此解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把本身將要發送的字節流總大小讓接收端知曉,而後接收端來一個死循環接收完全部數據。
#_*_coding:utf-8_*_ import socket,subprocess ip_port=('127.0.0.1',8080) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(ip_port) s.listen(5) while True: conn,addr=s.accept() print('客戶端',addr) while True: msg=conn.recv(1024) if not msg:break res=subprocess.Popen(msg.decode('utf-8'),shell=True,\ stdin=subprocess.PIPE,\ stderr=subprocess.PIPE,\ stdout=subprocess.PIPE) err=res.stderr.read() if err: ret=err else: ret=res.stdout.read() data_length=len(ret) conn.send(str(data_length).encode('utf-8')) data=conn.recv(1024).decode('utf-8') if data == 'recv_ready': conn.sendall(ret) conn.close()
#_*_coding:utf-8_*_ import socket,time s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if len(msg) == 0:continue if msg == 'quit':break s.send(msg.encode('utf-8')) length=int(s.recv(1024).decode('utf-8')) s.send('recv_ready'.encode('utf-8')) send_size=0 recv_size=0 data=b'' while recv_size < length: data+=s.recv(1024) recv_size+=len(data) print(data.decode('utf-8'))
存在的問題:
程序的運行速度遠快於網絡傳輸速度,因此在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗
解決方案進階
剛剛的方法,問題在於咱們咱們在發送
咱們能夠藉助一個模塊,這個模塊能夠把要發送的數據長度轉換成固定長度的字節。這樣客戶端每次接收消息以前只要先接受這個固定長度字節的內容看一看接下來要接收的信息大小,那麼最終接受的數據只要達到這個值就中止,就能恰好很少很多的接收完整的數據了。
struct模塊
該模塊能夠把一個類型,如數字,轉成固定長度的bytes
>>> struct.pack('i',1111111111111) struct.error: 'i' format requires -2147483648 <= number <= 2147483647 #這個是範圍
import json,struct #假設經過客戶端上傳1T:1073741824000的文件a.txt #爲避免粘包,必須自定製報頭 header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T數據,文件路徑和md5值 #爲了該報頭能傳送,須要序列化而且轉爲bytes head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化並轉成bytes,用於傳輸 #爲了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節 head_len_bytes=struct.pack('i',len(head_bytes)) #這4個字節裏只包含了一個數字,該數字是報頭的長度 #客戶端開始發送 conn.send(head_len_bytes) #先發報頭的長度,4個bytes conn.send(head_bytes) #再發報頭的字節格式 conn.sendall(文件內容) #而後發真實內容的字節格式 #服務端開始接收 head_len_bytes=s.recv(4) #先收報頭4個bytes,獲得報頭長度的字節格式 x=struct.unpack('i',head_len_bytes)[0] #提取報頭的長度 head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式 header=json.loads(json.dumps(header)) #提取報頭 #最後根據報頭的內容提取真實的數據,好比 real_data_len=s.recv(header['file_size']) s.recv(real_data_len)
#_*_coding:utf-8_*_ #http://www.cnblogs.com/coser/archive/2011/12/17/2291160.html __author__ = 'Linhaifeng' import struct import binascii import ctypes values1 = (1, 'abc'.encode('utf-8'), 2.7) values2 = ('defg'.encode('utf-8'),101) s1 = struct.Struct('I3sf') s2 = struct.Struct('4sI') print(s1.size,s2.size) prebuffer=ctypes.create_string_buffer(s1.size+s2.size) print('Before : ',binascii.hexlify(prebuffer)) # t=binascii.hexlify('asdfaf'.encode('utf-8')) # print(t) s1.pack_into(prebuffer,0,*values1) s2.pack_into(prebuffer,s1.size,*values2) print('After pack',binascii.hexlify(prebuffer)) print(s1.unpack_from(prebuffer,0)) print(s2.unpack_from(prebuffer,s1.size)) s3=struct.Struct('ii') s3.pack_into(prebuffer,0,123,123) print('After pack',binascii.hexlify(prebuffer)) print(s3.unpack_from(prebuffer,0))
使用struct解決黏包
藉助struct模塊,咱們知道長度數字能夠被轉換成一個標準大小的4字節數字。所以能夠利用這個特色來預先發送數據長度。
發送時 | 接收時 |
先發送struct轉換好的數據長度4字節 | 先接受4個字節使用struct轉換成數字來獲取要接收的數據長度 |
再發送數據 | 再按照長度接收數據 |
import socket,struct,json import subprocess phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加 phone.bind(('127.0.0.1',8080)) phone.listen(5) while True: conn,addr=phone.accept() while True: cmd=conn.recv(1024) if not cmd:break print('cmd: %s' %cmd) res=subprocess.Popen(cmd.decode('utf-8'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) err=res.stderr.read() print(err) if err: back_msg=err else: back_msg=res.stdout.read() conn.send(struct.pack('i',len(back_msg))) #先發back_msg的長度 conn.sendall(back_msg) #在發真實的內容 conn.close()
#_*_coding:utf-8_*_ import socket,time,struct s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) res=s.connect_ex(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if len(msg) == 0:continue if msg == 'quit':break s.send(msg.encode('utf-8')) l=s.recv(4) x=struct.unpack('i',l)[0] print(type(x),x) # print(struct.unpack('I',l)) r_s=0 data=b'' while r_s < x: r_d=s.recv(1024) data+=r_d r_s+=len(r_d) # print(data.decode('utf-8')) print(data.decode('gbk')) #windows默認gbk編碼
咱們還能夠把報頭作成字典,字典裏包含將要發送的真實數據的詳細信息,而後json序列化,而後用struck將序列化後的數據長度打包成4個字節(4個本身足夠用了)
發送時 | 接收時 |
先發報頭長度 |
先收報頭長度,用struct取出來 |
再編碼報頭內容而後發送 | 根據取出的長度收取報頭內容,而後解碼,反序列化 |
最後發真實內容 | 從反序列化的結果中取出待取數據的詳細信息,而後去取真實的數據內容 |
import socket,struct,json import subprocess phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加 phone.bind(('127.0.0.1',8080)) phone.listen(5) while True: conn,addr=phone.accept() while True: cmd=conn.recv(1024) if not cmd:break print('cmd: %s' %cmd) res=subprocess.Popen(cmd.decode('utf-8'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) err=res.stderr.read() print(err) if err: back_msg=err else: back_msg=res.stdout.read() headers={'data_size':len(back_msg)} head_json=json.dumps(headers) head_json_bytes=bytes(head_json,encoding='utf-8') conn.send(struct.pack('i',len(head_json_bytes))) #先發報頭的長度 conn.send(head_json_bytes) #再發報頭 conn.sendall(back_msg) #在發真實的內容 conn.close()
from socket import * import struct,json ip_port=('127.0.0.1',8080) client=socket(AF_INET,SOCK_STREAM) client.connect(ip_port) while True: cmd=input('>>: ') if not cmd:continue client.send(bytes(cmd,encoding='utf-8')) head=client.recv(4) head_json_len=struct.unpack('i',head)[0] head_json=json.loads(client.recv(head_json_len).decode('utf-8')) data_len=head_json['data_size'] recv_size=0 recv_data=b'' while recv_size < data_len: recv_data+=client.recv(1024) recv_size+=len(recv_data) print(recv_data.decode('utf-8')) #print(recv_data.decode('gbk')) #windows默認gbk編碼
FTP做業:上傳下載文件
import socket import struct import json import subprocess import os class MYTCPServer: address_family = socket.AF_INET socket_type = socket.SOCK_STREAM allow_reuse_address = False max_packet_size = 8192 coding='utf-8' request_queue_size = 5 server_dir='file_upload' def __init__(self, server_address, bind_and_activate=True): """Constructor. May be extended, do not override.""" self.server_address=server_address self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. """ self.socket.close() def get_request(self): """Get the request and client address from the socket. """ return self.socket.accept() def close_request(self, request): """Called to clean up an individual request.""" request.close() def run(self): while True: self.conn,self.client_addr=self.get_request() print('from client ',self.client_addr) while True: try: head_struct = self.conn.recv(4) if not head_struct:break head_len = struct.unpack('i', head_struct)[0] head_json = self.conn.recv(head_len).decode(self.coding) head_dic = json.loads(head_json) print(head_dic) #head_dic={'cmd':'put','filename':'a.txt','filesize':123123} cmd=head_dic['cmd'] if hasattr(self,cmd): func=getattr(self,cmd) func(head_dic) except Exception: break def put(self,args): file_path=os.path.normpath(os.path.join( self.server_dir, args['filename'] )) filesize=args['filesize'] recv_size=0 print('----->',file_path) with open(file_path,'wb') as f: while recv_size < filesize: recv_data=self.conn.recv(self.max_packet_size) f.write(recv_data) recv_size+=len(recv_data) print('recvsize:%s filesize:%s' %(recv_size,filesize)) tcpserver1=MYTCPServer(('127.0.0.1',8080)) tcpserver1.run() #下列代碼與本題無關 class MYUDPServer: """UDP server class.""" address_family = socket.AF_INET socket_type = socket.SOCK_DGRAM allow_reuse_address = False max_packet_size = 8192 coding='utf-8' def get_request(self): data, client_addr = self.socket.recvfrom(self.max_packet_size) return (data, self.socket), client_addr def server_activate(self): # No need to call listen() for UDP. pass def shutdown_request(self, request): # No need to shutdown anything. self.close_request(request) def close_request(self, request): # No need to close anything. pass
import socket import struct import json import os class MYTCPClient: address_family = socket.AF_INET socket_type = socket.SOCK_STREAM allow_reuse_address = False max_packet_size = 8192 coding='utf-8' request_queue_size = 5 def __init__(self, server_address, connect=True): self.server_address=server_address self.socket = socket.socket(self.address_family, self.socket_type) if connect: try: self.client_connect() except: self.client_close() raise def client_connect(self): self.socket.connect(self.server_address) def client_close(self): self.socket.close() def run(self): while True: inp=input(">>: ").strip() if not inp:continue l=inp.split() cmd=l[0] if hasattr(self,cmd): func=getattr(self,cmd) func(l) def put(self,args): cmd=args[0] filename=args[1] if not os.path.isfile(filename): print('file:%s is not exists' %filename) return else: filesize=os.path.getsize(filename) head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize} print(head_dic) head_json=json.dumps(head_dic) head_json_bytes=bytes(head_json,encoding=self.coding) head_struct=struct.pack('i',len(head_json_bytes)) self.socket.send(head_struct) self.socket.send(head_json_bytes) send_size=0 with open(filename,'rb') as f: for line in f: self.socket.send(line) send_size+=len(line) print(send_size) else: print('upload successful') client=MYTCPClient(('127.0.0.1',8080)) client.run()
六.socketserver
解讀socketserver源碼 —— https://www.cnblogs.com/l-hf/p/11532727.html
import socketserver class Myserver(socketserver.BaseRequestHandler): def handle(self): self.data = self.request.recv(1024).strip() print("{} wrote:".format(self.client_address[0])) print(self.data) self.request.sendall(self.data.upper()) if __name__ == "__main__": HOST, PORT = "127.0.0.1", 9999 # 設置allow_reuse_address容許服務器重用地址 socketserver.TCPServer.allow_reuse_address = True # 建立一個server, 將服務地址綁定到127.0.0.1:9999 server = socketserver.TCPServer((HOST, PORT),Myserver) # 讓server永遠運行下去,除非強制中止程序 server.serve_forever()
import socket HOST, PORT = "127.0.0.1", 9999 data = "hello" # 建立一個socket連接,SOCK_STREAM表明使用TCP協議 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((HOST, PORT)) # 連接到客戶端 sock.sendall(bytes(data + "\n", "utf-8")) # 向服務端發送數據 received = str(sock.recv(1024), "utf-8")# 從服務端接收數據 print("Sent: {}".format(data)) print("Received: {}".format(received))
併發編程
進程
使用process模塊建立進程
在一個python進程中開啓子進程,start方法和併發效果。
import time from multiprocessing import Process def f(name): print('hello', name) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() time.sleep(1) print('執行主進程的內容了')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() #p.join() print('我是父進程')
import os from multiprocessing import Process def f(x): print('子進程id :',os.getpid(),'父進程id :',os.getppid()) return x*x if __name__ == '__main__': print('主進程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
進階,多個進程同時運行(注意,子進程的執行順序不是根據啓動順序決定的)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父進程在執行')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) # [p.join() for p in p_lst] print('父進程在執行')
除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() #start會自動調用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主線程')
進程之間的數據隔離問題
from multiprocessing import Process def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進程內: ',n)
線程
理論知識
全局解釋器鎖GIL
Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。
在多線程環境中,Python 虛擬機按如下方式執行:
a、設置 GIL;
b、切換到一個線程去運行;
c、運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0));
d、把線程設置爲睡眠狀態;
e、解鎖 GIL;
d、再次重複以上全部步驟。
在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL。
python線程模塊的選擇
Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。
避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。
thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。
threading模塊
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹(官方連接)
線程的建立Threading.Thread類
線程的建立
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')
多線程與多進程
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print('主線程/主進程') ''' 打印結果: hello 主線程/主進程 ''' #在主進程下開啓子進程 t=Process(target=work) t.start() print('主線程/主進程') ''' 打印結果: 主線程/主進程 hello '''
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據 同一進程內的線程共享該進程的數據?
練習 :多線程實現socket
#_*_coding:utf-8_*_ #!/usr/bin/env python import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
#_*_coding:utf-8_*_ #!/usr/bin/env python import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)
Thread類的其餘方法
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程 print(threading.enumerate()) #連同主線程在內有兩個運行的線程 print(threading.active_count()) print('主線程/主進程') ''' 打印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主線程/主進程 Thread-1 '''
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''
鎖
同步鎖
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能爲99
import threading R=threading.Lock() R.acquire() ''' 對公共數據的操做 ''' R.release()
from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
#不加鎖:併發執行,速度快,數據不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ''' #不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全 from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ''' #有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊 #沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是 #start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的 #單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高. from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 ''' )
死鎖與遞歸鎖
進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
典型問題:科學家吃麪
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
線程隊列
queue隊列 :使用import queue,用法與進程Queue同樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
-
class
queue.
Queue
(maxsize=0) #先進先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite. The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data). exception queue.Empty Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty. exception queue.Full Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full. Queue.qsize() Queue.empty() #return True if empty Queue.full() # return True if full Queue.put(item, block=True, timeout=None) Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case). Queue.put_nowait(item) Equivalent to put(item, False). Queue.get(block=True, timeout=None) Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case). Queue.get_nowait() Equivalent to get(False). Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads. Queue.task_done() Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. Queue.join() block直到queue被消費完畢
Python標準模塊--concurrent.futures
https://docs.python.org/dev/library/concurrent.futures.html
#1 介紹 concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做 #shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回調函數
# done()
判斷某一個線程是否完成
# cancle()
取消某個任務
#介紹 The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned. class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised. #用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
#介紹 ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法 與ProcessPoolExecutor相同
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
協程
協程介紹
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。、
須要強調的是:
#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) #2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)
對比操做系統控制線程的切換,用戶在單線程內控制協程的切換
優勢以下:
#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 #2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
缺點以下:
#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程 #2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
總結協程特色:
- 必須在只有一個單線程裏實現併發
- 修改共享數據不需加鎖
- 用戶程序裏本身保存多個控制流的上下文棧
- 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))
Gevent模塊
安裝:pip3 install gevent
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合做一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('主')
上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前
或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭
from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')
咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程
from gevent import monkey;monkey.patch_all() import threading import gevent import time def eat(): print(threading.current_thread().getName()) print('eat food 1') time.sleep(2) print('eat food 2') def play(): print(threading.current_thread().getName()) print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')
Gevent之同步與異步
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): # 同步 for i in range(10): task(i) def asynchronous(): # 異步 g_l=[spawn(task,i) for i in range(10)] joinall(g_l) print('DONE') if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() # 上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 # 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數, # 後者阻塞當前流程,並執行全部給定的greenlet任務。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
Gevent之應用舉例
經過gevent實現單線程下的socket併發
注意 :from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞
from gevent import monkey;monkey.patch_all() from socket import * import gevent #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080)
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象必定要加到函數內,即局部名稱空間內,放在函數外則被全部線程共享,則你們公用一個套接字對象,那麼客戶端端口永遠同樣了 c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start()