Python之路,Day9, 進程、線程、協程篇

 

本節內容html

  1. 操做系統發展史介紹
  2. 進程、與線程區別
  3. python GIL全局解釋器鎖
  4. 線程
    1. 語法
    2. join
    3. 線程鎖之Lock\Rlock\信號量
    4. 將線程變爲守護進程
    5. Event事件 
    6. queue隊列
    7. 生產者消費者模型
    8. Queue隊列
    9. 開發一個線程池
  5. 進程
    1. 語法
    2. 進程間通信
    3. 進程池   

 

操做系統發展史

手工操做(無操做系統)

1946年第一臺計算機誕生--20世紀50年代中期,還未出現操做系統,計算機工做採用手工操做方式。python


手工操做
程序員將對應於程序和數據的已穿孔的紙帶(或卡片)裝入輸入機,而後啓動輸入機把程序和數據輸入計算機內存,接着經過控制檯開關啓動程序針對數據運行;計算完畢,打印機輸出計算結果;用戶取走結果並卸下紙帶(或卡片)後,才讓下一個用戶上機。程序員

 


手工操做方式兩個特色:
(1)用戶獨佔全機。不會出現因資源已被其餘用戶佔用而等待的現象,但資源的利用率低。
(2)CPU 等待手工操做。CPU的利用不充分。web


 20世紀50年代後期,出現人機矛盾:手工操做的慢速度和計算機的高速度之間造成了尖銳矛盾,手工操做方式已嚴重損害了系統資源的利用率(使資源利用率降爲百分之幾,甚至更低),不能容忍。惟一的解決辦法:只有擺脫人的手工操做,實現做業的自動過渡。這樣就出現了成批處理。編程

 

批處理系統

批處理系統:加載在計算機上的一個系統軟件,在它的控制下,計算機可以自動地、成批地處理一個或多個用戶的做業(這做業包括程序、數據和命令)。多線程

聯機批處理系統
首先出現的是聯機批處理系統,即做業的輸入/輸出由CPU來處理。
主機與輸入機之間增長一個存儲設備——磁帶,在運行於主機上的監督程序的自動控制下,計算機可自動完成:成批地把輸入機上的用戶做業讀入磁帶,依次把磁帶上的用戶做業讀入主機內存並執行並把計算結果向輸出機輸出。完成了上一批做業後,監督程序又從輸入機上輸入另外一批做業,保存在磁帶上,並按上述步驟重複處理。併發

 

監督程序不停地處理各個做業,從而實現了做業到做業的自動轉接,減小了做業創建時間和手工操做時間,有效克服了人機矛盾,提升了計算機的利用率。app

可是,在做業輸入和結果輸出時,主機的高速CPU仍處於空閒狀態,等待慢速的輸入/輸出設備完成工做: 主機處於「忙等」狀態。less

 

脫機批處理系統
爲克服與緩解高速主機與慢速外設的矛盾,提升CPU的利用率,又引入了脫機批處理系統,即輸入/輸出脫離主機控制。
這種方式的顯著特徵是:增長一臺不與主機直接相連而專門用於與輸入/輸出設備打交道的衛星機。
其功能是:dom

(1)從輸入機上讀取用戶做業並放到輸入磁帶上。
(2)從輸出磁帶上讀取執行結果並傳給輸出機。

這樣,主機不是直接與慢速的輸入/輸出設備打交道,而是與速度相對較快的磁帶機發生關係,有效緩解了主機與設備的矛盾。主機與衛星機可並行工做,兩者分工明確,能夠充分發揮主機的高速計算能力。

 

脫機批處理系統:20世紀60年代應用十分普遍,它極大緩解了人機矛盾及主機與外設的矛盾。IBM-7090/7094:配備的監督程序就是脫機批處理系統,是現代操做系統的原型。

不足:每次主機內存中僅存放一道做業,每當它運行期間發出輸入/輸出(I/O)請求後,高速的CPU便處於等待低速的I/O完成狀態,導致CPU空閒。

