Python基礎:進程、線程、協程(2)

進程與線程

什麼是進程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

  •  程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程
  • 程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。

        在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現併發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是爲了在CPU上實現多道編程而提出的。

有了進程爲什麼還要線程?

        進程有很多優點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。很多人就不理解了,既然進程這麼優秀,爲什麼還要線程呢?其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:



        例如,我們在使用qq聊天, qq做爲一個獨立進程如果同一時間只能幹一件事,那他如何實現在同一時刻 既能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操作系統不是有分時麼?但我的親,分時是指在不同進程間的分時呀, 即操作系統處理一會你的qq任務,又切換到word文檔任務上了,每個cpu時間片分給你的qq程序時,你的qq還是隻能同時幹一件事呀。

        再直白一點, 一個操作系統就像是一個工廠,工廠裏面有很多個生產車間,不同的車間生產不同的產品,每個車間就相當於一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,爲了能讓所有車間都能同時生產,你的工廠的電工只能給不同的車間分時供電,但是輪到你的qq車間時,發現只有一個幹活的工人,結果生產效率極低,爲了解決這個問題,應該怎麼辦呢?。。。。沒錯,你肯定想到了,就是多加幾個工人,讓幾個人工人並行工作,這每個工人,就是線程!

 

什麼是線程(thread)?

    

        線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線程並行執行不同的任務

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

 

進程與線程的區別?

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

 

Python GIL(Global Interpreter Lock)【全局解釋器鎖】  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,無論你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行,擦。。。,那這還叫什麼多線程呀?莫如此早的下結結論,聽我現場講。


        首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因爲CPython是大部分環境下默認的Python執行環境。所以在很多人的概念裏CPython就是Python,也就想當然的把GIL歸結爲Python語言的缺陷。所以這裏要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf 

 

Python threading模塊

線程有2種調用方式,如下:

直接調用

1
import threading
2
import time
3
4
5
def sayhi(num):  # 定義每個線程要運行的函數
6
7
    print("running on number:%s" % num)
8
9
    time.sleep(3)
10
11
12
if __name__ == '__main__':
13
    t1 = threading.Thread(target=sayhi, args=(1,))  # 生成一個線程實例
14
    t2 = threading.Thread(target=sayhi, args=(2,))  # 生成另一個線程實例
15
16
    t1.start()  # 啓動線程
17
    t2.start()  # 啓動另一個線程
18
19
    print(t1.getName())  # 獲取線程名
20
    print(t2.getName())
21
22
# D:\Python\python\python-3.6.1\Python36-64\python.exe 
23
# running on number:1
24
# running on number:2
25
# Thread-1
26
# Thread-2

繼承式調用


Join & Daemon

    Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

    Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.


1
1 Python 默認參數創建線程後,不管主線程是否執行完畢,都會等待子線程執行完畢才一起退出,有無join結果一樣
2
3
4
2 如果創建線程,並且設置了daemon爲true,即thread.setDaemon(True), 則主線程執行完畢後自動退出,
5
不會等待子線程的執行結果。而且隨着主線程退出,子線程也消亡。
6
7
8
3 join方法的作用是阻塞,等待子線程結束,join方法有一個參數是timeout,
9
即如果主線程等待timeout,子線程還沒有結束,則主線程強制結束子線程。
10
11
12
4 如果線程daemon屬性爲False, 則join裏的timeout參數無效。主線程會一直等待子線程結束。
13
14
15
5 如果線程daemon屬性爲True, 則join裏的timeout參數是有效的, 主線程會等待timeout時間後,結束子線程。
16
此處有一個坑,即如果同時有N個子線程join(timeout),那麼實際上主線程會等待的超時時間最長爲 N  timeout,
17
 因爲每個子線程的超時開始時刻是上一個子線程超時結束的時刻。


          Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an  Event .

Python 多線程編程時,經常會用到join()和setDaemon()方法,今天特地研究了一下兩者的區別。

     1、 join ()方法 :主線程A中,創建了子線程B,並且在主 線程A中調用了B.join(),那麼,主線程A會在調用的地方等待,直到 子線程B 完成操作後,纔可以接着往下執行,那麼在調用這個線程時可以使用被調用線程的join方法。

原型:join([timeout])
          裏面的參數時可選的,代表線程運行的最大時間,即如果超過這個時間,不管這個此線程有沒有執行完畢都會被回收,然後主線程或函數都會接着執行的。

