python多進程基礎

本文首發於知乎
python中的多進程編程方式和多線程很是類似,幾乎能夠說只是換了一些函數,有了以前講過的多線程基礎,不少地方我就只展現一些代碼,在涉及到差異的地方再着重說明。css

本文分爲以下幾個部分html

  • 事先說明
  • 最簡單的多進程
  • 類的形式
  • 進程池
  • 進程之間內存獨立
  • 隊列
  • pipe
  • value
  • 進程鎖

事先說明

有兩點在寫代碼時須要注意python

  • 使用多進程時,最好在文件中編寫代碼,用cmd來執行,在jupyter常常沒法獲得想要的結果
  • 建立進程的代碼必定要放在if __name__ == '__main__'裏面

最簡單的多進程

import multiprocessing
import time
def myfun(num):
time.sleep(1)
print(num + 1)
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target = myfun, args = (i, ))
p.start()
複製代碼

另外,join is_alive daemon name current_process等也都是同樣的。編程

類的形式

import multiprocessing
import requests
from bs4 import BeautifulSoup
class MyProcess(multiprocessing.Process):
def __init__(self, i):
multiprocessing.Process.__init__(self)
self.i = i
def run(self):
url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
print(title)
if __name__ == '__main__':
for i in range(10):
p = MyProcess(i)
p.start()
複製代碼

進程池

import requests
from bs4 import BeautifulSoup
from multiprocessing import Pool, current_process
def get_title(i):
print('start', current_process().name)
title_list = []
url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
# return title
title_list.append(title)
print(title)
return(title_list)
if __name__ == '__main__':
pool = Pool()
for i in range(10):
pool.apply_async(get_title, (i, ))
pool.close()
pool.join()
print('finish')
複製代碼

這裏要說明一下安全

  • 使用Pool時,不指定進程數量,則默認爲CPU核心數量
  • 核心數量對應電腦的(任務管理器-性能)邏輯處理器數量而不是內核數量(個人電腦2個內核,有4個邏輯處理器,因此這裏默認使用4個進程)
  • 進程數量能夠是成百上千,並非說最大開啓進程數量爲4,只要用Pool(10)就能夠同時開啓10個進程進行抓取
  • 不過要注意一點,不管多線程仍是多進程,數量開啓太多都會形成切換費時,下降效率,因此慎重建立太多線程與進程

進程之間內存獨立

多進程與多線程最大的不一樣在於,多進程的每個進程都有一份變量的拷貝,進程之間的操做互不影響,咱們先來看看下面的例子ruby

import multiprocessing
import time
zero = 0
def change_zero():
global zero
for i in range(3):
zero = zero + 1
print(multiprocessing.current_process().name, zero)
if __name__ == '__main__':
p1 = multiprocessing.Process(target = change_zero)
p2 = multiprocessing.Process(target = change_zero)
p1.start()
p2.start()
p1.join()
p2.join()
print(zero)
複製代碼

運行結果以下多線程

Process-1 1
Process-1 2
Process-1 3
Process-2 1
Process-2 2
Process-2 3
0
複製代碼

上面結果顯示,新建立的兩個進程各自把值增長到了3,兩者不是一塊兒將其加到了6的。同時,主進程的值仍是0。因此說每一個進程都是將數據拷貝過去本身作,並無將結果與其餘進程共享。app

可是對於寫入文件則不一樣dom

import multiprocessing
import time
def write_file():
for i in range(30):
with open('try.txt', 'a') as f:
f.write(str(i) + ' ')
if __name__ == '__main__':
p1 = multiprocessing.Process(target = write_file)
p2 = multiprocessing.Process(target = write_file)
p1.start()
p2.start()
p1.join()
p2.join()
複製代碼

獲得的try.txt文件內容以下async

0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 0 15 2 16 17 3 4 18 19 5 20 6 21 22 8 9 23 10 11 25 26 12 13 27 28 14 29 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 
複製代碼

可見兩個進程都將數據寫入了同一份文件中。

下面咱們要討論第一種狀況,若是真的要在兩個進程之間共享變量須要怎麼辦

隊列