爲改善CPU的利用率,又引入了多道程序系統。

 

多道程序系統

多道程序設計技術

所謂多道程序設計技術,就是指容許多個程序同時進入內存並運行。即同時把多個程序放入內存,並容許它們交替在CPU中運行,它們共享系統中的各類硬、軟件資源。當一道程序因I/O請求而暫停運行時,CPU便當即轉去運行另外一道程序。

單道程序的運行過程:
在A程序計算時,I/O空閒, A程序I/O操做時,CPU空閒(B程序也是一樣);必須A工做完成後,B才能進入內存中開始工做,二者是串行的,所有完成共需時間=T1+T2。

 

 


多道程序的運行過程:
將A、B兩道程序同時存放在內存中,它們在系統的控制下,可相互穿插、交替地在CPU上運行:當A程序因請求I/O操做而放棄CPU時,B程序就可佔用CPU運行,這樣 CPU再也不空閒,而正進行A I/O操做的I/O設備也不空閒,顯然,CPU和I/O設備都處於「忙」狀態,大大提升了資源的利用率,從而也提升了系統的效率,A、B所有完成所需時間<<T1+T2。

 

 

多道程序設計技術不只使CPU獲得充分利用,同時改善I/O設備和內存的利用率,從而提升了整個系統的資源利用率和系統吞吐量(單位時間內處理做業(程序)的個數),最終提升了整個系統的效率。

單處理機系統中多道程序運行時的特色:
(1)多道:計算機內存中同時存放幾道相互獨立的程序;
(2)宏觀上並行:同時進入系統的幾道程序都處於運行過程當中,即它們前後開始了各自的運行,但都未運行完畢;
(3)微觀上串行:實際上,各道程序輪流地用CPU,並交替運行。

多道程序系統的出現,標誌着操做系統漸趨成熟的階段,前後出現了做業調度管理、處理機管理、存儲器管理、外部設備管理、文件系統管理等功能。


多道批處理系統
20世紀60年代中期,在前述的批處理系統中,引入多道程序設計技術後造成多道批處理系統(簡稱:批處理系統)。
它有兩個特色:
(1)多道:系統內可同時容納多個做業。這些做業放在外存中,組成一個後備隊列,系統按必定的調度原則每次從後備做業隊列中選取一個或多個做業進入內存運行,運行做業結束、退出運行和後備做業進入運行均由系統自動實現,從而在系統中造成一個自動轉接的、連續的做業流。
(2)成批:在系統運行過程當中,不容許用戶與其做業發生交互做用,即:做業一旦進入系統,用戶就不能直接干預其做業的運行。

 

批處理系統的追求目標:提升系統資源利用率和系統吞吐量,以及做業流程的自動化。

批處理系統的一個重要缺點:不提供人機交互能力,給用戶使用計算機帶來不便。
雖然用戶獨佔全機資源,而且直接控制程序的運行,能夠隨時瞭解程序運行狀況。但這種工做方式因獨佔全機形成資源效率極低。

一種新的追求目標:既能保證計算機效率,又能方便用戶使用計算機。 20世紀60年代中期,計算機技術和軟件技術的發展使這種追求成爲可能。

 

分時系統

因爲CPU速度不斷提升和採用分時技術,一臺計算機可同時鏈接多個用戶終端,而每一個用戶可在本身的終端上聯機使用計算機,好象本身獨佔機器同樣。

分時技術:把處理機的運行時間分紅很短的時間片,按時間片輪流把處理機分配給各聯機做業使用。

若某個做業在分配給它的時間片內不能完成其計算,則該做業暫時中斷,把處理機讓給另外一做業使用,等待下一輪時再繼續其運行。因爲計算機速度很快,做業運行輪轉得很快,給每一個用戶的印象是,好象他獨佔了一臺計算機。而每一個用戶能夠經過本身的終端向系統發出各類操做控制命令,在充分的人機交互狀況下,完成做業的運行。

