目錄html
程序:java
進程:python
進程是分配資源的基本單位,爲線程提供資源,一個程序能夠開啓多個進程linux
進程被誰運行:編程
CPU最終運行你的程序json
操做系統調度做用,將你磁盤上的程序加載到內存,而後交給CPU處理,一個CPU在運行的一個程序,就是開啓了一個進程windows
操做系統是存在於硬件與軟件之間,管理,協調,控制軟件與硬件的交互設計模式
若是沒有操做系統,去寫一個程序,你要完成兩層功能:瀏覽器
操做系統兩個做用:安全
1,將一些複雜的硬件操做封裝成簡單的接口,便於使用
2,操做系統能夠合理的調度分配多個進程與CPU的關係,讓其有序化
第一代電子計算機:操做插線與你的程序結合
第二代計算機:磁帶存儲,批處理系
第三代計算機:集成電路,多道程序系統
多道技術是在不一樣任務間切換執行,因爲計算機切換速度很是快,用戶是無感狀態
解決軟件之間的隔離,互不影響
程序就是一堆代碼
進程是分配資源的基本單位,爲線程提供資源,一個程序能夠開啓多個進程
tail -f access.log |grep '404' 執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。 進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行
什麼是開啓多個進程:socket:server,client兩個進程
python中,若是一次想開啓多個進程,必須是一個主進程,開啓多個子進程
linux,windows:有主進程開啓子進程
相同點:主進程開啓子進程,兩個進程都有相互隔離的獨立空間,互不影響
不一樣點:
linux:子進程空間的初始數據徹底是從主(父)進程copy一份
windows:子進程空間初始數據徹底是從主(父)進程copy一份,可是有所不一樣
#這樣的實例雖然建立了子進程,可是在生產環境中子進程結束的時間不定 from multiprocessing import Process import time #當前py文件就是主進程,先運行主進程 def task(name): print(f"{name}is running") time.sleep(3) #阻塞 print(f"{name}is done") if __name__ == '__main__': #windows開啓必須寫在mian下面 p = Process(target=task,args=("海洋",)) #target要封裝的內容,對象args必定是個元祖形式 p.start() #子進程 通知操做系統在內存中開闢一個空間,將p這個進程放進去,讓cpu執行 print("___主進程")
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() #放在最上面,必需要繼承父類init self.name = name def run(self): print(f"{self.name}is running") time.sleep(3) #阻塞 print(f"{self.name}is done") if __name__ == '__main__': p = MyProcess("海洋") p.start() print("====主進程")
tasklist | findstr pycharm win查看某個進程
import os print(os.getpid()) 查看當前的pid
import os print(os.getppid()) 查看父進程
import time from multiprocessing import Process X = 1000 def task(): global x x = 2 if __name__ == '__main__': p1 = Process(target = task,) p1.start() time.sleep(1) print(f"主進程{X}") print(f"主進程{X}") import time from multiprocessing import Process X = 256 #知足小數據池 def task(): print(f"子進程{id(X)}") if __name__ == '__main__': print(f"主進程{id(X)}") p1 = Process(target = task,) p1.start() time.sleep(1) print()
join 主進程等待子進程結束以後,在執行
join開啓一個進程:
from multiprocessing import Process import time def task(name): time.sleep(1) print(f"{name}is running") if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.start() p.join() #告知主進程,p進程結束以後,主進程在結束,join有些阻塞的意思 print("___主進程") # p1.start() # p2.start() #p1,p2,p3三個子進程前後運行順序不定,start只是通知一下操做系統 # p3.start() #操做系統調用cpu先運行誰,誰先執行
join串行:
from multiprocessing import Process import time def task(name,sec): time.sleep(sec) print(f"{name}is running") if __name__ == '__main__': p1 = Process(target=task, args=("海洋",1)) p2 = Process(target=task, args=("俊麗",2)) p3 = Process(target=task ,args=("寶寶",3)) start_time = time.time() p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print(f"主進程{time.time() - start_time}")
join併發:
from multiprocessing import Process import time def task(sec): time.sleep(sec) print(f"is running") if __name__ == '__main__': start_time = time.time() list = [] for i in range(1,4): p = Process(target=task, args=(i,)) p.start() list.append(p) for i in list: i.join() print(f"主進程{time.time() - start_time}")
屬性:
from multiprocessing import Process import time def task(name): print(f"{name}is running") time.sleep(3) print(f"{name}is done") if __name__ == '__main__': p = Process(target=task,args=("海洋",),name="俊麗") #name給進程對象設置name屬性 p.start() # print(p.pid) #獲取到進程號 time.sleep(1) #睡一秒,子進程已經執行完成 p.terminate() #強制結束子進程,強制執行也會有執行時間 #terminate跟start同樣工做原理,都要通知操做系統開啓子進程 #內存終止或者開啓都要須要時間的 time.sleep(1) #睡一秒,讓terminate殺死 print(p.is_alive()) #判斷子進程是否存活,只是查看內存中p子進程是否還運行 print("主進程")
init是全部進程的父進程: 殭屍進程,殭屍是什麼,死而沒有消失 主進程建立多個短暫週期的子進程,當子進程退出,是須要等待父進程處理,而父進程沒有及時對子進程回收,那麼子進程的進程符仍然保存在系統中,這種進程就是僵死進程 什麼進程描述符:每個進程都有描述符,io請求,數據指針 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子進程開始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': for i in range(100): p = Process(target=task, args=("海洋",)) p.start() print(f"___主進程:{os.getpid()}")
孤兒進程:孤兒進程是由於主進程的退出,他下面的全部子進程都變成孤兒進程了,init會對孤兒進行回收,釋 放掉佔用系統的資源,這種回收也是爲了節省內存。 孤兒進程無害,若是殭屍進程掛了,init會對孤兒進程回收,init是全部進程的祖進程,linux中爲1,0系統
將一個子進程設置成守護進程,當父進程結束,子進程必定會結束,避免孤兒進程產生,應爲回收機制
父進程不能建立子進程
#守護進程會在主進程代碼執行結束後終止,守護進程內沒法在開啓子進程 from multiprocessing import Process import time import os def task(name): print(f"{name}is running") print(f"子進程開始了:{os.getpid()}") time.sleep(50) if __name__ == '__main__': p = Process(target=task,args=("海洋",)) p.daemon = True #將p子進程設置成守護進程,守護子進程,只要主進程結束 #子進程不管執行與否都立刻結束,daemon,開啓在start上面 p.start() print(f"___主進程:{os.getpid()}")
第一種:基於文件+鎖的形式:效率低,麻煩
第二種:基於隊列,推薦的使用形式
第三種:基於管道,管道本身加鎖,底層可能會出現數據丟失損壞,隊列和管道都是將數據存放於內存中
互斥鎖保證了每次只有一個線程進行寫入操做,只有當這個線程解鎖,在運行其餘資源,上鎖和解鎖都須要本身添加
三臺電腦同時調用打印機去打印,開啓三個進程使用互斥鎖,實現公平搶佔資源
#上鎖: #必定要是同一把鎖:只能按照這個規律,上鎖一次,解鎖一次 #互斥鎖與join區別: #共同點:都是完成了進程之間的串行 #區別:join認爲控制進程的串行,互斥鎖是解決搶佔的資源,保證公平性 from multiprocessing import Process from multiprocessing import Lock import time import os import random def task1(lock): print("test1") #驗證CPU遇到IO切換 lock.acquire() print("task1 開始打印") time.sleep(random.randint(1,3)) print("task1 打印完成") lock.release() def task2(lock): print("test2") lock.acquire() #上鎖 print("task2 開始打印") time.sleep(random.randint(1,3))#阻塞,cpu切換任務,別的任務都在鎖,回來繼續執行這個程序 print("task2 打印完成") lock.release() #解鎖 def task3(lock): print("test2") lock.acquire() # lock.acquire() #死鎖錯誤示例 print("task3 開始打印") time.sleep(random.randint(1,3)) print("task3 打印完成") lock.release() if __name__ == '__main__': lock = Lock() #一把鎖 p1 = Process(target=task1,args=(lock,)) #三個進程哪一個先到先執行 p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start()
互斥鎖買票示例:
#買票系統: #買票以前先要查票,在你查票的同時,100我的也在查看此票 #買票時,你要從服務端獲取到票數,票數>0 ,買票,而後服務端票數減一,中間有網絡延遲 #多進程原則上是不能互相通訊的,他們在內存級別是有數據隔離,不表明磁盤上的數據隔離,他們能夠共同操做一個文件 #多個進程搶佔同一個資源,要想公平按照順序,只能串行 from multiprocessing import Process from multiprocessing import Lock import random import json import time import os def search(): time.sleep(random.random()) #一秒以內 with open("db.json", encoding="utf-8") as f1: dic = json.load(f1) print(f"剩餘票數{dic['count']}") def get(): with open("db.json",encoding="utf-8") as f1: dic = json.load(f1) time.sleep(random.randint(1,3)) #時間延遲 if dic['count'] > 0: dic['count'] -= 1 with open("db.json",encoding="utf-8",mode="w") as f1: json.dump(dic,f1) print(f'{os.getpid()}用戶購買成功') else: print("沒票了") def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task,args=(lock,)) p.start() 缺點: 1.操做文件效率低 2.本身加鎖很麻煩,很容易出現死鎖,遞歸鎖
進程之間的通訊最好的方式是基於隊列
隊列是實現進程之間通訊的工具,存在內存中的一個容器,最大的特色是符合先進先出的原則
多個進程搶佔一個資源:串行,有序以及數據安全,買票
多個進程實現併發的效果:生產者消費模型
from multiprocessing import Queue q = Queue(3) #能夠設置元素個數,當數據已經達到上限,在插入夯住 def func(): print("in func") q.put("海洋") #插入數據 q.put({"count":1}) q.put(func) q.put("333",block=False) #默認爲True 當你插入的數據超過最大限度,默認阻塞 # q.put(333,timeout=8) #超過八秒在put不進數據,就會報錯 print(q.get()) print(q.get()) ret = q.get() ret() # q.get() #當你將數據取完,夯住,等待隊列put值,起另外一個進程往隊列中插入數據 #q.put() #1,maxsize() #數據量不易過大,精簡的重要數據 #2,put bolck #默認爲True阻塞 當你插入的數據超過最大限度,能夠設置報錯 #3,put timeout #延時報錯,超過三秒在put不進數據,就會報錯 #get #2,get bolck #取值爲空報錯 #3,get timeout #取值超過三秒報錯
#小米:搶手機,預期發售10個,100人去搶 from multiprocessing import Queue from multiprocessing import Process import os def task(q): try: q.put(f'{os.getpid()}') except Exception: return if __name__ == '__main__': q = Queue(10) #建立隊列,能夠存放十我的 for i in range(100): p = Process(target=task,args=(q,)) p.start() for i in range(1,11): #數量超過隊列會取 print(f'排名第{i}的用戶:{q.get()}') #獲取隊列中的信息,先進來的先取出來 #利用隊列進行進程之間的通訊:簡單,方便,不用本身手動加鎖,隊列自帶阻塞,可持續化取數據
利用隊列進行通訊,生產者生產數據,消費者獲取數據使用,平衡了生產力和消費力,生產者和消費者是一種解耦合性(經過容器解決),可持續化取數據
模型,設計模式,歸一化設計,理論等等,教給你一個編程的思路,之後遇到相似的狀況,之後直接調用就便可
生產者:生產數據的進程
消費者:生產出來的數據進行處理
#吃包子:廚師生產包子,不可能直接給你喂到嘴裏,放在一個盆裏,消費者從盆中取出包子食用 #三個主體:生產者(廚師),容器隊列(盤 緩衝區),消費者(人) #若是沒有容器,生產者與消費者強解耦性,不合理,因此咱們要有一個容器,緩衝區平衡了生產力與消費力 # 生產者消費者多應用於併發: from multiprocessing import Queue from multiprocessing import Process import time import random def producer(name,q): for i in range(1,6): time.sleep(random.randint(1,3)) res = f'{i}號包子' q.put(res) print(f'生產者{name}:生產了{res}') def consumer(name,q): while 1: try: time.sleep(random.randint(1, 3)) ret = q.get(timeout = 5) #五秒還吃不到退出 print(f'消費者{name}:吃了{ret}') except Exception: return if __name__ == '__main__': q = Queue() #盆 p1 = Process(target=producer,args=("俊麗",q,)) #生產 p2 = Process(target=consumer,args=("海洋",q,)) #消費 p1.start() p2.start()
進程:進程是分配資源的基本單位,內存中開闢空間,爲線程提供資源,一個程序能夠開啓多個進程
線程:CPU調度的最小單位,執行單位,線程也被稱做爲輕量級的進程,動態的
開啓QQ:開啓一個進程,在內存中開闢空間加載數據,啓動一個線程執行代碼
線程依賴進程的一個進程能夠包含多個線程,可是必定有一個主線程,線程纔是CPU執行的最小單元
進程線程對比:
1,開啓多進程開銷很是大,10-100倍,而開啓線程開銷很是小
2.開啓多進程速度慢,開啓多線程速度快
3.進程之間數據不共享,線程共享數據
多線程應用場景:
併發:一個CPU能夠來回切換(線程之間切換),多進程併發,多線程的併發
多進程併發:開啓多個進程,併發的執行
多線程併發:開啓線程,併發的執行
若是遇到併發:多線程居多
線程絕對要比進程開啓速度快
#先打印海洋,線程要比進程速度快,若是是進程先打印主線程 from threading import Thread def task(name): print(f'{name} is running') if __name__ == '__main__': t = Thread(target=task,args=("海洋",)) t.start() print("主線程") #子進程睡眠3秒,先運行主進程 from threading import Thread import time x = 1000 def task(): time.sleep(3) print('子線程....') def main(): print('111') print('222') print('333') if __name__ == '__main__': t = Thread(target=task) t.start() main()
from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(f'{self.name} is running') if __name__ == '__main__': t = MyThread("海洋") t.start() print("主線程")
#主線程和子線程pid同樣 from threading import Thread import os def task(): print(f'子線程:{os.getpid()}') if __name__ == '__main__': t = Thread(target=task,) t.start() print(f"主線程:{os.getpid()}")
from threading import Thread x = 1000 def task(): global x x = 0 if __name__ == '__main__': t = Thread(target=task, ) t.start() t.join() # 告知主線程,等待子線程運行完畢在執行 print(f'主線程:{x}')
from threading import Thread import threading import time def task(name): time.sleep(1) print(f'{name} is running') if __name__ == '__main__': for i in range(5): t = Thread(target=task,args=("海洋",)) t.start() #線程對象的方法 # print(t.is_alive()) #判斷線程是否存活 #threading模塊的方法 print(threading.current_thread().name) #返回線程對象.name print(threading.enumerate()) #返回列表,返回的是全部線程對象 print(threading.active_count()) #獲取活躍的線程數量(包括主線程) print("主線程")
守護線程必須等待主線程結束才結束,主線程必須等待全部的非守護線程結束才能結束,由於主線程的結束意味着進程的結束,這就是一個守護機制
多線程是同一個空間,同一個進程,進程表明,空間,資源,靜態的:
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置 t.start() print('主線程') print(t.is_alive()) #判斷進程是否存在也是主線程 from threading import Thread import time def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(1) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon = True t1.start() t2.start() #t2非守護線程,主線程等待子線程結束 print("main-------")
from threading import Thread import time x = 100 def task(name): global x temp = x time.sleep(3) temp -= 1 x = temp if __name__ == '__main__': t = Thread(target=task,args=("海洋",)) t.start() t.join() print(f"主線程{x}") #多個線程搶佔一個資源 from threading import Thread import time x = 100 def task(name): global x temp = x time.sleep(3) temp -= 1 x = temp if __name__ == '__main__': tl = [] for i in range(100): t = Thread(target=task,args=("海洋",)) tl.append(t) t.start() for i in tl: i.join() print(f"主進程{x}") #多個線程搶佔一個資源
全部線程串行執行,多個 線程共同搶佔一個數據,保證了數據安全:
from threading import Thread from threading import Lock import time x = 100 def task(lock): lock.acquire() global x temp = x time.sleep(0.1) temp -= 1 x = temp lock.release() if __name__ == '__main__': lock = Lock() tl = [] for i in range(100): t = Thread(target=task,args=(lock,)) tl.append(t) t.start() for i in tl: i.join() print(f"主線程{x}") #多個線程搶佔一個資源,join讓主線程等待子線程執行完成在執行,結果0
多個線程或者進程競爭資源,若是開啓的互斥鎖過多,遇到互相搶鎖形成互相等待狀況,程序夯住,
還有一種是給同時給一個線程或者進程連續加鎖屢次,利用遞歸鎖解決Rlock
from threading import Thread from threading import Lock import time lock_A = Lock() lock_B = Lock() class Mtthread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_B.acquire() print(f"{self.name}誰拿到B鎖") lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f"{self.name}誰拿到B鎖") time.sleep(1) lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_A.release() lock_B.release() if __name__ == '__main__': t1 = Mtthread() t1.start() t2 = Mtthread() t2.start() t3 = Mtthread() t3.start() print(f"主進程")
遞歸鎖上有引用次數,每次引用計數+1,解鎖計數-1,只有計數爲0.在運行下個進程
#遞歸鎖: #遞歸鎖是一把鎖,鎖上有記錄,只要acquire一次,鎖上就計數一次,acquire2次就計數兩次 #release 1次減一,只要遞歸鎖計數不爲0,其餘線程不能搶 from threading import Thread from threading import RLock import time lock_A = lock_B = RLock() class Mtthread(Thread): def run(self): # lock_A.acquire() # lock_B.acquire() # print(111) # lock_A.release() # lock_B.release() self.f1() self.f2() def f1(self): lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_B.acquire() print(f"{self.name}誰拿到B鎖") lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f"{self.name}誰拿到B鎖") time.sleep(1) lock_A.acquire() print(f"{self.name}誰拿到A鎖") lock_A.release() lock_B.release() if __name__ == '__main__': t1 = Mtthread() t1.start() t2 = Mtthread() t2.start() t3 = Mtthread() t3.start() print(f"主進程")
信號量准許多個線程或者進程同時進入
from threading import Thread from threading import current_thread from threading import Semaphore import time import random sm = Semaphore(4) def chi(): sm.acquire() print(f"{current_thread().name}正在吃飯") time.sleep(random.randint(1,3)) sm.release() if __name__ == '__main__': for i in range(20): t = Thread(target=chi) t.start()
全局解釋器鎖,就是一把互斥鎖,將併發變成串行,同一時刻只能有一個線程進入解釋器,自動加鎖和釋放鎖,犧牲效率保護python解釋器內部數據安全
強行加鎖,保證解釋器裏面的數據安全
多進程能夠利用多核,多進程的每一個進程裏面都有python解釋器程序
單進程的多線程不能利用多核,python解釋器內部程序,不支持多線程同時解釋
python-單核處理IO阻塞的多線程,java多核處理IO阻塞問題,效率差很少
單核處理三個IO線程,多核處理三個IO線程,多核快些
將你的py文件當作實參傳送給解釋器傳換成c語言字節碼,在交給虛擬機轉換成010101機器碼,這些代碼都是線程執行,進程進行調度資源
lpython:交互式解釋器,能夠補全代碼
Jpython:java語言字節碼,剩下的同樣
pypy:動態編譯,JAT技術,執行效率要比Cpython塊,可是技術還有缺陷bug
一核,都是單進程多線程併發快,由於單核開啓多進程也是串行。
計算密集型:
多進程的並行比多線程的併發執行效率高不少(由於不一樣進程運行在不一樣核心上,並行執行)
IO密集型:
多線程要比多進程處理速度快,由於進程開銷大,而線程處理其實也是串行,只不過處理速度比進程更快些,線程一次只能處理一個事情(空間複用)
開啓150個進程(開銷大,速度慢),執行IO任務耗時長
開啓150個線程(開銷小,速度快),執行IO任務耗時短
若是你的任務是io密集型而且任務數量大,用單進程下的多線程處理阻塞效率高
from multiprocessing import Process from threading import Thread import time import os # print(os.cpu_count()) def task1(): res = 1 for i in range(1, 100000000): res += i def task2(): res = 1 for i in range(1, 100000000): res += i def task3(): res = 1 for i in range(1, 100000000): res += i def task4(): res = 1 for i in range(1, 100000000): res += i if __name__ == '__main__': # 四個進程 四個cpu 並行 效率 start_time = time.time() p1 = Process(target=task1) p2 = Process(target=task2) p3 = Process(target=task3) p4 = Process(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(f'主: {time.time() - start_time}') # 10.125909328460693 # 一個進程 四個線程 # start_time = time.time() # p1 = Thread(target=task1) # p2 = Thread(target=task2) # p3 = Thread(target=task3) # p4 = Thread(target=task4) # # p1.start() # p2.start() # p3.start() # p4.start() # # p1.join() # p2.join() # p3.join() # p4.join() # print(f'主: {time.time() - start_time}') # 22.927688121795654
計算IO密集型:
from multiprocessing import Process from threading import Thread import time import os # print(os.cpu_count()) def task1(): res = 1 time.sleep(3) if __name__ == '__main__': # 開啓150個進程(開銷大,速度慢),執行IO任務, 耗時 8.382229089736938 # start_time = time.time() # l1 = [] # for i in range(150): # p = Process(target=task1) # l1.append(p) # p.start() # for i in l1: # i.join() # print(f'主: {time.time() - start_time}') # 開啓150個線程(開銷小,速度快),執行IO任務, 耗時 3.0261728763580322 # start_time = time.time() # l1 = [] # for i in range(150): # p = Thread(target=task1) # l1.append(p) # p.start() # for i in l1: # i.join() # print(f'主: {time.time() - start_time}')
當程序執行,開啓100個線程時,第一個線程先要拿到GIL鎖,而後拿到lock鎖,遇到阻塞,CPU切走,GIL釋放,第一個線程掛起
第二個線程執行,搶到GIL鎖,進入要搶lock,可是lock鎖還沒釋放,阻塞掛起
本身加互斥鎖,必定要加在處理共享數據的地方,加的範圍不要擴大,範圍過大,影響併發
GIL鎖單進程的多線程不能利用多核,不能並行,可是能夠併發
GIL自動上鎖解鎖,文件中的互斥鎖Lock,手動上鎖解鎖
GIL鎖,保護解釋器的數據安全,互斥鎖是保護的文件的數據安全
線程池在系統啓動時建立了大量的空閒線程,線程執行直接調用線程池中已經開啓好的空閒線程,當線程執行結束,該線程不會死亡,而是將線程變成空閒狀態,放回進程池。
線程池提升效率,資源複用
進程池:放置進程的一個容器
線程池:放置線程的一個容器
完成一個簡單的socket通訊,服務端必須與一個客戶端交流完畢,而且這個客戶端斷開鏈接以後,服務端才能接待下一個客戶:
#開啓進程池或者線程池: #線程池好仍是進程池好:io阻塞或者計算密集型 from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor import time import os import random def task(name): # print(name) print(f"{os.getpid()}準備接客") time.sleep(random.randint(1,3)) if __name__ == '__main__': # p = ProcessPoolExecutor(max_workers=5) #限制進程數量,默認爲cpu個數 p = ThreadPoolExecutor() #線程默認是CPU個數的五倍 for i in range(23): p.submit(task,1) #給進程池放置任務啓動,1爲傳參
程序運行中的狀態,阻塞,運行,就緒
阻塞:當你程序遇到IO阻塞掛起,CPU切換,等到IO結束以後再執行
非阻塞:程序沒有IO,或者遇到IO經過某種手段讓cpu去執行其餘任務,儘量的佔用CPU
任務發出去以後等待,直到這個任務最終結束以後,給我一個返回值,發佈下一個任務
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f"{os.getpid()}is running") time.sleep(1) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) for i in range(10): obj = p.submit(task,) print(obj.result()) #同步等待一個進程內容所有執行完成在執行下一個
將任務發給進程,無論任務如何,直接運行下一個
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(): print(f'{os.getpid()} is running') time.sleep(random.randint(0,2)) return f'{os.getpid()} is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) obj_l1 = [] for i in range(10): obj = p.submit(task,) # 異步發出. obj_l1.append(obj) # time.sleep(3) p.shutdown(wait=True) # 1. 阻止在向進程池投放新任務, # 2. wait = True 十個任務是10,一個任務完成了-1,直至爲零.進行下一行. for i in obj_l1: print(i.result())
瀏覽器 封裝頭部,發送一個請求--->www.taobao.com ----> 服務器獲取到請求信息,分析正確--->給你返回一個文件,--->遊覽器將這個文件的代碼渲染,就成了你看的樣子
爬蟲:利用reauests模塊功能模擬遊覽器封裝頭,給服務器發送一個請求,騙過服務器以後,服務器也會給你返回一個文件,爬蟲拿到文件,進行數據清洗獲取到你想要的信息
第一步:爬取服務器端的文件(IO阻塞)
第二部:拿到文件,進行數據分析(非IO,IO極少)
import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: return response.text def parse(text): print(f'{os.getpid()} 分析結果:{len(text)}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) obj_list = [] for url in url_list: obj = pool.submit(get, url) obj_list.append(obj) pool.shutdown(wait=True) for obj in obj_list: #抓取網頁是串行,輸出的結果 parse(obj.result()) #爬取一個網頁須要2s,併發爬取10個網頁:2.多s. #分析任務: 1s. 10s. 總共12.多秒. # 如今這個版本的過程: # 異步發出10個爬取網頁的任務,而後4個進程併發(並行)的先去完成4個爬取網頁的任務,而後誰先結束,誰進行下一個 # 爬取任務,直至10個任務所有爬取成功. # 將10個爬取結果放在一個列表中,串行的分析. import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: parse(response.text) def parse(text): print(f'{os.getpid()} 分析結果:{len(text)}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) # pool.shutdown(wait=True) print('主') #異步發出10個 爬取網頁+分析 的任務,而後4個進程併發(並行)的先去完成4個爬取網頁+分析 的任務, #而後誰先結束,誰進行下一個 爬取+分析 任務,直至10個爬取+分析 任務所有完成成功.
import requests from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process import time import random import os def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取:{url}') if response.status_code == 200: return response.text def parse(obj): time.sleep(1) print(f'{os.getpid()} 分析結果:{len(obj.result())}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.cnblogs.com/jin-xin/articles/11232151.html', 'https://www.cnblogs.com/jin-xin/articles/10078845.html', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] start_time = time.time() pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get, url) obj.add_done_callback(parse) # 增長一個回調函數 # 如今的進程完成的仍是網絡爬取的任務,拿到了返回值以後,結果丟給回調函數add_done_callback, # 回調函數幫助你分析結果 # 進程繼續完成下一個任務. pool.shutdown(wait=True) #阻止發佈新的任務,代替join print(f'主: {time.time() - start_time}') # 回調函數是主進程幫助你實現的, 回調函數幫你進行分析任務. 明確了進程的任務: 只有一個網絡爬取. # 分析任務: 回調函數執行了.對函數之間解耦. # 極值狀況: 若是回調函數是IO任務,那麼因爲你的回調函數是主進程作的,因此有可能影響效率. # 回調不是萬能的,若是回調的任務是IO, # 那麼異步 + 回調機制 很差.此時若是你要效率只能犧牲開銷,再開一個線程進程池.
import queue q = queue.Queue(3) q.put(1) q.put(2) q.put('海洋') print(q.get()) print(q.get()) print(q.get())
import queue q = queue.LifoQueue() q.put(1) q.put(3) q.put('海洋') print(q.get()) print(q.get()) print(q.get())
# 須要元組的形式,(int,數據) int 表明優先級,數字越低,優先級越高. import queue q = queue.PriorityQueue(3) q.put((10, '垃圾消息')) q.put((-9, '緊急消息')) q.put((3, '通常消息')) print(q.get()) print(q.get()) print(q.get())
併發的執行某個任務,多進程多線程,幾乎同時執行,一個線程執行到中間時,通知另外一個線程開始執行
import time from threading import Thread from threading import current_thread from threading import Event event = Event() # 默認是False def task(): print(f'{current_thread().name} 檢測服務器是否正常開啓....') time.sleep(3) # 先運行task阻塞三秒,在將event修改成True event.set() # 改爲了True def task1(): print(f'{current_thread().name} 正在嘗試鏈接服務器') # event.wait() # 輪詢檢測event是否爲True,當其爲True,繼續下一行代碼. 阻塞 event.wait(1) # 設置超時時間,若是1s中之內,event改爲True,代碼繼續執行. # 設置超時時間,若是超過1s中,event沒作改變,代碼繼續執行. print(f'{current_thread().name} 鏈接成功') if __name__ == '__main__': t1 = Thread(target=task1,) t2 = Thread(target=task1,) t3 = Thread(target=task1,) t = Thread(target=task) t.start() t1.start() t2.start() t3.start()
協程的本質也是一個線程,而使用協程目的是爲了減小系統開銷,協程是咱們經過程序來控制任務切換,協程速度比系統更快,最大限度的利用CPU,更加輕量級
線程協程的區別:
1,協程切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,更加輕量級
2.單線程內就能夠實現併發的效果,最大限度的利用CPU
3.修改共享的數據不須要加鎖
協程就像線程同樣也是在多任務間來回切換
在其餘語言中,協程的意義不大,多線程便可以解決I/O問題,在python中有GIL鎖,在同一時間只有一個線程在工做,因此一個線程裏面IO操做特別多,協程比較適用
串行:多個任務執行時,一個任務從開始執行,遇到IO等待,等待IO阻塞結束以後再執行下一個
並行:多核多個線程或者進程同時執行,四個CPU同時執行四個任務
併發:多個任務看起來是同時執行,CPU在多個任務之間來回切換,遇到IO阻塞,計算密集型執行時間過長
併發本質:遇到IO阻塞,計算密集型執行時間過長,保持原來的狀態
一個線程實現開發:
多進程:操做系統控制,多個進程的多個任務切換 + 保持狀態
多線程程:操做系統控制,多個線程的多個任務切換 + 保持狀態
協程:程序控制一個線程的多個任務的切換以及保持狀態
微併發,處理任務不宜過多
協程他會調度CPU,若是協程管控的任務中,遇到阻塞,他會快速的(比操做系統快),切換到另外一個任務,而且能將上一個任務掛起(保持狀態),讓操做系統覺得CPU一直在工做
import time def task1(): res = 1 for i in range(1,100000): res += i def task2(): res = 1 for i in range(1,100000): res -= i start_time = time.time() task1() task2() print(f'串行消耗時間:{time.time()-start_time}') # 串行消耗時間:0.012489557266235352 def task1(): res = 1 for i in range(1, 100000): res += i yield res def task2(): g = task1() res = 1 for i in range(1, 100000): res -= i next(g) start_time = time.time() task2() print(f'協程消耗時間:{time.time() - start_time}') # 協程消耗時間:0.02991938591003418
import gevent import time def eat(name): print('%s eat 1' %name) # 1 gevent.sleep(2) #協程識別gevent,能夠進行IO切換 # time.sleep(300) #協程不識別切換不了,不可切換 print('%s eat 2' %name) # 4 def play(name): print('%s play 1' %name) # 2 gevent.sleep(1) # time.sleep(3) print('%s play 2' %name) # 3 g1 = gevent.spawn(eat, '海洋') g2 = gevent.spawn(play, name='俊麗') #協程異步發佈任務 # g1.join() # g2.join() #或者gevent.joinall([g1,g2]) gevent.joinall([g1,g2]) #主線程等待協程執行完畢 print('主') #5
import threading from gevent import monkey monkey.patch_all() # 將你代碼中的全部的IO都標識. import gevent # 直接導入便可 import time def eat(): print(f'線程1:{threading.current_thread().getName()}') # 1 print('eat food 1') # 2 time.sleep(3) # 加上mokey就可以識別到time模塊的sleep了 print('eat food 2') # 6 def play(): print(f'線程2:{threading.current_thread().getName()}') # 3 print('play 1') # 4 time.sleep(1) # 來回切換,直到一個I/O的時間結束,這裏都是咱們個gevent作得,再也不是控制不了的操做系統了。 print('play 2') # 5 g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print(f'主:{threading.current_thread().getName()}') # 7