進程html
進程 : 什麼是進程? 是操做系統的發展過程當中,爲了提升CPU的利用率,在操做系統同時運行多個程序的時候,爲了數據的安全\代碼不混亂而被創造出來的一個概念 每個程序運行起來都至少是一個進程. 進程是計算機中最小的資源分配單位 進程被操做系統調度的,有不少相關的算法 - 早期的操做系統 進程之間是數據隔離的 進程的三狀態 就緒 運行 阻塞 同步異步 同步 : 一個任務的執行依賴另外一個事務的結束 join lock 異步 : 一個任務的執行不依賴另外一個事務的結束 start terminate 阻塞非阻塞 阻塞 : accept recv recvfrom queue.get join 非阻塞 : setblocking = False 併發並行 並行是特殊的併發 並行就是 同一時刻 兩個以上的程序同時在cpu上執行 併發就是 同一時段 兩個以上的程序看起來在同時執行 IO概念 : 文件操做 數據庫操做 網絡傳輸 用戶輸入輸出 Input 獲得bytes/str Output 發送數據/輸出數據 由於進程與進程之間本質上是異步且數據隔離 守護進程 : 子進程等待主進程的代碼結束就結束了 同步控制 join 鎖 - 互斥鎖 : 多個進程同時對一個數據進行操做的時候 操做同一個文件/數據庫/管道/Manager.dict 信號量 事件 數據共享 - 數據不安全 Manager IPC-進程之間通訊 管道 隊列 - 生產者消費者模型(爲了解決數據的生產和處理的效率問題) 第三方工具(消息隊列,消息中間件) 進程池 解決大量任務 開啓多個進程的開銷過大的問題 節省資源,提升併發效率的 通常開進程數 cpu_count * 1 or 2
Python並不支持真正意義上的多線程。Python中提供了多線程模塊,但若是想經過多線程提升代碼的速度,並不推薦使用多線程模塊。Python中有一個全局鎖Global Interpreter Lock(GIL),全局鎖會確保任什麼時候候多個線程中只有一個會被執行。線程的執行速度很是快,會誤覺得線程是並行執行的,但實際上都是輪流執行。通過GIL處理後,會增長線程執行的開銷。
全局鎖 GIL(Global interpreter lock) 並非 Python 的特性,而是在實現 Python 解析器(CPython)時所引入的一個概念。Python有CPython,PyPy,Psyco 等不一樣的 Python 執行環境,其中 JPython 沒有GIL。CPython 是大部分環境下默認的 Python 執行環境,GIL 並非 Python 的特性,Python 徹底能夠不依賴於 GIL。
GIL 限制了同一時刻只能有一個線程運行,沒法發揮多核 CPU 的優點。GIL 本質是互斥鎖,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。在一個 Python 的進程內,不只有主線程或者由主線程開啓的其它線程,還有解釋器開啓的垃圾回收等解釋器級別的線程。進程內,全部數據都是共享的,代碼做爲一種數據也會被全部線程共享,多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將 target 的代碼交給解釋器的代碼去執行,解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,所以爲了保證數據安全須要加鎖處理,即 GIL。
因爲GIL 的存在,同一時刻同一進程中只有一個線程被執行。多核 CPU能夠並行完成計算,所以多核能夠提高計算性能,但 CPU 一旦遇到 I/O 阻塞,仍然須要等待,因此多核CPU對 I/O 密集型任務提高不明顯。根據執行任務是計算密集型仍是I/O 密集型,不一樣場景使用不一樣的方法,對於計算密集型任務,多進程佔優點,對於 I/O 密集型任務,多線程佔優點
全部的程序 - 任務 全部的任務 - 進程 進程 ? 進行中的程序 PID進程ID 進程是計算機中資源分配的最小單位 進程間數據隔離 要通訊的話運用socket
進程調度:--先來先服務調度算法,短做業優先,時間片輪轉法,多級反饋機制
併發 資源有限的狀況下,AB程序交替使用cpu目的是提升效率 並行 同一時刻都在執行 多核 (是併發裏的一種特殊狀況) 更苛刻的條件 同步: 程序順序執行,多個任務之間串行執行 (洗衣完--作飯完--洗碗) 異步: 多個任務同時運行 (在同一時間內洗衣作飯洗碗) 阻塞: 程序因爲不符合某個條件或者等待某個條件知足 而在某一個地方進入等待狀態 非阻塞: 程序正常執行
同步阻塞
一件事兒一件事兒的作
中間還要被阻塞
同步非阻塞 : 費力不討好
一件事兒一件事兒的作
可是不阻塞
異步阻塞
同時進行的
每一件事兒都會遇到阻塞事件
異步非阻塞
幾個事情同時進行
每一件事都不阻塞
import os,time print(os.getpid()) #獲取當前進程號 print(os.getppid()) #獲取當前父進程id time.sleep(5) print(os.getpid())
import multiprocessing #是個包 from multiprocessing import Process import os def son_process(): # 這個函數的代碼實在子進程執行的 print('執行我啦',os.getpid(),os.getppid()) print() if __name__ == '__main__': print('1-->',os.getpid()) #主進程 p = Process(target=son_process) #實例化 p.start() #進程開始 下面是打印結果 # 1--> 6416 # 執行我啦 7388 6416
import time from multiprocessing import Process # 經過併發實現一個有併發效果的socket server # 1.開啓了一個子進程就已經實現了併發: 父進程(主進程)和子進程併發(同時執行) def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': p = Process(target=son_process) #建立子進程 p.start() #通知操做系統開啓一個子進程 os響應需求 分配資源 執行進程中的代碼 print('主進程')
import time from multiprocessing import Process # 經過併發實現一個有併發效果的socket server # 1.開啓了一個子進程就已經實現了併發: 父進程(主進程)和子進程併發(同時執行) def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': p = Process(target=son_process) #建立子進程 p.start() #通知操做系統開啓一個子進程 os響應需求 分配資源 執行進程中的代碼 for i in range(5): print('主進程') time.sleep(0.3)
import time from multiprocessing import Process def son_process(): print('son start') time.sleep(1) print('son end') if __name__ == '__main__': for i in range(3): p = Process(target=son_process) p.start()
import time from multiprocessing import Process def son_process(i): print('son start',i) time.sleep(1) print('son end',i) if __name__ == '__main__': for i in range(10): p = Process(target=son_process,args=(i,)) #args必須元組 p.start() # 通知操做系統 start並不意味着子進程已經開始了
import time from multiprocessing import Process def son_process(i): print('son start',i) time.sleep(1) print('son end',i) if __name__ == '__main__': for i in range(10): p = Process(target=son_process,args=(i,)) #子進程不支持返回值 p.start() print('主進程的代碼執行完畢') # 主進程會等待子進程結束以後才結束 # 爲何? # 父進程負責建立子進程,也負責回收子進程的資源
- server.py- import socket,time from multiprocessing import Process def talk(conn): conn, addr = sk.accept() print(conn) while True: msg = conn.recv(1024).decode() time.sleep(10) conn.send(msg.upper().encode()) if __name__ == '__main__': # 這句話下面的全部代碼都只在主進程中執行 sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() Process(target=talk,args=(sk,)).start() # 卡 大量的while True 而且代碼中並無太多的其餘操做 # 若是咱們使用socketserver,不會這麼卡 # 多進程確實能夠幫助咱們實現併發效果,可是還不夠完美 # 操做系統沒開啓一個進程要消耗大量的資源 # 操做系統要負責調度進程 進程越多 調度起來就越吃力 - clitent.py- import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'hello') print(sk.recv(1024))
def son_process(i): while True: print('son start',i) time.sleep(0.5) print('son end',i) if __name__ == '__main__': p = Process(target=son_process, args=(1,)) p.start() # 開啓一個子進程,異步的 print('主進程的代碼執行完畢') print(p.is_alive()) # 子進程還活着 p.terminate() # 結束一個子進程,異步的 print(p.is_alive()) # 子進程還在活着 time.sleep(0.1) print(p.is_alive()) # False
n = [100] def sub_n(): # 減法 global n # 子進程對於主進程中的全局變量的修改是不生效的 n.append(1) print('子進程n : ',n) #子進程n : [100, 1] if __name__ == '__main__': p = Process(target = sub_n) p.start() p.join() # 阻塞 直到子進程p結束 print('主進程n : ',n) #主進程n : [100]
# 主進程裏的print('主進程n : ',n)這句話在十個子進程執行完畢以後才執行 n = [100] import random def sub_n(): global n # 子進程對於主進程中的全局變量的修改是不生效的 time.sleep(random.random()) n.append(1) print('子進程n : ',n) if __name__ == '__main__': p_lst = [] #進程添加進列表 for i in range(10): p = Process(target = sub_n) #建立 p.start() #通知os 開啓 p_lst.append(p) for p in p_lst:p.join() # 阻塞 只有一個條件是可以讓我繼續執行 這個條件就是子進程結束 print('主進程n : ',n)
n = [100] def sub_n(): global n # 子進程對於主進程中的全局變量的修改是不生效的 n.append(1) print('子進程n : ',n) time.sleep(10) print('子進程結束') if __name__ == '__main__': p = Process(target = sub_n) p.start() p.join(timeout = 5) # 若是不設置超時時間 join會阻塞直到子進程p結束 # # timeout超時 # # 若是設置的超時時間,那麼意味着若是不足5s子進程結束了,程序結束阻塞 # # 若是超過5s尚未結束,那麼也結束阻塞 print('主進程n : ',n) p.terminate() # 也能夠強制結束一個子進程
# 設置子進程爲守護進程,守護進程會隨着主進程代碼的結束而結束 # 因爲主進程要負責給全部的子進程收屍,因此主進程必須是最後結束,守護進程只能在主進程的代碼結束以後就認爲主進程結束了 # 守護進程在主進程的代碼結束以後就結束了,不會等待其餘子進程結束 # # 但願守護進程必須等待全部的子進程結束以後才結束 # ???? # import time # from multiprocessing import Process # def alive(): # while True: # print('鏈接監控程序,而且發送報活信息') # time.sleep(0.6) # # def func(): # '主進程中的核心代碼' # while True: # print('選擇的項目') # time.sleep(1) # print('根據用戶的選擇作一些事兒') # # if __name__ == '__main__': # p = Process(target=alive) # p.daemon = True # 設置子進程爲守護進程,守護進程會隨着主進程代碼的結束而結束 # p.start() # p = Process(target=func) # p.start() # p.join() # 在主進程中等待子進程結束,守護進程就能夠幫助守護其餘子進程了 # 守護進程 # 1.守護進程會等待主進程的代碼結束而結束,不會等待其餘子進程的結束 # 2.要想守護進程等待其餘子進程,只須要在主進程中加上join
for i in range(5): pass print(i) # i=4 lst = [] for i in range(5): p = Process() lst.append(p) p.start() for p in lst: p.join() p.terminate()
import os from multiprocessing import Process class MyProcess(Process): #繼承 def __init__(self,參數): super().__init__() #父類 初始化 self.一個屬性 = 參數 def run(self): print('子進程中要執行的代碼') if __name__ == '__main__': conn = '一個連接' mp = MyProcess(conn) mp.start()
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。 接下來,咱們以 模擬搶票 爲例,來看看數據安全的重要性。 import json import time from multiprocessing import Process,Lock def search(name): '''查詢餘票的功能''' with open('ticket') as f: # 'r' dic = json.load(f) # 讀取 dict print(name , dic['count']) def buy(name): # 買票 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['count'] > 0: print(name,'買到票了') dic['count'] -= 1 time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) # 寫進去 def get_ticket(name,lock): #整個操做 search(name) # 先查 lock.acquire() # 只有第一個到達的進程才能獲取鎖,剩下的其餘人都須要在這裏阻塞 上鎖 buy(name) # 再買 lock.release() # 有一我的還鎖,會有一我的再結束阻塞拿到鑰匙 還鎖 if __name__ == '__main__': lock = Lock() # 實例化鎖 for i in range(10): # 10個進程 p = Process(target=get_ticket,args=('name%s'%i,lock)) # 建立 p.start() # 通知os 開啓 # tips : ticket 裏面的數據結構 {"count": 0} # 模擬過程描述: # 第一個來的人 取鑰匙 開門 進門 關門 帶着鑰匙反鎖 # 第一個拿到鑰匙的人 開門 出門 鎖門 掛鑰匙
進程 狀態碼 Z/z 殭屍進程 linux命令 主進程中控制子進程的方法: p = Process(target,args) #建立這一刻 根本沒有通知操做系統 p.start() #通知os 開啓子進程 異步非阻塞 p.terminate() #通知os,關閉子進程,異步非阻塞 p.is_alive() # 查看子進程是否還活着 p.join(timeout=10) # 阻塞 直到子進程結束 超時時間理解 # 守護進程 # 守護進程是一個子進程 # 守護進程會在主進程代碼結束以後才結束 # 爲何會這樣? # 因爲主進程必需要回收全部的子進程的資源 # 因此主進程必須在子進程結束以後才能結束 # 而守護進程就是爲了守護主進程存在的 # 不能守護到主進程結束,就只能退而求其次,守護到代碼結束了 # 守護到主進程的代碼結束,意味着若是有其餘子進程沒有結束,守護進程沒法繼續守護 # 解決方案 : 在主進程中加入對其餘子進程的join操做,來保證守護進程能夠守護全部主進程和子進程的執行 # 如何設置守護進程 # 子進程對象.daemon = True 這句話寫在start以前 # 鎖 # 爲何要用鎖? # 因爲多個進程的併發,致使不少數據的操做都在同時進行 # 因此就有可能產生多個進程同時操做 : 文件\數據庫 中的數據 # 致使數據不安全 # 因此給某一段修改數據的程序加上鎖,就能夠控制這段代碼永遠不會被多個進程同時執行 # 保證了數據的安全 # Lock 鎖(互斥鎖) # 鎖其實是把你的某一段程序變成同步的了,下降了程序運行的速度,爲了保證數據的安全性 # 沒有數據安全的效率都是耍流氓
# 對於鎖 保證一段代碼同一時刻只能有一個進程執行 # 對於信號量 保證一段代碼同一時刻只能有n個進程執行 # 流量控制 import time import random from multiprocessing import Process,Semaphore def ktv(name,sem): sem.acquire() #拿鎖 print("%s走進了ktv"%name) time.sleep(random.randint(5,10)) print("%s走出了ktv" % name) sem.release() #還鎖 if __name__ == '__main__': sem = Semaphore(4) # 同時只能有4個進程執行 for i in range(25): p = Process(target=ktv,args = ('name%s'%i,sem)) p.start() # 信號量原理 : 鎖 + 計數器實現的 # 普通的鎖 acquire 1次 # 信號量 acquire 屢次 # count計數 # count = 4 # acquire count -= 1 # 當count減到0的時候 就阻塞 # release count + = 1 # 只要count不爲0,你就能夠繼續acquire
# from multiprocessing import Event # Event 事件類 # e = Event() # e 事件對象 # 事件自己就帶着標識 : False # wait 阻塞 # 它的阻塞條件是 對象標識爲False # 結束阻塞條件是 對象標識爲True # 對象的標識相關的 : # set 將對象的標識設置爲True # clear 將對象的標識設置爲False # is_set 查看對象的標識是否爲True import time import random from multiprocessing import Event,Process def traffic_light(e): print('\033[1;31m紅燈亮\033[0m') while True: time.sleep(2) if e.is_set(): # 若是當前是綠燈 print('\033[1;31m紅燈亮\033[0m') # 先打印紅燈亮 e.clear() # 再把燈改爲紅色 else : # 當前是紅燈 print('\033[1;32m綠燈亮\033[0m') # 先打印綠燈亮 e.set() # 再把燈變綠色 # def car(e,carname): if not e.is_set(): # False print('%s正在等待經過'%carname) e.wait() #阻塞 print('%s正在經過'%carname) if __name__ == '__main__': e = Event() #建立 p = Process(target=traffic_light,args = (e,)) #建立進程 p.start() #開始 for i in range(100): #100輛車 time.sleep(random.randrange(0,3)) #隨機 p = Process(target=car, args=(e,'car%s'%i)) p.start() # 太複雜了 # 在咱們進行併發操做的時候不多用到這麼複雜的場景 # Event事件 # 放到進程中的代碼必定不止一段 # 這兩個操做之間 存在同步關係 # 一個操做去確認另外一個操做的執行條件是否完成 # 標識 控制wait是否阻塞的關鍵 # 如何修改這個標識 : clear set # 如何查看這個標識 : is_set
# 管道數據不安全 管道加鎖就是隊列 from multiprocessing import Pipe,Process def f(conn): #接收子conn conn.send('hello world') #發消息 conn.close() if __name__ == '__main__': parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) #傳子conn p.start() print(parent_conn.recv()) #父conn接收 p.join()
# 進程之間 數據隔離 # 憑什麼判斷 子進程是否執行完畢了???? # lock對象 # a進程 acquire了 b進程在acquire的地方一直阻塞直到a release # 你在b進程 如何知道a進程release了? # 你以前學習的lock semaphore event實際上都用到了進程之間的通訊 # 只不過這些通訊都是很是簡單而固定的信號 # 在你使用這些工具的過程當中並感知不到 # 對於用戶來說 : 就但願可以去進行一些更加複雜的 不固定的內容的交互 # 這種狀況下使用lock semaphore event就不可行了 # 進程間通訊 IPC # IPC Inter-Process Communication # 實現進程之間通訊的兩種機制: # 管道 Pipe 數據不安全 # 隊列 Queue 管道+鎖 # from multiprocessing import Queue,Process # # def consumer(q): # print( # '子進程 :', q.get() # ) # # # if __name__ == '__main__': # q = Queue() # p = Process(target=consumer,args=(q,)) # p.start() # q.put('hello,world') # 生產者消費者模型 import time from multiprocessing import Queue,Process def producer(name,food,num,q): '''生產者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生產了%s'%(name,foodi)) q.put(foodi) def consumer(name,q): while True: food = q.get() # 等待接收數據 if food == None:break print('%s吃了%s'%(name,food)) time.sleep(1) if __name__ == '__main__': q = Queue(maxsize=10) p1 = Process(target=producer,args = ('寶元','泔水',20,q)) p2 = Process(target=producer,args = ('戰山','魚刺',10,q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) p1.start() # 開始生產 p2.start() # 開始生產 c1.start() c2.start() p1.join() # 生產者結束生產了 p2.join() # 生產者結束生產了 q.put(None) # put None 操做永遠放在全部的生產者結束生產以後 q.put(None) # 有幾個消費者 就put多少個None # 爲何隊列爲空 爲滿 這件事情不夠準確 # q.qsize() 隊列的大小 # q.full() 是否滿了 滿返回True # q.empty() 是否空了 空返回True
import time from multiprocessing import JoinableQueue,Process def consumer(name,q): while True: food = q.get() time.sleep(1) print('%s消費了%s'%(name,food)) q.task_done() def producer(name,food,num,q): '''生產者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生產了%s'%(name,foodi)) q.put(foodi) q.join() # 消費者消費完畢以後會結束阻塞 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=('寶元', '泔水', 20, q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) c1.daemon = True c2.daemon = True p1.start() c1.start() c2.start() p1.join() # 消費者每消費一個數據會給隊列發送一條信息 # 當每個數據都被消費掉以後 joinablequeue的join阻塞行爲就會結束 # 以上就是爲何咱們要在生產完全部數據的時候發起一個q.join() # 隨着生產者子進程的執行完畢,說明消費者的數據都消費完畢了 # 這個時候主進程中的p1.join結束 # 主進程的代碼結束 # 守護進程也結束了
from multiprocessing import Manager,Process,Lock def work(d,lock): # with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 # d['count']-=1 ''' 等價於下面的代碼 ''' lock.acquire() d['count'] -= 1 lock.release() if __name__ == '__main__': lock=Lock() m = Manager() dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) # Manager是一個類 內部有一些數據類型可以實現進程之間的數據共享 # dict list這樣的數據 內部的數字進行自加 自減 是會引發數據不安全的,這種狀況下 須要咱們手動加鎖完成 # 所以 咱們通常狀況下 不適用這種方式來進行進程之間的通訊 # 咱們寧肯使用Queue隊列或者其餘消息中間件 來實現消息的傳遞 保證數據的安全
import time from multiprocessing import Process def func(i): i -= 1 if __name__ == '__main__': start = time.time() #計時開始 l = [] for i in range(100): p = Process(target=func,args=(i,)) p.start() l.append(p) for p in l: p.join() print(time.time() - start) #計時結束
計算時間差 import time from multiprocessing import Pool # 池 def func(i): i -= 1 if __name__ == '__main__': start = time.time() p = Pool(5) #池 你的池中打算放多少個進程,個數cpu的個數 * 1/2 p.map(func,range(100)) # 自動帶join print(time.time()-start)
from multiprocessing import Pool def f(i): i -= 1 return i**2 if __name__ == '__main__': p = Pool(5) #池 你的池中打算放多少個進程,個數cpu的個數 * 1/2 ret = p.map(f,range(100)) # 自動帶join print(ret)
import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(0.5) return i**2 if __name__ == '__main__': p = Pool(5) #你的池中打算放多少個進程,個數cpu的個數 * 1|2 for i in range(100): ret = p.apply(func,args=(i,)) # 自動帶join 串行/同步 apply就是同步提交任務 print(ret)
import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(0.3) # print(i) return i**2 if __name__ == '__main__': p = Pool(5) lst = [] for i in range(100): ret = p.apply_async(func,args=(i,)) # 自動帶join 異步的 apply_async異步提交任務 lst.append(ret) # p.close() # 關閉進程池的任務提交 今後以後不能再向p這個池提交新的任務 # p.join() # 阻塞 一直到全部的任務都執行完 # print('結束') for i in lst : print(i.get())
什麼是進程池? 有限的進程的池子 爲何要用進程池? 任務不少 cpu個數*5個任務以上 爲了節省建立和銷燬進程的時間 和 操做系統的資源 通常進程池中進程的個數: cpu的1-2倍 若是是高計算,徹底沒有io,那麼就用cpu的個數 隨着IO操做越多,可能池中的進程個數也能夠相應增長 向進程池中提交任務的三種方式 map 異步提交任務 簡便算法 接收的參數必須是 子進程要執行的func,可迭代的(可迭代中的每一項都會做爲參數被傳遞給子進程) 可以傳遞的參數是有限的,因此比起apply_async限制性比較強 apply 同步提交任務(你刪了吧) apply_async 異步提交任務 可以傳遞比map更豐富的參數,可是比較麻煩 首先 apply_async提交的任務和主進程徹底異步 能夠經過先close進程池,再join進程池的方式,強制主進程等待進程池中任務的完成 也能夠經過get獲取返回值的方式,來等待任務的返回值 咱們不能在apply_async提交任務以後直接get獲取返回值 for i in range(100): ret = p.apply_async(func,args=(i,)) # 自動帶join 異步的 apply_async異步提交任務 l.append(ret) for ret in l: print(ret.get())
回調函數 import os import time import random from multiprocessing import Pool # 池 def func(i): # [2,1,1.5,0,0.2] i -= 1 time.sleep(random.uniform(0,2)) return i**2 def back_func(args): print(args,os.getpid()) if __name__ == '__main__': print(os.getpid()) p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,),callback=back_func) # 5個任務 p.close() p.join() callback回調函數 主動執行func,而後在func執行完畢以後的返回值,直接傳遞給back_func做爲參數,調用back_func 處理池中任務的返回值 回調函數是由誰執行的? 主進程
import re import json from urllib.request import urlopen #請求頁面包 from multiprocessing import Pool def get_page(i): #頁面 ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read() ret = ret.decode('utf-8') return ret def parser_page(s): #頁面數據分析 com = re.compile( '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>' '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)評價</span>', re.S) ret = com.finditer(s) with open('file','a',encoding='utf-8') as f: for i in ret: dic = { "id": i.group("id"), "title": i.group("title"), "rating_num": i.group("rating_num"), "comment_num": i.group("comment_num"), } f.write(json.dumps(dic,ensure_ascii=False)+'\n') if __name__ == '__main__': p = Pool(5) count = 0 for i in range(10): p.apply_async(get_page,args=(count,),callback=parser_page) count += 25 p.close() p.join()
import json with open('file2','w',encoding='utf-8') as f: json.dump({'你好':'alex'},f,ensure_ascii=False)
線程 - python
線程 輕量級 進程 解決併發 總體效率高於進程 在進程中數據共享 資源共享 是進程的一部分,不能獨立存在的 被CPU調度的最小單位 使用場景 socketserver web的框架 django flask tornado 多線程來接收用戶併發的請求 python裏 同一個進程中的多個線程能不能同時使用多個cpu 在整個程序界: 若是你的程序須要數據隔離 : 多進程 若是你的程序對併發的要求很是高 : 多線程 python 初期 面向單核的 一個cpu 做爲一門腳本語言 解釋型語言 線程鎖這件事兒是由Cpython解釋器完成 對於python來講 同一時刻只能有一個線程被cpu訪問 完全的解決了多核環境下的安全問題 線程鎖 : 全局解釋器鎖 GIL 1.這個鎖是鎖線程的 2.這個鎖是解釋器提供的 多線程仍然有它的優點 你的程序中用到cpu真的多麼 若是100% 90%的時間都消耗在計算上,那麼cpython解釋器下的多線程對你來講確實沒用 可是你寫的大部分程序 的時間實際上都消耗在了IO操做上 遇到高計算型 開進程 4個進程 每一個進程裏開n個線程 換個解釋器
import os import time from threading import Thread def func(): print('start',os.getpid()) time.sleep(1) print('end') if __name__ == '__main__': t = Thread(target=func) t.start() for i in range(5): print('主線程',os.getpid()) time.sleep(0.3)
import time from threading import Thread def func(): n = 1 + 2 + 3 n ** 2 if __name__ == '__main__': start = time.time() lst = [] for i in range(100): t = Thread(target=func) t.start() lst.append(t) for t in lst: t.join() print(time.time() - start) # import time from multiprocessing import Process as Thread def func(): n = 1 + 2 + 3 n**2 if __name__ == '__main__': start = time.time() lst = [] for i in range(100): t = Thread(target=func) t.start() lst.append(t) for t in lst: t.join() print(time.time() - start)
from threading import Thread n = 100 def func(): global n n -= 1 t = Thread(target=func) t.start() t.join() print(n)
from threading import Thread class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): print('in son',self.arg) t = Mythread(123) t.start()
import time from threading import Thread,currentThread,activeCount,enumerate class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print('in son',self.arg,currentThread()) # t = Mythread(123) # t.start() # print('主',currentThread()) #當前線程 # for i in range(10): t = Mythread(123) t.start() # print(t.ident) #當前線程id print(activeCount()) #幾個活躍的線程 11 print(enumerate()) # 一共幾個線程 []
- server.py - import socket from threading import Thread def talk(conn): while True: msg = conn.recv(1024).decode() #解碼 conn.send(msg.upper().encode()) #編碼 sk = socket.socket() #建立 sk.bind(('127.0.0.1',9000)) #bind sk.listen() #監聽 while True: conn,addr = sk.accept() #接收 Thread(target=talk,args = (conn,)).start() #建立線程並開啓 - client.py - import socket sk = socket.socket() #建立 sk.connect(('127.0.0.1',9000)) #鏈接 while True: sk.send(b'hello') #發 print(sk.recv(1024)) #收
import time from threading import Thread def func(): while True: print('in func') time.sleep(0.5) def func2(): print('start func2') time.sleep(10) print('end func2') Thread(target=func2).start() t = Thread(target=func) t.setDaemon(True) #線程 守護操做 t.start() print('主線程') time.sleep(2) print('主線程結束') # 守護進程 只守護主進程的代碼,主進程代碼結束了就結束守護,守護進程在主進程以前結束 # 守護線程 隨着主線程的結束才結束,守護線程是怎麼結束的 直到主子線程都結束 進程結束 # 進程 terminate 強制結束一個進程的 # 線程 沒有強制結束的方法 # 線程結束 : 線程內部的代碼執行完畢 那麼就自動結束了
import time from threading import Thread,currentThread def func(): print(currentThread()) print('開始') for i in range(10): Thread(target=func).start() # time.sleep(2) print('主線程')
鎖 用來保證數據安全 有了GIL仍是會出現數據不安全的現象,因此仍是要用鎖 import time from threading import Thread,Lock n = 100 def func(lock): #鎖 global n #用 全局的n # n -= 1 with lock: tmp = n-1 # n-=1 # time.sleep(0.1) n = tmp if __name__ == '__main__': l = [] lock = Lock() #實例化 for i in range(100): t = Thread(target=func,args=(lock,)) #建立 t.start() #開啓 l.append(t) for t in l: t.join() print(n)
import dis n = 1 #全局空間的 += -= 操做 都不是數據安全的 def func(): n = 100 #局部空間內 永遠安全 n -= 1 dis.dis(func) # 會出現線程不安全的兩個條件 # 1.是全局變量 # 2.出現 += -=這樣的操做 #下面是解析 LOAD 僅有這個 數據安全 STORE 有load store 就會不安全 # 列表 字典 # 方法 l.append l.pop l.insert dic.update 都是線程安全的 # l[0] += 1 不安全 # d[k] += 1 不安全
# 科學家吃麪問題 import time from threading import Thread,Lock # noodle_lock = Lock() # fork_lock = Lock() # 死鎖不是時刻發生的,有偶然的狀況整個程序都崩了 # 每個線程之中不止一把鎖,而且套着使用 # 若是某一件事情須要兩個資源同時出現,那麼不該該將這兩個資源經過兩把鎖控制 # 而應看作一個資源 # # def eat1(name): # noodle_lock.acquire() # print('%s拿到麪條了'%name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # print('%s開始吃麪'%name) # time.sleep(0.2) # fork_lock.release() # print('%s放下叉子了' % name) # noodle_lock.release() # print('%s放下面了' % name) # # def eat2(name): # fork_lock.acquire() # print('%s拿到叉子了' % name) # noodle_lock.acquire() # print('%s拿到麪條了' % name) # print('%s開始吃麪' % name) # time.sleep(0.2) # noodle_lock.release() # print('%s放下面了' % name) # fork_lock.release() # print('%s放下叉子了' % name) # # Thread(target=eat1,args=('wei',)).start() # Thread(target=eat2,args=('hao',)).start() # Thread(target=eat1,args=('太',)).start() # Thread(target=eat2,args=('寶',)).start() lock = Lock() def eat1(name): lock.acquire() print('%s拿到麪條了'%name) print('%s拿到叉子了'%name) print('%s開始吃麪'%name) time.sleep(0.2) lock.release() print('%s放下叉子了' % name) print('%s放下面了' % name) def eat2(name): lock.acquire() print('%s拿到叉子了' % name) print('%s拿到麪條了' % name) print('%s開始吃麪' % name) time.sleep(0.2) lock.release() print('%s放下面了' % name) print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('寶元',)).start() # 先臨時解決 fork_lock=noodle_lock = Lock() # 而後再找到死鎖的緣由,再去修改 終極辦法一把鎖
from threading import RLock,Lock,Thread # 互斥鎖 # 不管在相同的線程仍是不一樣的線程,都只能連續acquire一次 # 要想再acquire,必須先release # 遞歸鎖 # 在同一個線程中,能夠無限次的acquire # 可是要想在其餘線程中也acquire, # 必須如今本身的線程中添加和acquire次數相同的release rlock = RLock() #每一次acquire都像進去一道門 rlock.acquire() rlock.acquire() rlock.acquire() rlock.acquire() #直到全都release 才能下我的進門 print('鎖不住') lock = Lock() #普通鎖/互斥鎖 lock.acquire() print('1') #到這裏 hang住了 lock.acquire() print('2')
from threading import RLock rlock = RLock() def func(num): rlock.acquire() #鎖 print('aaaa',num) rlock.acquire() print('bbbb',num) rlock.release() #必須 還鎖 rlock.release() #必須 Thread(target=func,args=(1,)).start() Thread(target=func,args=(2,)).start() # aaaa 1 # bbbb 1 # aaaa 2 # bbbb 2
import time from threading import RLock,Lock,Thread noodle_lock = fork_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到麪條了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s開始吃麪'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到麪條了' % name) print('%s開始吃麪' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('寶元',)).start()
import time from threading import Semaphore,Thread def func(name,sem): sem.acquire() print(name,'start') time.sleep(1) print(name,'stop') sem.release() sem = Semaphore(5) for i in range(20): Thread(target=func,args=(i,sem)).start()
from threading import Event # 事件 # wait() 阻塞 到事件內部標識爲True就中止阻塞 # 控制標識 # set # clear # is_set # 鏈接數據庫 import time import random from threading import Thread,Event def connect_sql(e): count = 0 while count < 3: e.wait(0.5) if e.is_set(): print('鏈接數據庫成功') break else: print('數據庫未鏈接成功') count += 1 def test(e): time.sleep(random.randint(0,3)) e.set() e = Event() Thread(target=test,args=(e,)).start() #測試 Thread(target=connect_sql,args=(e,)).start() #鏈接
from threading import Timer def func(): print('執行我啦') t = Timer(3,func) # 如今這個時間點我不想讓它執行,而是預估一下大概多久以後它執行比較合適 t.start() print('主線程的邏輯') t.join() print('ok ')
# wait 阻塞 # notify(n) 給信號 # 假如如今有20個線程 # 全部的線程都在wait這裏阻塞 # notify(n) n傳了多少 # 那麼wait這邊就能得到多少個解除阻塞的通知 # notifyall # acquire # release import threading def run(n): con.acquire() con.wait() print("run the thread: %s" % n) con.release() if __name__ == '__main__': con = threading.Condition() #條件 for i in range(10): #10個線程 t = threading.Thread(target=run, args=(i,)) t.start() #開啓 while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() print('****') # 設置某個條件 # 若是知足這個條件 就能夠釋放線程 # 監控測試個人網速 # 20000個任務 # 測試個人網速 /系統資源 # 發現系統資源有空閒,我就放行一部分任務
import queue # 線程隊列 線程之間數據安全 q = queue.Queue() # # 普通隊列 # q.put(1) # # print(q.get()) # try: # q.put_nowait(2) # except queue.Full: # print('您丟失了一個數據2') # print(q.get_nowait()) # 若是有數據我就取,若是沒數據不阻塞而是報錯 # 非阻塞的狀況下 q.put(10) print(q.get(timeout=2)) # # # 算法裏 棧 # lfq = queue.LifoQueue() # 棧 # lfq.put(1) # lfq.put(2) # lfq.put(3) # print(lfq.get()) # print(lfq.get()) # print(lfq.get()) # # # 優先級隊列,是根據第一個值的大小來排定優先級的 # # ascii碼越小,優先級越高 # q = queue.PriorityQueue() # q.put((2,'a')) # q.put((1,'c')) # q.put((1,'b')) # # print(q.get()) # 線程+隊列 實現生產者消費者模型
concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor #線程池 # from concurrent.futures import ProcessPoolExecutor as Pool #進程池 def func(num): print('in %s func'%num,currentThread()) time.sleep(random.random()) return num**2 tp = ThreadPoolExecutor(5) #5個線程 ret_l = [] for i in range(30): ret = tp.submit(func,i) #提交 ret_l.append(ret) for ret in ret_l: #取值 print(ret.result())
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool import os def func(num): # print('in %s func'%num,currentThread()) print('in %s func'%num,os.getpid()) time.sleep(random.random()) return num**2 if __name__ == '__main__': # tp = ThreadPoolExecutor(5) tp = Pool(5) ret = tp.map(func,range(30)) # print(list(ret)) for i in ret: print(i)
# 回調函數 add_done_callback import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool def func1(num): print('in func1 ',num,currentThread()) return num*'*' def func2(ret): print('--->',ret.result(),currentThread()) tp = Pool(5) print('主 : ',currentThread()) for i in range(10): tp.submit(func1,i).add_done_callback(func2) # 回調函數收到的參數是須要使用result()獲取的 # 回調函數是由誰執行的? 主線程
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool from urllib.request import urlopen def func(name,url): content = urlopen(url).read() return name,content def parserpage(ret): name,content = ret.result() with open(name,'wb') as f: f.write(content) urls = { # 'baidu.html':'https://www.baidu.com', # 'python.html':'https://www.python.org', # 'openstack.html':'https://www.openstack.org', 'github.html':'https://help.github.com/', 'sina.html':'http://www.sina.com.cn/' } tp = Pool(2) for k in urls: tp.submit(func,k,urls[k]).add_done_callback(parserpage)
線程 鎖 爲何有了GIL以後還須要鎖 多個線程同時操做全局變量的時候 當出現"非原子性操做",例如+= -= *= /= l.append(1) 原子性操做 a += 1 a = a + 1 tmp = a +1 a = tmp 死鎖現象 什麼是死鎖現象? 兩個以上的線程爭搶同一把鎖, 其中一個線程獲取到鎖以後不釋放 另外的其餘線程就都被鎖住了 比較容易出現問題的狀況 : 兩把鎖套在一塊兒用了 死鎖現象的本質 :代碼邏輯問題 遞歸鎖 一把鎖在同一個線程中acquire屢次而不被阻塞 若是另外的線程想要使用,必須release相同的次數, 才能釋放鎖給其餘線程 信號量 控制幾個線程同一時刻只能有n個線程執行某一段代碼 鎖 + 計數器 事件 兩件事情 一件事情要想執行依賴於另外一個任務的結果 條件 n個線程在某處阻塞 由另外一個線程控制這n個線程中有多少個線程能繼續執行 定時器 規定某一個線程在開啓以後的n秒以後執行 隊列\棧\優先級隊列 import queue 線程之間數據安全 多個線程get不可能同時取走一個數據,致使數據的重複獲取 多個線程put也不可能同時存入一個數據,致使數據的丟失 隊列 先進先出 棧 先進後出 優先級 優先級高的先出 線程池 concurrent.futrues ThreadPoolExcuter ProcessPoolExcuter submit 異步提交任務 shutdown 等待池內任務完成 result 獲取進程函數的返回值 map 異步提交任務的簡便用法 add_done_callback 回調函數 進程 主進程執行 線程
協程 - linux
進程 計算機中最小的資源分配單位 線程 計算機中能被CPU調度的最小單位 線程是由操做系統建立的,開啓和銷燬仍然佔用一些時間 調度 1.一條線程陷入阻塞以後,這一整條線程就不能再作其餘事情了 2.開啓和銷燬多條線程以及cpu在多條線程之間切換仍然依賴操做系統 你瞭解協程 ? 瞭解 協程(纖程,輕型線程) 對於操做系統來講協程是不可見的,不須要操做系統調度 協程是程序級別的操做單位 協程效率高不高 和操做系統自己沒有關係,和線程也沒有關係 而是看程序的調度是否合理 協程指的只是在同一條線程上可以互相切換的多個任務 遇到io就切換其實是咱們利用協程提升線程工做效率的一種方式
# 切換 + 狀態保存 yield import time def consumer(res): '''任務1:接收數據,處理數據''' pass def producer(): '''任務2:生產數據''' res=[] for i in range(100000): #1億次 res.append(i) return res # start=time.time() res=producer() consumer(res) # 寫成consumer(producer())會下降執行效率 stop=time.time() print(stop-start) import time def consumer(): while True: res = yield def producer(): g = consumer() next(g) for i in range(100000): g.send(i) start =time.time() producer() print(time.time() - start) # yield這種切換 就已經在一個線程中出現了多個任務,這多個任務以前的切換 本質上就是協程,consumer是一個協程,producer也是一個協程 # 單純的切換還會消耗時間 # 可是若是可以在阻塞的時候切換,而且多個程序的阻塞時間共享,協程可以很是大限度的提升效率
協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的 # greenlet 協程模塊 在多個任務之間來回切換 # gevent 基於greenlet實現的,多個任務交給gevent管理,遇到IO就使用greenlet進行切換 import time from greenlet import greenlet def play(): print('start play') g2.switch() # 開關 time.sleep(1) print('end play') def sleep(): print('start sleep') time.sleep(1) print('end sleep') g1.switch() g1 = greenlet(play) g2 = greenlet(sleep) g1.switch() # 開關
import time import gevent def play(): # 協程1 print(time.time()) print('start play') gevent.sleep(1) print('end play') def sleep(): # 協程2 print('start sleep') print('end sleep') print(time.time()) g1 = gevent.spawn(play) g2 = gevent.spawn(sleep) # g1.join() # g2.join() # 精準的控制協程任務,必定是執行完畢以後join當即結束阻塞 gevent.joinall([g1,g2])
from gevent import monkey;monkey.patch_all() import time import gevent url_lst = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/'] def get_page(url): ret = urlopen(url).read() return ret.decode('utf-8') start = time.time() g_l = [] for url in url_lst: g = gevent.spawn(get_page,url) g_l.append(g) # gevent.joinall(g_l) print(time.time()-start)
- server.py - from gevent import monkey;monkey.patch_all() import socket import gevent def talk(conn): while True: msg = conn.recv(1024).decode() conn.send(msg.upper().encode()) sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() gevent.spawn(talk,conn) - client.py - import socket import threading def task(): sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'hello') print(sk.recv(1024)) for i in range(500): threading.Thread(target=task).start()
協程
一條線程在多個任務之間相互切換
數據安全的
不能利用多核
可以規避一個線程上的IO阻塞
一條線程可以起500個協程
4c的機器
5個進程
每個進程20個線程
每個線程500個協程
5*20*500 = 50000