即C/S架構,包括html
一、硬件C/S架構(打印機)python
二、軟件B/S架構(web服務)linux
C/S架構與Socket的關係:程序員
咱們學習Socket就是爲了完成C/S的開發web
引子: 算法
計算機組成原理:硬件、操做系統、應用軟件三者組成。shell
具有以上條件後,計算機就能夠工做,若是你要和別人一塊兒玩,那你就須要上網了。互聯網的核心就是由一堆協議組成,協議就是標準。數據庫
爲何學習Socket以前要先了解互聯網協議?編程
一、C/S架構的軟件(應用軟件屬於應用層)是基於網絡進行通訊的json
二、網絡的核心即一堆協議,協議即標準,想開發一款基於網絡通訊的軟件,就必須遵循這些標準
Socket是應用層與TCP/IP協議族通訊的中間軟件抽象層,它是一組接口,在設計模式中,Socket其實就是一個門面模式,它把負責的TCP/IP協議族隱藏在Socket接口後面,對用戶來講,一組簡單的接口就是所有,讓Socket去組織數據,以符合指定的協議。
因此,咱們無需深刻學習理解TCP/UDP協議,Socket已經爲咱們封裝好了,咱們只須要遵循Socket的規定去編程,寫出的程序天然就是遵循TCP/UDP標準的。
套接字起源於20世紀70年代加利福尼亞大學伯克利分校版本的Unix,即人們所說的BSD Unix。所以,有時人們也把套接字成爲「伯克利套接字」或「BSD套接字」。一開始,套接字被設計用在一臺主機上多個應用程序之間的通訊,這也被稱做進程間通許或IPC。套接字有兩種(或者稱爲兩個種族),分別是基於文件型和就網絡型。
基於文件類型的套接字家族
套接字家族的名字:AF_UNIX
UNIX一切皆文件,基於文件的套接字調用的就是底層的文件系統來取數據,兩個套接字進程運行在同一機器上,能夠經過訪問同一文件系統間接完成通訊。
基於網絡類型的套接字家族
套接字家族的名字:AF_INET
還有AF_INET6被用於ipv6,還有一些其餘的地址家族,不過,他們要麼是隻用於某個平臺,要麼就是已經被廢棄,或者是不多被使用,或者是根本沒有實現,全部地址家族中,AF_INET是使用最普遍的一個,Python支持不少地址家族,可是因爲咱們只關心網絡編程,因此大部分時候咱們只使用AF_INET(AF:Address Family;INET:Internet)
生活中,你要打電話給一個朋友,先撥號,朋友聽到電話鈴聲響後接打電話,這時你和你的朋友就創建起了鏈接,就能夠講話了,等交流結束,掛斷電話結束這次通話。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 5 phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 買手機;socket.AF_INET:基於網絡協議;socket.SOCK_STREAM:基於流的TCP協議 6 phone.bind(('127.0.0.1', 8080)) # 綁定手機卡;元祖形式,ip地址+端口 7 # 注:服務器的ip地址寫本機的ip地址 8 phone.listen(5) # 開機 9 conn, addr = phone.accept() # 等電話 10 msg = conn.recv(1024) # 收消息 11 print('客戶端發來的消息是:', msg) 12 conn.send(msg.upper()) # 發消息 13 conn.close() 14 phone.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 買手機 5 phone.connect(('127.0.0.1', 8080)) # 撥電話 6 # 注:客戶端的ip地址,寫服務器端的ip地址 7 phone.send('hello'.encode('utf-8')) # 發消息 8 data = phone.recv(1024) # 收消息 9 print('收到服務端發來的消息', data)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # import socket 4 from socket import * 5 import time 6 ip_port = ('127.0.0.1', 8080) 7 back_log = 5 8 buffer_size = 1024 9 tcp_server = socket(AF_INET, SOCK_STREAM) 10 tcp_server.bind(ip_port) 11 tcp_server.listen(back_log) 12 print('服務端開始運行') 13 conn, addr = tcp_server.accept() # 服務器阻塞 14 print('雙向連接', conn) 15 print('客戶端地址', addr) 16 while True: 17 time.sleep(1) 18 print('[%s]' % time.time()) 19 data = conn.recv(buffer_size) 20 print('客戶端發來的消息是', data.decode('utf-8')) 21 conn.send(data.upper()) 22 conn.close() 23 tcp_server.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import time 5 ip_port = ('127.0.0.1', 8080) 6 back_log = 5 7 buffer_size = 1024 8 tcp_client = socket(AF_INET, SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 while True: 11 time.sleep(2) 12 print('[%s]' % time.time()) 13 msg = input('>>:').strip() 14 tcp_client.send(msg.encode('utf-8')) 15 print('客戶端已經發送消息') 16 data = tcp_client.recv(buffer_size) 17 print('收到服務端發來消息', data.decode('utf-8')) 18 tcp_client.close()
若重啓服務端時,可能會遇到:Address already in use;這個是因爲服務端扔然存在四次揮手的time_wait狀態佔用地址
解決方案:
1 # 加入一條socket配置,重用ip和端口 2 tcp_server = socket(AF_INET, SOCK_STREAM) 3 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # <---就是這條,在bind前加 4 tcp_server.bind(ip_port)
1 發現系統存在大量TIME_WAIT狀態的鏈接,經過調整linux內核參數解決, 2 vi /etc/sysctl.conf 3 4 編輯文件,加入如下內容: 5 net.ipv4.tcp_syncookies = 1 6 net.ipv4.tcp_tw_reuse = 1 7 net.ipv4.tcp_tw_recycle = 1 8 net.ipv4.tcp_fin_timeout = 30 9 10 而後執行 /sbin/sysctl -p 讓參數生效。 11 12 net.ipv4.tcp_syncookies = 1 表示開啓SYN Cookies。當出現SYN等待隊列溢出時,啓用cookies來處理,可防範少許SYN攻擊,默認爲0,表示關閉; 13 14 net.ipv4.tcp_tw_reuse = 1 表示開啓重用。容許將TIME-WAIT sockets從新用於新的TCP鏈接,默認爲0,表示關閉; 15 16 net.ipv4.tcp_tw_recycle = 1 表示開啓TCP鏈接中TIME-WAIT sockets的快速回收,默認爲0,表示關閉。 17 18 net.ipv4.tcp_fin_timeout 修改系統默認的 TIMEOUT 時間
1 ss = socket() # 建立一個服務器的套接字 2 ss.bind() # 綁定服務器套接字 3 while True: # 服務器無限循環 4 cs = ss.recvfrom()/ss.sendto() # 對話(接收與發送) 5 ss.close() # 關閉服務器套接字
1 cs = socket() # 建立客戶套接字 2 while True: 3 cs.sendto()/cs.recvfrom() # 對話(發送/接收) 4 cs.close() # 關閉客戶套接字
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('127.0.0.1', 8080) 5 buffer_size = 1024 6 udp_server = socket(AF_INET, SOCK_DGRAM) # SOCK_DGRAM:數據報式套接字 7 udp_server.bind(ip_port) 8 while True: 9 # data = udp_server.recvfrom(buffer_size) 10 # print(data) # (b'hello', ('127.0.0.1', 65047)) 11 data, addr = udp_server.recvfrom(buffer_size) 12 print(data) 13 print(addr) 14 udp_server.sendto(data.upper(), addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('127.0.0.1', 8080) 5 buffer_size = 1024 6 udp_client = socket(AF_INET, SOCK_DGRAM) # SOCK_DGRAM:數據報式套接字 7 while True: 8 msg = input('>>').strip() 9 udp_client.sendto(msg.encode('utf-8'), ip_port) 10 data, addr = udp_client.recvfrom(buffer_size) 11 print(data.decode('utf-8')) 12 print(addr)
注:只有TCP有粘包現象,UDP永遠不會粘包
一個socket收發消息的原理圖:
發送端能夠是一K一K地發送數據,而接收端的應用程序能夠兩K兩K地提走數據,固然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據,也就是說,應用程序所看到的數據是一個總體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,所以TCP協議是面向流的協議,這也是容易出現粘包問題的緣由。而UDP是面向消息的協議,每一個UDP段都是一條消息,應用程序必須以消息爲單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不一樣的。怎樣定義消息呢?能夠認爲對方一次性write/send的數據爲一個消息,須要明白的是當對方send一條信息的時候,不管底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成後才呈如今內核緩衝區。
例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束
所謂粘包問題主要仍是由於接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所形成的。
此外,發送方引發的粘包是由TCP協議自己形成的,TCP爲提升傳輸效率,發送方每每要收集到足夠多的數據後才發送一個TCP段。若連續幾回須要send的數據都不多,一般TCP會根據優化算法把這些數據合成一個TCP段後一次發送出去,這樣接收方就收到了粘包數據。
udp的recvfrom是阻塞的,一個recvfrom(x)必須對惟一一個sendinto(y),收完了x個字節的數據就算完成,如果y>x數據就丟失,這意味着udp根本不會粘包,可是會丟數據,不可靠
tcp的協議數據不會丟,沒有收完包,下次接收,會繼續上次繼續接收,己端老是在收到ack時纔會清除緩衝區內容。數據是可靠的,可是會粘包。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import subprocess 5 6 ip_port = ('127.0.0.1', 8080) 7 back_log = 5 8 buffer_size = 1024 9 tcp_server = socket(AF_INET, SOCK_STREAM) 10 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 11 tcp_server.bind(ip_port) 12 tcp_server.listen(back_log) 13 while True: 14 conn, addr = tcp_server.accept() 15 while True: 16 try: 17 cmd = conn.recv(buffer_size) 18 # if not cmd:break 19 # 執行命令,獲得命令的運行結果cmd_res 20 res = subprocess.Popen(cmd.decode('utf-8'), shell=True, 21 stderr=subprocess.PIPE, 22 stdout=subprocess.PIPE, 23 stdin=subprocess.PIPE 24 ) 25 err = res.stderr.read() 26 if err: 27 cmd_res = err 28 else: 29 cmd_res = res.stdout.read() 30 if not cmd_res: 31 cmd_res = 'excute success'.encode('utf-8') 32 length = len(cmd_res) 33 conn.send(str(length).encode('utf-8')) 34 client_ready = conn.recv(buffer_size) 35 if client_ready == b'ready': 36 conn.send(cmd_res) 37 38 except Exception as EX: 39 print(EX) 40 break 41 conn.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 5 ip_port = ('127.0.0.1', 8080) 6 back_log = 5 7 buffer_size = 1024 8 tcp_client = socket(AF_INET, SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 while True: 11 cmd = input('>>>').strip() 12 if not cmd:continue 13 if cmd == 'quit':break 14 tcp_client.send(cmd.encode('utf-8')) 15 length = tcp_client.recv(buffer_size) 16 tcp_client.send(b'ready') 17 length = int(length.decode('utf-8')) 18 recv_size = 0 19 recv_msg = b'' 20 while recv_size < length: 21 # 第一種寫法 22 # r_m = tcp_client.recv(buffer_size) 23 # recv_msg += r_m 24 # recv_size += len(r_m) 25 26 # 第二種寫法 27 recv_msg += tcp_client.recv(buffer_size) 28 recv_size = len(recv_msg) 29 print(recv_msg.decode('gbk')) 30 tcp_client.close()
方法2、
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import subprocess 5 import struct 6 7 ip_port = ('127.0.0.1', 8080) 8 back_log = 5 9 buffer_size = 1024 10 tcp_server = socket(AF_INET, SOCK_STREAM) 11 tcp_server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 12 tcp_server.bind(ip_port) 13 tcp_server.listen(back_log) 14 while True: 15 conn, addr = tcp_server.accept() 16 while True: 17 try: 18 cmd = conn.recv(buffer_size) 19 # if not cmd:break 20 # 執行命令,獲得命令的運行結果cmd_res 21 res = subprocess.Popen(cmd.decode('utf-8'), shell=True, 22 stderr=subprocess.PIPE, 23 stdout=subprocess.PIPE, 24 stdin=subprocess.PIPE 25 ) 26 err = res.stderr.read() 27 if err: 28 cmd_res = err 29 else: 30 cmd_res = res.stdout.read() 31 if not cmd_res: 32 cmd_res = 'excute success'.encode('utf-8') 33 length = len(cmd_res) 34 data_length = struct.pack('i', length) 35 conn.send(data_length) 36 conn.send(cmd_res) 37 38 except Exception as EX: 39 print(EX) 40 break 41 conn.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 import struct 5 6 ip_port = ('127.0.0.1', 8080) 7 back_log = 5 8 buffer_size = 1024 9 tcp_client = socket(AF_INET, SOCK_STREAM) 10 tcp_client.connect(ip_port) 11 while True: 12 cmd = input('>>>').strip() 13 if not cmd:continue 14 if cmd == 'quit':break 15 tcp_client.send(cmd.encode('utf-8')) 16 length_data = tcp_client.recv(4) 17 length = struct.unpack('i', length_data)[0] 18 recv_size = 0 19 recv_msg = b'' 20 while recv_size < length: 21 # 第一種寫法 22 # r_m = tcp_client.recv(buffer_size) 23 # recv_msg += r_m 24 # recv_size += len(r_m) 25 26 # 第二種寫法 27 recv_msg += tcp_client.recv(buffer_size) 28 recv_size = len(recv_msg) 29 print(recv_msg.decode('gbk')) 30 tcp_client.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 5 6 class MyServer(socketserver.BaseRequestHandler): 7 def handle(self): 8 print('conn is:', self.request) # <==>conn 9 print('addr is:', self.client_address) # <==> addr 10 while True: 11 try: 12 # 收消息 13 data = self.request.recv(1024) 14 if not data:break 15 print('收到客戶端的消息是', data) 16 # 發消息 17 self.request.sendall(data.upper()) 18 except Exception as EX: 19 print('錯誤提示:',EX) 20 break 21 22 23 if __name__ == '__main__': 24 s = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), MyServer) # 多線程;第一個參數,地址+端口;第二個參數,類
25 # s = socketserver.ForkingTCPServer(('127.0.0.1', 8080), MyServer) # 多進程;多進程的開銷大於多線程
26 s.serve_forever()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('127.0.0.1', 8080) 5 back_log = 5 6 buffer_size = 1024 7 tcp_client = socket(AF_INET, SOCK_STREAM) 8 tcp_client.connect(ip_port) 9 while True: 10 msg = input('>>').strip() 11 if not msg:continue 12 if msg == 'quit':break 13 tcp_client.send(msg.encode('utf-8')) 14 data = tcp_client.recv(buffer_size) 15 print('收到服務端發來的消息:', data.decode('utf-8')) 16 tcp_client.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 5 6 class MyServer(socketserver.BaseRequestHandler): 7 def handle(self): 8 print(self.request) 9 print('收到客戶端的消息是:', self.request[0].upper()) 10 self.request[1].sendto(self.request[0].upper(), self.client_address) 11 12 13 if __name__ == '__main__': 14 s = socketserver.ThreadingUDPServer(('127.0.0.1', 8080), MyServer) # 第一個參數,地址+端口;第二個參數,類 15 s.serve_forever()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from socket import * 4 ip_port = ('127.0.0.1', 8080) 5 buffer_size = 1024 6 udp_client = socket(AF_INET, SOCK_DGRAM) # SOCK_DGRAM:數據報式套接字 7 while True: 8 msg = input('>>').strip() 9 udp_client.sendto(msg.encode('utf-8'), ip_port) 10 data, addr = udp_client.recvfrom(buffer_size) 11 print(data.decode('utf-8')) 12 print(addr)
若是想在分佈式系統中實現一個簡單的客戶端連接認證功能,又不像SSL那麼複雜,那麼能夠利用hmac+加鹽的方法來實現。
1 # _*_coding:utf-8_*_ 2 __author__ = 'Linhaifeng' 3 from socket import * 4 import hmac, os 5 6 secret_key = b'alex bang bang bang' 7 8 9 def conn_auth(conn): 10 ''' 11 認證客戶端連接 12 :param conn: 13 :return: 14 ''' 15 print('開始驗證新連接的合法性') 16 msg = os.urandom(32) # 隨機生成的;b'\xa3\x9d\xaa\x94\x9e\x89\xe9\xc9\xc3r\xf9E\xe0w\x82=\xaac-\x04\xd8:\xea\x07\xad\x1dx\x1er\xe0\x7f\x02' 17 conn.sendall(msg) 18 h = hmac.new(secret_key, msg) # <hmac.HMAC object at 0x000000D5DA4824E0> 19 digest = h.digest() # 隨機生成的;b'\x17!*\xae6\x81\xfe|)\x138\xfa2o%\x1a' 20 respone = conn.recv(len(digest)) 21 return hmac.compare_digest(respone, digest) # 比較第一個參數和第二個參數;相同,返回True,反之也成立 22 23 24 def data_handler(conn, bufsize=1024): 25 if not conn_auth(conn): 26 print('該連接不合法,關閉') 27 conn.close() 28 return 29 print('連接合法,開始通訊') 30 while True: 31 data = conn.recv(bufsize) 32 if not data: break 33 conn.sendall(data.upper()) 34 35 36 def server_handler(ip_port, bufsize, backlog=5): 37 ''' 38 只處理連接 39 :param ip_port: 40 :return: 41 ''' 42 tcp_socket_server = socket(AF_INET, SOCK_STREAM) 43 tcp_socket_server.bind(ip_port) 44 tcp_socket_server.listen(backlog) 45 while True: 46 conn, addr = tcp_socket_server.accept() 47 print('新鏈接[%s:%s]' % (addr[0], addr[1])) 48 data_handler(conn, bufsize) 49 50 51 if __name__ == '__main__': 52 ip_port = ('127.0.0.1', 9999) 53 bufsize = 1024 54 server_handler(ip_port, bufsize)
1 # _*_coding:utf-8_*_ 2 __author__ = 'Linhaifeng' 3 from socket import * 4 import hmac, os 5 6 secret_key = b'alex bang bang bang' 7 8 9 def conn_auth(conn): 10 ''' 11 認證客戶端連接 12 :param conn: 13 :return: 14 ''' 15 print('開始驗證新連接的合法性') 16 msg = os.urandom(32) 17 conn.sendall(msg) 18 h = hmac.new(secret_key, msg) 19 digest = h.digest() 20 respone = conn.recv(len(digest)) 21 return hmac.compare_digest(respone, digest) 22 23 24 def data_handler(conn, bufsize=1024): 25 if not conn_auth(conn): 26 print('該連接不合法,關閉') 27 conn.close() 28 return 29 print('連接合法,開始通訊') 30 while True: 31 data = conn.recv(bufsize) 32 if not data: break 33 conn.sendall(data.upper()) 34 35 36 def server_handler(ip_port, bufsize, backlog=5): 37 ''' 38 只處理連接 39 :param ip_port: 40 :return: 41 ''' 42 tcp_socket_server = socket(AF_INET, SOCK_STREAM) 43 tcp_socket_server.bind(ip_port) 44 tcp_socket_server.listen(backlog) 45 while True: 46 conn, addr = tcp_socket_server.accept() 47 print('新鏈接[%s:%s]' % (addr[0], addr[1])) 48 data_handler(conn, bufsize) 49 50 51 if __name__ == '__main__': 52 ip_port = ('127.0.0.1', 9999) 53 bufsize = 1024 54 server_handler(ip_port, bufsize)
實現以下功能:
一、用戶加密認證
二、每一個用戶都有本身的家目錄,且只能訪問本身的家目錄
三、容許用戶在ftp server上隨意切換目錄(cd)
四、容許用戶查看當前目錄下的全部文件(ls)
五、容許上傳和下載文件
六、文件傳輸過程當中顯示進度條
七、支持文件的斷點續傳
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import os,sys 4 PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 sys.path.append(PATH) 6 7 8 from core import main 9 10 11 if __name__ == '__main__': 12 main.ArgvHandler()
1 [DEFAULT] 2 3 [alex] 4 Password =123 5 Quotation = 100 6 7 [root] 8 Password = root 9 Quotation = 100
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import os 4 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 6 ip = '127.0.0.1' 7 port = 8080 8 9 ACCOUNT_PATH = os.path.join(BASE_DIR, 'conf', 'accounts.cfg')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 import optparse 5 import socketserver 6 from conf import settings 7 from core import server 8 9 10 class ArgvHandler(): 11 def __init__(self): 12 self.op = optparse.OptionParser() 13 # self.op.add_option('-s', '--server', dest='server') 14 # self.op.add_option('-P', '--port', dest='port') 15 options, args = self.op.parse_args() 16 # print(options) # {'server': '127.0.0.1', 'port': '8080'} 17 # print(type(options)) # <class 'optparse.Values'> 18 # print(options.server) # 127.0.0.1 19 # print(args) 20 21 options, args = self.op.parse_args() 22 self.verify_args(options, args) 23 24 def verify_args(self, options, args): 25 cmd = args[0] 26 # 第一種方法 27 # if cmd == 'start': 28 # pass 29 # else: 30 # pass 31 # 第二種方法 32 if hasattr(self, cmd): 33 func = getattr(self, cmd) 34 func() 35 36 def start(self): 37 print('ths server is working...') 38 s = socketserver.ThreadingTCPServer((settings.ip, settings.port), server.ServerHandler) 39 s.serve_forever() 40 41 def help(self): 42 pass
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socketserver 4 import json 5 import configparser 6 from conf import settings 7 import os 8 9 STATUS_CODE = { 10 250: "Invalid cmd format,e.g:{'action':'get','filename':'test.py','size':344}", 11 251: "Invalid cmd", 12 252: "Invalid auth data", 13 253: "Wrong username or password", 14 254: "Passed authentication", 15 255: "Filename doesn't provided", 16 256: "File doesn't exist on server", 17 257: "ready to send file", 18 258: "md5 verification", 19 800: "the file exist,but not enough,is continue?", 20 801: "the file exist!", 21 802: "ready to receive datas", 22 900: "md5 valdate success" 23 } 24 25 26 class ServerHandler(socketserver.BaseRequestHandler): 27 def handle(self): 28 while True: 29 data = self.request.recv(1024).strip() # self.request=conn 30 data = json.loads(data.decode('utf8')) 31 if data.get('action'): 32 if hasattr(self, data.get('action')): 33 func = getattr(self, data.get('action')) 34 func(**data) 35 else: 36 print('Invalid cmd') 37 else: 38 print('Invalid cmd') 39 40 def send_response(self, status_code): 41 response = {'status_code': status_code} 42 self.request.sendall(json.dumps(response).encode('utf8')) 43 44 def auth(self, **data): 45 username = data['username'] 46 password = data['password'] 47 username = self.authenticate(username, password) 48 if username: 49 self.send_response(254) 50 else: 51 self.send_response(253) 52 53 def authenticate(self, username, password): 54 cfg = configparser.ConfigParser() 55 cfg.read(settings.ACCOUNT_PATH) 56 if username in cfg.sections(): 57 if cfg[username]['Password'] == password: 58 self.username = username 59 self.mainPath = os.path.join(settings.BASE_DIR, 'home', self.username) 60 print('passed authenticate') 61 return username 62 63 def put(self, **data): 64 print('data', data) 65 file_name = data.get('file_name') 66 file_size = data.get('file_size') 67 target_path = data.get('target_path') 68 abs_path = os.path.join(self.mainPath, target_path, file_name) 69 has_received = 0 70 if os.path.exists(abs_path): 71 file_has_size = os.stat(abs_path).st_size 72 if file_has_size < file_size: 73 # 斷點續傳 74 self.request.sendall('800'.encode('utf8')) 75 choice = self.request.recv(1024).decode('utf8') 76 if choice == 'Y': 77 self.request.sendall(str(file_has_size).encode('utf8')) 78 has_received = file_has_size 79 f = open(abs_path, 'ab') 80 else: 81 f = open(abs_path, 'wb') 82 else: 83 # 文件徹底存在 84 self.request.sendall('801'.encode('utf8')) 85 return 86 else: 87 self.request.sendall('802'.encode('utf8')) 88 f = open(abs_path, 'wb') 89 90 while has_received < file_size: 91 try: 92 data = self.request.recv(1024) 93 except Exception as EX: 94 break 95 f.write(data) 96 has_received += len(data) 97 f.close() 98 99 def ls(self, **data): 100 file_list = os.listdir(self.mainPath) 101 file_str = '\n'.join(file_list) 102 if not len(file_list): 103 file_str = '<empty dir>' 104 self.request.sendall(file_str.encode('utf8')) 105 106 def cd(self, **data): 107 dirname = data.get('dirname') 108 if dirname == '..': 109 self.mainPath = os.path.dirname(self.mainPath) 110 else: 111 self.mainPath = os.path.join(self.mainPath, dirname) 112 self.request.sendall(self.mainPath.encode('utf8')) 113 114 def mkdir(self, **data): 115 dirname = data.get('dirname') 116 path = os.path.join(self.mainPath, dirname) 117 if not os.path.exists(path): 118 if '/' in path: 119 os.makedirs(path) 120 else: 121 os.mkdir(path) 122 self.request.sendall('create success'.encode('utf8')) 123 else: 124 self.request.sendall('dirname exist'.encode('utf8'))
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # import socket 4 # 5 # sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 # sk.connect(('127.0.0.1', 8080)) 7 import optparse 8 import socket 9 import json 10 import os, sys 11 STATUS_CODE = { 12 250: "Invalid cmd format,e.g:{'action':'get','filename':'test.py','size':344}", 13 251: "Invalid cmd", 14 252: "Invalid auth data", 15 253: "Wrong username or password", 16 254: "Passed authentication", 17 255: "Filename doesn't provided", 18 256: "File doesn't exist on server", 19 257: "ready to send file", 20 258: "md5 verification", 21 800: "the file exist,but not enough,is continue?", 22 801: "the file exist!", 23 802: "ready to receive datas", 24 900: "md5 valdate success" 25 } 26 27 28 class ClientHandler(object): 29 def __init__(self): 30 self.op = optparse.OptionParser() 31 self.op.add_option('-s', '--server', dest='server') 32 self.op.add_option('-P', '--port', dest='port') 33 self.op.add_option('-u', '--username', dest='username') 34 self.op.add_option('-p', '--password', dest='password') 35 36 self.options, self.args = self.op.parse_args() 37 38 self.verify_args(self.options, self.args) 39 self.make_connection() 40 self.mainPath = os.path.dirname(os.path.abspath(__file__)) 41 self.last = 0 42 43 def verify_args(self, options, args): 44 server = options.server 45 port = options.port 46 username = options.username 47 password = options.password 48 if int(port) > 0 and int(port) < 65535: 49 return True 50 else: 51 exit('the port is in 0-65535') 52 53 def make_connection(self): 54 self.sock = socket.socket() 55 self.sock.connect((self.options.server, int(self.options.port))) 56 57 def interactive(self): 58 print('begin to interactive...') 59 # self.authenticate() 60 if self.authenticate(): 61 while 1: 62 cmd_info = input('[%s]' % self.current_dir).strip() 63 cmd_list = cmd_info.split() 64 if hasattr(self, cmd_list[0]): 65 func = getattr(self, cmd_list[0]) 66 func(*cmd_list) 67 68 def put(self, *cmd_list): 69 # put 12.png images 70 actions, local_path, target_path = cmd_list 71 local_path = os.path.join(self.mainPath, local_path) 72 file_name = os.path.basename(local_path) 73 file_size = os.stat(local_path).st_size 74 data = { 75 'action': 'put', 76 'file_name': file_name, 77 'file_size': file_size, 78 'target_path': target_path 79 } 80 self.sock.send(json.dumps(data).encode('utf8')) 81 is_exist = self.sock.recv(1024).decode('utf8') 82 has_send = 0 83 if is_exist == '800': 84 # 文件不完整 85 choice = input('the file exist,but not enough,is continue?[Y/N]').strip() 86 if choice.upper() == 'Y': 87 self.sock.sendall('Y'.encode('utf8')) 88 continue_position = self.sock.recv(1024).decode('utf8') 89 has_send += int(continue_position) 90 else: 91 self.sock.sendall('N'.encode('utf8')) 92 elif is_exist == '801': 93 # 文件徹底存在 94 print('the file exist') 95 return 96 else: 97 pass 98 f = open(local_path, 'rb') 99 f.seek(has_send) 100 while has_send < file_size: 101 data = f.read(1024) 102 self.sock.sendall(data) 103 has_send += len(data) 104 self.show_progress(has_send, file_size) 105 f.close() 106 print('successfully upload!') 107 108 def show_progress(self, has, total): 109 rate = float(has)/float(total) 110 rate_num = int(rate*100) 111 if self.last != rate_num: 112 sys.stdout.write('%s%% %s\r' % (rate_num, '#'*rate_num)) 113 self.last = rate_num 114 115 def ls(self, *cmd_list): 116 data = { 117 'action': 'ls' 118 } 119 self.sock.sendall(json.dumps(data).encode('utf8')) 120 data = self.sock.recv(1024).decode('utf8') 121 print(data) 122 123 def cd(self, *cmd_list): 124 data = { 125 'action': 'cd', 126 'dirname': cmd_list[1] 127 } 128 self.sock.sendall(json.dumps(data).encode('utf8')) 129 data = self.sock.recv(1024).decode('utf8') 130 self.current_dir = os.path.basename(data) 131 132 def mkdir(self, *cmd_list): 133 data = { 134 'action': 'mkdir', 135 'dirname': cmd_list[1] 136 } 137 self.sock.sendall(json.dumps(data).encode('utf8')) 138 data = self.sock.recv(1024).decode('utf8') 139 140 def authenticate(self): 141 if self.options.username is None or self.options.password is None: 142 username = input('username:') 143 password = input('password:') 144 return self.get_auth_result(username, password) 145 return self.get_auth_result(self.options.username, self.options.password) 146 147 def response(self): 148 data = self.sock.recv(1024).decode('utf8') 149 data = json.loads(data) 150 return data 151 152 def get_auth_result(self, username, password): 153 data = { 154 'action': "auth", 155 'username': username, 156 'password': password 157 } 158 self.sock.send(json.dumps(data).encode('utf8')) 159 response = self.response() 160 print('response:', response['status_code']) 161 if response['status_code'] == 254: 162 self.username = username 163 self.current_dir = username 164 print(STATUS_CODE[response['status_code']]) 165 return True 166 else: 167 print(STATUS_CODE[response['status_code']]) 168 169 170 ch = ClientHandler() 171 ch.interactive()
現代計算機系統是由一個或者多個處理器、內存、硬盤、打印機、鍵盤、鼠標和顯示器等組成的。網絡接口以及各類其餘輸入/輸出設備組成的複雜系統,每位程序員不可能掌握全部系統實現的細節,而且管理優化這些部件是一件具備挑戰性極強的工做。因此,咱們須要爲計算機安裝一層軟件,成爲操做系統,任務就是用戶程序性提供一個簡單清晰的計算機模型,並管理以上設備。
定義:操做系統是一個用來協調、管理和控制計算機硬件和軟件資源的系統程序,它位於硬件和應用程序之間。程序是運行在系統上的具備某種功能的軟件,好比:瀏覽器,音樂播放器等。
操做系統內部的定義:操做系統的內核是一個管理和控制程序,負責管理計算機的全部物理資源,其中包括:文件系統、內存管理、設備管理、進程管理。
假若有兩個程序A和B,程序A在執行到一半的過程當中,須要讀取大量的數據輸入(I/O操做),而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。是否是在程序A讀取數據的過程當中,讓程序B去執行,當程序A讀取完數據以後,讓程序B暫停,而後讓程序A繼續執行?固然沒問題,但這裏有一個關鍵詞:切換;既然是切換,那麼這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所須要的系統資源(內存,硬盤,鍵盤等等)是不同的。天然而然的就須要有一個東西去記錄程序A和程序B分別須要什麼資源,怎樣去識別程序A和程序B等等,因此就有了一個叫進程的抽象。
進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據庫、進程控制塊三部分組成。咱們編寫的程序用來描述進程要完成哪些功能以及如何完成。數據集則是程序在執行過程當中所須要使用的資源。進程控制塊用來記錄的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。
本質上就是一段程序的運行過程(抽象的概念)
線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,讓進程內併發成爲可能。
一、一個程序至少有一個進程,一個進程至少有一個線程(進程能夠理解成線程的容器)
二、進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率
三、線程在執行過程當中與進程仍是有區別的,每一個獨立的線程有一個程序運行的入口,順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
四、進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位;線程是進程的一個實體,是CPU調度和分源的基本單位,它是比進程更小的能獨立運行的基本單位,線程本身基本上不擁有系統資源,只擁有一點運行中必不可少的資源(如程序計數器,一組寄存器和錢),可是它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源。一個進程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行。
五、線程:最小的執行單元(實例);進程:最小的資源單位
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思:不管你啓多少個線程,你有多少個CPU,Python在執行的時候會淡定的在同一時刻只容許一個線程運行。
threading 模塊創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。
方式1、
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 def music(): 8 print('begin to listen % s' % time.ctime()) 9 time.sleep(3) 10 print('stop to listen %s' % time.ctime()) 11 12 13 def game(): 14 print('begin to play game % s \r\n' % time.ctime()) 15 time.sleep(5) 16 print('stop to play game %s \r\n' % time.ctime()) 17 18 19 threads = [] 20 t1 = threading.Thread(target=music) 21 t2 = threading.Thread(target=game) 22 threads.append(t1) 23 threads.append(t2) 24 if __name__ == '__main__': 25 # join()功能:在子線程完成運行以前,這個子線程的父線程講一直被阻塞 26 # t1 = threading.Thread(target=music) 27 # t2 = threading.Thread(target=game) 28 # t1.start() 29 # t2.start() 30 # 31 # t1.join() 32 # t2.join() 33 # print('end') 34 35 # setDaemon():將線程生命爲守護線程 36 t2.setDaemon(True) # 注:必定要在start以前設置 37 for t in threads: 38 t.start() 39 print('end')
方式2、
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class MyThread(threading.Thread): 8 def __init__(self, num): 9 threading.Thread.__init__(self) 10 self.num = num 11 12 def run(self): # 定義每一個線程要運行的函數 13 14 print("running on number:%s" % self.num) 15 16 time.sleep(3) 17 18 19 if __name__ == '__main__': 20 t1 = MyThread(1) 21 t2 = MyThread(2) 22 t1.start() 23 t2.start() 24 25 print("ending......")
join():在子線程完成運行以前,這個子線程的父線程將一直被阻塞。
setDaemon(True):
將線程生命爲守護線程,必須在start()方法調用以前設置,若是不設置爲守護線程,程序會被無限掛起。這個方法基本和join是相反的。當咱們在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程就分兵兩路,分別運行,那麼當主線程完成想退出時,會驗證子線程是否完成。若是子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是,只要主線程完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠用setDaemon方法了。
1 # run():用於表示線程活動的方法 2 # start():啓動線程活動 3 # isAlive():返回線程是否活動的,返回布爾值,True/False 4 # getName():返回線程名字 5 # setName():設置線程名字 6 7 threading模塊提供的一些方法: 8 # threading.currentThread():返回當前的線程變量 9 # threading.enumerate():返回一個包含正在運行的線程的list。正在運行指線程啓動後-結束前,不包括啓動前和終止後的線程 10 # threading.activeCount():返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 def music(): 8 print('begin to listen % s' % time.ctime()) 9 time.sleep(3) 10 print('stop to listen %s' % time.ctime()) 11 12 13 def game(): 14 print('begin to play game % s \r\n' % time.ctime()) 15 time.sleep(5) 16 print('stop to play game %s \r\n' % time.ctime()) 17 18 19 threads = [] 20 t1 = threading.Thread(target=music) 21 t2 = threading.Thread(target=game) 22 threads.append(t1) 23 threads.append(t2) 24 if __name__ == '__main__': 25 # join()功能:在子線程完成運行以前,這個子線程的父線程講一直被阻塞 26 # t1 = threading.Thread(target=music) 27 # t2 = threading.Thread(target=game) 28 # t1.start() 29 # t2.start() 30 # 31 # t1.join() 32 # t2.join() 33 # print('end') 34 35 # setDaemon():將線程生命爲守護線程 36 t2.setDaemon(True) # 注:必定要在start以前設置 37 for t in threads: 38 t.start() 39 print(t.getName()) 40 print('count:', threading.activeCount()) 41 while threading.activeCount() == 3: 42 print('end')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 num = 100 6 7 8 def sub(): 9 global num 10 print('ok') 11 lock.acquire() # 加鎖 12 temp = num 13 time.sleep(0.001) 14 num = temp-1 15 lock.release() # 釋放鎖 16 17 18 li = [] 19 lock = threading.Lock() 20 for i in range(100): 21 t1 = threading.Thread(target=sub) 22 t1.start() 23 li.append(t1) 24 for l in li: 25 l.join() 26 print(num)
注:多個線程都在同時操做同一個共享資源,因此形成了資源破壞(join會形成串行,失去線程的意義),能夠經過同步鎖來解決這種問題。
在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都在使用,全部這兩個線程在無外力做用下將一直等待下去。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class MyThread(threading.Thread): 8 9 def actionA(self): 10 r_lock.acquire() 11 print(self.name, 'gotA', time.ctime()) # 重寫線程後的self.name --->線程的名字 12 time.sleep(2) 13 r_lock.acquire() 14 print(self.name, 'gotB', time.ctime()) 15 time.sleep(1) 16 r_lock.release() 17 r_lock.release() 18 19 def actionB(self): 20 r_lock.acquire() 21 print(self.name, 'gotB', time.ctime()) # 重寫線程後的self.name --->線程的名字 22 time.sleep(2) 23 r_lock.acquire() 24 print(self.name, 'gotA', time.ctime()) 25 time.sleep(1) 26 r_lock.release() 27 r_lock.release() 28 29 def run(self): 30 self.actionA() 31 self.actionB() 32 33 34 if __name__ == '__main__': 35 r_lock = threading.RLock() 36 li = [] 37 for t in range(3): 38 t = MyThread() 39 t.start() 40 li.append(t) 41 42 for i in li: 43 i.join() 44 45 print('end')
爲了支持在同一線程中屢次請求同一資源,Python提供了「可重入鎖」:threading.Rlock。Rlock內部維護着一個Lock和counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
An event is a simple synchronization object;the event represents an internal flag,
and threads can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class Boss(threading.Thread): 8 def run(self): 9 print('Boss:今天加班到22:00!\r\n') 10 print(event.isSet()) # False 11 event.set() 12 time.sleep(6) 13 print('Boss:能夠下班了,明天放假!\r\n') 14 print(event.isSet()) 15 event.set() 16 17 18 class Worker(threading.Thread): 19 def run(self): 20 event.wait() # 一旦event被設定,等同於pass 21 print('Worker:唉···命真苦!\r\n') 22 time.sleep(1) 23 event.clear() 24 event.wait() 25 print('Worker:OhYeah!\r\n') 26 27 28 if __name__ == '__main__': 29 event = threading.Event() 30 threads = [] 31 for i in range(5): 32 threads.append(Worker()) 33 threads.append(Boss()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join() 38 print('end')
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數器,每當調用acquire()時-1,調用release()時+1。計數器不能小於0,當計數器爲0時,acquire()將阻塞線程至同步鎖狀態,直到其餘線程調用release()。(相似於停車位的概念)BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 7 class MyThread(threading.Thread): 8 def run(self): 9 if semaphore.acquire(): 10 print(self.name, '\r') 11 time.sleep(5) 12 semaphore.release() 13 14 15 if __name__ == '__main__': 16 semaphore = threading.Semaphore(5) 17 threads = [] 18 for i in range(100): 19 threads.append(MyThread()) 20 for t in threads: 21 t.start()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import time 5 6 li = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 7 8 9 def Foo(): 10 while li: 11 try: 12 last_li = li[-1] 13 print(last_li) 14 li.remove(last_li) 15 time.sleep(1) 16 except Exception as EX: 17 print('錯誤提示:', last_li, EX) 18 19 20 t1 = threading.Thread(target=Foo) 21 t1.start() 22 t2 = threading.Thread(target=Foo) 23 t2.start()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import queue # 線程隊列 4 q = queue.Queue() # 建立q對象,同步實現的。隊列長度可爲無限或者有限。可經過Queque的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限 5 q.put(12) # 將一個值放到隊列,調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲1.若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空處一個數據單元。若是block爲0,put方法將引起Full異常 6 q.put('alex') 7 q.put({"age": 15}) 8 print(q.get()) # 將一個值從隊列中取出,調用隊列對象的get()方法,從對頭刪除並返回一個實例。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常 9 print(q.qsize()) # 返回隊列的大小 10 print(q.empty()) # 判斷隊列是否爲空,返回布爾值,True/False 11 print(q.full()) # 判斷隊列是否已經滿了,返回布爾值,True/False 12 q.join() # 實際上意味着等到隊列爲空,再執行別的操做 13 14 ''' 15 Queue模塊的三種隊列及構造函數 16 一、Python Queue模塊的FIFO隊列,先進先出 class queue.Queue(maxsize) 17 二、LIFO 相似於堆,即先進後處。 class queue.LifoQueue(maxsize) 18 三、優先級隊列,級別越低月先出來。 class queue.PriorityQueue(maxsize) 19 '''
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
生產者和消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import time 4 import random 5 import queue 6 import threading 7 q = queue.Queue() 8 9 10 def Producer(name): 11 count = 0 12 while count < 10: 13 print('making..') 14 time.sleep(random.randrange(3)) 15 q.put(count) 16 print('Producer [%s] has produced %s meat bun。\r' % (name, count)) 17 count += 1 18 print('ok\r') 19 20 21 def Consumer(name): 22 count = 0 23 while count < 10: 24 time.sleep(random.randrange(4)) 25 if not q.empty(): 26 data = q.get() 27 print('lm Consumer [%s] has eat %s meat bun。\r' % (name, data)) 28 else: 29 print('---no meat bun anymore----\r') 30 count += 1 31 32 33 p1 = threading.Thread(target=Producer, args=('alex',)) 34 c1 = threading.Thread(target=Consumer, args=('B',)) 35 p1.start() 36 c1.start()
併發:指系統具備處理多個任務(動做)的能力
並行:指系統具備同時處理多個任務(動做)的能力
同步:當進程執行到一個IO(等待外部數據)的時候你等
異步:當進程執行到一個IO(等待外部數據)的時候你不等;一直等到數據接收完成,在回來處理
IO密集型:Python的多線程是有意義的
計算密集型:Python的多線程就不推薦,能夠採用多進程+協程
M
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.multiprocessing
因爲GIL的存在,Python中的多線程其實並非真正的多線程,若是想充分地使用多核CPU的資源,在Python中大部分狀況下須要使用多進程。multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法,也有start(),run(),join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類(這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部分與threading使用同一套API,只不過換到了多進程的情景。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process 4 import time 5 6 7 def Foo(name): 8 time.sleep(1) 9 print('hello', name, time.ctime()) 10 11 12 if __name__ == '__main__': 13 p_list = [] 14 for i in range(200): 15 p = Process(target=Foo, args=('alex',)) 16 p_list.append(p) 17 p.start() 18 for i in p_list: 19 p.join() 20 print('end')
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process 4 import time 5 6 7 class MyProcess(Process): 8 def __init__(self): 9 super(MyProcess, self).__init__() 10 11 def run(self): 12 time.sleep(1) 13 print('hello', self.name, time.ctime()) 14 15 16 if __name__ == '__main__': 17 p_list = [] 18 for i in range(3): 19 p = MyProcess() 20 p.start() 21 p_list.append(p) 22 for p in p_list: 23 p.join() 24 print('end')
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group:線程組,目前尚未實現,庫引用中提示必須是None
target:要執行的方法
name:進程名
args/kwargs:要傳入方法的參數
實例方法:
is_alive():返回進程是否在運行
join([timeout]):阻塞當前上下文環境的進程,直到調用此方法的進程終止或到達指定的timeout(可選參數)
start():進程準備就緒,等待CPU調度
run():start()調用run方法,若是實例進程時未指定傳入target,這start執行t默認run()方法
terminate():無論任務是否完成,當即中止工做進程
屬性:
daemon:和線程的setDeamon功能同樣
name:進程名字
pid:進程號
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import queue 4 import multiprocessing 5 6 7 def Foo(q): 8 q.put(123) 9 q.put(456) 10 11 12 if __name__ == '__main__': 13 q = multiprocessing.Queue() # 注意:此處需用進程隊列,不能用線程隊列,即q=queue.Queue() 14 p = multiprocessing.Process(target=Foo, args=(q,)) 15 p.start() 16 print(q.get()) 17 print(q.get())
管道
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Pipe 4 5 6 def Foo(child_conn): 7 child_conn.send([12, {'name': 'alice'}, 'hello']) 8 response = child_conn.recv() 9 print('response:', response) 10 child_conn.close() 11 print('q_id2:', id(child_conn)) 12 13 14 if __name__ == '__main__': 15 parent_conn, child_conn = Pipe() 16 print('q_ID1', id(child_conn)) 17 p = Process(target=Foo, args=(child_conn,)) 18 p.start() 19 print(parent_conn.recv()) 20 parent_conn.send('早上好!') 21 p.join()
Managers
Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Manager 4 5 6 def Foo(dic, li, i): 7 dic[i] = '1' 8 li.append(i) 9 10 11 if __name__ == '__main__': 12 with Manager() as manager: 13 dic = manager.dict() 14 li = manager.list(range(5)) 15 p_list = [] 16 for i in range(10): 17 p = Process(target=Foo, args=(dic, li, i)) 18 p.start() 19 p_list.append(p) 20 for p in p_list: 21 p.join() 22 print(dic) 23 print(li)
進程同步
Without using the lock output from the different processes is liable to get all mixed up.
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Lock 4 5 6 def Foo(lk, i): 7 with lk: # 默認狀況下,已經lk.acquire() 8 print('hello world %s' % i) 9 10 11 if __name__ == '__main__': 12 lock = Lock() 13 for num in range(10): 14 Process(target=Foo, args=(lock, num)).start()
進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可進程爲止。
進程池中兩個方法:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from multiprocessing import Process, Pool 4 import time, os 5 6 7 def Foo(i): 8 time.sleep(1) 9 print('i = \r', i) 10 11 12 def Bar(arg): # 此處arg=Foo()函數的返回值 13 print('pgid-->%s\r' % os.getpid()) 14 print('ppid-->%s\r' % os.getppid()) 15 print('logger:%s\r' % arg) 16 17 18 if __name__ == '__main__': 19 pool = Pool(5) 20 Bar(1) 21 print('------------\r') 22 for i in range(10): 23 # pool.apply(func=Foo, args=(i,)) # 同步接口 24 # pool.apply_async(func=Foo, args=(i,)) 25 pool.apply_async(func=Foo, args=(i,), callback=Bar) # callback-->回調函數:就是某個動做或者函數執行成功後再去執行的函數
26 pool.close() 27 pool.join() # join和close位置不能反 28 print('end\r')
協程:又稱微線程,英文名:Coroutine,本質上是一個線程
優勢1:協程具備極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制。所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。
優勢2:不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法就是多進程+協程,即充分利用多核,又充分發揮協程的高效率,可得到極高的性能。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import time 4 5 6 def consumer(name): 7 print('---->ready to eat meat bun') 8 while True: 9 new_meat_bun = yield 10 print('[%s] is eating meat bun %s' % (name, new_meat_bun)) 11 12 13 def producer(): 14 con1.__next__() 15 con2.__next__() 16 n = 0 17 while 1: 18 time.sleep(1) 19 print('\033[32;1m[producer]\033[0m is making meat bun %s and %s' % (n, n + 1)) 20 con1.send(n) 21 con2.send(n + 1) 22 n += 2 23 24 25 if __name__ == '__main__': 26 con1 = consumer('alex') 27 con2 = consumer('alice') 28 producer()
Greenlet
greenlet是一個用C實現的協程模塊,相比與Python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator。(注:須要用pip安裝包;pip install gevent)
1 from greenlet import greenlet 2 import time 3 4 5 def A(): 6 while 1: 7 print('-------A-------') 8 time.sleep(0.5) 9 g2.switch() 10 11 12 def B(): 13 while 1: 14 print('-------B-------') 15 time.sleep(0.5) 16 g1.switch() 17 18 19 g1 = greenlet(A) # 建立協程g1 20 g2 = greenlet(B) 21 22 g1.switch() # 跳轉至協程g1
gevent
1 import gevent 2 import requests, time # 此處requests需安裝模塊;pip --timeout=100 install requests 3 4 start = time.time() 5 6 7 def f(url): 8 print('GET: %s' % url) 9 resp = requests.get(url) # 爬網頁的標籤 10 data = resp.text 11 print('%d bytes received from %s.' % (len(data), url)) 12 13 14 gevent.joinall([ 15 gevent.spawn(f, 'https://www.qq.com/'), 16 gevent.spawn(f, 'https://www.baidu.com/'), 17 gevent.spawn(f, 'https://www.taobao.com/'), 18 ]) 19 20 print("cost time:", time.time() - start)
緩存I/O又被稱做標準I/O,大多數文件系統的默認I/O操做都是緩存I/O。在Linux的緩存I/O機制中,操做系統會將I/O的數據緩存在文件系統的頁緩存(page cache)中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內的緩衝區拷貝到應用程序的地址空間。用戶空間無法直接訪問內核空間的,內核態到用戶態的數據拷貝。
緩存I/O的缺點:數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的CPU以及內存開銷是很是大的。
對於一個network IO(以read舉例),他會涉及到兩個系統對象,一個是調用這個IO的process(or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,它會經歷兩個階段:
一、等待數據準備(Waiting for the data to be read)
二、將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
注:這兩點很重要,由於這些IO Mode的區別就是在這兩個階段上各有不一樣的狀況。
blocking IO(阻塞IO,Linux下)
在Linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做大概流程圖:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一階段:準備數據。對於network IO來講,不少時候數據在一開始還沒到達(如:還沒收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就將數據從kernel中拷貝到用戶內存,而後kernel。因此,blocking IO的特色就是在IO執行的兩個階段都被block了。
non-blocking IO(非阻塞IO,Linux下)
在Linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行時大概的流程:
從上圖能夠看出,當用戶進程發出read時,若是kernel中的數據還沒準備好,那麼它並不會block用戶進程,而是當即返回一個error。從用戶進程角度講來說,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了。因此,用戶進程實際上是須要不斷的主動詢問kernel數據好了沒有。
1 import time 2 import socket 3 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sk.bind(('127.0.0.1',8080)) 5 sk.listen(5) 6 sk.setblocking(False) # 設置是否阻塞,默認爲True,非阻塞 7 while True: 8 try: 9 print('waiting client connection...') 10 conn,addr = sk.accept() # 進程主動輪詢 11 print('+++',addr) 12 data = sk.recv(1024) 13 print(str(data,'utf8')) 14 conn.close() 15 except Exception as EX: 16 print('錯誤提示:',EX) 17 time.sleep(2)
1 import time 2 import socket 3 sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 4 while True: 5 sk.connect(('127.0.0.1',8080)) 6 print('hello') 7 sk.sendall(bytes('hello','utf8')) 8 time.sleep(1) 9 break
IO multiplexing(IO多路複用)
有些地方也稱爲這種IO方式爲event driven IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程,大概流程圖:
當用戶進程調用了select,那麼真個進程會被block。而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這時用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。(若是處理的鏈接數不是不少的話,使用select/epoll的web server不必定比使用multi-threading+blocking IO的web server性能更好,可能延遲更大;select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接)
注:
一、select函數返回結果中若是有文件可讀了,那麼進程就能夠同故宮調用accept()或recv()來讓kernel將位於內核中準備到的數據copy到用戶區。
二、select的優點在於能夠處理多個鏈接,不適用於單個鏈接。
1 import socket 2 import select 3 4 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5 sk.bind(('127.0.0.1', 8080)) 6 sk.listen(5) 7 inputs = [sk, ] 8 while True: 9 r, w, e = select.select(inputs, [], [], 3) 10 for obj in r: 11 print('obj:', obj) 12 if obj == sk: 13 conn, addr = obj.accept() 14 print('已鏈接:', conn) 15 inputs.append(conn) 16 else: 17 data_byte = obj.recv(1024) 18 print(str(data_byte, 'utf8')) 19 inp = input('回答[%s]號客戶端>>:' % inputs.index(obj)) 20 obj.sendall(bytes(inp, 'utf8')) 21 print('>>>', r)
1 import socket 2 3 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sk.connect(('127.0.0.1', 8080)) 5 6 while True: 7 inp = input(">>>>") 8 sk.sendall(bytes(inp, "utf8")) 9 data = sk.recv(1024) 10 print(str(data, 'utf8'))
Asynchronous I/O(異步IO)
流程圖:
從圖中能夠看出,用戶進程發起read操做以後,馬上就開始去作其它的事。另外一方面,從kernel的角度,當他受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。
selectors模塊:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import selectors 4 import socket 5 6 sel = selectors.DefaultSelector() # 根據系統,默認選擇最優IO多用戶模式 7 8 9 def accept(sock, mask): 10 conn, addr = sock.accept() 11 print('accepted', conn, 'from', addr) 12 conn.setblocking(False) 13 sel.register(conn, selectors.EVENT_READ, read) 14 15 16 def read(conn, mask): 17 try: 18 data = conn.recv(1000) 19 if not data: 20 raise Exception 21 print('收到:', data.decode('utf8')) 22 conn.send(data.upper()) # Hope it won't block 23 except Exception as EX: 24 print('closing:', conn) 25 sel.unregister(conn) # 解除綁定 26 conn.close() 27 28 29 sock = socket.socket() 30 sock.bind(('localhost', 8080)) 31 sock.listen(100) 32 sock.setblocking(False) 33 sel.register(sock, selectors.EVENT_READ, accept) # sock與accept綁定 34 while True: 35 events = sel.select() # 監聽[sock,conn1,conn2....] 36 for key, mask in events: 37 callback = key.data 38 print('>>callback:', callback) 39 callback(key.fileobj, mask)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 sk = socket.socket() 5 sk.connect(('127.0.0.1', 8080)) 6 while True: 7 inp = input('>>>') 8 sk.send(inp.encode('utf8')) 9 data = sk.recv(1024) 10 print(data.decode('utf8'))