multiprocessing模塊

Process

進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。linux

一個進程至少包含一個線程。算法

Process 類用來描述一個進程對象。建立子進程的時候,只須要傳入一個執行函數和函數的參數便可完成 Process 示例的建立。編程

Process類經過建立子進程來是實現更多的佔有cpu,提升效率json

在pycharm裏查看下Process的源碼:bootstrap

 1 class Process(object):
 2     def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
 3         self.name = ''
 4         self.daemon = False
 5         self.authkey = None
 6         self.exitcode = None
 7         self.ident = 0
 8         self.pid = 0
 9         self.sentinel = None
10 
11     def run(self):
12         pass
13 
14     def start(self):
15         pass
16 
17     def terminate(self):
18         pass
19 
20     def join(self, timeout=None):
21         pass
22 
23     def is_alive(self):
24         return False
Process類

 

除了__init__()以外,包含的方法有run(),start(),terminate(),join(),is_alive()五個方法.windows

run()方法:

在研究run()方法以前咱們先看下面一段代碼:安全

 1 from multiprocessing import Process
 2 import os
 3 
 4 
 5 def func():
 6 
 7     print(os.getpid(), os.getppid())
 8 
 9 
10 if __name__ == '__main__':
11 
12     print(os.getpid(), os.getppid())
13 
14     p = Process(target=func)
15 
16     p.start()
開啓子進程

運行這段代碼的結果是:app

6864 6100
5612 6864dom

py文件中的子進程在調用Process類對象後由子進程變成了父進程,也就是說新開了一個子進程去執行func()函數.ide

若是是使用面向對象的思想解決進程,自定義的子類要繼承Process類,而且在子類中重構run()方法,切必須重構run()方法.

重構的run()方法就至關於def func()函數同樣,切套經過start()方法調用run()方法來執行run()方法,看下面一段代碼:

 1 from multiprocessing import Process
 2 import os
 3 
 4 
 5 class MyProcess(Process):
 6 
 7     def run(self):
 8 
 9         print(os.getpid(), os.getppid())
10 
11 
12 if __name__ == '__main__':
13 
14     print(os.getpid(), os.getppid())
15 
16     p = MyProcess()
17 
18     p.start()
面向對象建立

 

這段代碼執行的結果:

6184 6100
5320 6184

說明咱們重構的run()方法是正確的,和麪向過程的實現同樣

不過此處要注意的是:在windows操做系統下,必定要加上if __name__ == '__main__':這段代碼,由於windows的操做機制是在是在開闢新的進程是會從新加載一編前邊的額引入模塊和變量.至關於把程序從新讀了一遍,這樣會陷入開闢子進程的死循環中,且會拋出RuntimeError的異常:

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

而linux操做系統是直接把內存中的變量直接拷貝,不在去執行一遍

start()方法:

start()方法不是運行一個程序,而是調用操做系統的命令,告訴操做系統要建立子進程

在Process類內部start()方法是調用run()方法,來實現開啓一個子進程,且必須經過start()方法來實現開啓子進程

 join()方法:

join()方法會阻塞主程序一直到目標進程/線程執行完畢才執行編程非阻塞:

 1 from os import getpid
 2 from multiprocessing import Process
 3 
 4 
 5 def func(i):
 6     print('Process %s will be executed ' % i)
 7     print('Process ID is %s' % getpid())
 8     print('Process %s has been executed ' % i)
 9 
10 
11 if __name__ == '__main__':
12     process_list = []
13     for i in range(10):
14         p = Process(target=func, args=(i,))
15         p.start()
16         process_list.append(p)  # 操做系統執行子進程並非按照這裏遍歷i的順序來執行的,而是按照他內部的算法實現的
17 
18     for p in process_list:
19         p.join()    # 遍歷process_list列表,檢查p進程是否執行結束
20 
21     print('The main process has been executed')
join方法

打印結果是:

 1 Process 0 will be executed 
 2 Process ID is 11728
 3 Process 0 has been executed 
 4 Process 4 will be executed 
 5 Process ID is 11632
 6 Process 4 has been executed 
 7 Process 1 will be executed 
 8 Process ID is 7568
 9 Process 1 has been executed 
10 Process 3 will be executed 
11 Process ID is 10548
12 Process 3 has been executed 
13 Process 2 will be executed 
14 Process ID is 10600
15 Process 2 has been executed 
16 Process 9 will be executed 
17 Process ID is 11940
18 Process 9 has been executed 
19 Process 7 will be executed 
20 Process ID is 11980
21 Process 7 has been executed 
22 Process 6 will be executed 
23 Process ID is 11772
24 Process 6 has been executed 
25 Process 5 will be executed 
26 Process ID is 11644
27 Process 5 has been executed 
28 Process 8 will be executed 
29 Process ID is 11600
30 Process 8 has been executed 
31 The main process has been executed
打印結果

