#第一種 from multiprocessing import Process import time def func(name):( print(f'{name}子進程開始') time.sleep(1) print(f'{name}子進程結束') if __name__=='__main__': times=time.time() x = Process(target=func,args('宋',)) #裏面必定要放元組 x.start() #啓動進程,並調用該子進程中的x.run() time.sleep(1) #x.run()調用target制定的函數,啓動進程運行的方法 print('主進程啓動') print(f'{time.time()-times}') #第二種 from multiprocessing import Process import time class A(Process): #父類必定是Process def __init__(self,name): self.name=name super().__init__() def run(self): #必定要有run方法!!!!!! print(f'{self.name}子進程開始') time.sleep(1) print(f'{self.name}子進程結束') if __name__=='__main__': times=time.time() x=A('宋',) x.start() #只是想操做系統發憷一個開闢子進程的信號,執行下一行,信號接收到了,會在內存總開闢一個進程空間,而後再講主進程全部數據copy加載到子進程,而後在調用CPU去執行,開闢子進程開銷是很大的 time.sleep(1) print(f'{time.time()-times}主進程開啓') #因此永遠會先執行主進程的代碼 x.terminate() #強制終止進程x 不會進行任何清理操做 x.is_alive() #判斷進程是否是還活着 x.join() #主進程等待執行完子進程之後再執行主進程 x.daemon() #默認值爲Flase 表明x爲後臺運行的守護進程 x.name #查看進程的名稱 x.pid #進程的pid
注意:在Windows中process()必須放到if __name__=='__main__'下前端
import os import time print(f'子進程:{os.getpid()}') print(f'主進程:{os.getppid()}')
進程與進程之間是有物理隔離,不能共享內存的數據(lock,隊列)python
from multiprocessing import Process import time import os name='song' def task(): global name name='111' print(f'{name}') if __name__=='__main__': p.Process(target=task) p.start() time.sleep(2) print(f'主開始{name}') from multiprocessing import Process import time import os lst=[11,22,33] def task(): global name name.append(44) print(f'{lst}') if __name__=='__main__': p.Process(target=task) p.start() time.sleep(2) print(f'主開始{lst}') #可變不可變都會隔離
#join就是主進程先讓子進程執行結束,再執行主進程 from multiprocessing import Process import time def func(name): print(f'{name} is running') time.sleep(1) print(f'{name} is out') if __name__ == '__main__': p = Process(target=task,args=('song',)) # 建立一個進程對象 p.start() p.join() print('==主開始') #多個子進程使用join from multiprocessing import Process import time def func(name,s): print(f'{name}開始') time.sleep(s) print(f'{name}結束') def func1(name,s): print(f'{name}開始') time.sleep(s) print(f'{name}結束') def func2(name,s): print(f'{name}開始') time.sleep(s) print(f'{name}結束') if __name__ == '__main__': times=time.time() li=[] for i in range(1,4): x = Process(target=func,args=('song',i)) x.start() li.append(x) # for i in li: i.join() print('主進程開始') x=Process(target=func,args=('宋',3)) x1=Process(target=func,args=('李',2)) x2=Process(target=func,args=('王',1)) x.start() x1.start() x2.start() # x.join() print(f'{time.time()-times}') x1.join() print(f'{time.time()-times}') x2.join() print(f'主進程開啓{time.time()-times}')
.代碼優化程序員
from multiprocessing import Process import time def func(name,s): print(f'{name}開始') time.sleep(s) print(f'{name}結束') if __name__ == '__main__': times=time.time() li=[] for i in range(1,4): x = Process(target=func,args=('song',i)) x.start() li.append(x) for i in li: i.join() print('主進程開始')
x.terminate() #殺死子進程 x.join() #先執行子進程,後執行主進程 print(x.is_alive()) #判斷進程還在不在 # from multiprocessing import Process # import time # # def task(name): # print(f'{name} is running') # time.sleep(2) # print(f'{name} is gone') # # # # if __name__ == '__main__': # # 在windows環境下, 開啓進程必須在 __name__ == '__main__' 下面 # # p = Process(target=task,args=('常鑫',)) # 建立一個進程對象 # p = Process(target=task,args=('常鑫',),name='alex') # 建立一個進程對象 # p.start() # # time.sleep(1) # # p.terminate() # 殺死子進程 *** # # p.join() # *** # # time.sleep(0.5) # # print(p.is_alive()) # *** # # print(p.name) # p.name = 'sb' # print(p.name) # print('==主開始')
#子進程守護主進程,只要結束主進程,子進程也隨之結束 x.daemon=True from multiprocessing import Process import time def func(name): print(f'{name}開始') time.sleep(1) print(f'{name}結束') if __name__ == '__main__': x.Process(target=func,args('song')) x.daemon=True #在發送信號以前開啓 守護進程,將X子進程設置成守護進程,只要主進程結束,守護進程立刻結束 x.start() x.join() print('666主進程開始')
爲何主進程不在子進程結束會對其立刻回收?面試
Unix針對於上面的問題,提供了一個機制編程
全部的子進程結束以後,立馬回釋放掉文件的操做連接,內存的大部分數據,會保留一些內容,進程號,結束時間,運行狀態,等待主進程監測,回收json
殭屍進程:全部的子進程結束以後,在準備回收以前,都會進入殭屍進程狀態windows
殭屍進程有危害:若是父進程不對殭屍進程進行回收(wait//waitpid),產生大量的殭屍進程,這樣就會佔用內存,佔用進程pid號後端
孤兒進程:父進程因爲某種緣由結束了,可是你的子進程還在運行中,這樣你的這些子進程就成了孤兒進程,.你的父進程若是結束了.你的全部的孤兒進程就會被inIT進程回收,init就變成了你的父進程,對孤兒進程進行回收安全
殭屍進程如何解決::::服務器
父進程產生了大量的子進程,可是不回收,這樣就會造成大量的殭屍進程,解決方式就是直接殺死父進程將全部的殭屍進程變成孤兒進程而後init進行照顧,由init進程進行回收
三個同事,同時用一個打印機打印內容 模擬三個進程模仿三個同事,輸出平臺模擬打印機 from multiprocessing import Process import time,random def func1(name): print(f'{name}開始') time.sleep(random.randint(1, 3)) print(f'{name}結束') def func2(name): print(f'{name}開始') time.sleep(random.randint(1, 3)) print(f'{name}結束') def func3(name): print(f'{name}開始') time.sleep(random.randint(1, 3)) print(f'{name}結束') if __name__ == '__main__': x1=Process(target=func1,args=('宋',)) x2=Process(target=func2,args=('佳',)) x3=Process(target=func3,args=('凡',)) x1.start() x2.start() x3.start() #併發的搶佔打印機 #併發效率優先,可是不是順序優先 #多個進程共槍一個資源,須要保證順序,一個一個來 串行 版本二 加join 變成串行,可是這個順序是固定的, 版本三 from multiprocessing import Process,Lock import time,random def func1(lock,name): lock.acquire() print(f'{name}開始') time.sleep(random.randint(1,3)) print(f'{name}結束') lock.release() def func2(lock,name): lock.acquire() print(f'{name}開始') time.sleep(random.randint(1,3)) print(f'{name}結束') lock.release() def func3(lock,name): lock.acquire() print(f'{name}開始') time.sleep(random.randint(1,3)) print(f'{name}結束') lock.release() if __name__ == '__main__': lock=Lock() x1=Process(target=func1,args=(lock,'宋',)) x2=Process(target=func2,args=(lock,'佳',)) x3=Process(target=func3,args=(lock,'凡',)) x1.start() x2.start() x3.start() #保證了順序,保證了先到先得的原則,串行
共同點:均可以阻塞,均可以把併發變成併發,保證了順序 不一樣點:join是人爲的設定順序,lock讓其爭搶順序,保證了公平性
進程在內存級別是隔離的,可是文件在磁盤上
基於文件通訊
搶票系統 #思路分析:先能夠查票,查詢剩餘票數 併發 均可以查 #購買,向服務器發送請求,服務端接受,在後端將票數修改,返回給前端,串行 一個一個的買 from multiprocessing import Process,Lock import os,time,random,json def search(): time.sleep(random.randint(1,3)) with open('song.json','r',encoding='utf-8')as f: dic=json.load(f) print(f'{os.getpid()}查看了票,還剩{dic["count"]}張票') def buy(): with open('song.json','r',encoding='utf-8')as f1: dic=json.load(f1) if dic['count']>0: dic['count']-=1 time.sleep(random.randint(1,3)) with open('song.json','w',encoding='utf-8')as f2: json.dump(dic,f2) print(f'{os.getpid()}購票成功') def run(lock): search() lock.acquire() buy() lock.release() if __name__ == '__main__': lock=Lock() for i in range(6): x= Process(target=run,args=(lock,)) x.start() #當多個進程共槍一個數據時,保證數據安全,必須加鎖和串行 #互斥鎖 能夠公平的保證順序的問題 以及數據的安全 #基於文件的進程之間的通訊 效率低 本身加鎖麻煩並且很容易出現死鎖
基於隊列通訊
from multiprocessing import Queue q=Queue(3) #裏面能夠限制最大有幾個 def func(): print(1) q.put(1) #往隊列裏面放入1 q.put('adfadf') q.put(func) '啥也能放' q.get() q.get() print(q.get()) '通常放幾個就取幾個' print(q.get(timeout=3)) #阻塞三秒,給他計時,三秒以後還阻塞就主動拋出錯誤 隊列:把隊列理解成一個容器,這個容器能夠承載一些數據 特性:先進先出 永遠保持這個數據,FIFO 羽毛球筒 #若是隊列裏面是限制三個數據 要是放四個數據 或是取四個數據 會阻塞 #解除阻塞 在別的進程裏面取出來 或者添加 print(q.get(block=False)) #只要遇到阻塞就會報錯 print(q.get(timeout=2)) q.put(3,block=False) q.put(3,timeout=2) 都報錯
基於管道通訊
生產者與消費者模型
from multiprocessing import Process,Queue import time,random def producer(q,name): for i in range(1,6): ret=f'第{i}個包子' q.put(ret) print(f'生產者:{name},生產的{ret}') def consumer(q,name): while 1: try: s1=q.get(timeout=2) time.sleep(random.randint(1,3)) print(f'消費者:{name},吃了{s1}') except Exception: return if __name__=='__main__': q=Queue() x=Process(target=producer,args=(q,'宋')) x1=Process(target=consumer,args=(q,'李')) x.start() x1.start()
生產者消費者模型:編程思想,設計理念,理論等等,都是交給你一種編程的方法,之後遇到相似的狀況,套用便可
生產者消費者模型三要素:
生產者:產生數據的
消費者:接受數據作進一步處理
容器:隊列
隊列容易起到的做用:起到緩衝做用,平衡生產力和消費力 ,解耦
1.什麼是線程 是一條流水線的工做流程 進程:在內存中開啓一個進程空間,而後將主進程的全部資源數據複製一份,而後調用CPU去執行這些代碼 開啓一個進程:在內存中開啓一個進程空間,而後將主進程的全部資源數據複製一份,而後調用線程去執行代碼 進程是資源單位,線程是執行單位
開啓一個進程:進程會在內存,中開闢一個進程空間,將主進程的資源數據去複製一份,線程會執行裏面的代碼
開啓進程的開銷很是大,比開啓線程的開銷大不少
開啓線程的速度很是快,要比進程快幾十上百倍
from threading import Thread # from multiprocessing import Process # import os # # def work(): # print('hello') # # if __name__ == '__main__': # #在主進程下開啓線程 # t=Process(target=work) # t.start() # print('主線程/主進程') # 多線程 # from threading import Thread # import time # # def task(name): # print(f'{name} is running') # time.sleep(1) # print(f'{name} is gone') # # # # if __name__ == '__main__': # # t1 = Thread(target=task,args=('海狗',)) # t1.start() # print('===主線程') # 線程是沒有主次之分的.
# from threading import Thread # import os # # x = 3 # def task(): # global x # x = 100 # # if __name__ == '__main__': # # t1 = Thread(target=task) # t1.start() # t1.join() # print(f'===主線程{x}') # 同一進程內的資源數據對於這個進程的多個線程來講是共享的.
#第一種 from threading import Thread import time def func(): print('子線程開') time.sleep(1) print('子線程關') if __name__=='__main__': x=Thread(target=func) x.start() print('主線程開') #通常是子線程先開再開主線程 #第二種 from threading import Thread import time class A(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): #必需要有run這個方法 print('子線程開') time.sleep(1) print('子線程關') x=A('song') #能夠不寫__name__=='__main__' x.start() print('主線程開')
from threading import Thread,enumerate,activeCount,CurrentThread import time,os,random def func(): print(666) if __name__ == '__main__=': x=Thread(target=func) x.start() print(x.name) #查詢name屬性 父類中是 Thread-1 x.steName('song') #添加name屬性或者修改name屬性 print(x.getname()) #查獲name屬性 print(x.isAlive) #返回線程是否活動的 print(currentThread()) #返回當前的線程變量 print(enumernate()) #返回一個包換正在運功的線程列表list 線程啓動後,結束前 print(activecount()) #返回正在運行的線程數量 #主線程等待子線程結束 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''
#join 阻塞 告知主線程要等待子線程執行完畢後再執行主線程 from threading import Thread import time # def task(name): print(f'{name} is running') time.sleep(1) print(f'{name} is gone') # # if __name__ == '__main__': start_time = time.time() t1 = Thread(target=task,args=('海狗',)) t2 = Thread(target=task,args=('海狗1',)) t3 = Thread(target=task,args=('海狗2',)) # t1.start() t1.join() t2.start() t2.join() t3.start() t3.join() print(f'===主線程{time.time() - start_time}') # 線程是沒有主次之分的. $$$$守護線程 from multiprocessing import Process import os,time,random def func(): print('子進程開') time.sleep(1) print('子進程罐') def foo(): print('1 kai') time.sleep(1) print('1 guan') if __name__=='__main__': x=Process(target=func) x1=Process(target=foo) x.daemon=True x.start() x1.start() time.sleep(1) #注意沒有睡眠一秒 就會直接不執行x子進程 print('主進程開') $$$$守護線程 from threading import Thread import time def func(name): print('zou') time.sleep(2) print('gunle') if __name__ == '__main__': x=Thread(target=func,args=('宋',)) x1=Thread(target=func,args=('反',)) x1.daemon=True x.start() x1.start() print('主線程開') #主線程何時結束: 非守護進程與主線程結束以後,再結束
from threading import Thread import time import random x = 10 def task(): time.sleep(random.randint(1,2)) global x temp = x time.sleep(random.randint(1, 3)) temp = temp - 1 x = temp if __name__ == '__main__': l1 = [] for i in range(10): t = Thread(target=task) l1.append(t) t.start() for i in l1: i.join() print(f'主線程{x}') #多個任務共搶一個數據,保證數據的安全的目的,要讓其串行 from threading import Thread from threading import Lock import time import random x = 100 def task(lock): lock.acquire() # time.sleep(random.randint(1,2)) global x temp = x time.sleep(0.01) temp = temp - 1 x = temp lock.release() if __name__ == '__main__': mutex = Lock() l1 = [] for i in range(100): t = Thread(target=task,args=(mutex,)) l1.append(t) t.start() time.sleep(3) print(f'主線程{x}')
from threading import Thread,Lock import time locka=Lock() lockb=Lock() class A(Thread): def run(self): self.f1() self.f2() def f1(self): locka.acquire() print(f'{self.name}拿到a') lockb.acquire() print(f'{self.name}拿到b') locka.release() lockb.release() def f2(self): lockb.acquire() print(f'{self.name}拿到b') time.sleep(0.1) locka.acquire() print(f'{self.name}拿到a') lockb.release() locka.release() if __name__ == '__main__': for i in range(3): x = A() x.start()
解決死鎖用遞歸鎖就能夠,遞歸鎖有一個計數功能,上鎖加1,解鎖-1
只要遞歸鎖上面的數字不爲零,其餘線程就不能搶鎖
from threading import Thread,RLock import time locka=lockb=RLock() class A(Thread): def run(self): self.f1() self.f2() def f1(self): locka.acquire() print(f'{self.name}拿到a') lockb.acquire() print(f'{self.name}拿到b') locka.release() lockb.release() def f2(self): lockb.acquire() print(f'{self.name}拿到b') time.sleep(0.1) locka.acquire() print(f'{self.name}拿到a') lockb.release() locka.release() if __name__ == '__main__': for i in range(3): x = A() x.start() # RLock 遞歸鎖使用方法 locka=loakb=RLock() $$$遞歸鎖能夠解決死鎖現象,業務須要多個鎖時,要先考慮遞歸鎖
也是一種鎖,控制併發數量
from threading import Thread,Semaphore,currentThread import time ,random s=Semaphore(5) #Semaphore 賦值一個變量 限制分次數,而後用這個帶上鎖 def func(): s.acquire() print(f'{currentThread().name}廁所ing') time.sleep(random.randint(1,3)) s.release() if __name__ == '__main__': for i in range(10): x=Thread(target=func,) x.start()
理論上來講,單個進程的多線程能夠利用多核,可是開發cpython解釋器的程序員,給進入解釋器的線程加了鎖
當時都是單核時代,並且CPU價格很是貴,
若是不加全局解釋器鎖,開發cpython解釋器的程序員就會在源碼內部各類主動加鎖,解鎖,很是麻煩,各類死鎖現象,爲了省事,直接進入解釋器時給線程加了一個鎖
優勢:保證了cpython解釋器的數據資源的安全
缺點:單個進程的多線程不能利用多核
jpython以及pypy沒有GIL鎖
多核時代,將GIL鎖去掉能夠嗎?
由於cpython解釋器的全部的業務邏輯都是圍繞着單個線程實現的,去掉這個GIL鎖,幾乎不可能
單個進程的多線程能夠併發,可是不能利用多核,不能並行
多個進程能夠並行,併發
相同點:都是同種鎖,互斥鎖
不一樣點:GIL全局解釋器鎖,保護解釋器內部的資源數據的安全
GIL鎖,上鎖 釋放 無需手動操做
本身代碼中定義的互斥鎖保護進程中的資源數據的安全
本身定義的互斥鎖必須本身手動上鎖,釋放鎖
io密集型
#io密集型:單個進程的多線程合適 併發執行
計算密集型
計算密集型:適合多進程的並行
from threading import Thread from multiprocessing import Process import time,random # 計算密集型:單個進程的多線程併發vs進程的併發執行 def func(): count=0 for i in range(100000000): count+=1 if __name__ == '__main__': times=time.time() lst=[] for i in range(4): x=Process(target=func) lst.append(x) x.start() for i in lst: i.join() print(f'{time.time()-times}') #17.108332633972168 if __name__ == '__main__': times=time.time() lst=[] for i in range(4): x=Thread(target=func) lst.append(x) x.start() for i in lst: i.join() print(f'{time.time()-times}') #30.25704336166382 總結:計算密集型,仍是用多進程的併發效率高(前提是數據比較大 #io密集型 # io密集型 單個進程的多線程併發vs多個進程的併發並行 from threading import Thread from multiprocessing import Process import time,random def func(): count=0 time.sleep(random.randint(1,3)) count+=1 if __name__ == '__main__': times=time.time() li=[] for i in range(4): x=Process(target=func) li.append(x) x.start() for i in li: i.join() print(f'{time.time()-times}') #多進程3.244175910949707 if __name__ == '__main__': times=time.time() li=[] for i in range(4): x=Thread(target=func) li.append(x) x.start() for i in li: i.join() print(f'{time.time()-times}') #多線程2.002326250076294 計算io密集型的仍是用多線程的併發合適效率高
不管是多線程仍是多進程,若是按照上面的寫法,來一個
客戶端請求,我就開一個線程,來一個請求開一個線程,
應該是這樣: 你的計算機容許範圍內,開啓的線程進程數
量越多越好
#服務端 import socket,time from threading import Thread def foo(conn,addr): while 1: try: s=conn.recv(1024).decode('utf-8') print(f'{addr[1]}{s}') time.sleep(1) s1=input('<<<<').strip().encode('utf-8') conn.send(s1) except Exception: break conn.close() def func(): server = socket.socket() server.bind(('127.0.0.1', 8848)) server.listen(5) while 1: conn, addr = server.accept() x=Thread(target=foo,args=(conn,addr)) x.start() if __name__ == '__main__': func() #客戶端 import socket client=socket.socket() client.connect(('127.0.0.1',8890)) while 1: s=input('<<<<').strip().encode('utf-8') client.send(s) s1=client.recv(1024).decode('utf-8') print(f'{s1}') client.close()
線程池:一個容器,這個容器限制住你開啓線程的數量,好比四個,第一次確定只能併發的處理四個任務,只要有任務完成,線程立刻就會接下一個任務.以時間換空間 #默認進程池,是你的計算機cpu核數 #默認線程池 是你的計算機CPU核數 *5 乘5 #查看cpu核數 import os # print(os.cpu_count())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random print(os.cpu_count()) #查看計算機核數 def func(): print(f'{os.getpid()}接客') time.sleep(random.randint(1,2)) if __name__ == '__main__': #進程池(並行+併發 p=ProcessPoolExecutor() # 默認不寫,進程池裏面的進程數與cpu個數相等 for i in range(5): p.submit(func,) if __name__ == '__main__': #線程池(併發 p=ThreadPoolExecutor(3) # 默認不寫, cpu個數*5 線程數 p.submit(func) p.submit(func) p.submit(func) p.submit(func) p.submit(func) p.submit(func)