# 進程 : 進行中的程序就是一個進程 佔用資源 須要操做系統調度 pid : 可以惟一標識一個進程 計算機中最小的資源分配單位 # 併發 多個程序同時執行 : 只有一個cpu,多個程序輪流在一個cpu上執行 宏觀上 : 多個程序在同時執行 微觀上 : 多個程序輪流在一個cpu上執行 本質上仍是串行 # 並行 多個程序同時執行,而且同時在多個cpu上執行 # 同步 在作A事的時候發起B件事,必須等待B件事結束以後才能繼續作A事件 # 異步 在作A事的時候發起B時間,不須要等待B事件結束就能夠繼續A事件 # 阻塞 若是CPU不工做 input accept recv recvfrom sleep connect # 非阻塞 CPU在工做 # 線程 線程是進程中的一個單位,不能脫離進程存在 線程是計算機中可以被CPU調度的最小單位
進程 線程 正常的開發語言 多線程能夠利用多核 cpython解釋器下的多個線程不能利用多核 : 規避了全部io操做的單線程 協程 是操做系統不可見的 協程本質就是一條線程 多個任務在一條線程上來回切換 利用協程這個概念實現的內容 : 來規避IO操做,就達到了咱們將一條線程中的io操做降到最低的目的 # 進程 數據隔離 數據不安全 操做系統級別 開銷很是大 能利用多核 # 線程 數據共享 數據不安全 操做系統級別 開銷小 不能利用多核 一些和文件操做相關的io只有操做系統能感知到 # 協程 數據共享 數據安全 用戶級別 更小 不能利用多核 協程的全部的切換都基於用戶,只有在用戶級別可以感知到的io纔會用協程模塊作切換來規避(socket,請求網頁的)
I/O操做 相對內存來講 輸入Input輸出Output 輸入是怎麼輸入 :鍵盤\input\read\recv 輸出是怎麼輸出 :顯示器 打印機 播放音樂\print\write\send 文件操做 :read write 網絡操做 :send recv recvfrom 函數 :print input
計算機的工做分爲兩個狀態
CPU工做 : 作計算(對內存中的數據進行操做)的時候工做
CPU不工做 : IO操做的時候
CPU的工做效率 500000條指令/ms
多道操做系統 :一個程序遇到IO就把CPU讓給別人
順序的一個一個執行的思路變成
共同存在在一臺計算機中,其中一個程序執行讓出cpu以後,另外一個程序能繼續使用cpu
來提升cpu的利用率
單純的切換會不會佔用時間 : 會
可是多道操做系統的原理總體上仍是節省了時間,提升了CPU的利用率
時空複用的概念
單cpu 分時操做系統 : 把時間分紅很小很小的段,每個時間都是一個時間片, 每個程序輪流執行一個時間片的時間,本身的時間片到了就輪到下一個程序執行 -- 時間片的輪轉 老教授 24h全是計算 沒有io 先來先服務 FCFS 研究生 5min全是計算 沒有io 短做業優先 研究生2 5min全是計算 沒有io 沒有提升CPU的利用率 \ 提升了用戶體驗
html
併發:單個cpu,同時執行多個進程(來回切換的),看起來像是同時運行.python
並行:多個cpu,真正的同時運行多個進程.linux
阻塞:遇到IO才叫阻塞.git
一個cpu運行兩個進程,其中一個進程徹底沒有阻塞,github
非阻塞: 沒有IO.web
什麼是開啓多個進程:socket:server,client兩個進程編程
python中,若是一次想開啓多個進程,必須是一個主進程,開啓多個子進程json
linux,windows:有主進程開啓子進程windows
相同點:主進程開啓子進程,兩個進程都有相互隔離的獨立空間,互不影響安全
不一樣點:
linux:子進程空間的初始數據徹底是從主(父)進程copy一份
windows:子進程空間初始數據徹底是從主(父)進程copy一份,可是有所不一樣
就緒 運行 阻塞
# 就緒 -操做系統調度->運行 -遇到io操做-> 阻塞 -阻塞狀態結束-> 就緒
-時間片到了-> 就緒
multiple 多元化的
processing 進程
multiprocessing 多元的處理進程的模塊
python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
建立進程的類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs{'name':'egon','age':18} name爲子進程的名稱
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性介紹:
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
注意:在windows中Process()必須放到# if __name__ == '__main__':下
函數方法
from multiprocessing import Process import time def task(name): print('%s is runing' %(name)) time.sleep(3) print('%s is done' % (name)) if __name__ == '__main__': p = Process(target=task,args=('壯壯',)) # p = Process(target=task,kwargs={'name':'壯壯'}) 兩種傳參方式 p.start() print('====主')
類方法
from multiprocessing import Process import time # 方式二: class MyProcess(Process): def __init__(self,name): self.name = name super().__init__() def run(self): # 必須定義一個run方法 print('%s is runing' % (self.name)) time.sleep(3) print('%s is done' % (self.name)) if __name__ == '__main__': p = MyProcess('小明') p.start() print('===主')
多進程之間的數據隔離 from multiprocessing import Process n = 0 def func(): global n n += 1 if __name__ == '__main__': p_l = [] for i in range(100): p = Process(target=func) p.start() p_l.append(p) for p in p_l:p.join() print(n)
join 主進程等待子進程結束以後,在執行
join開啓一個進程:
from multiprocessing import Process import time def task(name): time.sleep(1) print(f"{name}is running") if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.start() p.join() #告知主進程,p進程結束以後,主進程在結束,join有些阻塞的意思 print("___主進程") # p1.start() # p2.start() #p1,p2,p3三個子進程前後運行順序不定,start只是通知一下操做系統 # p3.start() #操做系統調用cpu先運行誰,誰先執行
join串行:
from multiprocessing import Process import time def task(name,sec): time.sleep(sec) print(f"{name}is running") if __name__ == '__main__': p1 = Process(target=task, args=("小明",1)) p2 = Process(target=task, args=("明明",2)) p3 = Process(target=task ,args=("大明",3)) start_time = time.time() p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print(f"主進程{time.time() - start_time}")
join併發:
from multiprocessing import Process import time def task(sec): time.sleep(sec) print(f"is running") if __name__ == '__main__': start_time = time.time() list = [] for i in range(1,4): p = Process(target=task, args=(i,)) p.start() list.append(p) for i in list: i.join() print(f"主進程{time.time() - start_time}")
屬性:
from multiprocessing import Process import time def task(name): print(f"{name}is running") time.sleep(3) print(f"{name}is done") if __name__ == '__main__': p = Process(target=task,args=("小明",),name="大明") #name給進程對象設置name屬性 p.start() # print(p.pid) #獲取到進程號 time.sleep(1) #睡一秒,子進程已經執行完成 p.terminate() #強制結束子進程,強制執行也會有執行時間 #terminate跟start同樣工做原理,都要通知操做系統開啓子進程 #內存終止或者開啓都要須要時間的 time.sleep(1) #睡一秒,讓terminate殺死 print(p.is_alive()) #判斷子進程是否存活,只是查看內存中p子進程是否還運行 print("主進程")
server
import socket from multiprocessing import Process def talk(conn): while True: msg = conn.recv(1024).decode('utf-8') ret = msg.upper().encode('utf-8') conn.send(ret) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9001)) sk.listen() while True: conn, addr = sk.accept() Process(target = talk,args=(conn,)).start() sk.close()
client
import time import socket sk = socket.socket() sk.connect(('127.0.0.1',9001)) while True: sk.send(b'hello') msg =sk.recv(1024).decode('utf-8') print(msg) time.sleep(0.5) sk.close()
init是全部進程的父進程: 殭屍進程,殭屍是什麼,死而沒有消失 主進程建立多個短暫週期的子進程,當子進程退出,是須要等待父進程處理,而父進程沒有及時對子進程回收,那麼子進程的進程符仍然保存在系統中,這種進程就是僵死進程 什麼進程描述符:每個進程都有描述符,io請求,數據指針 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子進程開始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': for i in range(100): p = Process(target=task, args=("海洋",)) p.start() print(f"___主進程:{os.getpid()}")
孤兒進程:孤兒進程是由於主進程的退出,他下面的全部子進程都變成孤兒進程了,init會對孤兒進行回收,釋 放掉佔用系統的資源,這種回收也是爲了節省內存。
孤兒進程無害,若是殭屍進程掛了,init會對孤兒進程回收,init是全部進程的祖進程,linux中爲1,0系統
將一個子進程設置成守護進程,當父進程結束,子進程必定會結束,避免孤兒進程產生,應爲回收機制
父進程不能建立子進程
函數方法:
#守護進程會在主進程代碼執行結束後終止,守護進程內沒法在開啓子進程 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子進程開始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.daemon = True #將p子進程設置成守護進程,守護子進程,只要主進程結束 #子進程不管執行與否都立刻結束,daemon,開啓在start上面 p.start() print(f"___主進程:{os.getpid()}")
from multiprocessing import Process 開啓進程的另外一種方式 class 類名(Process): def __init__(self,參數): self.屬性名 = 參數 super().__init__() def run(self): print('子進程要執行的代碼') p = 類名() p.start() # 守護進程 : 會等待主進程代碼結束以後就當即結束 p = 類名() p.daemon = True # 設置守護進程 p.start() # 通常狀況下,多個進程的執行順序,多是: # 主進程代碼結束--> 守護進程結束-->子進程結束-->主進程結束 # 子進程結束 -->主進程代碼結束-->守護進程結束-->主進程結束
第一種:基於文件+鎖的形式:效率低,麻煩
第二種:基於隊列,推薦的使用形式
第三種:基於管道,管道本身加鎖,底層可能會出現數據丟失損壞,隊列和管道都是將數據存放於內存中
互斥鎖保證了每次只有一個線程進行寫入操做,只有當這個線程解鎖,在運行其餘資源,上鎖和解鎖都須要本身添加
兩種方式
from multiprocessing import Lock
第一種: lock = Lock() lock.acquire() print(1) lock.release() 第二種 with lock: buy_ticket(i)
#上鎖: #必定要是同一把鎖:只能按照這個規律,上鎖一次,解鎖一次 #互斥鎖與join區別: #共同點:都是完成了進程之間的串行 #區別:join認爲控制進程的串行,互斥鎖是解決搶佔的資源,保證公平性 from multiprocessing import Process from multiprocessing import Lock import time import os import random def task1(lock): print("test1") #驗證CPU遇到IO切換 lock.acquire() print("task1 開始打印") time.sleep(random.randint(1,3)) print("task1 打印完成") lock.release() def task2(lock): print("test2") lock.acquire() #上鎖 print("task2 開始打印") time.sleep(random.randint(1,3))#阻塞,cpu切換任務,別的任務都在鎖,回來繼續執行這個程序 print("task2 打印完成") lock.release() #解鎖 def task3(lock): print("test2") lock.acquire() # lock.acquire() #死鎖錯誤示例 print("task3 開始打印") time.sleep(random.randint(1,3)) print("task3 打印完成") lock.release() if __name__ == '__main__': lock = Lock() #一把鎖 p1 = Process(target=task1,args=(lock,)) #三個進程哪一個先到先執行 p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start()
互斥鎖買票示例:
import json import time from multiprocessing import Process,Lock def search(i): with open('ticket',encoding='utf-8') as f: ticket = json.load(f) print('%s :當前的餘票是%s張'%(i,ticket['count'])) def buy_ticket(i): with open('ticket',encoding='utf-8') as f: ticket = json.load(f) if ticket['count']>0: ticket['count'] -= 1 print('%s買到票了'%i) time.sleep(0.1) with open('ticket', mode='w',encoding='utf-8') as f: json.dump(ticket,f) def get_ticket(i,lock): search(i) with lock: # 代替acquire和release 而且在此基礎上作一些異常處理,保證即使一個進程的代碼出錯退出了,也會歸還鑰匙 buy_ticket(i) if __name__ == '__main__': lock = Lock() # 互斥鎖 for i in range(10): Process(target=get_ticket,args=(i,lock)).start()
1. 進程之間的通訊最好的方式是基於隊列
2. 隊列是實現進程之間通訊的工具,存在內存中的一個容器,最大的特色是符合先進先出的原則
from multiprocessing import Queue,Process def pro(q): for i in range(10): print(q.get()) def son(q): for i in range(10): q.put('hello%s'%i) if __name__ == '__main__': q = Queue() p = Process(target=son,args=(q,)) p.start() p = Process(target=pro, args=(q,)) p.start()
多個進程搶佔一個資源:串行,有序以及數據安全,買票
多個進程實現併發的效果:生產者消費模型
import time import random from multiprocessing import Queue,Process def consumer(q,name): # 消費者:一般取到數據以後還要進行某些操做 while True: food = q.get() if food: print('%s吃了%s'%(name,food)) else:break def producer(q,name,food): # 生產者:一般在放數據以前須要先經過某些代碼來獲取數據 for i in range(10): foodi = '%s%s'%(food,i) print('%s生產了%s'%(name,foodi)) time.sleep(random.random()) q.put(foodi) if __name__ == '__main__': q = Queue() c1 = Process(target=consumer,args=(q,'alex')) c2 = Process(target=consumer,args=(q,'alex')) p1 = Process(target=producer,args=(q,'大壯','泔水')) p2 = Process(target=producer,args=(q,'b哥','香蕉')) c1.start() c2.start() p1.start() p2.start() p1.join() p2.join() q.put(None) q.put(None)
import requests from multiprocessing import Process,Queue url_dic = { 'cnblogs':'https://www.cnblogs.com/Eva-J/articles/8253549.html', 'douban':'https://www.douban.com/doulist/1596699/', 'baidu':'https://www.baidu.com', 'gitee':'https://gitee.com/old_boy_python_stack__22/teaching_plan/issues/IXSRZ', } def producer(name,url,q): ret = requests.get(url) q.put((name,ret.text)) def consumer(q): while True: tup = q.get() if tup is None:break with open('%s.html'%tup[0],encoding='utf-8',mode='w') as f: f.write(tup[1]) if __name__ == '__main__': q = Queue() pl = [] for key in url_dic: p = Process(target=producer,args=(key,url_dic[key],q)) p.start() pl.append(p) Process(target=consumer,args=(q,)).start() for p in pl:p.join() q.put(None) # join n 個進程 n個進程必須都執行完才繼續 # for i in range(4): # print(q.get())
# Manager dict list 只要是共享的數據都存在數據不安全的現象 # 須要咱們本身加鎖來解決數據安全問題 from multiprocessing import Process,Manager,Lock def change_dic(dic,lock): with lock: dic['count'] -= 1 if __name__ == '__main__': # m = Manager() with Manager() as m: lock = Lock() dic = m.dict({'count': 100}) # dic = {'count': 100} p_l = [] for i in range(100): p = Process(target=change_dic,args=(dic,lock)) p.start() p_l.append(p) for p in p_l : p.join() print(dic)
進程:進程是分配資源的基本單位,內存中開闢空間,爲線程提供資源,一個程序能夠開啓多個進程
線程:CPU調度的最小單位,執行單位,線程也被稱做爲輕量級的進程,動態的
開啓QQ:開啓一個進程,在內存中開闢空間加載數據,啓動一個線程執行代碼
線程依賴進程的一個進程能夠包含多個線程,可是必定有一個主線程,線程纔是CPU執行的最小單元
1,開啓多進程開銷很是大,10-100倍,而開啓線程開銷很是小
2.開啓多進程速度慢,開啓多線程速度快
3.進程之間數據不共享,線程共享數據
在cpython解釋器下 :GIL鎖(全局解釋器鎖) 致使了同一個進程中的多個線程不能利用多核
併發:一個CPU能夠來回切換(線程之間切換),多進程併發,多線程的併發
多進程併發:開啓多個進程,併發的執行
多線程併發:開啓線程,併發的執行
若是遇到併發:多線程居多
線程絕對要比進程開啓速度快
#先打印小明,線程要比進程速度快,若是是進程先打印主線程 from threading import Thread def task(name): print(f'{name} is running') if __name__ == '__main__': t = Thread(target=task,args=("小明",)) t.start() print("主線程") #子進程睡眠3秒,先運行主進程 from threading import Thread import time x = 1000 def task(): time.sleep(3) print('子線程....') def main(): print('111') print('222') print('333') if __name__ == '__main__': t = Thread(target=task) t.start() main() # 結果是111 222 333 子線程....
from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(f'{self.name} is running') if __name__ == '__main__': t = MyThread("小明") t.start() print("主線程")
from threading import Thread x = 1000 def task(): global x x = 0 if __name__ == '__main__': t = Thread(target=task, ) t.start() t.join() # 告知主線程,等待子線程運行完畢在執行 print(f'主線程:{x}')
from threading import Thread import threading import time def task(name): time.sleep(1) print(f'{name} is running') if __name__ == '__main__': for i in range(5): t = Thread(target=task,args=("海洋",)) t.start() #線程對象的方法 # print(t.is_alive()) #判斷線程是否存活 #threading模塊的方法 print(threading.current_thread().name) #返回線程對象.name print(threading.enumerate()) #返回列表,返回的是全部線程對象 print(threading.active_count()) #獲取活躍的線程數量(包括主線程) print("主線程")
守護線程必須等待主線程結束才結束,主線程必須等待全部的非守護線程結束才能結束,由於主線程的結束意味着進程的結束,這就是一個守護機制
多線程是同一個空間,同一個進程,進程表明,空間,資源,靜態的:
import time from threading import Thread def son(): while True: print('in son') time.sleep(1) def son2(): for i in range(3): print('in son2 ****') time.sleep(1) # flag a 0s t = Thread(target=son) t.daemon = True t.start() Thread(target=son2).start() # flag b # 主線程會等待子線程結束以後才結束 # 主線程結束進程就會結束 # 守護線程隨着主線程的結束而結束 # 守護線程會在主線程的代碼結束以後繼續守護其餘子線程
守護進程須要主進程來回收資源 守護線程是隨着進程的結束才結束的 其餘子線程-->主線程結束-->主進程結束-->整個進程中全部的資源都被回收-->守護線程也會被回收 進程是資源分配單位 子進程都須要它的父進程來回收資源 線程是進程中的資源 全部的線程都會隨着進程的結束而被回收的
多個線程同時操做全局變量/靜態變量 會產生數據不安全現象 互斥鎖 += -= 說明了線程之間數據的不安全 a = a.strip() 帶返回值的都是先計算後賦值,數據不安全 a = a+1 /a+=1 數據不安全 if\while 數據不安全 append pop 說明了在線程中操做列表中的方法是數據安全的
from threading import Thread,Lock import time n = [] def append(): for i in range(500000): n.append(1) def pop(lock): for i in range(500000): with lock: if not n: time.sleep(0.0000001) # 強制CPU輪轉 n.pop() t_l = [] lock = Lock() for i in range(20): t1 = Thread(target=append) t1.start() t2 = Thread(target=pop,args=(lock,)) t2.start() t_l.append(t1) t_l.append(t2) for t in t_l: t.join() print(n) # 不要操做全局變量,不要在類裏操做靜態變量 # += -= *= /= if while 數據不安全 # queue logging 數據安全的
import time class A: from threading import Lock __instance = None lock = Lock() def __new__(cls, *args, **kwargs): with cls.lock: if not cls.__instance: time.sleep(0.000001) # cpu輪轉 cls.__instance = super().__new__(cls) return cls.__instance
# Lock 互斥鎖 效率高 # RLock 遞歸(recursion)鎖 效率相對低 l = Lock() l.acquire() print('但願被鎖住的代碼') l.release() rl = RLock() # 在同一個線程中能夠被acquire屢次 rl.acquire() print('但願被鎖住的代碼') rl.release()
from threading import Thread,RLock as Lock def func(i,lock): lock.acquire() lock.acquire() print(i,': start') lock.release() lock.release() print(i, ': end') lock = Lock() for i in range(5): Thread(target=func,args=(i,lock)).start()
死鎖現象是怎麼產生的? 多把(互斥/遞歸)鎖 而且在多個線程中 交叉使用 fork_lock.acquire() noodle_lock.acquire() fork_lock.release() noodle_lock.release() 若是是互斥鎖,出現了死鎖現象,最快速的解決方案把全部的互斥鎖都改爲一把遞歸鎖 程序的效率會下降的 遞歸鎖 效率低 可是解決死鎖現象有奇效 互斥鎖 效率高 可是多把鎖容易出現死鎖現象 一把互斥鎖就夠了
線程之間數據安全的容器隊列
from queue import Empty # 不是內置的錯誤類型,而是queue模塊中的錯誤 q = queue.Queue(4) # fifo 先進先出的隊列 q.get() q.put(1) q.put(2) q.put(3) q.put(4) print('4 done') q.put_nowait(5) print('5 done') try: q.get_nowait() except Empty:pass print('隊列爲空,繼續其餘內容') # put_nowait: 不會等待隊列有空閒位置再放入數據,若是數據放入不成功就直接崩潰 # get_nowait: 隊列爲空,取值的時候不等待,可是取不到值那麼直接崩潰了
from queue import LifoQueue # last in first out 後進先出 棧 lq = LifoQueue() lq.put(1) lq.put(2) lq.put(3) print(lq.get()) print(lq.get()) print(lq.get())
from queue import PriorityQueue # 優先級隊列 priq = PriorityQueue() priq.put((2,'alex')) priq.put((1,'wusir')) priq.put((0,'太白')) print(priq.get()) print(priq.get()) print(priq.get())
要在程序開始的時候,還沒提交任務先建立幾個線程或者進程
放在一個池子裏,這就是池
若是先開好進程/線程,那麼有任務以後就能夠直接使用這個池中的數據了
而且開好的線程或者進程會一直存在在池中,能夠被多個任務反覆利用
這樣極大的減小了開啓\關閉\調度線程/進程的時間開銷
池中的線程/進程個數控制了操做系統須要調度的任務個數,控制池中的單位
有利於提升操做系統的效率,減輕操做系統的負擔
發展過程
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print('start') print(a,b) print('end') if __name__ == '__main__': # p = ProcessPoolExecutor(max_workers=5) #限制進程數量,默認爲cpu個數 p = ThreadPoolExecutor(4) #線程默認是CPU個數的五倍 for i in range(4): p.submit(func,1,2) #給進程池放置任務啓動,1,2爲傳參
同步:
任務發出去以後等待,直到這個任務最終結束以後,給我一個返回值,發佈下一個任務
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f"{os.getpid()}is running") time.sleep(1) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) for i in range(10): obj = p.submit(task,) print(obj.result()) #同步等待一個進程內容所有執行完成在執行下一個
將任務發給進程,無論任務如何,直接運行下一個
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f'{os.getpid()} is running') time.sleep(random.randint(0,2)) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) obj_l1 = [] for i in range(10): obj = p.submit(task,) # 異步發出. obj_l1.append(obj) # time.sleep(3) p.shutdown(wait=True) # 1. 阻止在向進程池投放新任務, # 2. wait = True 十個任務是10,一個任務完成了-1,直至爲零.進行下一行. for i in obj_l1: print(i.result())
import os import time,random from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a,b): print(os.getpid(),'start',a,b) time.sleep(random.randint(1,4)) print(os.getpid(),'end') return a*b if __name__ == '__main__': tp = ProcessPoolExecutor(4) futrue_l = {} for i in range(20): # 異步非阻塞的 ret = tp.submit(func,i,b=i+1) futrue_l[i] = ret # print(ret.result()) # Future將來對象 for key in futrue_l: # 同步阻塞的 print(key,futrue_l[key].result())
# map 只適合傳遞簡單的參數,而且必須是一個可迭代的類型做爲參數 import os import time,random from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(a): print(os.getpid(),'start',a[0],a[1]) time.sleep(random.randint(1,4)) print(os.getpid(),'end') return a[0]*a[1] if __name__ == '__main__': tp = ProcessPoolExecutor(4) ret = tp.map(func,((i,i+1) for i in range(20))) for key in ret: # 同步阻塞的 print(key)
# 回調函數 : 效率最高的 import time,random from threading import current_thread from concurrent.futures import ThreadPoolExecutor def func(a,b): print(current_thread().ident,'start',a,b) time.sleep(random.randint(1,4)) print(current_thread().ident,'end',a) return (a,a*b) def print_func(ret): # 異步阻塞 print(ret.result()) if __name__ == '__main__': tp = ThreadPoolExecutor(4) futrue_l = {} for i in range(20): # 異步非阻塞的 ret = tp.submit(func,i,b=i+1) ret.add_done_callback(print_func) # ret這個任務會在執行完畢的瞬間當即觸發print_func函數,而且把任務的返回值對象傳遞到print_func作參數 # 異步阻塞 回調函數 給ret對象綁定一個回調函數,等待ret對應的任務有告終果以後當即調用print_func這個函數 # 就能夠對結果當即進行處理,而不用按照順序接收結果處理結果
from concurrent.futures import ThreadPoolExecutor import requests import os def get_page(url): # 訪問網頁,獲取網頁源代碼 線程池中的線程來操做 print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): # 獲取到字典結果以後,計算網頁源碼的長度,把https://www.baidu.com : 1929749729寫到文件裏 線程任務執行完畢以後綁定回調函數 res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # 得到一個線程池對象 = 開啓線程池 tp = ThreadPoolExecutor(4) # 循環urls列表 for url in urls: # 獲得一個futrue對象 = 把每個url提交一個get_page任務 ret = tp.submit(get_page,url) # 給futrue對象綁定一個parse_page回調函數 ret.add_done_callback(parse_page) # 誰先回來誰就先寫結果進文件
協程:本質是一個線程 可以在一個線程內的多個任務之間來回切換 節省io操做的時間也只能是和網絡操做相關的 特色:數據安全,用戶級別,開銷小,不能利用多核,可以識別的io操做少 gevent 第三方模塊 完成併發的socket server 協程對象.spawn(func,參數) 能識別的io操做也是有限的 而且要想讓gevent可以識別一些導入的模塊中的io操做 from gevent import monkey;monkey.patch_all() asyncio 內置模塊 await 寫好的asyncio中的阻塞方法 async 標識一個函數時協程函數,await語法必須用在async函數中
切換 並 規避io 的兩個模塊 gevent = 利用了 greenlet 底層模塊完成的切換 + 自動規避io的功能 asyncio = 利用了 yield 底層語法完成的切換 + 自動規避io的功能 tornado 異步的web框架 yield from - 更好的實現協程 send - 更好的實現協程 asyncio模塊 基於python原生的協程的概念正式的被成立 特殊的在python中提供協程功能的關鍵字 : aysnc await # 用戶級別的協程還有什麼好處: # 減輕了操做系統的負擔 # 一條線程若是開了多個協程,那麼給操做系統的印象是線程很忙,這樣能多爭取一些時間片時間來被CPU執行,程序的效率就提升了
import gevent def func(): # 帶有io操做的內容寫在函數裏,而後提交func給gevent print('start func') gevent.sleep(1) # gevent.sleep是一個特殊的,time.sleep在這裏不行 # 若是想用time就要在用下面的代碼 print('end func') g1 = gevent.spawn(func) g2 = gevent.spawn(func) g3 = gevent.spawn(func) gevent.joinall([g1,g2,g3])
time
import time print(time.sleep) # 這裏的time和from gevent import mockey裏的不一樣 from gevent import monkey monkey.patch_all() import time import gevent def func(): # 帶有io操做的內容寫在函數裏,而後提交func給gevent print('start func') time.sleep(1) print('end func') g1 = gevent.spawn(func) g2 = gevent.spawn(func) g3 = gevent.spawn(func) gevent.joinall([g1,g2,g3]) # 阻塞 直到協程g1任務執行結束 # 要有阻塞才能執行
基於gevent協程實現socket併發
import socket print(socket.socket) # 在patch all以前打印一次 from gevent import monkey # gevent 如何檢測是否能規避某個模塊的io操做呢? monkey.patch_all() import socket import gevent print(socket.socket) # 在patch all以後打印一次,若是兩次的結果不同,那麼就說明可以規避io操做 def func(conn): while True: msg = conn.recv(1024).decode('utf-8') MSG = msg.upper() conn.send(MSG.encode('utf-8')) sk = socket.socket() sk.bind(('127.0.0.1',9001)) sk.listen() while True: conn,_ = sk.accept() gevent.spawn(func,conn)
client
import time import socket from threading import Thread def client(): sk = socket.socket() sk.connect(('127.0.0.1',9001)) while True: sk.send(b'hello') msg = sk.recv(1024) print(msg) time.sleep(0.5) for i in range(500): Thread(target=client).start()
import asyncio async def func(name): print('start',name) # await 可能會發生阻塞的方法 # await 關鍵字必須寫在一個async函數裏 await asyncio.sleep(1) print('end') loop = asyncio.get_event_loop() loop.run_until_complete(func("alex")) # 單個任務 # loop.run_until_complete(asyncio.wait([func('alex'),func('太白')]))接收多個,接受列表