例子:




          機器上運行時,4和999之間,有明顯的停頓。 解釋 :線程t1 start後,主線程並沒有等線程t1運行結束後再執行,而是先把5次循環打印執行完畢(打印到4),然後sleep(10)後,線程t1把傳進去的999纔打印出來。

          現在,我們把join()方法加進去(其他代碼不變),看看有什麼不一樣,

例子:


1
import threading
2
import time
3
4
5
class MyThread(threading.Thread):
6
    def __init__(self, id):
7
        threading.Thread.__init__(self)
8
        self.id = id
9
10
    def run(self):
11
        x = 0
12
        time.sleep(10)
13
        print(self.id)
14
15
16
if __name__ == "__main__":
17
    t1 = MyThread(999)
18
    t1.start()
19
    t1.join()
20
    for i in range(5):
21
        print(i)
22
23
24
# D:\Python\python\python-3.6.1\Python36-64\python.exe
25
# 999
26
# 0
27
# 1
28
# 2
29
# 3
30
# 4


          機器上運行時,999之前,有明顯的停頓。 解釋 :線程t1 start後,主線程停在了join()方法處,等sleep(10)後,線程t1操作結束被join,接着,主線程繼續循環打印。

2、setDaemon()方法。
          主線程A中,創建了子線程B,並且在主線程A中調用了B.setDaemon(),這個的意思是,把主線程A設置爲守護線程,這時候,要是主線程A執行結束了,就不管子線程B是否完成,一併和主線程A退出.這就是setDaemon方法的含義,這基本和join是相反的。此外,還有個 要特別注意的:必須在start() 方法調用之前設置,如果不設置爲守護線程,程序會被無限掛起。


          例子:就是設置子線程隨主線程的結束而結束:


          可以看出,子線程t1中的內容並未打出。 解釋 :t1.setDaemon(True)的操作,將父線程設置爲了守護線程。根據setDaemon()方法的含義,父線程打印內容後便結束了,不管子線程是否執行完畢了。

            程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程就分兵兩路,分別運行,那麼當主線程完成想退出時,會檢驗子線程是否完成。如果子線程未完成,則主線程會等待子線程完成後再退出。但是有時候我們需要的是,只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以用setDaemon方法了。

          所以,join和setDaemon的區別如上述的例子,它們基本是相反的。


線程鎖(互斥鎖Mutex)

        一個進程下可以啓動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什麼狀況?

1
# -*- coding:UTF-8 -*-
2
import time
3
import threading
4
5
6
def addNum():
7
    global num  # 在每個線程中都獲取這個全局變量
8
    print('--get num:', num)
9
    time.sleep(1)
10
    num -= 1  # 對此公共變量進行-1操作
11
12
13
num = 100  # 設定一個共享變量
14
thread_list = []
15
for i in range(100):
16
    t = threading.Thread(target=addNum)
17
    t.start()
18
    thread_list.append(t)
19
20
for t in thread_list:  # 等待所有線程執行完畢
21
    t.join()
22
23
print('final num:', num)
24
25
26
# D:\Python\python\python-3.6.1\Python36-64\python.exe
27
# --get num: 100
28
# --get num: 100
29
# --get num: 100
30
# --get num: 100
31
# ...
32
# --get num: 100
33
# --get num: 100
34
# final num: 0
35
36
# D:\Python\python\python-2.7.13\Python27\python2.exe
37
# ('--get num:', 100)
38
# ('--get num:', 100)
39
# ('--get num:', 100)
40
# ('--get num:', 100)
41
# ('--get num:', (100'--get num:'), 
42
# 100)
43
# 
44
# ('--get num:', 100)
45
# ('--get num:', 100)
46
# ('--get num:', 100)
47
# ('final num:', 1)

        正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最後打印出來的num結果不總是0,爲什麼每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由於2個線程是併發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每個線程在要修改公共數據時,爲了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 

        *注:不要在3.x上運行,不知爲什麼,3.x上的結果總是正確的,可能是自動加了鎖

  • 加鎖版本


GIL VS Lock 

  • 機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲什麼這裏還需要lock? 
  • 注意啦,這裏的lock是用戶級的lock,跟那個GIL沒關係 ,具體我們通過下圖來看一下,就明白了。



    那你又問了, 既然用戶程序已經自己有鎖了,那爲什麼C python還需要GIL呢?

      加入GIL主要的原因是爲了降低程序的開發的複雜度,比如現在的你寫python不需要關心內存回收的問題,因爲Python解釋器幫你自動定期進行內存回收,你可以理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 裏的線程和 py解釋器自己的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。

RLock(遞歸鎖)

  • 說白了就是在一個大鎖中還要再包含子鎖