terminate()方法:

terminate()方法是用來終止一個子進程的.即回收該子進程的資源.

is_alive()方法:

is_alive()方法是用來判斷一個進程是否還在運行:

若是運行:return True

若是沒有運行:return False

下面這段代碼總體來掩飾一遍:

 1 from os import getpid
 2 from time import sleep
 3 from multiprocessing import Process
 4 
 5 
 6 class MyProcess(Process):
 7 
 8     def __init__(self, args):
 9         super(MyProcess, self).__init__()
10         self.args = args
11 
12     def run(self):
13         print('Process ID is %s' % getpid())
14         print('Process %s has been executed ' % self.args)
15 
16     def start(self):
17         print('Process %s will be executed ' % self.args)
18         super().start()
19 
20     def terminate(self):
21         super().terminate()
22 
23     def join(self, timeout=None):
24         super().join()
25 
26     def is_alive(self):
27         try:
28             result = super().is_alive()
29             return result
30         except:
31             print("Something was wrong!")
32 
33 
34 if __name__ == '__main__':
35     process_list = []
36     for i in range(10):
37         p = MyProcess(i)
38         p.start()
39         print(p.is_alive())
40         process_list.append(p)
41     for p in process_list:
42         print(p.is_alive())
43         print('There will terminated process %s' % p)
44         p.terminate()
45         sleep(0.2)      # 這裏不延時0.2而直接打印的話,下邊的is_alive會顯示True
46         print(p.is_alive())
47         p.join()
48         print(p.is_alive())
重構父類方法

Process的一些經常使用方法:

daemon()方法:

daemon()方法是守護進程守護進程會在主進程的代碼執行完畢以後直接結束,不管守護進程是否執行完畢,使用daemon()方法注意:

  • 守護進程的屬性,默認是False,若是設置成True,就表示設置這個子進程爲一個守護進程
  • 設置守護進程的操做應該在開啓子進程以前

守護進程的主要應用:

  • 報活 主進程還活着
 1 from time import sleep
 2 from multiprocessing import Process
 3 
 4 
 5 def func1():
 6     print('begin')
 7     sleep(3)
 8     print('wahaha')
 9 
10 
11 def func2():
12     print('in func2')
13     sleep(4)
14     print('The func2 has been executed')
15 
16 
17 if __name__ == '__main__':
18     Process(target=func1).start()
19     p = Process(target=func2)
20     p.daemon = True
21     p.start()
22     sleep(1)
23     print('In main process')
daemon

打印會發現func2中的最後一句沒有被打印出來

Lock類:

 現階段瞭解的lock主要適用於共享數據時控制數據的安全,這個鎖有且只有一把鑰匙,若是前邊的進程拿到了後邊的就要等着那倒鑰匙才能進去

lock的類中有以下兩個方法:

1 class Lock(object):
2     def acquire(self, blocking=True, timeout=-1):
3         pass
4 
5     def release(self):
6         pass
Lock類

acquire()方法:

至關於拿鑰匙開鎖,

release()方法:

至關於還鑰匙

 1 import json
 2 from multiprocessing import Process, Lock
 3 
 4 
 5 class MyProcess(Process):
 6 
 7     def __init__(self, lock):
 8         self.lock = lock
 9         super(MyProcess, self).__init__()
10 
11     def run(self):
12         ticket_dict = self.read()
13         print("The rest of the ticket>>>%s" % ticket_dict['count'])
14         if ticket_dict['count'] >= 1:
15             print("Out of ticket")
16             self.lock.acquire()
17             self.buy()
18             self.lock.release()
19         else:
20             print('Failure to buy tickets')
21 
22     def read(self):
23         with open('db', 'r') as filehandler:
24             content = json.load(filehandler)
25         return content
26 
27     def buy(self):
28         conrent = self.read()
29         if conrent['count'] >= 1:
30             print('Buy ticket successful')
31             conrent['count'] -= 1
32             with open('db', 'w') as fh:
33                 json.dump(conrent, fh)
34         else:
35             print('More votes short')
36 
37 
38 if __name__ == '__main__':
39     lock = Lock()
40     for i in range(11):
41         p = MyProcess(lock)
42         p.start()
Lock

Semaphore()類:

semaphore()類的實質就是鎖+計數器,一把鑰匙或者多把鑰匙對應一個鎖,用法和Lock()基本相同,惟一的區別是若是是多把鑰匙的話要設置鑰匙數,默認是1。下面是源碼:

1 class Semaphore(object):
2     def __init__(self, value=1):
3         pass
4 
5     def acquire(self, blocking=True, timeout=None):
6         pass
7 
8     def release(self):
9         pass
Semaphore源碼