具備上述特徵的計算機系統稱爲分時系統,它容許多個用戶同時聯機使用計算機。

 


特色:
(1)多路性。若干個用戶同時使用一臺計算機。微觀上看是各用戶輪流使用計算機;宏觀上看是各用戶並行工做。
(2)交互性。用戶可根據系統對請求的響應結果,進一步向系統提出新的請求。這種能使用戶與系統進行人機對話的工做方式,明顯地有別於批處理系統,於是,分時系統又被稱爲交互式系統。
(3)獨立性。用戶之間能夠相互獨立操做,互不干擾。系統保證各用戶程序運行的完整性,不會發生相互混淆或破壞現象。
(4)及時性。系統可對用戶的輸入及時做出響應。分時系統性能的主要指標之一是響應時間,它是指:從終端發出命令到系統予以應答所需的時間。


分時系統的主要目標:對用戶響應的及時性,即不至於用戶等待每個命令的處理時間過長。

分時系統能夠同時接納數十個甚至上百個用戶,因爲內存空間有限,每每採用對換(又稱交換)方式的存儲方法。即將未「輪到」的做業放入磁盤,一旦「輪到」,再將其調入內存;而時間片用完後,又將做業存回磁盤(俗稱「滾進」、「滾出「法),使同一存儲區域輪流爲多個用戶服務。

多用戶分時系統是當今計算機操做系統中最廣泛使用的一類操做系統。

 

實時系統

雖然多道批處理系統和分時系統能得到較使人滿意的資源利用率和系統響應時間,但卻不能知足實時控制與實時信息處理兩個應用領域的需求。因而就產生了實時系統,即系統可以及時響應隨機發生的外部事件,並在嚴格的時間範圍內完成對該事件的處理。
實時系統在一個特定的應用中常做爲一種控制設備來使用。


實時系統可分紅兩類:
(1)實時控制系統。當用于飛機飛行、導彈發射等的自動控制時,要求計算機能儘快處理測量系統測得的數據,及時地對飛機或導彈進行控制,或將有關信息經過顯示終端提供給決策人員。當用於軋鋼、石化等工業生產過程控制時,也要求計算機能及時處理由各種傳感器送來的數據,而後控制相應的執行機構。
(2)實時信息處理系統。當用於預約飛機票、查詢有關航班、航線、票價等事宜時,或當用於銀行系統、情報檢索系統時,都要求計算機能對終端設備發來的服務請求及時予以正確的回答。此類對響應及時性的要求稍弱於第一類。


實時操做系統的主要特色:
(1)及時響應。每個信息接收、分析處理和發送的過程必須在嚴格的時間限制內完成。
(2)高可靠性。需採起冗餘措施,雙機系統先後臺工做,也包括必要的保密措施等。

 

操做系統發展圖譜

 

 

 

 

進程與線程

什麼是進程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。

在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。進程的出現讓每一個用戶感受到本身獨享CPU,所以,進程就是爲了在CPU上實現多道編程而提出的。

有了進程爲何還要線程?

進程有不少優勢,它提供了多道編程,讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。不少人就不理解了,既然進程這麼優秀,爲何還要線程呢?其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上:

  • 進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。

  • 進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。

例如,咱們在使用qq聊天, qq作爲一個獨立進程若是同一時間只能幹一件事,那他如何實如今同一時刻 即能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操做系統不是有分時麼?但個人親,分時是指在不一樣進程間的分時呀, 即操做系統處理一會你的qq任務,又切換到word文檔任務上了,每一個cpu時間片分給你的qq程序時,你的qq仍是隻能同時幹一件事呀。

再直白一點, 一個操做系統就像是一個工廠,工廠裏面有不少個生產車間,不一樣的車間生產不一樣的產品,每一個車間就至關於一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,爲了能讓全部車間都能同時生產,你的工廠的電工只能給不一樣的車間分時供電,可是輪到你的qq車間時,發現只有一個幹活的工人,結果生產效率極低,爲了解決這個問題,應該怎麼辦呢?。。。。沒錯,你確定想到了,就是多加幾個工人,讓幾我的工人並行工做,這每一個工人,就是線程!

 

