顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。php
進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。html
因此想要真正瞭解進程,必須事先了解操做系統,python
點擊連接地址 http://www.cnblogs.com/linhaifeng/p/6295875.htmllinux
PS:即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。nginx
#一 操做系統的做用:程序員
1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口web
2:管理、調度進程,而且將多個進程對硬件的競爭變得有序算法
#二 多道技術:shell
1.產生背景:針對單核,實現併發數據庫
ps:
如今的主機通常是多核,那麼每一個核都會利用多道技術
有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個
cpu中的任意一個,具體由操做系統調度算法決定。
2.空間上的複用:如內存中同時有多道程序
3.時間上的複用:複用一個cpu的時間片
強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣
才能保證下次切換回來時,能基於上次切走的位置繼續運行
進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。
舉例(單核+多道,實現多個進程的併發執行):
egon在一個時間段內有不少任務要作:python備課的任務,寫書的任務,交女友的任務,王者榮耀上分的任務,
但egon同一時刻只能作一個任務(cpu同一時間只能幹一個活),如何才能玩出多個任務併發執行的效果?
egon備一會課,再去跟李傑的女友聊聊天,再去打一會王者榮耀....這就保證了每一個任務都在進行中.
程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。
舉例:
想象一位有一手好廚藝的計算機科學家egon正在爲他的女兒元昊烘製生日蛋糕。
他有作生日蛋糕的食譜,
廚房裏有所需的原料:麪粉、雞蛋、韭菜,蒜泥等。
在這個比喻中:
作蛋糕的食譜就是程序(即用適當形式描述的算法)
計算機科學家就是處理器(cpu)
而作蛋糕的各類原料就是輸入數據。
進程就是廚師閱讀食譜、取來各類原料以及烘製蛋糕等一系列動做的總和。
如今假設計算機科學家egon的兒子alex哭着跑了進來,說:XXXXXXXXXXXXXX。
科學家egon想了想,處理兒子alex蟄傷的任務比給女兒元昊作蛋糕的任務更重要,因而
計算機科學家就記錄下他照着食譜作到哪兒了(保存進程的當前狀態),而後拿出一本急救手冊,按照其中的指示處理蟄傷。這裏,咱們看處處理機從一個進程(作蛋糕)切換到另外一個高優先級的進程(實施醫療救治),每一個進程擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完以後,這位計算機科學家又回來作蛋糕,從他
離開時的那一步繼續作下去。
須要強調的是:同一個程序執行兩次,那也是兩個進程,好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放蒼井空,一個能夠播放飯島愛。
不管是並行仍是併發,在用戶看來都是'同時'運行的,無論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務
###################單cpu,多進程,併發舉例一################## 你是一個cpu,你同時談了三個女友,每個均可以是一個戀愛任務,你被這三個任務共享 要玩出併發戀愛的效果, 應該是你先跟女朋友1去看電影,看了一會說:很差,我要拉肚子,而後跑去跟第二個女朋友吃飯,吃了一會說:那啥,我 去趟洗手間,而後跑去跟女朋友3開了個房
####################單cpu,多進程,併發舉例二######################## 某天下午,egon,yuanhao,wupeiqi,alex約好了一塊兒去嫖娼,但娼只有一個,cpu只有一個,可是卻要‘同時’幹 四個任務(嫖出併發的效果),那就必須是幹一會egon,再幹一會yuanhao,再幹一會wupeiqi,再幹一會alex egon:花了200塊錢,由於人美活好 yuanhao:500塊錢 wupeiqi:100塊錢,多是不太行 alex:沒要錢,爲啥???由於你們剛剛嫖的是他女友
單核下,能夠利用多道技術,多個核,每一個核也均可以利用多道技術(多道技術是針對單核而言的)
有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,
一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術
而一旦任務1的I/O結束了,操做系統會從新調用它(需知進程的調度、分配給哪一個cpu運行,由操做系統說了算),可能被分配給四個cpu中的任意一個去執行
全部現代計算機常常會在同一時間作不少件事,一個用戶的PC(不管是單cpu仍是多cpu),均可以同時運行多個任務(一個任務能夠理解爲一個進程)。
啓動一個進程來殺毒(360軟件)
啓動一個進程來看電影(暴風影音)
啓動一個進程來聊天(騰訊QQ)
全部的這些進程都需被管理,因而一個支持多進程的多道程序系統是相當重要的
多道技術概念回顧:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個,使每一個進程各自運行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,即僞併發,以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)
#所謂同步,就是在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。可是通常而言,咱們在說同步、異步的時候,特指那些須要其餘部件協做或者須要必定時間完成的任務。 #舉例: #1. multiprocessing.Pool下的apply #發起同步調用後,就在原地等着任務結束,根本不考慮任務是在計算仍是在io阻塞,總之就是一股腦地等任務結束 #2. concurrent.futures.ProcessPoolExecutor().submit(func,).result() #3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()
#異步的概念和同步相對。當一個異步功能調用發出後,調用者不能馬上獲得結果。當該異步功能完成後,經過狀態、通知或回調來通知調用者。若是異步功能用狀態來通知,那麼調用者就須要每隔必定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這實際上是一 種很嚴重的錯誤)。若是是使用通知的方式,效率則很高,由於異步功能幾乎不須要作額外的操做。至於回調函數,其實和通知沒太多區別。 #舉例: #1. multiprocessing.Pool().apply_async() #發起異步調用後,並不會等待任務結束才返回,相反,會當即獲取一個臨時結果(並非最終的結果,多是封裝好的一個對象)。 #2. concurrent.futures.ProcessPoolExecutor(3).submit(func,) #3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)
#阻塞調用是指調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不一樣的。對於同步調用來講,不少時候當前線程仍是激活的,只是從邏輯上當前函數沒有返回而已。 #舉例: #1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果爲止,但並未阻塞住(即使是被搶走cpu的執行權限,那也是處於就緒態); #2. 阻塞調用:當socket工做在阻塞模式的時候,若是沒有數據的狀況下調用recv函數,則當前線程就會被掛起,直到有數據爲止。
#非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。
#1. 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。而異步狀況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行當,函數返回的時候經過狀態、通知、事件等方式通知進程任務完成。 #2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能知足的時候就將進程掛起,而非阻塞則不會阻塞當前進程
但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。
而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4種形式建立新的進程
1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)
2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)
4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)
不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的:
1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)
2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。
關於建立的子進程,UNIX和windows
1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另一個進程。
2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。
1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自願,python a.py中a.py不存在)
3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等,能夠捕捉異常,try...except...)
4. 被其餘進程殺死(非自願,如kill -9)
不管UNIX仍是windows,進程只有一個父進程,不一樣的是:
1. 在UNIX中全部的進程,都是以init進程爲根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的全部成員。
2. 在windows中,沒有進程層次的概念,全部的進程都是地位相同的,惟一相似於進程層次的暗示,是在建立進程時,父進程獲得一個特別的令牌(稱爲句柄),該句柄能夠用來控制子進程,可是父進程有權把該句柄傳給其餘子進程,這樣就沒有層次了。
tail -f access.log |grep '404'
執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。
進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行
其實在兩種狀況下會致使一個進程在邏輯上不能運行,
1. 進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做
2. 與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU。
於是一個進程由三種狀態
進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)
該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配情況、全部打開文件的狀態、賬號和調度信息,以及其餘在進程由運行態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣。
python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
group參數未使用,值始終爲None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name爲子進程的名稱
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
注意:在windows中Process()必須放到# if __name__ == '__main__':下
#######################詳細解釋######################### Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
#########################方法一########################### #開進程的方法一: import time import random from multiprocessing import Process def piao(name): print('%s piaoing' %name) time.sleep(random.randrange(1,5)) print('%s piao end' %name) p1=Process(target=piao,args=('egon',)) #必須加,號 p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeqi',)) p4=Process(target=piao,args=('yuanhao',)) p1.start() p2.start() p3.start() p4.start() print('主線程')
############################方法二######################### #開進程的方法二: import time import random from multiprocessing import Process class Piao(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s piao end' %self.name) p1=Piao('egon') p2=Piao('alex') p3=Piao('wupeiqi') p4=Piao('yuanhao') p1.start() #start會自動調用run p2.start() p3.start() p4.start() print('主線程')
###############方式一:#################### from multiprocessing import Process import time def task(name): print('%s is running' %name) time.sleep(5) print('%s is done' %name) if __name__ == '__main__': p=Process(target=task,args=('alex',)) p.start() print('主')
###############方式二:################## # from multiprocessing import Process # import time # # class MyProcess(Process): # def __init__(self,name): # super(MyProcess,self).__init__() # self.name=name # # def run(self): # print('%s is running' %self.name) # time.sleep(3) # print('%s is done' %self.name) # # if __name__ == '__main__': # p=MyProcess('進程1') # p.start() #p.run() # print('主')
################實例一################ from multiprocessing import Process n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了 def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': p=Process(target=work) p.start() print('主進程內: ',n)
#################實例二########################## from multiprocessing import Process import time n=100 def task(): global n time.sleep(5) n=0 if __name__ == '__main__': p=Process(target=task) p.start() # time.sleep(5) print(p.is_alive()) p.join() print(p.is_alive()) print('主',n)
##############################server端############################### from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程必定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
###########################多個client端########################## from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
##############這麼實現有沒有問題???####################### 每來一個客戶端,都在服務端開啓一個進程,若是併發來一個萬個客戶端,要開啓一萬個進程嗎,你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。 解決方法:進程池
########################join:主進程等,等待子進程結束######################### from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() p.join(0.0001) #等待p中止,等0.0001秒就再也不等了 print('開始')
#####################有了join,程序不就是串行了嗎???################ from multiprocessing import Process import time import random def piao(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',)) p1.start() p2.start() p3.start() p4.start() #有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎? #固然不是了,必須明確:p.join()是讓誰等? #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p, #詳細解析以下: #進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了 #而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 #join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待 # 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 p1.join() p2.join() p3.join() p4.join() print('主線程') #上述啓動進程與join進程能夠簡寫爲 # p_l=[p1,p2,p3,p4] # # for p in p_l: # p.start() # # for p in p_l: # p.join()
# from multiprocessing import Process # import time # import os # # def task(n): # print('%s is runing' %os.getpid()) # time.sleep(n) # print('%s is done' %os.getpid()) # # # if __name__ == '__main__': # # start_time=time.time() # p1=Process(target=task,args=(1,)) # p2=Process(target=task,args=(2,)) # p3=Process(target=task,args=(3,)) # # p_l=[p1,p2,p3] # # p1.start() # # p2.start() # # p3.start() # for p in p_l: # p.start() # # # p3.join() #3 # # p1.join() # # # p2.join() # # for p in p_l: # p.join() # stop_time=time.time() # print('主',(stop_time-start_time)) # from multiprocessing import Process # import time # import os # # def task(n): # print('%s is runing' %os.getpid()) # time.sleep(n) # print('%s is done' %os.getpid()) # # # if __name__ == '__main__': # # start_time=time.time() # p1=Process(target=task,args=(1,)) # p2=Process(target=task,args=(2,)) # p3=Process(target=task,args=(3,)) # # p_l=[p1,p2,p3] # # p1.start() # # p2.start() # # p3.start() # for p in p_l: # p.start() # # # p3.join() #3 # # p1.join() # # # p2.join() # # for p in p_l: # p.join() # stop_time=time.time() # print('主',(stop_time-start_time)) from multiprocessing import Process import time import os def task(n): print('%s is runing' %os.getpid()) time.sleep(n) print('%s is done' %os.getpid()) if __name__ == '__main__': start_time=time.time() p1=Process(target=task,args=(1,)) p2=Process(target=task,args=(2,)) p3=Process(target=task,args=(3,)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() stop_time=time.time() print('主',(stop_time-start_time))
#######################terminate與is_alive####################### #進程對象的其餘方法一:terminate,is_alive from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s is piao end' %self.name) p1=Piao('egon1') p1.start() p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活 print(p1.is_alive()) #結果爲True print('開始') print(p1.is_alive()) #結果爲False
########################name與pid######################## from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): # self.name=name # super().__init__() #Process的__init__方法會執行self.name=Piao-1, # #因此加到這裏,會覆蓋咱們的self.name=name #爲咱們開啓的進程設置名字的作法 super().__init__() self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() print('開始') print(p.pid) #查看pid from multiprocessing import Process import time import os def task(n): print('pid:%s ppid:%s' %(os.getpid(),os.getppid())) time.sleep(n) if __name__ == '__main__': p=Process(target=task,args=(15,),name='進程1') p.start() p.terminate() # time.sleep(1) print(p.is_alive()) print('主pid:%s ppid:%s' %(os.getpid(),os.getppid())) # print(p.pid) p.name='xxxx' print(p.name)
參考博客:http://www.cnblogs.com/Anker/p/3271773.html 一:殭屍進程(有害) 殭屍進程:一個進程使用fork建立子進程,若是子進程退出,而父進程並無調用wait或waitpid獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之爲僵死進程。詳解以下 咱們知道在unix/linux中,正常狀況下子進程是經過父進程建立的,子進程在建立新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠沒法預測子進程到底何時結束,若是子進程一結束就馬上回收其所有資源,那麼在父進程內將沒法獲取子進程的狀態信息。 所以,UNⅨ提供了一種機制能夠保證父進程能夠在任意時刻獲取子進程結束時的狀態信息: 1、在每一個進程退出的時候,內核釋放該進程全部的資源,包括打開的文件,佔用的內存等。可是仍然爲其保留必定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等) 二、直到父進程經過wait / waitpid來取時才釋放. 但這樣就致使了問題,若是進程不調用wait / waitpid的話,那麼保留的那段信息就不會釋放,其進程號就會一直被佔用,可是系統所能使用的進程號是有限的,若是大量的產生僵死進程,將由於沒有可用的進程號而致使系統不能產生新的進程. 此即爲殭屍進程的危害,應當避免。 任何一個子進程(init除外)在exit()以後,並不是立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程處理。這是每一個子進程在結束時都要通過的階段。若是子進程在exit()以後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。若是父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不通過殭屍狀態。 若是父進程在子進程結束以前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。
二:孤兒進程(無害) 孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被init進程(進程號爲1)所收養,並由init進程對它們完成狀態收集工做。 孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工做。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置爲init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程淒涼地結束了其生命週期的時候,init進程就會表明黨和政府出面處理它的一切善後工做。所以孤兒進程並不會有什麼危害。 咱們來測試一下(建立完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成爲孤兒進程,而非殭屍進程),文件內容 import os import sys import time pid = os.getpid() ppid = os.getppid() print 'im father', 'pid', pid, 'ppid', ppid pid = os.fork() #執行pid=os.fork()則會生成一個子進程 #返回值pid有兩種值: # 若是返回的pid值爲0,表示在子進程當中 # 若是返回的pid值>0,表示在父進程當中 if pid > 0: print 'father died..' sys.exit(0) # 保證主線程退出完畢 time.sleep(1) print 'im child', os.getpid(), os.getppid() 執行文件,輸出結果: im father pid 32515 ppid 32015 father died.. im child 32516 1 看,子進程已經被pid爲1的init進程接收了,因此殭屍進程在這種狀況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明週期結束天然會被init來銷燬。
三:殭屍進程危害場景:
例若有個進程,它按期的產 生一個子進程,這個子進程須要作的事情不多,作完它該作的事情以後就退出了,所以這個子進程的生命週期很短,可是,父進程只管生成新的子進程,至於子進程 退出以後的事情,則一律漠不關心,這樣,系統運行上一段時間以後,系統中就會存在不少的僵死進程,假若用ps命令查看的話,就會看到不少狀態爲Z的進程。 嚴格地來講,僵死進程並非問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。所以,當咱們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是經過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程以後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。
四:測試 #一、產生殭屍進程的程序test.py內容以下 #coding:utf-8 from multiprocessing import Process import time,os def run(): print('子',os.getpid()) if __name__ == '__main__': p=Process(target=run) p.start() print('主',os.getpid()) time.sleep(1000) #二、在unix或linux系統上執行 [root@vm172-31-0-19 ~]# python3 test.py & [1] 18652 [root@vm172-31-0-19 ~]# 主 18652 子 18653 [root@vm172-31-0-19 ~]# ps aux |grep Z USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND root 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出現殭屍進程 root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z [root@vm172-31-0-19 ~]# top #執行top命令發現1zombie top - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12 Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie %Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st KiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cache KiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND root 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin #三、 等待父進程正常結束後會調用wait/waitpid去回收殭屍進程 但若是父進程是一個死循環,永遠不會結束,那麼該殭屍進程就會一直存在,殭屍進程過多,就是有害的 解決方法一:殺死父進程 解決方法二:對開啓的子進程應該記得使用join,join會回收殭屍進程 參考python2源碼註釋 class Process(object): def join(self, timeout=None): ''' Wait until child process terminates ''' assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self) join方法中調用了wait,告訴系統釋放殭屍進程。discard爲從本身的children中剔除 解決方法三:http://blog.csdn.net/u010571844/article/details/50419798
主進程建立守護進程
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行 p.start() print('主')
############迷惑人的例子################# #主進程代碼運行完畢,守護進程就會結束 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止
#守護進程:當子進程執行的任務在父進程代碼運行完畢後就沒有存在的必要了,那 #該子進程就應該被設置爲守護進程 # from multiprocessing import Process # import time # # def task(name): # p=Process(target=time.sleep,args=(6,)) # p.start() # print('%s is running' %name) # time.sleep(5) # print('%s is done' %name) # # # if __name__ == '__main__': # p=Process(target=task,args=('alex',)) # p.daemon=True # p.start() # time.sleep(1) # print('主') #主進程代碼運行完畢,守護進程就會結束 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------")
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
################併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂################# from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start()
###########由併發變成了串行,犧牲了運行效率,但避免了競爭############## from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()
文件當數據庫,模擬搶票
#############併發運行,效率高,但競爭寫同一文件,數據寫入錯亂################# #文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db.txt','w')) print('\033[43m購票成功\033[0m') def task(lock): search() get() if __name__ == '__main__': lock=Lock() for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
#########加鎖:購票行爲由併發變成了串行,犧牲了運行效率,但保證了數據安全########### #文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db.txt','w')) print('\033[43m購票成功\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock=Lock() for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
############db.txt############ {"count": 1}
from multiprocessing import Process,Lock import json import time import random import os def search(): time.sleep(random.randint(1,3)) dic=json.load(open('db.txt','r',encoding='utf-8')) print('%s 查看到剩餘票數%s' %(os.getpid(),dic['count'])) def get(): dic=json.load(open('db.txt','r',encoding='utf-8')) if dic['count'] > 0: dic['count']-=1 time.sleep(random.randint(1,3)) json.dump(dic,open('db.txt','w',encoding='utf-8')) print('%s 購票成功' %os.getpid()) def task(mutex): search() mutex.acquire() get() mutex.release() if __name__ == '__main__': mutex=Lock() for i in range(10): p=Process(target=task,args=(mutex,)) p.start() # p.join()
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然能夠用文件共享數據實現進程間通訊,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.須要本身加鎖處理 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
maxsize是隊列中容許最大項數,省略則無大小限制。
主要方法:
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲
''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 ''' from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了
from multiprocessing import Queue q=Queue(3) q.put('first') q.put(2) q.put({'count':3}) # q.put('fourth',block=False) #q.put_nowait('fourth') # q.put('fourth',block=True,timeout=3) print(q.get()) print(q.get()) print(q.get()) # print(q.get(block=False)) #q.get_nowait() print(q.get(block=True,timeout=3))
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主')
#生產者消費者模型總結 #程序中有兩類角色 一類負責生產數據(生產者) 一類負責處理數據(消費者) #引入生產者消費者模型爲了解決的問題是: 平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度 #如何實現: 生產者<-->隊列<——>消費者 #生產者消費者模型實現類程序的解耦和
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環
############生產者在生產完畢後發送結束信號None################ from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #發送結束信號 if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主')
注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號
############主進程在生產者生產完畢後發送結束信號None############## from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() p1.join() q.put(None) #發送結束信號 print('主')
但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決
##############有幾個消費者就須要發送幾回結束信號:至關low################ from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始 p1.start() p2.start() p3.start() c1.start() p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號 p2.join() p3.join() q.put(None) #有幾個消費者就應該發送幾回結束信號None q.put(None) #發送結束信號 print('主')
其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 #參數介紹: maxsize是隊列中容許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法以外還具備: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') #主進程等--->p1,p2,p3等---->c1,c2 #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據 #於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程
# from multiprocessing import Process,Queue # import time # import random # # def producer(name,food,q): # for i in range(3): # res='%s%s' %(food,i) # time.sleep(random.randint(1,3)) # q.put(res) # print('廚師[%s]生產了<%s>' %(name,res)) # # # def consumer(name,q): # while True: # res=q.get() # if res is None:break # time.sleep(random.randint(1,3)) # print('吃貨[%s]吃了<%s>' % (name, res)) # # if __name__ == '__main__': # #隊列 # q=Queue() # #生產者們 # p1=Process(target=producer,args=('egon1','泔水',q)) # p2=Process(target=producer,args=('egon2','骨頭',q)) # #消費者們 # c1=Process(target=consumer,args=('管廷威',q)) # c2=Process(target=consumer,args=('oldboy',q)) # c3=Process(target=consumer,args=('oldgirl',q)) # # # p1.start() # p2.start() # c1.start() # c2.start() # c3.start() # # p1.join() # p2.join() # q.put(None) # q.put(None) # q.put(None) # print('主')
from multiprocessing import Process,JoinableQueue import time import random def producer(name,food,q): for i in range(3): res='%s%s' %(food,i) time.sleep(random.randint(1,3)) q.put(res) print('廚師[%s]生產了<%s>' %(name,res)) def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('吃貨[%s]吃了<%s>' % (name, res)) q.task_done() if __name__ == '__main__': #隊列 q=JoinableQueue() #生產者們 p1=Process(target=producer,args=('egon1','泔水',q)) p2=Process(target=producer,args=('egon2','骨頭',q)) #消費者們 c1=Process(target=consumer,args=('管廷威',q)) c2=Process(target=consumer,args=('oldboy',q)) c3=Process(target=consumer,args=('oldgirl',q)) c1.daemon=True c2.daemon=True c3.daemon=True p1.start() p2.start() c1.start() c2.start() c3.start() p1.join() p2.join() q.join() print('主')
在傳統操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程
線程顧名思義,就是一條流水線工做的過程,一條流水線必須屬於一個車間,一個車間的工做過程是一個進程
車間負責把資源整合到一塊兒,是一個資源單位,而一個車間內至少有一個流水線
流水線的工做須要電源,電源就至關於cpu
因此,進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。
多線程(即多個控制線程)的概念是,在一個進程中存在多個控制線程,多個控制線程共享該進程的地址空間,至關於一個車間內有多條流水線,都共用一個車間的資源。
例如,北京地鐵與上海地鐵是不一樣的進程,而北京地鐵裏的13號線是一個線程,北京地鐵全部的線路共享北京地鐵全部的資源,好比全部的乘客能夠被全部線路拉。
若是咱們的軟件是一個工廠,該工廠有多條流水線,流水線工做須要電源,電源只有一個即cpu(單核cpu)
一個車間就是一個進程,一個車間至少一條流水線(一個進程至少一個線程)
建立一個進程,就是建立一個車間(申請空間,在該空間內建至少一條流水線)
而建線程,就只是在一個車間內造一條流水線,無需申請空間,因此建立開銷小
車間直接是競爭/搶電源的關係,競爭(不一樣的進程直接是競爭關係,是不一樣的程序員寫的程序運行的,迅雷搶佔其餘進程的網速,360把其餘進程當作病毒乾死)
一個車間的不一樣流水線式協同工做的關係(同一個進程的線程之間是合做關係,是同一個程序寫的程序內開啓動,迅雷內的線程是合做關係,不會本身幹本身)
1、Threads share the address space of the process that created it; processes have their own address space. 線程的地址空間共享,每一個進程有本身的地址空間。 2、Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process. 一個進程中的線程直接接入他的進程的數據段,可是每一個進程都有他們本身的從父進程拷貝過來的數據段 3、Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes. 一個進程內部的線程之間可以直接通訊,進程之間必須使用進程間通訊實現通訊 4、New threads are easily created; new processes require duplication of the parent process. 新的線程很容易被建立,新的進程須要從父進程複製 5、Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes. 一個進程中的線程間可以有至關大的控制力度,進程僅僅只能控制他的子進程 六、Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes. 改變主線程(刪除,優先級改變等)可能影響這個進程中的其餘線程;修改父進程不會影響子進程
多線程指的是,在一個進程中開啓多個線程,簡單的講:若是多個任務共用一塊地址空間,那麼必須在一個進程內開啓多個線程。詳細的講分爲4點:
1. 多線程共享一個進程的地址空間
2. 線程比進程更輕量級,線程比進程更容易建立可撤銷,在許多操做系統中,建立一個線程比建立一個進程要快10-100倍,在有大量線程須要動態和快速修改時,這一特性頗有用
3. 若多個線程都是cpu密集型的,那麼並不能得到性能上的加強,可是若是存在大量的計算和大量的I/O處理,擁有多個線程容許這些活動彼此重疊運行,從而會加快程序執行的速度。
4. 在多cpu系統中,爲了最大限度的利用多核,能夠開啓多個線程,比開進程開銷要小的多。(這一條並不適用於python)
開啓一個字處理軟件進程,該進程確定須要辦不止一件事情,好比監聽鍵盤輸入,處理文字,定時自動將文字保存到硬盤,這三個任務操做的都是同一塊數據,於是不能用多進程。只能在一個進程裏併發地開啓三個線程,若是是單線程,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹
官網連接:https://docs.python.org/3/library/threading.html?highlight=threading#
#方式一 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
#方式二 from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')
# from threading import Thread # import time # import random # # def piao(name): # print('%s is piaoing' %name) # time.sleep(random.randint(1,3)) # print('%s is piao end' %name) # # # if __name__ == '__main__': # t1=Thread(target=piao,args=('alex',)) # t1.start() # print('主') from threading import Thread import time import random class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randint(1,3)) print('%s is piao end' %self.name) if __name__ == '__main__': t1=MyThread('alex') t1.start() print('主')
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print('主線程/主進程') ''' 打印結果: hello 主線程/主進程 ''' #在主進程下開啓子進程 t=Process(target=work) t.start() print('主線程/主進程') ''' 打印結果: 主線程/主進程 hello '''
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
# from threading import Thread # import time # import random # import os # # def piao(): # print('%s is piaoing' %os.getpid()) # time.sleep(random.randint(1,3)) # # # if __name__ == '__main__': # t1=Thread(target=piao,) # t2=Thread(target=piao,) # t3=Thread(target=piao,) # t1.start()a # t2.start() # t3.start() # print('主',os.getpid()) # from threading import Thread import time import random import os n=100 def piao(): global n n=0 if __name__ == '__main__': t1=Thread(target=piao,) t1.start() t1.join() print('主',n)
#####################服務端################## from threading import Thread,current_thread from socket import * def comunicate(conn): print('子線程:%s' %current_thread().getName()) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): print('主線程:%s' %current_thread().getName()) server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn, addr = server.accept() print(addr) # comunicate(conn) t=Thread(target=comunicate,args=(conn,)) t.start() server.close() if __name__ == '__main__': server('127.0.0.1', 8081)
###################客戶端###################### from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
###################多線程併發的socket服務端################## #_*_coding:utf-8_*_ #!/usr/bin/env python import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
########################客戶端###################### #_*_coding:utf-8_*_ #!/usr/bin/env python import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)
練習三:三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件
from threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open('db.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start()
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程 print(threading.enumerate()) #連同主線程在內有兩個運行的線程 print(threading.active_count()) print('主線程/主進程') ''' 打印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主線程/主進程 Thread-1 ''' 主線程等待子線程結束 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''
不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬
須要強調的是:運行完畢並不是終止運行
#1.對主進程來講,運行完畢指的是主進程代碼運行完畢 #2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
詳細解釋:
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束, #2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
########################迷惑人的例子####################### from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
# from threading import Thread # import time # # def sayhi(name): # print('====>') # time.sleep(2) # print('%s say hello' %name) # # if __name__ == '__main__': # t=Thread(target=sayhi,args=('egon',)) # # t.setDaemon(True) # t.daemon=True # t.start() # # print('主線程') # from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
''' 定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) ''' 結論:在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點
首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。
GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。
能夠確定的一點是:保護不一樣的數據的安全,就應該加不一樣的鎖。
要想了解GIL,首先肯定一點:每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不一樣的python進程
############驗證python test.py只會產生一個進程################ ''' #驗證python test.py只會產生一個進程 #test.py內容 import os,time print(os.getpid()) time.sleep(1000) ''' python3 test.py #在windows下 tasklist |findstr python #在linux下 ps aux |grep python
在一個python的進程內,不只有test.py的主線程或者由該主線程開啓的其餘線程,還有解釋器開啓的垃圾回收等解釋器級別的線程,總之,全部線程都運行在這一個進程內,毫無疑問
#1 全部數據都是共享的,這其中,代碼做爲一種數據也是被全部線程共享的(test.py的全部代碼以及Cpython解釋器的全部代碼) 例如:test.py定義一個函數work(代碼內容以下圖),在進程內全部線程都能訪問到work的代碼,因而咱們能夠開啓三個線程而後target都指向該代碼,能訪問到意味着就是能夠執行。 #2 全部線程的任務,都須要將任務的代碼當作參數傳給解釋器的代碼去執行,即全部的線程要想運行本身的任務,首先須要解決的是可以訪問到解釋器的代碼。
綜上:
若是多個線程的target=work,那麼執行流程是
多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將target的代碼交給解釋器的代碼去執行
解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操做,解決這種問題沒有什麼高明的方法,就是加鎖處理,以下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼。
GIL保護的是解釋器級的數據,保護用戶本身的數據則須要本身加鎖處理,以下圖
有了GIL的存在,同一時刻同一進程中只有一個線程被執行
聽到這裏,有的同窗立馬質問:進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點,也就是說python沒用了,php纔是最牛逼的語言?
彆着急啊,老孃還沒講完呢。
要解決這個問題,咱們須要在幾個點上達成一致:
#1. cpu究竟是用來作計算的,仍是用來作I/O的? #2. 多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能 #3. 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處
一個工人至關於cpu,此時計算至關於工人在幹活,I/O阻塞至關於爲工人幹活提供所需原材料的過程,工人幹活的過程當中若是沒有原材料了,則工人幹活的過程須要中止,直到等待原材料的到來。
若是你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一我的,在等材料的過程當中讓工人去幹別的活,
反過來說,若是你的工廠原材料都齊全,那固然是工人越多,效率越高
結論:
對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用
固然對運行一個程序來講,隨着cpu的增多執行效率確定會有所提升(無論提升幅度多大,總會有所提升),這是由於一個程序基本上不會是純計算或者純I/O,因此咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程到底有無用武之地
#分析: 咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是: 方案一:開啓四個進程 方案二:一個進程下,開啓四個線程 #單核狀況下,分析結果: 若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝 若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝 #多核狀況下,分析結果: 若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝 若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝 #結論:如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。
#################計算密集型:多進程效率高##################### from multiprocessing import Process from threading import Thread import os,time def work(): res=0 for i in range(100000000): res*=i if __name__ == '__main__': l=[] print(os.cpu_count()) #本機爲4核 start=time.time() for i in range(4): p=Process(target=work) #耗時5s多 p=Thread(target=work) #耗時18s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
####################I/O密集型:多線程效率高################### from multiprocessing import Process from threading import Thread import threading import os,time def work(): time.sleep(2) print('===>') if __name__ == '__main__': l=[] print(os.cpu_count()) #本機爲4核 start=time.time() for i in range(400): # p=Process(target=work) #耗時12s多,大部分時間耗費在建立進程上 p=Thread(target=work) #耗時2s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
# from threading import Thread,Lock # import time # # n=100 # # def task(): # global n # with mutex: # temp=n # time.sleep(0.1) # n=temp-1 # # if __name__ == '__main__': # start_time=time.time() # mutex=Lock() # t_l=[] # for i in range(100): # t=Thread(target=task) # t_l.append(t) # t.start() # # for t in t_l: # t.join() # stop_time=time.time() # print('主',n) # print('run time is %s' %(stop_time-start_time))
#計算密集型:開多進程 # from multiprocessing import Process # from threading import Thread # import os,time # def work(): # res=0 # for i in range(100000000): # res*=i # # # if __name__ == '__main__': # l=[] # start=time.time() # for i in range(4): # # p=Process(target=work) #5.826333284378052 # p=Thread(target=work) #run time is 19.91913938522339 # l.append(p) # p.start() # for p in l: # p.join() # stop=time.time() # print('run time is %s' %(stop-start))
#I/O密集型:多線程效率高 # from multiprocessing import Process # from threading import Thread # import threading # import os,time # def work(): # time.sleep(2) # # if __name__ == '__main__': # l=[] # start=time.time() # for i in range(400): # # p=Process(target=work) # 12.465712785720825 # p=Thread(target=work) #2.037116765975952 # l.append(p) # p.start() # for p in l: # p.join() # stop=time.time() # print('run time is %s' %(stop-start)) # from threading import Thread,Lock import time n=100 def task(): global n mutex.acquire() temp=n time.sleep(0.1) n=temp-1 mutex.release() if __name__ == '__main__': mutex=Lock() for i in range(3): t=Thread(target=task) t.start()
應用:
多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析
paramiko是一個用於作遠程控制的模塊,使用該模塊能夠對遠程服務器進行命令或文件操做,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。
pip3 install paramiko #在python3中
#####################在python2中################### pycrypto,因爲 paramiko 模塊內部依賴pycrypto,因此先下載安裝pycrypto #在python2中 pip3 install pycrypto pip3 install paramiko 注:若是在安裝pycrypto2.0.1時發生以下錯誤 command 'gcc' failed with exit status 1... 多是缺乏python-dev安裝包致使 若是gcc沒有安裝,請事先安裝gcc
用於鏈接遠程服務器並執行基本命令
###############基於用戶名密碼鏈接:##################### import paramiko # 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='xxx') # 執行命令 stdin, stdout, stderr = ssh.exec_command('df') # 獲取命令結果 result = stdout.read() print(result.decode('utf-8')) # 關閉鏈接 ssh.close()
##############SSHClient 封裝 Transport################### import paramiko transport = paramiko.Transport(('120.92.84.249', 22)) transport.connect(username='root', password='xxx') ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') res=stdout.read() print(res.decode('utf-8')) transport.close()
##########基於公鑰密鑰鏈接:###############
客戶端文件名:id_rsa
服務端必須有文件名:authorized_keys(在用ssh-keygen時,必須製做一個authorized_keys,能夠用ssh-copy-id來製做)
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') # 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key) # 執行命令 stdin, stdout, stderr = ssh.exec_command('df') # 獲取命令結果 result = stdout.read() print(result.decode('utf-8')) # 關閉鏈接 ssh.close()
##################SSHClient 封裝 Transport################### import paramiko private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') transport = paramiko.Transport(('120.92.84.249', 22)) transport.connect(username='root', pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') result=stdout.read() print(result.decode('utf-8')) transport.close()
##################基於私鑰字符串進行鏈接#################### import paramiko from io import StringIO key_str = """-----BEGIN RSA PRIVATE KEY----- MIIEoQIBAAKCAQEAsJmFLrSeCumJvga0Gl5O5wVOVwMIy2MpqIyQPi5J87dg89a4 Da9fczJog7qoSbRwHFOQoCHNphSlp5KPhGsF6RJewkIw9H1UKV4dCOyl/4HOAkAD rKrsEDmrJ9JlzF2GTTZSnTgVQWcvBS2RKB4eM2R9aJ11xV6X2Hk4YDLTExIWeabb h2TUKw0iyjI8pRuYLKkF2X16u9TBwfOTroGYgiNFHQvhsQppbEbI49NF2XkCkFMi 8/7tLjf95InE/VUUq56JqfzyHwdpHou+waXbwtvGgXN3sz+KkuEv6R2qDz06upZV FCZRRpDhzoR8Uh/UEzTGZb8z7FB6EJXUiXJikQIBIwKCAQBBmBuGYFf1bK+BGG7H 9ySe81ecqVsJtx4aCFLVRGScWg4RbQKIvXs5an6XU/VdNGQnx0RYvBkvDvuzRRC8 J8Bd4kB0CfTtGJuaVigKoQp02HEWx1HSa17+tlWD0c4KFBvwywi+DYQ83S64x8gz eOalX9bPFenqORPUD8R7gJeKvPVc6ZTPeorpuH7u9xayP0Eop8qKxZza9Xh3foVj Qo4IxoYnDN57CIRX5PFSlDDggpmr8FtRF4nAxmFq8LhSp05ivzX/Ku1SNHdaMWZO 7va8tISXdLI5m0EGzoVoBvohIbwlxI6kfmamrh6Eas2Jnsc4CLzMsR4jBWt0LHLv /SLnAoGBANaEUf/Jptab9G/xD9W2tw/636i3gLpTPY9KPtCcAxqStNeT6RAWZ5HF lKJg+NKpu3pI45ldAwvts0i+aCZk2xakEWIZWqCmXm31JSPDQTaMGe7H0vOmUaxx ncdpBVdvhMbfFUgei15iKfuafgrKaS9oIkntXEgrC+3wBOI0Gbx3AoGBANLAGxAF TK7ydr+Q1+6/ujs6e8WsXt8HZMa/1khCVSbrf1MgACvZPSSSrDpVwaDTSjlRI4AL bb0l0RFU+/0caMiHilscuJdz9Fdd9Ux4pjROZa3TF5CFhvP7PsZAoxOo+yqJg4zr 996GG/aAv4M8lQJ2rDFk/Dgn5y/AaAun1oM3AoGAGIQmoOPYjY4qkHNSRE9lYOl4 pZFQilKn8x5tlC8WTC4GCgJGhX7nQ9wQ/J1eQ/YkDfmznH+ok6YjHkGlgLsRuXHW GdcDCwuzBUCWh76LHC1EytUCKnloa3qy8jfjWnMlHgrd3FtDILrC+C7p1Vj2FAvm qVz0moiTpioPL8twp9MCgYEAin49q3EyZFYwxwdpU7/SJuvq750oZq0WVriUINsi A6IR14oOvbqkhb94fhsY12ZGt/N9uosq22H+anms6CicoQicv4fnBHDFI3hCHE9I pgeh50GTJHUA6Xk34V2s/kp5KpThazv6qCw+QubkQExh660SEdSlvoCfPKMCi1EJ TukCgYAZKY1NZ2bjJyyO/dfNvMQ+etUL/9esi+40GUGyJ7SZcazrN9z+DO0yL39g 7FT9NMIc2dsmNJQMaGBCDl0AjO1O3b/wqlrNvNBGkanxn2Htn5ajfo+LBU7yHAcV 7w4X5HLarXiE1mj0LXFKJhdvFqU53KUQJXBqR6lsMqzsdPwLMJg== -----END RSA PRIVATE KEY-----""" private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) transport = paramiko.Transport(('120.92.84.249', 22)) transport.connect(username='root', pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') result = stdout.read() print(result.decode('utf-8')) transport.close() print(result)
用於鏈接遠程服務器並執行上傳下載
##############基於用戶名密碼上傳下載################ import paramiko transport = paramiko.Transport(('120.92.84.249',22)) transport.connect(username='root',password='xxx') sftp = paramiko.SFTPClient.from_transport(transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/id_rsa', '/etc/test.rsa') # 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close()
##############基於公鑰密鑰上傳下載################## import paramiko private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa') transport = paramiko.Transport(('120.92.84.249', 22)) transport.connect(username='root', pkey=private_key ) sftp = paramiko.SFTPClient.from_transport(transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/id_rsa', '/tmp/a.txt') # 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close()
###################Demo################ #!/usr/bin/env python # -*- coding:utf-8 -*- import paramiko import uuid class Haproxy(object): def __init__(self): self.host = '172.16.103.191' self.port = 22 self.username = 'root' self.pwd = '123' self.__k = None def create_file(self): file_name = str(uuid.uuid4()) with open(file_name,'w') as f: f.write('sb') return file_name def run(self): self.connect() self.upload() self.rename() self.close() def connect(self): transport = paramiko.Transport((self.host,self.port)) transport.connect(username=self.username,password=self.pwd) self.__transport = transport def close(self): self.__transport.close() def upload(self): # 鏈接,上傳 file_name = self.create_file() sftp = paramiko.SFTPClient.from_transport(self.__transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put(file_name, '/home/root/tttttttttttt.py') def rename(self): ssh = paramiko.SSHClient() ssh._transport = self.__transport # 執行命令 stdin, stdout, stderr = ssh.exec_command('mv /home/root/tttttttttttt.py /home/root/ooooooooo.py') # 獲取命令結果 result = stdout.read() ha = Haproxy() ha.run()
題目:簡單主機批量管理工具
需求:
batch_run -h h1,h2,h3 -g web_clusters,db_servers -cmd "df -h"
batch_scp -h h1,h2,h3 -g web_clusters,db_servers -action put -local test.py -remote /tmp/