python 併發編程之多進程

 

1、數據共享python

 

1.進程間的通訊應該儘可能避免共享數據的方式git

2.進程間的數據是獨立的,能夠藉助隊列或管道實現通訊,兩者都是基於消息傳遞的。github

雖然進程間數據獨立,但能夠用過Manager實現數據共享,事實上Manager的功能遠不止於此。windows

 

命令就是一個程序,按回車就會執行(這個只是在windows狀況下)

tasklist 查看進程

tasklist | findstr  pycharm   #(findstr是進行過濾的),|就是管道(tasklist執行的內容就放到管道里面了,

管道後面的findstr  pycharm就接收了)

2、進程池數組

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:併發

  1. 很明顯須要併發執行的任務一般要遠大於核數
  2. 一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
  3. 進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。app

那麼什麼是進程池呢?進程池就是控制進程數目異步

 ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。 socket

進程池的結構:async

 建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

進程池的結構:

 建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

1.建立進程池

 

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

2.參數介紹

numprocess:要建立的進程數,若是省略,將默認爲cpu_count()的值,可os.cpu_count()查看

initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None

initargs:是要傳給initializer的參數組

3.方法介紹

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行

func(*args,**kwargs),而後返回結果。

須要強調的是:此操做並不會在全部池工做進程中並執行func函數。

若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()

函數或者使用p.apply_async()

 

 

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,

callback是可調用對象,接收輸入參數。當func的結果變爲可用時,

將理解傳遞給callback。callback禁止執行任何阻塞操做,

不然將接收其餘異步操做中的結果。

    

p.close():關閉進程池,防止進一步操做。禁止往進程池內在添加任務(須要注意的是必定要寫在close()的上方)


應用1:

 1 from multiprocessing import Pool
 2 import os,time
 3 def task(n):
 4     print('[%s] is running'%os.getpid())
 5     time.sleep(2)
 6     print('[%s] is done'%os.getpid())
 7     return n**2
 8 if __name__ == '__main__':
 9     # print(os.cpu_count())  #查看cpu個數
10     p = Pool(4) #最大四個進程
11     for i in range(1,7):#開7個任務
12         res = p.apply(task,args=(i,))  #同步的,等着一個運行完才執行另外一個
13         print('本次任務的結束:%s'%res)
14     p.close()#禁止往進程池內在添加任務
15     p.join() #在等進程池
16     print('')

 

 1 # ----------------
 2 # 那麼咱們爲何要用進程池呢?這是由於進程池使用來控制進程數目的,
 3 # 咱們須要幾個就開幾個進程。若是不用進程池實現併發的話,會開不少的進程
 4 # 若是你開的進程特別多,那麼你的機器就會很卡,因此咱們把進程控制好,用幾個就
 5 # 開幾個,也不會太佔用內存
 6 from multiprocessing import Pool
 7 import os,time
 8 def walk(n):
 9     print('task[%s] running...'%os.getpid())
10     time.sleep(3)
11     return n**2
12 if __name__ == '__main__':
13      p = Pool(4)
14      res_obj_l = []
15      for i in range(10):
16          res = p.apply_async(walk,args=(i,))
17          # print(res)  #打印出來的是對象
18          res_obj_l.append(res)  #那麼如今拿到的是一個列表,怎麼獲得值呢?咱們用個.get方法
19      p.close() #禁止往進程池裏添加任務
20      p.join()
21      # print(res_obj_l)
22      print([obj.get() for obj in res_obj_l])  #這樣就獲得了

 

那麼什麼是同步,什麼是異步呢?

同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去

異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。

什麼是串行,什麼是並行呢?

舉例:能並排開幾輛車的就能夠說是「並行」,只能一輛一輛開的就屬於「串行」了。很明顯,並行的速度要比串行的快得多。(並行互不影響,串行的等着一個完了才能接着另外一個)

 

應用2:
使用進程池維護固定數目的進程(之前客戶端和服務端的改進)

1 from socket import *
 2 from multiprocessing import Pool
 3 s = socket(AF_INET,SOCK_STREAM)
 4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用
 5 s.bind(('127.0.0.1',8081))
 6 s.listen(5)
 7 print('start running...')
 8 def talk(coon,addr):
 9     while True:  # 通訊循環
10         try:
11             cmd = coon.recv(1024)
12             print(cmd.decode('utf-8'))
13             if not cmd: break
14             coon.send(cmd.upper())
15             print('發送的是%s'%cmd.upper().decode('utf-8'))
16         except Exception:
17             break
18     coon.close()
19 if __name__ == '__main__':
20     p = Pool(4)
21     while True:#連接循環
22         coon,addr = s.accept()
23         print(coon,addr)
24         p.apply_async(talk,args=(coon,addr))
25     s.close()
26 #由於是循環,因此就不用p.join了
服務端
1 from socket import *
 2 c = socket(AF_INET,SOCK_STREAM)
 3 c.connect(('127.0.0.1',8081))
 4 while True:
 5     cmd = input('>>:').strip()
 6     if not cmd:continue
 7     c.send(cmd.encode('utf-8'))
 8     data = c.recv(1024)
 9     print('接受的是%s'%data.decode('utf-8'))
10 c.close()
客戶端


三.回調函數

回調函數何時用?(回調函數在爬蟲中最經常使用)

造數據的很是耗時

處理數據的時候不耗時

 

 

你下載的地址若是完成了,就自動提醒讓主進程解析

誰要是好了就通知解析函數去解析(回調函數的強大之處)

 

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

 

1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 import time
 5 def get_page(url):
 6     print('<%s> is getting [%s]' %(os.getpid(),url))
 7     response = requests.get(url)  #獲得地址
 8     time.sleep(2)
 9     print('<%s> is  done [%s]'%(os.getpid(),url))
10     return {'url':url,'text':response.text}
11 def parse_page(res):
12     '''解析函數'''
13     print('<%s> parse [%s]'%(os.getpid(),res['url']))
14     with open('db.txt','a') as f:
15         parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text']))
16         f.write(parse_res)
17 if __name__ == '__main__':
18     p = Pool(4)
19     urls = [
20         'https://www.baidu.com',
21         'http://www.openstack.org',
22         'https://www.python.org',
23         'https://help.github.com/',
24         'http://www.sina.com.cn/'
25     ]
26     for url in urls:
27         obj = p.apply_async(get_page,args=(url,),callback=parse_page)
28     p.close()
29     p.join()
30     print('',os.getpid())  #都不用.get()方法了
View Code

若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 def get_page(url):
 5     print('<%os> get [%s]' %(os.getpid(),url))
 6     response = requests.get(url)  #獲得地址  response響應
 7     return {'url':url,'text':response.text}
 8 if __name__ == '__main__':
 9     p = Pool(4)
10     urls = [
11         'https://www.baidu.com',
12         'http://www.openstack.org',
13         'https://www.python.org',
14         'https://help.github.com/',
15         'http://www.sina.com.cn/'
16     ]
17     obj_l= []
18     for url in urls:
19         obj = p.apply_async(get_page,args=(url,))
20         obj_l.append(obj)
21     p.close()
22     p.join()
23     print([obj.get() for obj in obj_l])
View Code
相關文章
相關標籤/搜索