1
import threading, time
2
3
4
def run1():
5
    print("grab the first part data")
6
    lock.acquire()
7
    global num
8
    num += 1
9
    lock.release()
10
    return num
11
12
13
def run2():
14
    print("grab the second part data")
15
    lock.acquire()
16
    global num2
17
    num2 += 1
18
    lock.release()
19
    return num2
20
21
22
def run3():
23
    lock.acquire()
24
    res = run1()
25
    print('--------between run1 and run2-----')
26
    res2 = run2()
27
    lock.release()
28
    print(res, res2)
29
30
31
if __name__ == '__main__':
32
33
    num, num2 = 0, 0
34
    lock = threading.RLock()
35
    for i in range(10):
36
        t = threading.Thread(target=run3)
37
        t.start()
38
39
while threading.active_count() != 1:
40
    print(threading.active_count())
41
else:
42
    print('----all threads done---')
43
    print(num, num2)
44
    
45
# D:\Python\python\python-2.7.13\Python27\python2.exe 
46
# grab the first part data
47
# --------between run1 and run2-----
48
# grab the second part data
49
# (grab the first part data1
50
# , --------between run1 and run2-----1
51
# )grab the second part data
52
# 
53
# (2, 2)
54
# grab the first part data
55
# --------between run1 and run2-----
56
# grab the second part data
57
# (3, 3)
58
# grab the first part data
59
# --------between run1 and run2-----
60
# grab the second part data
61
# (4, 4)
62
# grab the first part data
63
# --------between run1 and run2-----
64
# grab the second part data
65
# (5, 5)
66
# grab the first part data
67
# --------between run1 and run2-----
68
# grab the second part data
69
# (6, 6grab the first part data)
70
# 
71
# --------between run1 and run2-----
72
# grab the second part data
73
# (grab the first part data7
74
# , --------between run1 and run2-----7
75
# )grab the second part data
76
# 
77
# (8, 8)grab the first part data
78
# 
79
# --------between run1 and run2-----
80
# grab the second part data
81
# (9, 9)
82
# grab the first part data
83
# --------between run1 and run2-----
84
# grab the second part data
85
# (10, 10)
86
# 1
87
# ----all threads done---
88
# (10, 10)
89
# 

Semaphore(信號量)

  • 互斥鎖 同時只允許一個線程更改數據而Semaphore是同時允許一定數量的線程更改數據 ,
  • 比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裏面有人出來了才能再進去。


Timer  

  • This class represents an action that should be run only after a certain amount of time has passed 
  • Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling the cancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

1
from threading import Timer
2
3
4
def hello():
5
    print("hello, world")
6
7
8
t = Timer(3.0, hello)
9
t.start()  
10
11
# after 3 seconds, "hello, world" will be printed


Events

  • An event is a simple synchronization object;
  • the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves.


  • # a client thread can wait for the flag to be set
1
event.wait()

  • # a server thread can set or reset it

  • If the flag is set, the wait method doesn’t do anything.
  • If the flag is cleared, wait will block until it becomes set again.
  • Any number of threads may wait for the same event.

  • python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法wait、clear、set
    • 事件處理的機制:全局定義了一個「Flag」,如果「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,如果「Flag」值爲True,那麼event.wait 方法時便不再阻塞。

      • clear:將「Flag」設置爲False
      • set:將「Flag」設置爲True

  • 用 threading.Event 實現線程間通信
  • 使用threading.Event可以使一個線程等待其他線程的通知,我們把這個Event傳遞到線程對象中,
  • Event默認內置了一個標誌,初始值爲False。
  • 一旦該線程通過wait()方法進入等待狀態,直到另一個線程調用該Event的set()方法將內置標誌設置爲True時,該Event會通知所有等待狀態的線程恢復運行。

通過Event來實現兩個或多個線程間的交互:

  • 下面是一個紅綠燈的例子,即啓動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。


1
import random
2
import threading
3
import time
4
5
6
def light():
7
    if not event.isSet():
8
        event.set()  # wait就不阻塞 #綠燈狀態
9
    count = 0
10
    while True:
11
        print("count:",count)
12
        if count < 10:
13
            print('\033[42;1m--green light on---\033[0m')
14
        elif count < 13:
15
            print('\033[43;1m--yellow light on---\033[0m')
16
        elif count < 20:
17
            if event.isSet():
18
                event.clear()
19
            print('\033[41;1m--red light on---\033[0m')
20
        else:
21
            count = 0
22
            event.set()  # 打開綠燈