什麼是線程(thread)?

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

 

進程與線程的區別?

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

 

Python GIL(Global Interpreter Lock)  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行,擦。。。,那這還叫什麼多線程呀?莫如此早的下結結論,聽我現場講。

 

 

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf 

 

Python threading模塊

線程有2種調用方式,以下:

import threading

import  time
 
def  sayhi(num):  #定義每一個線程要運行的函數
 
     print ( "running on number:%s"  % num)
 
     time.sleep( 3 )
 
if  __name__  = =  '__main__' :
 
     t1  =  threading.Thread(target = sayhi,args = ( 1 ,))  #生成一個線程實例
     t2  =  threading.Thread(target = sayhi,args = ( 2 ,))  #生成另外一個線程實例
 
     t1.start()  #啓動線程
     t2.start()  #啓動另外一個線程
 
     print (t1.getName())  #獲取線程名
     print (t2.getName())
 
 
 

繼承式調用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  threading
import  time
 
 
class  MyThread(threading.Thread):
     def  __init__( self ,num):
         threading.Thread.__init__( self )
         self .num  =  num
 
     def  run( self ): #定義每一個線程要運行的函數
 
         print ( "running on number:%s"  % self .num)
 
         time.sleep( 3 )
 
if  __name__  = =  '__main__' :
 
     t1  =  MyThread( 1 )
     t2  =  MyThread( 2 )
     t1.start()
     t2.start()

Join & Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#_*_coding:utf-8_*_
__author__  =  'Alex Li'
 
import  time
import  threading
 
 
def  run(n):
 
     print ( '[%s]------running----\n'  %  n)
     time.sleep( 2 )
     print ( '--done--' )
 
def  main():
     for  in  range ( 5 ):
         =  threading.Thread(target = run,args = [i,])
         t.start()
         t.join( 1 )
         print ( 'starting thread' , t.getName())
 
 
