1、多任務編程html
1. 意義: 充分利用計算機多核資源,提升程序的運行效率。python
2. 實現方案 :多進程 , 多線程linux
""" fork函數演示 若是不是並行/併發執行,則該代碼執行時間在11秒,不然6秒 """ import os from time import sleep pid = os.fork() if pid < 0: print("Create process failed") elif pid == 0: sleep(5) print("Create new process") else: sleep(6) print("The old process") print("Fork test over")
""" 父子進程獨立空間運行驗證 """ import os from time import sleep print("==================") #只會在父進程裏打印一條,不會再子進程打印 a = 1 #變量賦值是開闢新的空間的,子進程時深拷貝父進程內存空間的,所以在子進程裏能夠對a操做 pid = os.fork() if pid < 0: print("Error") elif pid == 0: print('Child process') print("child a = %d"%a) a = 10000 else: sleep(1) print("Parent process") print("parent a = %d"%a) print("global a = %d"%a)
""" 獲取父子進程的pid號 """ import os pid = os.fork() if pid < 0: print("Error") elif pid == 0: print("child PID:",os.getpid()) print("parent PID:",os.getppid()) else: print("parent PID:",os.getpid()) print("child PID:",pid) parent PID: 10693 child PID: 10694 child PID: 10694 parent PID: 10693
""" 兩種進程退出方式 """ import os import sys # os._exit(1) #程序運行至此退出,後面的打印不會執行 # sys.exit() #程序運行至此退出,後面的打印不會執行 sys.exit("進程退出") #程序運行至此退出,後面的打印不會執行 print("Process exit")
""" 父子進程退出方式 """ import os import sys pid = os.fork() if pid < 0: print("Error") elif pid == 0: sys.exit("退出進程") print("Child process") else: sys.exit("退出進程") print("parent process") print("all process")
""" 孤兒進程 """ import os from time import sleep pid = os.fork() if pid < 0: print("Error") elif pid == 0: sleep(2) print("child PID:",os.getpid()) print("get parent PID:",os.getppid()) else: print("parent PID:",os.getpid()) print("child PID:",pid) 在終端能夠驗證,在pycharm不行,結果爲: parent PID: 18506 ----生父 child PID: 18507 child PID: 18507 get parent PID: 2625 ----養父(操做系統建立的進程)
""" 殭屍進程驗證----模擬服務器(通常很長時間不退出) """ import os pid = os.fork() if pid < 0: print("Error") elif pid == 0: print("child process",os.getpid()) else: "不讓父進程退出" while True: pass 此時在終端執行:ps -aux能夠看見子進程變成了殭屍(Z)
""" 殭屍處理方法------os.wait() """ import os pid = os.fork() if pid < 0: print("Error") elif pid == 0: print("child process",os.getpid()) os._exit(2) else: pid,status = os.wait() #等待處理殭屍 print("pid",pid) print("status",status) while True: pass 輸出結果: child process 26843 pid 26843 status 512 ------2*256 此時在終端用ps -aux查詢子進程狀態可見其不是(Z)狀態,即不是殭屍
""" 殭屍處理方法------os.waitpid(pid,option) """ import os pid = os.fork() if pid < 0: print("Error") elif pid == 0: print("child process",os.getpid()) os._exit(2) else: pid,status = os.waitpid(-1,os.WNOHANG) #此時是非阻塞,當打印的pid和status均爲0說明子進程尚未結束,不然能夠回收子進程,徹底看運氣,若是採用隔一段時間循環一次,則相似垃圾回收機制 print("pid",pid) print("status",status) while True: pass
""" 建立二級子進程防止殭屍進程----利用孤兒進程,與父進程一塊兒完成事物,相互獨立,同時運行 """ import os from time import * def f1(): for i in range(4): sleep(2) print("寫代碼.....") def f2(): for i in range(5): sleep(1) print("測試代碼.....") pid = os.fork() if pid < 0: print("Error") elif pid == 0: p = os.fork() #二級子進程 if p == 0: f2() #二級子進程執行 else: os._exit(0) else: os.wait() f1()
""" 信號方法處理殭屍 """ import os import signal #處理子進程退出,即讓父進程忽略全部子進程退出行爲,由操做系統處理殭屍 signal.signal(signal.SIGCHLD,signal.SIG_IGN) pid = os.fork() if pid < 0: print("Error") elif pid == 0: print("child PID:",os.getpid()) else: while True: pass
""" 客戶端 """ from socket import * import os,sys #服務器地址 ADDR = ("176.61.14.181",8888) #發送消息 def send_msg(s,name): while True: try: text = input("發言:") except KeyboardInterrupt: text = "quit" #退出聊天室 if text == 'quit': msg = "Q " + name s.sendto(msg.encode(),ADDR) sys.exit("退出聊天室") msg = "C %s %s" % (name,text) s.sendto(msg.encode(),ADDR) #接收消息 def recv_msg(s): while True: data,addr = s.recvfrom((2048)) #服務端發送EXIT表示讓客戶端退出 if data.decode() == "EXIT": sys.exit("退出聊天室") print(data.decode() + "\n發言:",end="") #建立網絡鏈接 def main(): s = socket(AF_INET,SOCK_DGRAM) while True: name = input("Name:") msg = "L " + name s.sendto(msg.encode(),ADDR) #等待服務端迴應 data,addr = s.recvfrom(1024) if data.decode() == "OK": print("您已進入聊天室") break else: print(data.decode()) #建立新的進程,子進程複製發消息,父進程複製接收消息 pid = os.fork() if pid < 0: sys.exit("Error!") elif pid ==0: send_msg(s,name) else: recv_msg(s) if __name__ == "__main__": main()
""" 服務端 基礎知識:socket fork """ from socket import * import os,sys #服務器地址 ADDR = ("0.0.0.0",8888) #存戶用戶信息 user = {} #進入聊天室 def do_login(s,name,addr): if name in user or "管理員" in name: s.sendto("該用戶已存在".encode(),addr) return s.sendto(b"OK",addr) #通知其餘人 msg = "\n歡迎%s進入聊天室"%name for i in user: s.sendto(msg.encode(),user[i]) #將用戶加入 user[name] = addr #聊天 def do_chat(s,name,text): msg = "\n%s:%s"%(name,text) for i in user: if i != name: s.sendto(msg.encode(),user[i]) #退出聊天室 def do_quit(s,name): msg = "\n%s退出了聊天室"%name for i in user: if i !=name: s.sendto(msg,user[i]) else: s.sendto(b"EXIT",user[i]) #將用戶刪除 del user[name] #處理客戶端請求 def do_request(s): while True: data,addr = s.recvfrom(1024) msg = data.decode().split(" ") #區分請求類型 if msg[0] == "L": do_login(s,msg[1],addr) elif msg[0] == "C": text = ' '.join(msg[2:]) do_chat(s,msg[1],text) elif msg[0] == 'Q': if msg[1] not in user: s.sendto(b"EXIT",addr) continue do_quit(s,msg[1]) #建立網絡鏈接 def main(): #建立套接字 s = socket(AF_INET,SOCK_DGRAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(ADDR) pid = os.fork() if pid < 0: return elif pid == 0: #發送管理員消息 while True: msg = input("管理員消息:") msg = "C 管理員消息 " + msg s.sendto(msg,ADDR) else: #對接收的請求處理 do_request(s) #處理客戶端請求 if __name__ == "__main__": main()
""" multiprocessing建立多進程 """ import multiprocessing from time import sleep import os #子進程函數 def fun(): print("子進程後開始執行了") sleep(3) print("子進程執行完畢") #建立進程函數 p = multiprocessing.Process(target=fun) #啓動進程,此時函數fun做爲進程的獨立部分運行 p.start() #想體現父子進程同時執行,父進程必須寫在start和join之間,若是寫在start前父進程先執行,若寫在join後,則子進程退出後才執行 sleep(2) print("父進程乾點事") #回收進程,以防產生殭屍進程 p.join() # 上述代碼用fork實現以下 pid = os.fork() if pid == 0: fun() os._exit(0) else: sleep(2) print("父進程乾點事") os.wait()
""" multiprocessing建立多進程-------父子進程獨立運行,互不干擾 """ import multiprocessing from time import sleep import os a = 1 #子進程函數 def fun(): print("子進程後開始執行了") global a print("a = ",a) a = 10000 sleep(3) print("子進程執行完畢") #建立進程函數 p = multiprocessing.Process(target=fun) #啓動進程,此時函數fun做爲進程的獨立部分運行 p.start() #想體現父子進程同時執行,父進程必須寫在start和join之間,若是寫在start前父進程先執行,若寫在join後,則子進程退出後才執行 sleep(2) print("父進程乾點事") #回收進程,以防產生殭屍進程 p.join() print("parent a = ",a) 運行結果: 子進程後開始執行了 a = 1 父進程乾點事 子進程執行完畢 parent a = 1
""" 使用multiprocessing建立多個子進程 """ import multiprocessing from time import sleep import os def th1(): sleep(3) print("吃飯") print(os.getppid(),"====",os.getpid()) def th2(): sleep(2) print("睡覺") print(os.getppid(),"====",os.getpid()) def th3(): sleep(4) print("打豆豆") print(os.getppid(),"====",os.getpid()) things = [th1,th2,th3] jobs = [] for th in things: p = multiprocessing.Process(target=th) jobs.append(p) #用列表保存進程對象 p.start() for i in jobs: i.join() 運行結果: 睡覺 124561 ==== 124567 吃飯 124561 ==== 124566 打豆豆 124561 ==== 124568
運行結果: I'm Jame I'm working.... I'm Jame I'm working.... I'm Jame I'm working....
""" Process進程對象屬性 """ from multiprocessing import Process from time import sleep,ctime def tm(): for i in range(3): sleep(2) print(ctime()) p = Process(target=tm,name="haha") #子進程隨父進程一塊兒退出 p.daemon = True #daemon與join選擇其一,歷來防止殭屍進程,也能夠用signal.signal(signal.SIGCHLD,signal.SIG_IGN)處理殭屍 p.start() print("Name:",p.name) print("PID;",p.pid) print("Is_alive:",p.is_alive()) 輸出結果: Name: haha PID; 9537 Is_alive: True
""" 進程池原理示例 """ from multiprocessing import Pool from time import sleep,ctime #進程池事件 def worker(msg): sleep(2) print(msg) #建立進程池 pool = Pool() #向進程池添加事件 for i in range(20): msg = "Hello %d"%i pool.apply_async(func=worker,args=(msg,)) #關閉進程池----當運行close時就不能往進程池例添加事件了 pool.close() #回收進程池----將進程池裏的時間處理完畢,進程池就會被回收 pool.join() 輸出結果(因選擇系統默認建立的進程個數,所以運行時是兩個兩個併發執行輸出,其餘的在隊列等待) Hello 0 Hello 1 Hello 2 Hello 3 Hello 4 Hello 5 Hello 6 Hello 7 Hello 8 Hello 9 Hello 10 Hello 11 Hello 12 Hello 13 Hello 14 Hello 15 Hello 16 Hello 17 Hello 18 Hello 19
""" 管道通訊---雙向管道 """ from multiprocessing import Pipe,Process import os,time #建立管道: fd1,fd2 = Pipe() def fun(name): time.sleep(3) #向管道寫入內容 fd1.send({name:os.getpid()}) jobs = [] #子進程寫管道 for i in range(5): p = Process(target=fun,args=(i,)) jobs.append(p) p.start() #父進程讀管道 for i in range(5): #讀取管道 data = fd2.recv() print(data) for i in jobs: i.join() 輸出結果: {3: 32720} {2: 32719} {1: 32718} {4: 32721} {0: 32717}
""" 管道通訊---單向管道(fd1只能讀,fd2只能寫) """ from multiprocessing import Pipe,Process import os,time #建立管道: fd1,fd2 = Pipe() def fun(name): time.sleep(3) #向管道寫入內容 fd2.send({name:os.getpid()}) jobs = [] #子進程寫管道 for i in range(5): p = Process(target=fun,args=(i,)) jobs.append(p) p.start() #父進程讀管道 for i in range(5): #讀取管道 data = fd1.recv() print(data) for i in jobs: i.join() 輸出結果: {3: 35102} {2: 35101} {0: 35099} {4: 35103} {1: 35100}
""" 消息隊列通訊
一個進程提出需求,一個進程處理需求 """ from multiprocessing import Queue,Process from random import randint from time import sleep #建立消息隊列 q = Queue(3) def request(): for i in range(20): x= randint(0,100) y = randint(0,100) q.put((x,y)) def handle(): while True: sleep(0.5) try: x,y = q.get(timeout=3) except: break else: print("%d + %d = %d"%(x,y,(x + y))) p1 = Process(target=request) p2 = Process(target=handle) p1.start() p2.start() p1.join() p2.join() 輸出結果: 7 + 6 = 13 85 + 53 = 138 39 + 43 = 82 45 + 66 = 111 57 + 10 = 67 43 + 2 = 45 29 + 51 = 80 71 + 42 = 113 2 + 16 = 18 58 + 7 = 65 34 + 6 = 40 3 + 91 = 94 11 + 47 = 58 22 + 39 = 61 64 + 94 = 158 99 + 10 = 109 28 + 0 = 28 100 + 56 = 156 30 + 66 = 96 94 + 68 = 162
""" 共享內存通訊(單個數值)---男的掙錢,女的花錢,月末剩餘多少? """ from multiprocessing import Value,Process import time import random #建立共享內存 money = Value("i",5000) #操做共享內存 def man(): for i in range(30): money.value += random.randint(1,1000) def girl(): for i in range(30): time.sleep(0.15) money.value -= random.randint(100,800) m = Process(target=man) g = Process(target=girl) m.start() g.start() m.join() g.join() #獲取共享內存的值 print("一個月的餘額:",money.value) 輸出結果: 一個月的餘額: 8639
""" 共享內存通訊(多個數據)-----對共享內存修改查看 """ from multiprocessing import Array,Process #建立共享內存,制定共享內存開闢5個整型列表空間 shm = Array("i",[5,6,7,2,9]) def fun(): #共享內存對象---可迭代 for i in shm: print(i) shm[2] = 99 p = Process(target=fun) p.start() p.join() print("-----------------") for i in shm: print(i) 輸出結果: 5 6 7 2 9 ----------------- 5 6 99 2 9
""" 共享內存通訊(多個數據)-----對共享內存修改查看(字節串數據結構) """ from multiprocessing import Array,Process #建立共享內存,制定共享內存開闢5個整型列表空間 shm = Array("c",b"hello") def fun(): #共享內存對象---可迭代 for i in shm: print(i) shm[2] = b'H' p = Process(target=fun) p.start() p.join() print("-----------------") for i in shm: print(i) print("-----------------") print(shm.value) 輸出結果: b'h' b'e' b'l' b'l' b'o' ----------------- b'h' b'e' b'H' b'l' b'o' ----------------- b'heHlo'
""" 接收端 """ from socket import * import os #肯定本地套接字文件,這個也能夠暫時不建立,在綁定的時候,系統會自動建立 sock_file = "./sock" #判斷文件是否存在,存在就刪除 if os.path.exists(sock_file): os.remove(sock_file) #建立本地套接字 sockfd = socket(AF_UNIX,SOCK_STREAM) #綁定本地套接字 sockfd.bind(sock_file) #監聽鏈接 sockfd.listen(3) while True: #這個鏈接僅僅是應用層的通訊鏈接,而不是網絡鏈接 c,addr = sockfd.accept() while True: data = c.recv(1024) if not data: break print(data.decode()) c.close() sockfd.close() """ 發送端 """ from socket import * #確保兩端使用相同的套接字文件 sock_file = "./sock" #建立本地套接字 sockfd = socket(AF_UNIX,SOCK_STREAM) sockfd.connect(sock_file) while True: msg = input(">>").encode() if not msg: break sockfd.send(msg) sockfd.close()
""" 信號量信息傳遞 """ from multiprocessing import Semaphore,Process from time import sleep import os #建立信號量 #服務程序最多容許3個進程同時執行事件 sem = Semaphore(3) def handle(): print("%d 想執行事件"%os.getpid()) #想執行事件必須獲取信號量 sem.acquire() print("%d 開始執行操做"%os.getpid()) sleep(3) print("%d 完成操做"%os.getpid()) sem.release() #增長信號量 jobs = [] #有5個進程請求執行事件 for i in range(5): p = Process(target=handle) jobs.append(p) p.start() for i in jobs: i.join() #打印最終的信號量個數 print(sem.get_value()) 輸出結果: 103160 想執行事件 103160 開始執行操做 103161 想執行事件 103161 開始執行操做 103162 想執行事件 103162 開始執行操做 103163 想執行事件 103164 想執行事件 103160 完成操做 103163 開始執行操做 103161 完成操做 103164 開始執行操做 103162 完成操做 103163 完成操做 103164 完成操做 3
習題1:算法
""" multiprocess建立兩個進程,同時複製一個文件的上下兩半部分,各自複製到一個新的文件裏 """ from multiprocessing import Process import os filename = "./520.jpg" #獲取圖片大小 size = os.path.getsize(filename) #複製上半部分 def get_top(): f = open(filename,'rb') n = size // 2 fw = open("top.jpg",'wb') fw.write(f.read(n)) f.close() fw.close() #下半部分 def get_bot(): f = open("520.jpg",'rb') fw = open("bot.jpg",'wb') f.seek(size//2,0) while True: data = f.read(1024) if not data: break fw.write(data) f.close() fw.close() #建立進程 p1 = Process(target=get_top) p2 = Process(target=get_bot) p1.start() p2.start() p1.join() p2.join()
""" multiprocess建立兩個進程,同時複製一個文件的上下兩半部分,各自複製到一個新的文件裏 ---把要複製的圖片打開代碼放在父進程裏,同時在獲取上半部圖片的子進程函數設置延遲阻塞,能夠看到上半部的圖片大小爲零 ---緣由:程序在執行時,父進程先建立文件對象,當執行到建立子進程時,會將這個對象傳遞給兩個子進程,致使父子進程共用一個文件對象 三者任意一個進程對該文件的操做都會影響其餘進程對該文件的操做,此外,在兩個子進程從新打開文件,則互不影響 """ from multiprocessing import Process import os from time import sleep filename = "./520.jpg" #獲取圖片大小 size = os.path.getsize(filename) f = open(filename,"rb") #複製上半部分 def get_top(): sleep(1) # f = open(filename,'rb') n = size // 2 fw = open("top.jpg",'wb') fw.write(f.read(n)) # f.close() fw.close() #下半部分 def get_bot(): # f = open(filename,'rb') fw = open("bot.jpg",'wb') f.seek(size//2,0) while True: data = f.read(1024) if not data: break fw.write(data) # f.close() fw.close() #建立進程 p1 = Process(target=get_top) p2 = Process(target=get_bot) p1.start() p2.start() p1.join() p2.join() f.close()
注:若是父進程中打開文件,建立進程通訊對象或者建立套接字,子進程會從父進程內存空間獲取這些內容,那麼父子進程對該對象的操做會有必定的屬性關聯(共用同一個對象)shell
白話:上述代碼的本質就是進程間的通訊:父進程建立對象,子進程繼承父進程建立的對象,與消息隊列,管道,共享內存等進程間信息交互一模一樣(父進程建立進程間通訊對象,子進程繼承這個對象)編程
""" 線程建立示例 ---此示例有兩個線程,啓動程序的稱爲主線程,播放音樂的爲分支線程,共同構成一個進程 ---由PID能夠看出,兩個線程同屬一個進程 ---a變量是兩個線程公用的資源,所以在一個線程對a進行操做,另外一個線程使用這個變量時也會受影響,即進程空間信息至關於線程全局變量 """ import threading from time import sleep import os a = 1 #線程函數 def music(): global a print("a = ", a) a = 10000 for i in range(5): sleep(2) print("播放《心如止水》",os.getpid()) #建立線程對象(分支線程) t = threading.Thread(target=music) t.start() #主線程任務 for i in range(3): sleep(3) print("播放《跳舞吧》",os.getpid()) t.join() #回收線程 print("Main thread a = ",a) 輸出結果: a = 1 播放《心如止水》 9022 播放《跳舞吧》 9022 播放《心如止水》 9022 播放《跳舞吧》 9022 播放《心如止水》 9022 播放《心如止水》 9022 播放《跳舞吧》 9022 播放《心如止水》 9022 Main thread a = 10000
""" 線程傳參 """ from threading import Thread from time import sleep #含參數的線程函數 def fun(sec,name): print("線程函數傳參") sleep(sec) print("%s 線程執行完畢"%name) #建立多個線程 jobs = [] for i in range(5): t = Thread(target=fun,args=(2,),kwargs={"name":"T%d"%i}) jobs.append(t) t.start() for i in jobs: i.join() 輸出結果: 線程函數傳參 線程函數傳參 線程函數傳參 線程函數傳參 線程函數傳參 T1 線程執行完畢 T3 線程執行完畢 T2 線程執行完畢 T4 線程執行完畢 T0 線程執行完畢
""" 線程屬性 """ from threading import Thread from time import sleep def fun(): sleep(3) print("線程屬性測試") t = Thread(target=fun,name="Hobby") #主線程退出,分支線程也隨之退出 t.setDaemon(True) t.start() #修改線程名稱 t.setName("Back") #線程名稱 print("Thread name:",t.getName()) #線程生命週期 print("Is alive:",t.is_alive()) 輸出結果: Thread name: Back Is alive: True
拓展:Python線程池,第三方模塊:threadpoolwindows
""" 自定義線程類示例 """ from threading import Thread class ThreadClass(Thread): def __init__(self,attr): super().__init__() self.attr = attr #多個方法配合實現具體功能 def f1(self): print('步驟1',self.attr) def f2(self): print("步驟2",self.attr) def run(self): self.f1() self.f2() t = ThreadClass('****') t.start() #自動運行run方法 t.join() 輸出結果: 步驟1 **** 步驟2 ****
from threading import Thread from time import sleep,ctime class MyThread(Thread): def __init__(self,target=None,args=(),kwargs={},name=None): super().__init__() self.target = target self.args = args self.kwargs = kwargs self.name = name def run(self): self.target(*self.args,**self.kwargs) # ********************************************** # 經過完成上面的Mythread類讓整個程序能夠正常執行 # 當調用start時player做爲一個線程功能函數運行 # 注意:函數的名稱和參數並不肯定,player只是測試函數 # ********************************************** def player(sec,song): for i in range(2): print("Playing %s:%s"%(song,ctime())) sleep(sec) t = MyThread(target=player,args=(3,),kwargs={"song":"涼涼"},name="happy") t.start() t.join() 輸出結果: Playing 涼涼:Tue May 21 12:40:24 2019 Playing 涼涼:Tue May 21 12:40:27 2019
同步互斥後端
線程間通訊方法數組
1. 通訊方法瀏覽器
線程間使用全局變量進行通訊----會存在通訊紊亂(好比:一個進程中有三個線程,兩個線程在通訊時,另外一個線程也使用公共變量,致使信息傳遞有誤),形成這種現象的緣由:共享資源的爭奪
2. 共享資源爭奪
共享資源:多個進程或者線程均可以操做的資源稱爲共享資源。對共享資源的操做代碼段稱爲臨界區。------線程更加明顯,由於其使用全局變量
影響 : 對共享資源的無序操做可能會帶來數據的混亂,或者操做錯誤。此時每每須要同步互斥機制協調操做順序。
3. 同步互斥機制
同步 : 同步是一種協做關係,爲完成操做,多進程或者線程間造成一種協調,按照必要的步驟有序執行操做。
好比:進程通訊方式中的消息隊列,管道等,一個先放,而後另外一個取,就是一種同步
網絡信息的收發機制,也是先發再收,一種同步協做關係
阻塞函數也是同步協做
互斥 : 互斥是一種制約關係,當一個進程或者線程佔有資源時會進行加鎖處理,此時其餘進程線程就沒法操做該資源,直到解鎖後才能操做。
線程同步互斥方法:更準確的說是互斥方法
注:全部的互斥方法必然有阻塞行爲和解除阻塞的行爲
線程Event
from threading import Event
e = Event() 建立線程event對象
e.wait([timeout]) 阻塞等待e被set
e.set() 設置e,使wait結束阻塞
e.clear() 使e回到未被設置狀態
e.is_set() 查看當前e是否被設置
""" Event事件: 必須分支線程對全局變量操做以後,主線程才能對全局變量操做 """ from threading import Thread,Event from time import sleep # 全局變量,用於通訊 s = None #建立事件對象 e = Event() def yangzirong(): print("楊子榮前來拜山頭") global s s = "天王蓋地虎" # 共享資源操做完畢 e.set() t = Thread(target=yangzirong) t.start() print("說對口令就是本身人") # 阻塞等待 e.wait() if s == "天王蓋地虎": print("寶塔鎮河妖") print("確認過眼神,你是對的人") else: print("打死他...") t.join() 輸出結果: 楊子榮前來拜山頭 說對口令就是本身人 寶塔鎮河妖 確認過眼神,你是對的人
線程鎖 Lock
from threading import Lock
lock = Lock() 建立鎖對象
lock.acquire() 上鎖 若是lock已經上鎖再調用會阻塞
lock.release() 解鎖
with lock: 上鎖
......
......
with代碼塊結束自動解鎖
注:誰先運行到上鎖,誰就有執行權,執行完後另一個遇到上鎖就會阻塞,上鎖至關於增長了程序運行的原則性:一個線程上鎖解鎖中間的部分在執行的時候,其餘線程不能對共享資源操做
""" Lock鎖的應用 """ from threading import Lock,Thread a = b = 0 lock = Lock() def value(): while True: lock.acquire() #上鎖 if a != b: print("a = %d,b = %d"%(a,b)) lock.release() #解鎖 t = Thread(target=value) t.start() while True: with lock: a += 1 b += 1 t.join()
死鎖及其處理
1. 定義
死鎖是指兩個或兩個以上的線程在執行過程當中,因爲競爭資源或者因爲彼此通訊而形成的一種阻塞的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖。
示例:倆小孩交換東西
2. 死鎖產生條件
死鎖發生的必要條件(四個同時知足)
* 互斥條件:指線程對所分配到的資源進行排它性使用,即在一段時間內某資源只由一個進程佔用。若是此時還有其它進程請求資源,則請求者只能等待,直至佔有資源的進程用畢釋放。
* 請求和保持條件:指線程已經保持至少一個資源,但又提出了新的資源請求,而該資源已被其它進程佔有,此時請求線程阻塞,但又對本身已得到的其它資源保持不放。
* 不剝奪條件:指線程已得到的資源,在未使用完以前,不能被剝奪,只能在使用完時由本身釋放,一般CPU內存資源是能夠被系統強行調配剝奪的。
* 環路等待條件:指在發生死鎖時,必然存在一個線程——資源的環形鏈,即進程集合{T0,T1,T2,···,Tn}中的T0正在等待一個T1佔用的資源;T1正在等待T2佔用的資源,……,Tn正在等待已被T0佔用的資源。
死鎖的產生緣由
簡單來講形成死鎖的緣由能夠歸納成三句話:
* 當前線程擁有其餘線程須要的資源
* 當前線程等待其餘線程已擁有的資源
* 都不放棄本身擁有的資源
3. 如何避免死鎖
死鎖是咱們很是不肯意看到的一種現象,咱們要儘量避免死鎖的狀況發生。經過設置某些限制條件,去破壞產生死鎖的四個必要條件中的一個或者幾個,來預防發生死鎖。預防死鎖是一種較易實現的方法。
可是因爲所施加的限制條件每每太嚴格,可能會致使系統資源利用率。
* 使用定時鎖---加阻塞函數
* 使用重入鎖RLock(),用法同Lock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其他的線程才能得到資源。
""" 死鎖預防案例----銀行交易系統 ---先讓一個線程先執行,再執行另外一個線程,即加阻塞函數 """ import time import threading #交易類 class Account: def __init__(self,id,balance,lock): self.id = id # 用戶 self.balance = balance # 銀行存款 self.lock = lock # 鎖 # 取錢 def withdraw(self,amount): self.balance -= amount # 存錢 def deposit(self,amount): self.balance += amount # 查看帳戶金額 def get_balance(self): return self.balance # 轉帳函數 def transfer(from_,to,amount): # 上鎖成功返回true if from_.lock.acquire(): # 鎖住本身的帳戶 from_.withdraw(amount) # 本身帳戶金額減小 time.sleep(0.5) if to.lock.acquire(): to.deposit(amount) # 對方帳戶金額增長 to.lock.release() # 對方帳戶解鎖 from_.lock.release() # 本身帳戶解鎖 print("轉帳完成") # 建立兩個帳戶 Abby = Account("Abby",5000,threading.Lock()) Levi = Account("Levi",3000,threading.Lock()) t1 = threading.Thread(target=transfer,args=(Abby,Levi,1500)) t2 = threading.Thread(target=transfer,args=(Levi,Abby,1000)) t1.start() time.sleep(2) # 加阻塞延遲函數,避免死鎖----讓ti先執行,過兩秒後,讓t2再執行 t2.start() t1.join() t2.join() print("Abby:",Abby.get_balance()) print("Levi:",Levi.get_balance()) 運行結果: 轉帳完成 轉帳完成 Abby: 4500 Levi: 3500
""" 死鎖預防案例----fun1重複上鎖致使死鎖 ---重載鎖解決死鎖 ---通常邏輯複雜的狀況容易產生因屢次上鎖致使的死鎖,所以用重載鎖解鎖 """ from threading import Thread,RLock from time import sleep num = 0 # 共享資源(全局變量) lock = RLock() # 重載鎖:容許在一個線程內部容許對鎖進行重複上鎖 class MyThread(Thread): def fun1(self): global num with lock: num -= 1 def fun2(self): global num if lock.acquire(): num += 1 if num > 5: self.fun1() print("Num = ",num) lock.release() def run(self): while True: sleep(2) self.fun2() for i in range(10): t = MyThread() t.start() 輸出結果: Num = 1 Num = 2 Num = 3 Num = 4 Num = 5 Num = 5 Num = 5 ......
python線程GIL----------python的一個bug
線程最大的問題:共享資源的爭奪,這樣涉及上鎖,在應用層對必定資源上鎖外,在解釋器層一樣有共享資源,Python線程建立須要解釋器幫助,所以解釋器也存在共享資源問題,爲了解決這個問題,Python設計者就把解釋器上鎖,
使得解釋器在同一時刻只解釋一個線程就不會產生系統資源衝突,最終致使Python解釋器在同一時刻只能解釋一個線程,多核資源成了擺設(雖然能夠利用計算機多核,可是同一時刻只能利用一個內核),
所以只有在高延遲或者IO阻塞時,Python多線程能夠提升執行效率,對於計算密集型程序則沒有(計算機雖然多核,可是同一時刻只有一個解釋器在佔有一個內核執行程序)並且效率比單線程還低(多線程來回切換消耗時間)
1. python線程的GIL問題 (全局解釋器鎖)
什麼是GIL :因爲python解釋器設計中加入瞭解釋器鎖,致使python解釋器同一時刻只能解釋執行一個線程,大大下降了線程的執行效率。
致使後果: 由於遇到阻塞時線程會主動讓出解釋器,去解釋其餘線程。因此python多線程在執行多阻塞高延遲IO時能夠提高程序效率,其餘狀況並不能對效率有所提高。
GIL問題建議:
* 儘可能使用進程完成無阻塞的併發行爲(等於沒給建議)
* 不使用c做爲解釋器 (Java C#)
2. 結論 : 在無阻塞狀態下,多線程程序和單線程程序執行效率幾乎差很少,甚至還不如單線程效率。可是多進程運行相同內容卻能夠有明顯的效率提高。
""" 單線程執行計算祕籍函數十次,記錄時間,執行io祕籍函數十次,記錄時間 """ import time # 計算密集型函數 x y 傳入1,1 def count(x,y): c = 0 while c < 7000000: c += 1 x += 1 y += 1 # io密集型 def io(): write() read() def write(): f = open('test','w') for i in range(1500000): f.write("hello world\n") f.close() def read(): f = open('test') lines = f.readlines() f.close() st = time.time() for i in range(10): # count(1,1) # Single CPU: 14.62774109840393 io() # print("Single CPU:",time.time()-st) # Single CPU: 14.62774109840393 print("Single IO:",time.time()-st) #Single IO: 8.693575382232666
""" 多線程執行計算祕籍函數十次,記錄時間,執行io祕籍函數十次,記錄時間 """ import time import threading # 計算密集型函數 x y 傳入1,1 def count(x,y): c = 0 while c < 7000000: c += 1 x += 1 y += 1 # io密集型 def io(): write() read() def write(): f = open('test','w') for i in range(1500000): f.write("hello world\n") f.close() def read(): f = open('test') lines = f.readlines() f.close() jobs = [] st = time.time() for i in range(10): # t = threading.Thread(target=count,args=(1,1)) t = threading.Thread(target=io) jobs.append(t) t.start() for i in jobs: i.join() # print("Thread cpu:",time.time()-st) # Thread cpu: 14.862890243530273 print("Thread io:",time.time()-st) # Thread io: 6.805188179016113
""" 多進程執行計算祕籍函數十次,記錄時間,執行io祕籍函數十次,記錄時間 """ import time import multiprocessing # 計算密集型函數 x y 傳入1,1 def count(x,y): c = 0 while c < 7000000: c += 1 x += 1 y += 1 # io密集型 def io(): write() read() def write(): f = open('test','w') for i in range(1500000): f.write("hello world\n") f.close() def read(): f = open('test') lines = f.readlines() f.close() jobs = [] st = time.time() for i in range(10): t = multiprocessing.Process(target=count,args=(1,1)) # t = multiprocessing.Process(target=io) jobs.append(t) t.start() for i in jobs: i.join() print("Process cpu:",time.time()-st) # Process cpu: 6.3905298709869385 # print("Process io:",time.time()-st) # Process io: 3.8089511394500732
注:由上面三個程序對比,若是Python中不存在GIL問題,則Python多線程與多進程執行效率基本相同,甚至更好
進程線程的區別聯繫
區別聯繫:
1. 二者都是多任務編程方式,都能使用計算機多核資源
2. 進程的建立刪除消耗的計算機資源比線程多
3. 進程空間獨立,數據互不干擾,有專門通訊方法;線程使用全局變量通訊
4. 一個進程能夠有多個分支線程,二者有包含關係
5. 多個線程共享進程資源,在共享資源操做時每每須要同步互斥處理
6. 進程線程在系統中都有本身的特有屬性標誌,如ID,代碼段,命令集等。
使用場景
1. 任務場景:若是是相對獨立的任務模塊,可能使用多進程,若是是多個分支共同造成一個總體任務可能用多線程
2. 項目結構:多中編程語言實現不一樣任務模塊,多是多進程,或者先後端分離應該各自爲一個進程。
3. 難易程度:通訊難度,數據處理的複雜度來判斷用進程間通訊仍是同步互斥方法。
進程和線程重點知識:
1. 對進程線程的理解,進程線程的差別
2. 進程間通訊方法,各有什麼特色
3.同步互斥的定義及理解,使用場景,如何用
4. 給一個情形,能夠判斷使用進程仍是線程,闡述緣由
5.殭屍進程的處理,GIL問題,進程狀態
4、併發網絡通訊模型
常見模型分類
1. 循環服務器模型 :循環接收客戶端請求,處理請求。同一時刻只能處理一個請求,處理完畢後再處理下一個。-----TCP和UDP數據傳輸
優勢:實現簡單,佔用資源少
缺點:沒法同時處理多個客戶端請求
適用狀況:處理的任務能夠很快完成,客戶端無需長期佔用服務端程序。udp比tcp更適合循環。
2. IO併發模型:利用IO多路複用,異步IO等技術,同時處理多個客戶端IO請求。
優勢 : 資源消耗少,能同時高效處理多個IO行爲
缺點 : 只能處理併發產生的IO事件,沒法處理cpu計算
適用狀況:HTTP請求,網絡傳輸等都是IO行爲。
3. 多進程/線程網絡併發模型:每當一個客戶端鏈接服務器(TCP),就建立一個新的進程/線程爲該客戶端服務,客戶端退出時再銷燬該進程/線程。
優勢:能同時知足多個客戶端長期佔有服務端需求,能夠處理各類請求。
缺點: 資源消耗較大
適用狀況:客戶端同時鏈接量較少,須要處理行爲較複雜狀況。
基於fork的多進程網絡併發模型
實現步驟:
1. 建立監聽套接字
2. 等待接收客戶端請求
3.服務端鏈接建立新的進程處理客戶端請求
4. 原進程繼續等待其餘客戶端鏈接
5. 若是客戶端退出,則銷燬對應的進程
""" 基於fork的多進程網絡併發---須要處理殭屍進程 """ from socket import * import os,sys import signal def handle(c): print("客戶端:",c.getpeername()) while True: data = c.recv(1024) if not data: break print(data.decode()) c.send(b'ok') c.close() # 建立監聽套接字 HOST = '0.0.0.0' PORT = 8888 ADDR = (HOST,PORT) # 服務端地址 s = socket() # tcp套接字 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 設置端口地址的當即重用 s.bind(ADDR) s.listen(3) # 殭屍進程的處理 signal.signal(signal.SIGCHLD,signal.SIG_IGN) print("Listen the port 8888...") # 循環等待客戶端鏈接 while True: try: c,addr = s.accept() except KeyboardInterrupt: sys.exit("服務器退出") except Exception as e: print(e) continue # 建立子進程處理客戶端請求 pid = os.fork() if pid == 0: s.close() # 子進程不須要s handle(c) # 具體處理客戶端請求 os._exit(0) # 父進程實際只用來處理客戶端鏈接 else: c.close() # 父進程不須要c
基於threading的多線程網絡併發
實現步驟:
1. 建立監聽套接字
2. 循環接收客戶端鏈接請求
3. 當有新的客戶端鏈接建立線程處理客戶端請求
4. 主線程繼續等待其餘客戶端鏈接
5. 當客戶端退出,則對應分支線程退出----------每一個線程封裝爲一個 函數,函數結束,線程天然結束
""" 基於thread的多線程網絡併發 """ from socket import * from threading import Thread import os,sys # 客戶端處理 def handle(c): print("Connect from",c.getpeername()) while True: data = c.recv(1024) if not data: break print(data.decode()) c.send(b'ok') c.close() # 建立監聽套接字 HOST = '176.61.14.181' PORT = 8888 ADDR = (HOST,PORT) s = socket() s.setsockopt(SOCK_STREAM,SO_REUSEADDR,1) s.bind(ADDR) s.listen(3) # 循環等待客戶端鏈接 while True: try: c,addr = s.accept() except KeyboardInterrupt: sys.exit("服務器退出") # 進程退出 except Exception as e: print(e) continue # 建立新的線程處理客戶端請求 t = Thread(target=handle,args=(c,)) t.setDaemon(True) # 分支線程隨主線程退出 t.start()
""" 基於Process的多進程網絡併發 """ from socket import * from multiprocessing import Process import os,sys import signal # 客戶端處理 def handle(c): print("Connect from",c.getpeername()) while True: data = c.recv(1024) if not data: break print(data.decode()) c.send(b'ok') c.close() # 建立監聽套接字 HOST = '176.61.14.181' PORT = 8888 ADDR = (HOST,PORT) s = socket() s.setsockopt(SOCK_STREAM,SO_REUSEADDR,1) s.bind(ADDR) s.listen(3) # 處理殭屍進程 signal.signal(signal.SIGCHLD,signal.SIG_IGN) # 循環等待客戶端鏈接 while True: try: c,addr = s.accept() except KeyboardInterrupt: sys.exit("服務器退出") # 進程退出 except Exception as e: print(e) continue # 建立子進程處理客戶端請求 p = Process(target=handle,args=(c,)) p.daemon = True # 子進程隨父進程退出 p.start()
@@擴展:集成模塊完成多進程/線程網絡併發
1. 使用方法
import socketserver
經過模塊提供的不一樣的類的組合完成多進程或者多線程,tcp或者udp的網絡併發模型
2. 經常使用類說明
TCPServer 建立tcp服務端套接字
UDPServer 建立udp服務端套接字
StreamRequestHandler 處理tcp客戶端請求
DatagramRequestHandler 處理udp客戶端請求
ForkingMixIn 建立多進程併發
ForkingTCPServer ForkingMixIn + TCPServer
ForkingUDPServer ForkingMixIn + UDPServer
ThreadingMixIn 建立多線程併發
ThreadingTCPServer ThreadingMixIn + TCPServer
ThreadingUDPServer ThreadingMixIn + UDPServer
3. 使用步驟
【1】 建立服務器類,經過選擇繼承的類,決定建立TCP或者UDP,多進程或者多線程的併發服務器模型。
【2】 建立請求處理類,根據服務類型選擇stream處理類仍是Datagram處理類。重寫handle方法,作具體請求處理。
【3】 經過服務器類實例化對象,並綁定請求處理類。
【4】 經過服務器對象,調用serve_forever()啓動服務
ftp 文件服務器
1. 功能
【1】 分爲服務端和客戶端,要求能夠有多個客戶端同時操做。
【2】 客戶端能夠查看服務器文件庫中有什麼文件。
【3】 客戶端能夠從文件庫中下載文件到本地。
【4】 客戶端能夠上傳一個本地文件到文件庫。
【5】 使用print在客戶端打印命令輸入提示,引導操做
2.思路分析:
1.技術點分析:
* 併發模型:多線程併發模式,固然多進程併發也能夠
* 數據傳輸:TCP傳輸
2.結構設計:
* 客戶端發起請求,打印請求提示界面
* 文件傳輸功能封裝爲類【2】【3】【4】
3.功能分析:
* 網絡搭建
* 查看文件庫信息
* 下載文件
* 上傳文件
* 客戶端退出
4.協議:
* L 表示請求文件列表
* Q 表示退出
* G 表示下載文件
* P 表示上傳文件
""" ftp文件服務器---多線程網絡併發 """ from socket import * from threading import Thread import os from time import sleep # 全局變量 HOST = '0.0.0.0' PORT = 8888 ADDR = (HOST,PORT) FTP = '/home/tarena/ftp/' # 文件庫路徑 # 將客戶端請求功能封裝爲類 class FtpServer: def __init__(self,connfd,FTP_PATH): self.connfd = connfd self.path = FTP_PATH def do_list(self): # 獲取文件列表 files = os.listdir(self.path) # 包含了隱藏文件,注意要把其排除 if not files: self.connfd.send("該文件列表爲空".encode()) return else: self.connfd.send(b'ok') sleep(0.1) fs = ' ' for file in files: if file[0] != '.' and os.path.isfile(self.path+file): # 保證不是隱藏文件且是普通文件 fs += file + '\n' self.connfd.send(fs.encode()) def do_get(self,filename): try: fd = open(self.path+filename,'rb') except Exception: self.connfd.send("文件不存在".encode()) return else: self.connfd.send(b'ok') sleep(0.1) # 防止粘包 # 發送文件內容 while True: data = fd.read(1024) if not data: # 文件結束 sleep(0.1) # 防止粘包 self.connfd.send(b'##') break self.connfd.send(data) def do_put(self, filename): if os.path.exists(self.path + filename): self.connfd.send("該文件已存在") return self.connfd.send(b'ok') fd = open(self.path+filename,'wb') # 接收文件 while True: data = self.connfd.recv(1024) if data == b'##': break fd.write(data) fd.close() # 客戶端請求處理函數 def handle(connfd): cls = connfd.recv(1024).decode() FTP_PATH = FTP + cls + "/" ftp = FtpServer(connfd,FTP_PATH) while True: # 接收客戶端請求 data = connfd.recv(1024).decode() print(FTP_PATH,':',data) # 若是客戶端斷開返回data爲空 if not data or data[0] == 'Q': return elif data[0] == 'L': ftp.do_list() elif data[0] == 'G': filename = data.split(' ')[-1] ftp.do_get(filename) elif data[0] == 'P': filename = data.split(' ')[-1] ftp.do_put(filename) # 網絡搭建---經過main函數完成 def main(): # 建立套接字 sockfd = socket() sockfd.setsockopt(SOCK_STREAM,SO_REUSEADDR,1) sockfd.bind(ADDR) sockfd.listen(5) print("Listen the port 8888...") while True: try: connfd,addr = sockfd.accept() except KeyboardInterrupt: print("退出服務程序") return except Exception as e: print(e) continue print("連接的客戶端:",addr) # 建立新的線程處理請求 client = Thread(target=handle,args=(connfd,)) client.setDaemon(True) client.start() if __name__ == "__main__": main() ============================================= """ ftp文件客戶端---多線程網絡併發 """ from socket import * import sys import time # 具體功能 class FtpClient: def __init__(self,sockfd): self.sockfd = sockfd def do_list(self): self.sockfd.send(b"L") # 發送請求 # 等待回覆 data = self.sockfd.recv(128).decode() # ok表示請求成功 if data == "ok": # 接收文件列表 data = self.sockfd.recv(4096) print(data.decode()) else: print(data) def do_quit(self): self.sockfd.send(b'Q') self.sockfd.close() sys.exit("謝謝使用") # 退出進程(本程序只有一個進程),即整個程序退出 def do_get(self,filename): # 發送請求 self.sockfd.send(("G " + filename).encode()) # 等待回覆 data = self.sockfd.recv(128).decode() if data == 'ok': fd = open(filename,'wb') while True: data = self.sockfd.recv(1024) if data == b"##": break fd.write(data) fd.close() else: print(data) def do_put(self,filename): # 判斷本地是否有該文件 try: f = open(filename,'rb') except Exception: print("沒有該文件") return # 發送請求 filename = filename.split('/')[-1] self.sockfd.send(("P " + filename).encode()) # 等待回覆 data = self.sockfd.recv(128).decode() if data == 'ok': while True: data = f.read(1024) if not data: time.sleep(0.1) self.sockfd.send(b'##') break self.sockfd.send(data) f.close() else: print(data) def request(sockfd): while True: ftp = FtpClient(sockfd) print("\n*****************命令選項********************") print("\n*****************list********************") # 查看文件列表 print("\n*****************get file********************") # 下載文件 print("\n*****************put file********************") # 上傳文件 print("\n*****************quit********************") # 退出 print("========================================") cmd = input("輸入命令:") if cmd.strip() == 'list': ftp.do_list() elif cmd == 'quit': ftp.do_quit() elif cmd[:3] == 'get': filename = cmd.strip().split(' ')[-1] ftp.do_get(filename) elif cmd[:3] == 'put': filename = cmd.strip().split(' ')[-1] ftp.do_put(filename) # 網絡連接 def main(): # 服務器地址 ADDR = ("176.61.14.181",8888) sockfd = socket() try: sockfd.connect(ADDR) except Exception as e: print("連接服務器失敗") return else: print("""***************************** Data File Image ***************************** """) cls = input("請輸入文件種類:") if cls not in ['Data','File','Image']: print("Sorry input Error!!!") return else: sockfd.send(cls.encode()) request(sockfd) # 發送具體請求 if __name__ == "__main__": main()
IO併發
IO 分類:阻塞IO ,非阻塞IO,IO多路複用,異步IO等
阻塞IO
1.定義:在執行IO操做時若是執行條件不知足則阻塞。阻塞IO是IO的默認形態。
2.效率:阻塞IO是效率很低的一種IO。可是因爲邏輯簡單因此是默認IO行爲。
3.阻塞狀況:
* 由於某種執行條件沒有知足形成的函數阻塞
e.g. accept input recv
* 處理IO的時間較長產生的阻塞狀態
e.g. 網絡傳輸,大文件讀寫
注:程序分類
1. 計算密集型程序:算法優化
2. IO密集型程序:運行效率低,耗時長
* 阻塞等待
* 網絡傳輸,磁盤交互耗時
非阻塞IO
1. 定義 :經過修改IO屬性行爲,使本來阻塞的IO變爲非阻塞的狀態。----一般只能改變《由於某種執行條件沒有知足形成的函數阻塞》
* 設置套接字爲非阻塞IO
sockfd.setblocking(bool)
功能:設置套接字爲非阻塞IO
參數:默認爲True,表示套接字IO阻塞;設置爲False則套接字IO變爲非阻塞
*超時檢測 :設置一個最長阻塞時間,超過該時間後則再也不阻塞等待。
sockfd.settimeout(sec)
功能:設置套接字的超時時間
參數:設置的時間
""" 套接字非阻塞示例----循環等待客戶端鏈接,在未被鏈接時,循環將日誌寫入文件log裏 """ from socket import * from time import sleep,ctime # 打開日誌文件 f = open('log.txt','a+') sockfd = socket() sockfd.bind(('0.0.0.0',8888)) sockfd.listen(3) # 設置套接字爲非阻塞 # sockfd.setblocking(False) # 設置超市檢測 sockfd.settimeout(3) while True: print("Waiting for connect..") try: connfd,addr = sockfd.accept() # 已被設置非阻塞 except (BlockingIOError,timeout) as e: #每隔2秒寫入一條日誌 sleep(2) f.write("%s: %s\n"%(ctime(),e)) else: data = connfd.recv(1024).decode() print(data)
IO多路複用----屬於IO併發方法,只能監控和處理IO行爲,當有多個計算行爲須要同時處理的時候,仍然須要採用多進程或者多線程,後端的併發程序並不只僅是網絡併發,還有計算併發
1. 定義:同時監控多個IO事件,當哪一個IO事件準備就緒就執行哪一個IO事件。以此造成能夠同時處理多個IO的行爲,避免一個IO阻塞形成其餘IO均沒法執行,提升了IO執行效率。
注:
* 前提是多個IO時間的運行互不影響
* 準備就緒:臨界概念,事件剛剛產生,好比input()函數,輸入內容按回車一剎那就是準備就緒,網絡通訊中accept()等待鏈接,剛好鏈接上的一瞬間
2. 具體方案
select方法 : windows linux unix
poll方法: linux unix
epoll方法: linux
注:以上三個方法都是來自select模塊,思想都是同樣的,都是同時監控多個IO事件,實現方法不一樣而已
select 方法:
rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 監控IO事件,阻塞等待IO發生
參數:rlist 列表 存放關注的等待發生的IO事件-----即IO事件的發生不是由自己決定的,必須等待外部條件帶來這個IO事件的發生,被動等待發生,如accept()
wlist 列表 存放關注的要主動處理的IO事件------即IO事件的發生由本身控制,不須要等待外部條件的發生,主動處理阻塞,此時,select至關於非阻塞
xlist 列表 存放關注的出現異常要處理的IO事件-------即IO事件發生異常
timeout 超時時間
注:前三個參數表明IO事件發生的不一樣類別
經過哪一個對象調用IO函數行爲,這個對象就是IO事件
若是中間參數列表(wlist)存在IO事件的話,則select至關於沒有阻塞(主動處理阻塞)
返回值: rs 列表 rlist中準備就緒的IO-----返回值爲列表,列表元素爲就緒的IO事件對象
ws 列表 wlist中準備就緒的IO-----返回值爲列表,列表元素爲就緒的IO事件對象
xs 列表 xlist中準備就緒的IO-----返回值爲列表,列表元素爲就緒的IO事件對象
""" select函數講解 """ from socket import * from select import select # 作幾個IO用做監控 s = socket() s.bind(('0.0.0.0',8880)) s.listen(3) fd = open("log.txt",'a+') print("開始提交監控的IO") rs,ws,xs = select([s,fd],[fd],[]) print("rs:",rs) print("ws:",ws) print("xs:",xs) 輸出結果: rs: [<_io.TextIOWrapper name='log.txt' mode='a+' encoding='UTF-8'>] ws: [<_io.TextIOWrapper name='log.txt' mode='a+' encoding='UTF-8'>] xs: []
select 實現tcp服務:
【1】 將關注的IO放入對應的監控類別列表
【2】經過select函數進行監控
【3】遍歷select返回值列表,肯定就緒IO事件
【4】處理髮生的IO事件
注意:
* wlist中若是存在IO事件,則select當即返回給ws
* 處理IO過程當中不要出現死循環佔有服務端的狀況
* IO多路複用消耗資源較少,效率較高
""" IO多路複用select實現多客戶端通訊,即對服務端使用IO多路複用技術 """ from socket import * from select import select # 設置套接字爲關注IO s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('0.0.0.0',8888)) s.listen(5) # 設置關注的IO rlist = [s] wlist = [] xlist = [] while True: # 監控IO的發生 rs,ws,xs = select(rlist,wlist,xlist) # 創建三個返回值列表,判斷哪一個IO發生 for r in rs: # 若是是s套接字就緒,則處理連接 if r is s: c,addr = r.accept() print("Connect from",addr) # 加入新的關注IO,目前需求是收消息,若是加wlist則表示發消息,做爲服務端,通常是先接收再發送 # 此時rlist多了一個客戶端套接字c,那麼當再次循環至select時,其返回結果有三種可能:[s],[s,c],[c],此外還可能在等待 rlist.append(c) else: # 此時只考慮兩種狀況:[s],[c],將[s,c]剔除,所以,只用else,不用elif data = r.recv(1024) if not data: rlist.remove(r) r.close() continue print(data) # r.send(b'ok') # 但願咱們主動處理這個IO wlist.append(r) for w in ws: w.send(b'ok,thanks') wlist.remove(w) # 防止不斷的想客戶端發送消息 for r in xs: pass
@@擴展: 位運算
定義 : 將整數轉換爲二進制,按二進制位進行運算
運算符號:
& 按位與
| 按位或
^ 按位異或
<< 左移
> > 右移
e.g. 14 --> 01110
19 --> 10011
14 & 19 = 00010 = 2 一0則0
14 | 19 = 11111 = 31 一1則1
14 ^ 19 = 11101 = 29 相同爲0不一樣爲1
14 << 2 = 111000 = 56 向左移動低位補0
14 >> 2 = 11 = 3 向右移動去掉低位
poll方法:
p = select.poll()--------------這個poll是select模塊下的,是生成對象的
功能 : 建立poll對象
返回值: poll對象
p.register(fd,event)
功能: 註冊關注的IO事件,即添加IO事件
參數:fd 要關注的IO對象
event 要關注的IO事件類型
經常使用類型:POLLIN 讀IO事件(rlist)
POLLOUT 寫IO事件 (wlist)
POLLERR 異常IO (xlist)
POLLHUP 斷開鏈接
e.g. p.register(sockfd,POLLIN|POLLERR)------同是關注多個事件
p.unregister(fd)
功能:取消對IO的關注
參數:IO對象或者IO對象的fileno(文件描述符)
events = p.poll()---------------------這個poll是p對象的屬性函數
功能: 阻塞等待監控的IO事件發生(即監控)
返回值: 返回發生的IO,返回值爲列表,列表元素爲元組,元組表明就緒的IO時間,元組由兩項構成,一是就緒IO的文件描述符,另外一個是就緒IO的就緒時間
events格式 [(fileno,event),()....]
每一個元組爲一個就緒IO(不是對象),元組第一項是該IO的fileno,第二項爲該IO就緒的事件類型,兩項都不是對象,所以要根據fileno回推就緒IO對象,所以要提早搭建查找地圖,每關注一個IO就把它的文件描述符加入其中,
若是取消關注,則將其從查找地圖中刪除,在此建議地圖採用字典形式
poll_server 步驟:
【1】 建立套接字
【2】 將套接字register
【3】 建立查找字典,並維護
【4】 循環監控IO發生
【5】 處理髮生的IO
""" poll實現的IO多路複用 """ from select import * from socket import * # 設置套接字爲關注IO s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('0.0.0.0',8888)) s.listen(5) # 建立poll p = poll() # 創建查找字典{fileno: io_obj} fdmap = {s.fileno():s} # 設置關注IO p.register(s,POLLIN|POLLERR) # 循環監控IO事件的發生 while True: events = p.poll # 阻塞等待IO發生 for fd,event in events: # 遍歷列表處理IO if fd == s.fileno(): c,addr = fdmap[fd].accept() # 保持代碼風格統一 print("Connect from",addr) # 添加新的關注IO事件 p.register(c,POLLIN|POLLHUP) fdmap[c.fileno()] = c # elif event & POLLHUP: # print("客戶端退出") # p.unregister(fd) # 取消關注 # fdmap[fd].close() # del fdmap[fd] # 從字典中刪除 elif event & POLLIN: # 客戶端發消息 data = fdmap[fd].recv(1024) # 斷開發生時POLLIN返回空此時POLLIN也會就緒 if not data: p.unregister(fd) # 取消關注 fdmap[fd].close() del fdmap[fd] continue print(data.decode()) fdmap[fd].send(b'ok')
epoll方法
1. 使用方法 : 基本與poll相同
* 生成對象改成 epoll()
* 將全部事件類型改成EPOLL類型
2. epoll特色:
* epoll 效率比select poll要高:select和poll要來回複製應用層和內核的關注事件且還要在應用層對從內核複製的事件進行遍歷找出知足就緒事件,耗時;epoll則直接在內核開闢空間,須要監控哪一個IO事件,
應用層直接將其放入內核進行監控,待有就緒事件時,內核只需將就緒事件返回給應用層便可,雖然消耗內存,可是提高了來回複製和遍歷消耗的事件
* epoll 監控IO數量比select poll要多:select和poll最多監控1024個,epoll監控更多
* epoll 的觸發方式比poll要多 (EPOLLET邊緣觸發):三者默認狀態都是水平觸發,epoll多了個邊緣觸發
""" epoll實現的IO多路複用 """ from select import * from socket import * # 設置套接字爲關注IO s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('0.0.0.0',8888)) s.listen(5) # 建立epoll ep = epoll() # 創建查找字典{fileno: io_obj} fdmap = {s.fileno():s} # 設置關注IO ep.register(s,EPOLLIN|EPOLLERR) # 循環監控IO事件的發生 while True: events = ep.poll # 阻塞等待IO發生 for fd,event in events: # 遍歷列表處理IO if fd == s.fileno(): c,addr = fdmap[fd].accept() # 保持代碼風格統一 print("Connect from",addr) # 添加新的關注IO事件 ep.register(c,EPOLLIN|EPOLLHUP) fdmap[c.fileno()] = c # elif event & EPOLLHUP: # print("客戶端退出") # ep.unregister(fd) # 取消關注 # fdmap[fd].close() # del fdmap[fd] # 從字典中刪除 elif event & EPOLLIN: # 客戶端發消息 data = fdmap[fd].recv(1024) # 斷開發生時EPOLLIN返回空此時EPOLLIN也會就緒 if not data: ep.unregister(fd) # 取消關注 fdmap[fd].close() del fdmap[fd] continue print(data.decode()) fdmap[fd].send(b'ok')
5、協程技術----實現異步IO的方法
基礎概念
1. 定義:纖程,微線程。是爲非搶佔式(相互之間協調執行)多任務產生子程序的計算機組件(一段封裝的代碼)。協程容許不一樣入口點在不同位置暫停或開始,簡單來講,協程就是能夠暫停執行的函數(如:yield函數)。
2. 協程原理 : 記錄一個函數的上下文棧幀(記錄函數執行的位置),協程調度切換時會將記錄的上下文保存,在切換回來時進行調取,恢復原有的執行內容,以便從上一次執行位置繼續執行。
即在應用層,經過人爲控制函數之間的執行跳轉來來完成事件,所以可稱爲異步IO模式
3. 協程優缺點:
優勢:
1. 協程完成多任務佔用計算資源不多:由於全部操做只涉及整個進程棧的內存操做,不涉及內核操做
2. 因爲協程的多任務切換在應用層完成,所以切換開銷少
3. 協程爲單線程程序,無需進行共享資源同步互斥處理
缺點:
協程的本質是一個單線程,沒法利用計算機多核資源
注:協程和線程區別:*實現原理不一樣:協程是單線程程序,沒法利用計算機的多和資源
擴展延伸@標準庫協程的實現
python3.5之後,使用標準庫asyncio和async/await 語法來編寫併發代碼。asyncio庫經過對異步IO行爲的支持完成python的協成調。
* 同步是指完成事務的邏輯,先執行第一個事務,若是阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,順序執行。
* 異步是和同步相對的,異步是指在處理調用這個事務的以後,不會等待這個事務的處理結果,直接處理第二個事務去了,經過狀態、通知、回調來通知調用者處理結果。
雖然官方說asyncio是將來的開發方向,可是因爲其生態不夠豐富,大量的客戶端不支持awaitable(不支持基於協程的阻塞),須要本身去封裝,因此在使用上存在缺陷。更多時候只能使用已有的異步庫(asyncio等),功能有限
""" 協程小示例 """ import asyncio import time now = lambda : time.time() # 協程函數 async def do_work(x): print("Waiting :",x) # await asyncio.sleep(x) # 協程跳轉,跳出該協程函數,當不阻塞時再回來繼續執行後面的程序 await time.sleep(x) # 這個能夠證實不是全部的阻塞都能跳轉,在平常中,大量客戶端並不支持這種跳轉,即只能使用有限的已有異步庫(asyncio) return "None after %s s"%x start = now() # 生成三個協程對象 cor1 = do_work(1) cor2 = do_work(2) cor3 = do_work(3) # 將協程對象生成一個可輪尋異步io操做的對象列表 tasks = [ asyncio.ensure_future(cor1), asyncio.ensure_future(cor2), asyncio.ensure_future(cor3), ] # 生成輪尋對象,調用run啓動協程執行 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # 記錄執行時間 print("Time:",now() - start) 用await asyncio.sleep(x)測試結果: Waiting : 1 Waiting : 2 Waiting : 3 Time: 3.0023772716522217 用await time.sleep(x)測試結果: Waiting : 1 Waiting : 2 Waiting : 3 Time: 6.007911682128906
第三方協程模
1. greenlet模塊
* 安裝 : sudo pip3 install greenlet
* 函數
greenlet.greenlet(func)
功能:建立協程對象
參數:協程函數
g.switch()
功能:選擇要執行的協程函數
from greenlet import greenlet def test1(): print("執行test1") gr2.switch() print("結束test1") gr2.switch() def test2(): print("執行test2") gr1.switch() print("結束test2") # 將函數變成協程函數 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() # 執行協程test1
2. gevent模塊-----------依賴於greenlet模塊
* 安裝:sudo pip3 instll gevent
* 函數
gevent.spawn(func,argv)
功能: 生成協程對象
參數:func 協程函數
argv 給協程函數傳參(不定參---按照位置一一傳遞)
返回值: 協程對象
gevent.joinall(list,[timeout])
功能: 阻塞等待協程執行完畢
參數:list 協程對象列表
timeout 超時時間
gevent.sleep(sec)
功能: gevent睡眠阻塞
參數:睡眠時間
*gevent協程只有在遇到gevent指定的阻塞行爲(gevent.sleep)時纔會自動在協程之間進行跳轉如gevent.joinall(),gevent.sleep()帶來的阻塞
import gevent from time import sleep def foo(a,b): print("Running foo...",a,b) gevent.sleep(3) print("Foo again") def bar(): print("Running bar...") gevent.sleep(2) print("Bar again") # 將函數封裝爲協程,遇到gevent阻塞自動執行 f = gevent.spawn(foo,1,"hello") g = gevent.spawn(bar) gevent.joinall([f,g]) # 阻塞等待f,g的結束 gevent.sleep(3)
* monkey腳本
做用:在gevent協程中,協程只有遇到gevent指定類型的阻塞(gevent.sleep)才能跳轉到其餘協程,所以,咱們但願將普通的IO阻塞行爲轉換爲能夠觸發gevent協程跳轉的阻塞,以提升執行效率。
轉換方法:gevent 提供了一個腳本程序monkey,能夠修改底層解釋IO阻塞的行爲,將不少普通阻塞轉換爲gevent阻塞。
使用方法:
【1】 導入monkey
from gevent import monkey
【2】 運行相應的腳本,例如轉換socket中全部阻塞
monkey.patch_socket()
【3】 若是將全部可轉換的IO阻塞所有轉換則運行all
monkey.patch_all()
【4】 注意:腳本運行函數須要在對應模塊導入前執行
""" gevent協程演示 """ from gevent import monkey import gevent monkey.patch_all() # 該句的執行必須在導入socket以前 from socket import * # 處理客戶端請求 def handle(c): while True: data = c.recv(1024) if not data: break print(data.decode()) c.send(b'ok') c.close() # 建立套接字 s = socket() s.bind(('0.0.0.0',8888)) s.listen(5) while True: c,addr = s.accept() print("Connect from",addr) # handle(c) # 循環方案 gevent.spawn(handle,c) # 協程方案 s.close()
HTTPServer v2.0
1. 主要功能 :
【1】 接收客戶端(瀏覽器)請求
【2】 解析客戶端發送的請求
【3】 根據請求組織數據內容
【4】 將數據內容形參http響應格式返回給瀏覽器
2. 升級點 :
【1】 採用IO併發,能夠知足多個客戶端同時發起請求狀況
【2】 作基本的請求解析,根據具體請求返回具體內容,同時知足客戶端簡單的非網頁請求狀況
【3】 經過類接口形式進行功能封裝
""" httpserver 2.0 主要功能 : 【1】 接收客戶端(瀏覽器)請求 【2】 解析客戶端發送的請求 【3】 根據請求組織數據內容 【4】 將數據內容形參http響應格式返回給瀏覽器 【5】 採用IO併發,能夠知足多個客戶端同時發起請求狀況 【6】 作基本的請求解析,根據具體請求返回具體內容,同時知足客戶端簡單的非網頁請求狀況 【7】 經過類接口形式進行功能封裝 技術點: 1.使用tcp通訊 2.select io多路複用 結構: 1.採用類封裝 類的接口設計: * 在用戶使用角度進行工做流程設計 * 儘量提供全面的功能,能爲用戶決定的在類中實現 * 不能替用戶決定的變量能夠經過實例化對象傳入類中 * 不能替用戶決定的複雜功能,能夠經過重寫覆蓋讓用戶本身決定 """ from select import select from socket import * # 將具體http server功能封裝 class HTTPServer: def __init__(self,server_addr,static_dir): # 添加屬性 self.server_address = server_addr self.static_dir = static_dir self.rlist = [] self.wlist = [] self.xlist = [] self.create_socket() self.bind() # 建立套接字 def create_socket(self): self.sockfd = socket() self.sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 設置地址當即重用 # 綁定 def bind(self): self.sockfd.bind(self.server_address) self.ip = self.server_address[0] self.port = self.server_address[1] def serve_forever(self): self.sockfd.listen(5) print("Listen the port %d"%self.port) # port由使用者決定,即經過傳參 self.rlist.append(self.sockfd) # 循環監聽客戶端鏈接 while True: rs,ws,xs = select(self.rlist, self.wlist, self.xlist) for r in rs: if r is self.sockfd: c,addr = r.accept() print("Connect from",addr) self.rlist.append(c) else: # 處理瀏覽器(即客戶端)請求---接收請求和發送相應 self.handle(r) # 處理客戶端請求 def handle(self,connfd): # 接收http請求 request = connfd.recv(4096) # 防止瀏覽器斷開--即瀏覽器斷開後,將該鏈接套接字去除 if not request: self.rlist.remove(connfd) connfd.close() return # print(request) # 請求解析 request_line = request.splitlines()[0] info = request_line.decode().split(" ")[1] print(connfd.getpeername(),":",info) # info 分爲方位網頁和其餘 if info == "/" or info[-5:] == '.html': self.get_html(connfd,info) else: self.get_data(connfd,info) self.rlist.remove(connfd) connfd.close() # 處理網頁 def get_html(self,connfd,info): if info == "/": # 網頁文件 filename = self.static_dir + '/index.html' else: filename = self.static_dir + info try: fd = open(filename) except Exception: # 沒有網頁 responseHeaders = 'HTTP/1.1 404 Not Found\r\n' responseHeaders += '\r\n' responseBody = "<h1>Sorry,Not Found the page</h1>" else: responseHeaders = 'HTTP/1.1 200 OK\r\n' responseHeaders += '\r\n' responseBody = fd.read() finally: response = responseHeaders + responseBody connfd.send(response.encode()) # 其餘 def get_data(self,connfd,info): responseHeaders = 'HTTP/1.1 200 OK\r\n' responseHeaders += '\r\n' responseBody = "<h1>Waiting Httpserver 3.0</h1>" response = responseHeaders + responseBody connfd.send(response.encode())