semaphore()類包含了三個方法:

__init__(self, value=1)方法:

value=1:默認是1,這裏的傳的值是設置鑰匙的個數

acquire()方法:

同Lock()類的acquire()

release()方法:

同Lock()類的release()

semaphore()類對應的時間是商場裏的唱吧:假設一個唱吧裏只能同時有四我的,外邊公有20人要準備玩,只能排隊等:

 1 from time import sleep
 2 from random import randint
 3 from multiprocessing import Semaphore, Process
 4 
 5 
 6 def ktv(s, i):
 7     s.acquire()
 8     print('%s 進入了ktv' % i)
 9     sleep(randint(1, 5))
10     print('%s 離開了ktv' % i)
11     s.release()
12 
13 
14 if __name__ == '__main__':
15     s = Semaphore(4)
16     for i in range(20):
17         p = Process(target=ktv, args=(s, i))
18         p.start()
唱吧模型

打印會發現離開一個進一個

Event()類:

 event()類主要是用於進程間的通訊,經過其餘進程間的傳遞的信號去控制其餘進程,下面是event()類的源碼:

 1 class Event(object):
 2     def is_set(self):
 3         return False
 4 
 5     def set(self):
 6         pass
 7 
 8     def clear(self):
 9         pass
10 
11     def wait(self, timeout=None):
12         pass
Event源碼

Enent提供了四個方法:

is_set(self)方法:

is_set()方法是用來查看標誌的

在事件的建立之初 默認是False

set(self)方法:

將標誌設置爲True

clear(self)方法:

將標誌設置爲False

wait(self, timeout=None)方法:

等待:阻塞、若是這個標誌是False 那麼就阻塞。非阻塞,若是這個標誌是True 那麼就非阻塞。

timeout:若是是None,將會永遠阻塞。若是設置了時間:在設置的時間內,若是標誌變成True,則直接pass,若是沒有,則繼續阻塞

Event()典型事件就是交通燈控制車輛是否經過,經過函數和麪向對象模擬練習下這個事件:

 1 from time import sleep
 2 from random import randrange,choice
 3 from multiprocessing import Process, Event
 4 
 5 
 6 class MyObj(object):
 7 
 8     def traffic_light(self, e):
 9         """
10         默認紅燈開始:is_set = False,
11         sleep(2)是亮燈時間,而後切換另外一個燈,而後設置is_set的狀態,
12         紅燈亮:普通車等待,救護車等0.5秒,若是0.5秒後沒有變燈,則闖燈
13         綠燈亮:全部車輛都經過
14         :param e: Event類的實例化對象
15         :return:
16         """
17 
18         print('\033[1;31m The red light is on \033[0m')
19         while 1:
20             sleep(2)
21             if e.is_set():
22                 print('\033[1;31m The red light is on \033[0m')
23                 e.clear()
24             else:
25                 print('\033[1;32m The green light is on \033[0m')
26                 e.set()
27 
28     def car(self, e, id):
29         if not e.is_set():
30             print('%s car wait' % id)
31             e.wait()
32         print('%s car pass' % id)
33 
34     def ambulance(self, e, id):
35         if e.is_set():
36             e.wait(timeout=0.5)
37         print('%s ambulance pass' % id)
38 
39 
40 if __name__ == '__main__':
41     myobj = MyObj()
42     e = Event()
43     p = Process(target=myobj.traffic_light, args=(e,))
44     p.start()
45     car_list = [myobj.ambulance, myobj.car]
46     for i in range(20):
47         p = Process(target=choice(car_list), args=(e, i))
48         p.start()
49         sleep(randrange(0, 3, 2))
面向對象

函數模擬:

 1 from time import sleep
 2 from random import randrange, choice
 3 from multiprocessing import Event, Process
 4 
 5 
 6 def traffic_light(e):
 7     print('\033[1;31m The red light is on\033[0m')
 8     while 1:
 9         sleep(2)
10         if e.is_set():
11             print('\033[1;31m The red light is on\033[0m')
12             e.clear()
13         else:
14             print('\033[1;32m The green light is on\033[0m')
15             e.set()
16 
17 
18 def car(id, e):
19     if not e.is_set():
20         print('%s car wait' % id)
21         e.wait()
22     print('%s car pass' % id)
23 
24 
25 def ambulance(id, e):
26     if not e.is_set():
27         e.wait(timeout=0.5)
28     print('%s ambulance pass' % id)
29 
30 
31 if __name__ == '__main__':
32     e = Event()
33     p = Process(target=traffic_light, args=(e,))
34     p.start()
35     car_list = [car, ambulance]
36     for i in range(20):
37         p = Process(target=choice(car_list), args=(i, e))
38         p.start()
39         sleep(randrange(0, 3, 2))
函數
相關文章
相關標籤/搜索