這裏介紹進程之間的第一種交流方式——隊列。multiprocessing模塊中提供了multiprocessing.Queue,它和Queue.Queue的區別在於,它裏面封裝了進程之間的數據交流,不一樣進程能夠操做同一個multiprocessing.Queue

from multiprocessing import Process, Queue
def addone(q):
q.put(1)
def addtwo(q):
q.put(2)
if __name__ == '__main__':
q = Queue()
p1 = Process(target=addone, args = (q, ))
p2 = Process(target=addtwo, args = (q, ))
p1.start()
p2.start()
p1.join()
p2.join()
print(q.get())
print(q.get())
複製代碼

運行結果以下

1
2
複製代碼

這個隊列是線程、進程安全的,即對隊列的每一次修改中間不會被中斷從而形成結果錯誤。

pipe

pipe的功能和Queue相似,能夠理解成簡化版的Queue。咱們先來看下面一個例子

import random
import time
from multiprocessing import Process, Pipe, current_process
def produce(conn):
while True:
new = random.randint(0, 100)
print('{} produce {}'.format(current_process().name, new))
conn.send(new)
time.sleep(random.random())
def consume(conn):
while True:
print('{} consume {}'.format(current_process().name, conn.recv()))
time.sleep(random.random())
if __name__ == '__main__':
pipe = Pipe()
p1 = Process(target=produce, args=(pipe[0],))
p2 = Process(target=consume, args=(pipe[1],))
p1.start()
p2.start()
複製代碼

結果以下

Process-1 produce 24
Process-2 consume 24
Process-1 produce 95
Process-2 consume 95
Process-1 produce 100
Process-2 consume 100
Process-1 produce 28
Process-2 consume 28
Process-1 produce 62
Process-2 consume 62
Process-1 produce 92
Process-2 consume 92
....................
複製代碼

上面使用了pipe來實現生產消費模式。

總結Queuepipe之間的差異以下

  • Queue使用put get來維護隊列,pipe使用send recv來維護隊列
  • pipe只提供兩個端點,而Queue沒有限制。這就表示使用pipe時只能同時開啓兩個進程,能夠像上面同樣,一個生產者一個消費者,它們分別對這兩個端點(Pipe()返回的兩個值)操做,兩個端點共同維護一個隊列。若是多個進程對pipe的同一個端點同時操做,就會發生錯誤(由於沒有上鎖,相似線程不安全)。因此兩個端點就至關於只提供兩個進程安全的操做位置,以此限制了進程數量只能是2
  • Queue的封裝更好,Queue只提供一個結果,它能夠被不少進程同時調用;而Pipe()返回兩個結果,要分別被兩個進程調用
  • Queue的實現基於pipe,因此pipe的運行速度比Queue快不少
  • 當只須要兩個進程時使用pipe更快,當須要多個進程同時操做隊列時,使用Queue

value

當咱們不是想維護一個隊列,而只是多個進程同時操做一個數字,就須要提供一個能夠在多個進程之間共享的方法,即Value

from multiprocessing import Process, Value
def f1(n):
n.value += 1
def f2(n):
n.value -= 2
if __name__ == '__main__':
num = Value('d', 0.0)
p1 = Process(target=f1, args=(num, ))
p2 = Process(target=f2, args=(num, ))
p1.start()
p2.start()
p1.join()
p2.join()
print(num.value)
複製代碼

運行結果爲

-1.0
複製代碼

其中Value('d', 0.0)中的d表示雙精度浮點數,更多類型能夠看這裏

除了Value,模塊還提供了相似的Array,感興趣的讀者能夠去官網查看用法

進程鎖

既然變量在進程之間能夠共享了,那麼同時操做一個變量致使的不安全也隨之出現。同多線程同樣,進程也是經過鎖來解決,並且使用方法都和多線程裏相同。

lock = multiprocessing.Lock()
lock.acquire()
lock.release()
with lock:
複製代碼

這些用法和功能都和多線程是同樣的

另外,multiprocessing.Semaphore Condition Event RLock也和多線程相同

歡迎關注個人知乎專欄

專欄主頁:python編程

專欄目錄:目錄

版本說明:軟件及包版本說明

相關文章
相關標籤/搜索