=  threading.Thread(target = main,args = [])
m.setDaemon( True #將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務
m.start()
m.join(timeout = 2 )
print ( "---main thread done----" )

  

Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an .Event

  

 

 

線程鎖(互斥鎖Mutex)

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  time
import  threading
 
def  addNum():
     global  num  #在每一個線程中都獲取這個全局變量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     num   - = 1  #對此公共變量進行-1操做
 
num  =  100   #設定一個共享變量
thread_list  =  []
for  in  range ( 100 ):
     =  threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
 
for  in  thread_list:  #等待全部線程執行完畢
     t.join()
 
 
print ( 'final num:' , num )

正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 

*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖

加鎖版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import  time
import  threading
 
def  addNum():
     global  num  #在每一個線程中都獲取這個全局變量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     lock.acquire()  #修改數據前加鎖
     num   - = 1  #對此公共變量進行-1操做
     lock.release()  #修改後釋放
 
num  =  100   #設定一個共享變量
thread_list  =  []
lock  =  threading.Lock()  #生成全局鎖
for  in  range ( 100 ):
     =  threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
 
for  in  thread_list:  #等待全部線程執行完畢
     t.join()
 
print ( 'final num:' , num )

 

GIL VS Lock 

機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體咱們經過下圖來看一下+配合我現場講給你們,就明白了。

那你又問了, 既然用戶程序已經本身有鎖了,那爲何C python還須要GIL呢?加入GIL主要的緣由是爲了下降程序的開發的複雜度,好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這能夠說是Python早期版本的遺留問題。

 

 

  

RLock(遞歸鎖)

說白了就是在一個大鎖中還要再包含子鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import  threading,time
 
def  run1():
     print ( "grab the first part data" )
     lock.acquire()
     global  num
     num  + = 1
     lock.release()
     return  num
def  run2():
     print ( "grab the second part data" )
     lock.acquire()
     global   num2
     num2 + = 1
     lock.release()
     return  num2
def  run3():
     lock.acquire()
     res  =  run1()
     print ( '--------between run1 and run2-----' )
     res2  =  run2()
     lock.release()
     print (res,res2)
 
 
if  __name__  = =  '__main__' :
 
     num,num2  =  0 , 0
     lock  =  threading.RLock()
     for  in  range ( 10 ):
         =  threading.Thread(target = run3)
         t.start()
 
while  threading.active_count() ! =  1 :
     print (threading.active_count())
else :
     print ( '----all threads done---' )
     print (num,num2)

  

Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  threading,time
 
def  run(n):
     semaphore.acquire()
     time.sleep( 1 )
     print ( "run the thread: %s\n"  % n)
     semaphore.release()
 
if  __name__  = =  '__main__' :
 
     num =  0
     semaphore   =  threading.BoundedSemaphore( 5 #最多容許5個線程同時運行
     for  in  range ( 20 ):
         =  threading.Thread(target = run,args = (i,))
         t.start()
 
while  threading.active_count() ! =  1 :
     pass  #print threading.active_count()
else :
     print ( '----all threads done---' )
     print (num)

 

Timer  

This class represents an action that should be run only after a certain amount of time has passed 

Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling thecancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

1
2
3
4
5
def  hello():
     print ( "hello, world" )
 
=  Timer( 30.0 , hello)
t.start()   # after 30 seconds, "hello, world" will be printed

  

 

Events

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

經過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import  threading,time
import  random
def  light():
     if  not  event.isSet():
         event. set ()  #wait就不阻塞 #綠燈狀態
     count  =  0
     while  True :
         if  count <  10 :
             print ( '\033[42;1m--green light on---\033[0m' )
         elif  count < 13 :
             print ( '\033[43;1m--yellow light on---\033[0m' )
         elif  count < 20 :
             if  event.isSet():
                 event.clear()
             print ( '\033[41;1m--red light on---\033[0m' )
         else :
             count  =  0
             event. set ()  #打開綠燈
         time.sleep( 1 )
         count  + = 1
def  car(n):
     while  1 :
         time.sleep(random.randrange( 10 ))
         if   event.isSet():  #綠燈
             print ( "car [%s] is running.."  %  n)
         else :
             print ( "car [%s] is waiting for the red light.."  % n)
if  __name__  = =  '__main__' :
     event  =  threading.Event()
     Light  =  threading.Thread(target = light)
     Light.start()
     for  in  range ( 3 ):
         =  threading.Thread(target = car,args = (i,))
         t.start()

這裏還有一個event使用的例子,員工進公司門要刷卡, 咱們這裏設置一個線程是「門」, 再設置幾個線程爲「員工」,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就能夠經過。

  View Code

 

  

  

queue隊列 

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue. qsize ()
Queue. empty () #return True if empty  
Queue. full () # return True if full 
Queue. put (itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue. put_nowait (item)

Equivalent to put(item, False).

Queue. get (block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue. get_nowait ()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue. task_done ()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue. join () block直到queue被消費完畢

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

 

下面來學習一個最基本的生產者消費者模型的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import  threading
import  queue
 
def  producer():
     for  in  range ( 10 ):
         q.put( "骨頭 %s"  %  i )
 
     print ( "開始等待全部的骨頭被取走..." )
     q.join()
     print ( "全部的骨頭被取完了..." )
 
 
def  consumer(n):
 
     while  q.qsize() > 0 :
 
         print ( "%s 取到"  % n  , q.get())
         q.task_done()  #告知這個任務執行完了
 
 
=  queue.Queue()
 
 
 
=  threading.Thread(target = producer,)
p.start()
 
c1  =  consumer( "李闖" )

  

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import  time,random
import  queue,threading
=  queue.Queue()
def  Producer(name):
   count  =  0
   while  count < 20 :
     time.sleep(random.randrange( 3 ))
     q.put(count)
     print ( 'Producer %s has produced %s baozi..'  % (name, count))
     count  + = 1
def  Consumer(name):
   count  =  0
   while  count < 20 :
     time.sleep(random.randrange( 4 ))
     if  not  q.empty():
         data  =  q.get()
         print (data)
         print ( '\033[32;1mConsumer %s has eat %s baozi...\033[0m'  % (name, data))
     else :
         print ( "-----no baozi anymore----" )
     count  + = 1
p1  =  threading.Thread(target = Producer, args = ( 'A' ,))
c1  =  threading.Thread(target = Consumer, args = ( 'B' ,))
p1.start()
c1.start()

  

 

多進程multiprocessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

1
2
3
4
5
6
7
8
9
10
from  multiprocessing  import  Process
import  time
def  f(name):
     time.sleep( 2 )
     print ( 'hello' , name)
 
if  __name__  = =  '__main__' :
     =  Process(target = f, args = ( 'bob' ,))
     p.start()
     p.join()

To show the individual process IDs involved, here is an expanded example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from  multiprocessing  import  Process
import  os
 
def  info(title):
     print (title)
     print ( 'module name:' , __name__)
     print ( 'parent process:' , os.getppid())
     print ( 'process id:' , os.getpid())
     print ( "\n\n" )
 
def  f(name):
     info( '\033[31;1mfunction f\033[0m' )
     print ( 'hello' , name)
 
if  __name__  = =  '__main__' :
     info( '\033[32;1mmain process line\033[0m' )
     =  Process(target = f, args = ( 'bob' ,))
     p.start()
     p.join()

 

進程間通信  

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:

Queues

使用方法跟threading裏的queue差很少

1
2
3
4
5
6
7
8
9
10
11
from  multiprocessing  import  Process, Queue
 
def  f(q):
     q.put([ 42 None 'hello' ])
 
if  __name__  = =  '__main__' :
     =  Queue()
     =  Process(target = f, args = (q,))
     p.start()
     print (q.get())     # prints "[42, None, 'hello']"
     p.join()

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

1
2
3
4
5
6
7
8
9
10
11
12
from  multiprocessing  import  Process, Pipe
 
def  f(conn):
     conn.send([ 42 None 'hello' ])
     conn.close()
 
if  __name__  = =  '__main__' :
     parent_conn, child_conn  =  Pipe()
     =  Process(target = f, args = (child_conn,))
     p.start()
     print (parent_conn.recv())    # prints "[42, None, 'hello']"
     p.join()

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

 

Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from  multiprocessing  import  Process, Manager
 
def  f(d, l):
     d[ 1 =  '1'
     d[ '2' =  2
     d[ 0.25 =  None
     l.append( 1 )
     print (l)
 
if  __name__  = =  '__main__' :
     with Manager() as manager:
         =  manager. dict ()
 
         =  manager. list ( range ( 5 ))
         p_list  =  []
         for  in  range ( 10 ):
             =  Process(target = f, args = (d, l))
             p.start()
             p_list.append(p)
         for  res  in  p_list:
             res.join()
 
         print (d)
         print (l)

  

 
 

進程同步

Without using the lock output from the different processes is liable to get all mixed up.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from  multiprocessing  import  Process, Lock
 
def  f(l, i):
     l.acquire()
     try :
         print ( 'hello world' , i)
     finally :
         l.release()
 
if  __name__  = =  '__main__' :
     lock  =  Lock()
 
     for  num  in  range ( 10 ):
         Process(target = f, args = (lock, num)).start()

  

 

進程池  

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:

  • apply
  • apply_async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from   multiprocessing  import  Process,Pool
import  time
 
def  Foo(i):
     time.sleep( 2 )
     return  i + 100
 
def  Bar(arg):
     print ( '-->exec done:' ,arg)
 
pool  =  Pool( 5 )
 
for  in  range ( 10 ):
     pool.apply_async(func = Foo, args = (i,),callback = Bar)
     #pool.apply(func=Foo, args=(i,))
 
print ( 'end' )
pool.close()
pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。

  

做業需求:

題目:簡單主機批量管理工具

需求:

  1. 主機分組
  2. 主機信息配置文件用configparser解析
  3. 可批量執行命令、發送文件,結果實時返回,執行格式以下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
  4. 主機用戶名密碼、端口能夠不一樣
  5. 執行遠程命令使用paramiko模塊
  6. 批量命令需使用multiprocessing併發
 
 
 
番外篇之隊列詳解:
 

Python中,隊列是線程間最經常使用的交換數據的形式。Queue模塊是提供隊列操做的模塊,雖然簡單易用,可是不當心的話,仍是會出現一些意外。

建立一個「隊列」對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

Python Queue模塊有三種隊列及構造函數:
一、Python Queue模塊的FIFO隊列先進先出。 class Queue.Queue(maxsize)
二、LIFO相似於堆,即先進後出。 class Queue.LifoQueue(maxsize)
三、還有一種是優先級隊列級別越低越先出來。 class Queue.PriorityQueue(maxsize)

此包中的經常使用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 若是隊列爲空,返回True,反之False
q.full() 若是隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 至關q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 至關q.put(item, False)
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做

範例:
實現一個線程不斷生成一個隨機數到一個隊列中(考慮使用Queue這個模塊)
實現一個線程從上面的隊列裏面不斷的取出奇數
實現另一個線程從上面的隊列裏面不斷取出偶數

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
#coding:utf8
import random,threading,time
from Queue import Queue
#Producer thread
class Producer(threading.Thread):
   def __init__( self , t_name, queue):
     threading.Thread.__init__( self ,name = t_name)
     self .data = queue
   def run( self ):
     for i in range ( 10 ):  #隨機產生10個數字 ,能夠修改成任意大小
       randomnum = random.randint( 1 , 99 )
       print "%s: %s is producing %d to the queue!" % (time.ctime(), self .getName(), randomnum)
       self .data.put(randomnum) #將數據依次存入隊列
       time.sleep( 1 )
     print "%s: %s finished!" % (time.ctime(), self .getName())
  
#Consumer thread
class Consumer_even(threading.Thread):
   def __init__( self ,t_name,queue):
     threading.Thread.__init__( self ,name = t_name)
     self .data = queue
   def run( self ):
     while 1 :
       try :
         val_even = self .data.get( 1 , 5 ) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒
         if val_even % 2 = = 0 :
           print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self .getName(),val_even)
           time.sleep( 2 )
         else :
           self .data.put(val_even)
           time.sleep( 2 )
       except :   #等待輸入,超過5秒 就報異常
         print "%s: %s finished!" % (time.ctime(), self .getName())
         break
class Consumer_odd(threading.Thread):
   def __init__( self ,t_name,queue):
     threading.Thread.__init__( self , name = t_name)
     self .data = queue
   def run( self ):
     while 1 :
       try :
         val_odd = self .data.get( 1 , 5 )
         if val_odd % 2 ! = 0 :
           print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self .getName(), val_odd)
           time.sleep( 2 )
         else :
           self .data.put(val_odd)
           time.sleep( 2 )
       except :
         print "%s: %s finished!" % (time.ctime(), self .getName())
         break
#Main thread
def main():
   queue = Queue()
   producer = Producer( 'Pro.' , queue)
   consumer_even = Consumer_even( 'Con_even.' , queue)
   consumer_odd = Consumer_odd( 'Con_odd.' ,queue)
   producer.start()
   consumer_even.start()
   consumer_odd.start()
   producer.join()
   consumer_even.join()
   consumer_odd.join()
   print 'All threads terminate!'
  
if __name__ = = '__main__' :
   main()
 
 
番外篇之進程:
 

python中多進程(multiprocessing)

 
1、 multiprocessing中使用子進程概念

from multiprocessing import Process

能夠經過Process來構造一個子進程

p = Process(target=fun,args=(args))

再經過p.start()來啓動子進程

再經過p.join()方法來使得子進程運行結束後再執行父進程

相關文章
相關標籤/搜索