23
        time.sleep(1)
24
        count += 1
25
26
27
def car(n):
28
    while 1:
29
        time.sleep(random.randrange(10))
30
        if event.isSet():  # 綠燈
31
            print("car [%s] is running.." % n)
32
        else:
33
            print("car [%s] is waiting for the red light.." % n)
34
35
36
if __name__ == '__main__':
37
    event = threading.Event()
38
    Light = threading.Thread(target=light)
39
    Light.start()
40
    for i in range(3):
41
        t = threading.Thread(target=car, args=(i,))
42
        t.start()

  • 這裏還有一個event使用的例子,員工進公司門要刷卡, 我們這裏設置一個線程是「門」, 
  • 再設置幾個線程爲「員工」,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就可以通過。

 

  

queue隊列 

        

Queue

Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞

基本FIFO隊列

class Queue.Queue(maxsize=0)

        FIFO即First in First Out,先進先出。Queue提供了一個基本的FIFO容器,使用方法很簡單,maxsize是個整數,指明瞭隊列中能存放的數據個數的上限。一旦達到上限,插入會導致阻塞,直到隊列中的數據被消費掉。如果maxsize小於或者等於0,隊列大小沒有限制

舉個栗子:

1
import queue
2
3
q = queue.Queue()
4
5
for i in range(5):
6
    q.put(i)
7
8
while not q.empty():
9
    print(q.get())
10
11
# D:\Python\python\python-3.6.1\Python36-64\python.exe
12
# 0
13
# 1
14
# 2
15
# 3
16
# 4
17

LIFO隊列

class Queue.LifoQueue(maxsize=0)

        LIFO即Last in First Out,後進先出。與棧的類似,使用也很簡單,maxsize用法同上

再舉個栗子:

可以看到僅僅是將Queue.Quenu類替換爲Queue.LifiQueue類

優先級隊列

class Queue.PriorityQueue(maxsize=0)

        構造一個優先隊列。maxsize用法同上。

1
import Queue
2
import threading
3
4
class Job(object):
5
    def __init__(self, priority, description):
6
        self.priority = priority
7
        self.description = description
8
        print 'Job:',description
9
        return
10
    def __cmp__(self, other):
11
        return cmp(self.priority, other.priority)
12
13
q = Queue.PriorityQueue()
14
15
q.put(Job(3, 'level 3 job'))
16
q.put(Job(10, 'level 10 job'))
17
q.put(Job(1, 'level 1 job'))
18
19
def process_job(q):
20
    while True:
21
        next_job = q.get()
22
        print 'for:', next_job.description
23
        q.task_done()
24
25
workers = [threading.Thread(target=process_job, args=(q,)),
26
        threading.Thread(target=process_job, args=(q,))
27
        ]
28
29
for w in workers:
30
    w.setDaemon(True)
31
    w.start()
32
33
q.join()
34
35
36
# D:\Python\python\python-2.7.13\Python27\python2.exe
37
# Job: level 3 job
38
# Job: level 10 job
39
# Job: level 1 job
40
# for: level 1 job
41
# for: level 3 job
42
# for: level 10 job

一些常用方法

  • task_done()


  • join()

1
阻塞調用線程,直到隊列中的所有任務被處理掉。
2
3
只要有數據被加入隊列,未完成的任務數就會增加。
4
當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減少。
5
當未完成的任務數降到0,join()解除阻塞。


  • put(item[, block[, timeout]])


  • get([block[, timeout]])

1
從隊列中移除並返回一個數據。block跟timeout參數同put方法
2
3
其非阻塞方法爲`get_nowait()`相當與get(False)


  • empty()



生產者消費者模型

        在併發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

爲什麼要使用生產者和消費者模式

        在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

        生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

 

        下面來學習一個最基本的生產者消費者模型的例子

1
import threading
2
import queue
3
4
5
def producer():
6
    for i in range(10):
7
        q.put("骨頭 %s" % i)
8
9
    print("開始等待所有的骨頭被取走...")
10
    q.join()
11
    print("所有的骨頭被取完了...")
12
13
14
def consumer(n):
15
    while q.qsize() > 0:
16
        print("%s 取到" % n, q.get())
17
        q.task_done()  # 告知這個任務執行完了
18
19
20
q = queue.Queue()
21
22
p = threading.Thread(target=producer, )
23
p.start()
24
25
c1 = consumer("李闖")
26
27
28
# D:\Python\python\python-3.6.1\Python36-64\python.exe 
29
# 開始等待所有的骨頭被取走...
30
# 李闖 取到 骨頭 0
31
# 李闖 取到 骨頭 1
32
# 李闖 取到 骨頭 2
33
# 李闖 取到 骨頭 3
34
# 李闖 取到 骨頭 4
35
# 李闖 取到 骨頭 5
36
# 李闖 取到 骨頭 6
37
# 李闖 取到 骨頭 7
38
# 李闖 取到 骨頭 8
39
# 李闖 取到 骨頭 9
40
# 所有的骨頭被取完了...

多進程multiprocessing

    multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping(迴避) the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage( 槓桿作用; 優勢,力量) multiple processors on a given machine. It runs on both Unix and Windows.

To show the individual process IDs involved, here is an expanded example:

1
from multiprocessing import Process
2
import os
3
4
5
def info(title):
6
    print(title)
7
    print('module name:', __name__)
8
    print('parent process:', os.getppid())
9
    print('process id:', os.getpid())
10
    print("\n\n")
11
12
13
def f(name):
14
    info('\033[31;1mfunction f\033[0m')
15
    print('hello', name)
16
17
18
if __name__ == '__main__':
19
    info('\033[32;1mmain process line\033[0m')
20
    p = Process(target=f, args=('bob',))
21
    p.start()
22
    p.join()
23
    
24
# D:\Python\python\python-3.6.1\Python36-64\python.exe 
25
# main process line
26
# module name: __main__
27
# parent process: 7652
28
# process id: 5576
29
# 
30
# 
31
# 
32
# function f
33
# module name: __mp_main__
34
# parent process: 5576
35
# process id: 7888
36
# 
37
# 
38
# 
39
# hello bob
40
# 
41

進程間通訊  

  • 不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:

Queues

  • 使用方法跟threading裏的queue差不多

Pipes

  • The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way)(有兩部分的). For example:

1
from multiprocessing import Process, Pipe
2
3
4
def f(conn):
5
    conn.send([42, None, 'hello'])
6
    conn.close()
7
8
9
if __name__ == '__main__':
10
    parent_conn, child_conn = Pipe()
11
    p = Process(target=f, args=(child_conn,))
12
    p.start()
13
    print(parent_conn.recv())
14
    p.join()
15
16
# D:\Python\python\python-3.6.1\Python36-64\python.exe 
17
# [42, None, 'hello']

  • The two connection objects returned by Pipe() represent the two ends of the pipe. 
  • Each connection object has send() and recv() methods (among others). 
  • Note that data in a pipe may become corrupted(引起(計算機文件等的)錯誤; 破壞) if two processes (or threads) try to read from or write to the same end of the pipe at the same time.
  •  Of course there is no risk of corruption from processes using different ends of the pipe at the same time.


Managers

進程同步

  • Without using the lock output from the different processes is liable(有…傾向的; 易…的) to get all mixed up.

1
from multiprocessing import Process, Lock
2
3
4
def f(l, i):
5
    l.acquire()
6
    try:
7
        print('hello world', i)
8
    finally:
9
        l.release()
10
11
12
if __name__ == '__main__':
13
    lock = Lock()
14
15
    for num in range(10):
16
        Process(target=f, args=(lock, num)).start()
17
18
# D:\Python\python\python-3.6.1\Python36-64\python.exe 
19
# hello world 9
20
# hello world 7
21
# hello world 8
22
# hello world 6
23
# hello world 3
24
# hello world 1
25
# hello world 2
26
# hello world 5
27
# hello world 4
28
# hello world 0


進程池  

  • 進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。
  • 進程池中有兩個方法:
    • apply
    • apply_async

  • 由於Python設計的限制[GIL](我說的是咱們常用的CPython)。最多隻能用滿1個CPU核心。
  • Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。藉助這個包,可以輕鬆完成從單進程到併發執行的轉換

1、新建單一進程

如果我們新建少量進程,可以如下:

1
import multiprocessing
2
import time
3
4
5
def func(msg):
6
    for i in range(3):
7
        print(msg)
8
        time.sleep(1)
9
10
11
if __name__ == "__main__":
12
    p = multiprocessing.Process(target=func, args=("hello",))
13
    p.start()
14
    p.join()
15
    print("Sub-process done.")
16
17
# D:\Python\python\python-3.6.1\Python36-64\python.exe 
18
# hello
19
# hello
20
# hello
21
# Sub-process done.

2、使用進程池

是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。

  • 注意要用apply_async,如果使用async,就變成阻塞版本了。
  • processes=4是最多併發進程數量。
相關文章
相關標籤/搜索