python進程池詳解

 

python進程池multiprocessing.Pool和線程池multiprocessing.dummy.Pool實例

1,Python的multiprocessing 實現多cpu 多進程html

爲啥用multiprocessing?python

*如今cpu都多核了, 寫大型程序你還在用單線程的傳統寫法?很不fashion並且效率很低, 你out了。 安全

*那用multi-threading不就結了?C語言能夠, 編譯完執行沒問題。 可是python這種解釋型的語言用多線程就不行了, python的multithreading效率並不高。聽說是受制於GIL (global interpreter lock) 的鎖機制, 該鎖只能工做於單個cpu core。這樣別的cpu core乾着急也幫不上忙。多線程

*那怎麼辦呢? 本身fork一堆子進程,而後管理唄? 呵呵,DIY?這不是從新造輪子嗎。app

*親, 用python現成的multiprocessing庫吧。支持多個cpu core,簡直就是爲了多核cpu打造的, python 2.6以後都支持。less

不用說了,直接上乾貨異步

*sample 1, 建立process, target=f 函數名, args傳遞參數async

# encoding: utf8
# @Time    : 2017/10/26 20:54
# @Author  : Ycs
# @Site    : 
# @File    : mymultiprocessing.py
# @Desc    : Python的multiprocessing 實現多cpu 多進程, python進程池multiprocessing.Pool和線程池multiprocessing.dummy.Pool實例

from multiprocessing import Process,Queue,Pipe,Pool
import time


def f(name):
    print('hello:', name)


if __name__ == '__main__':
    #建立process, target=f 函數名, args傳遞參數
    p=Process(target=f,args=('bbb',))
    p.start()
    p.join()

    #支持隊列

def ff(q):
    q.put([42,None,'hello'])

if __name__ == '__main__':
    q=Queue()
    p=Process(target=ff,args=(q,))
    p.start()
    print(q.get()) #這裏prints "[42, None, 'hello']"
    p.join()

    #支持Pipe

def f3(conn):
    conn.send([43,None,'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn=Pipe()
    p=Process(target=f3,args=(child_conn,))
    p.start()
    print(parent_conn.recv()) # 這裏prints "[42, None, 'hello']"
    p.join()

    #支持Pool, 僱一羣worker來幹活

def f4(x):
    return x*x

if __name__ == '__main__':
    pool=Pool(processes=4) # start 4 worker processes
    result=pool.apply_async(f4,(10,)) # evaluate "f(10)" asynchronously
    try:
        print(result.get(timeout=1))
    except TimeoutError:
        pass
    print(pool.map(f4,range(10)))
    it=pool.imap(f4,range(10))
    print(it.next())
    print(it.next())
    print(it.next(timeout=1)) # raises TimeoutError
    result=pool.apply_async(time.sleep,(10,))
    print(result.get(timeout=1))

 

注意事項:函數

* 在UNIX平臺上,當某個進程終結以後,該進程須要被其父進程調用wait,不然進程成爲殭屍進程(Zombie)。因此,有必要對每一個Process對象調用join()方法 (實際上等同於wait)。對於多線程來講,因爲只有一個進程,因此不存在此必要性。ui

* multiprocessing提供了threading包中沒有的IPC(好比Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由於它們佔據的不是用戶進程的資源)。

* 多進程應該避免共享資源。在多線程中,咱們能夠比較容易地共享資源,好比使用全局變量或者傳遞參數。在多進程狀況下,因爲每一個進程有本身獨立的內存空間,以上方法並不合適。此時咱們能夠經過共享內存和Manager的方法來共享資源。但這樣作提升了程序的複雜度,並由於同步的須要而下降了程序的效率。

Reference:

[1] http://www.ibm.com/developerworks/aix/library/au-multiprocessing

[2] http://docs.python.org/2/library/multiprocessing.html

python中的多進程處理

 


1)Process
  要建立一個Process是很簡單的。

  1. from multiprocessing import Process
  2.  
  3. def f(name):
  4.     print('hello', name)
  5.  
  6. if __name__ == '__main__':
  7.      p = Process(target=f, args=('bob',))
  8.      p.start()
  9.      p.join()

  要得到一個Process的進程ID也是很簡單的。

  1. from multiprocessing import Process
  2. import os
  3.  
  4. def info(title):
  5.     print title
  6.     print 'module name:', __name__
  7.     print 'process id:', os.getpid()
  8.  
  9. def f(name):
  10.     info('function f')
  11.     print 'hello', name
  12.  
  13. if __name__ == '__main__':
  14.     info('main line')
  15.     p = Process(target=f, args=('bob',))
  16.     p.start()
  17.     p.join()

  建立進程:multiprocessing.Process([group[, target[, name[, args[, kargs]]]]])
  參數:
  group:    None,它的存在僅僅是爲了與threading.Thread兼容
  target:   通常是函數
  name:     進程名
  args:     函數的參數
  kargs:    keywords參數

  函數:
  run()                  默認的run()函數調用target的函數,你也能夠在子類中覆蓋該函數
  start()                啓動該進程
  join([timeout])        父進程被中止,直到子進程被執行完畢。
                         當timeout爲None時沒有超時,不然有超時。
                         進程能夠被join不少次,但不能join本身
  is_alive()             
  terminate()            結束進程。
                         在Unix上使用的是SIGTERM
                         在Windows平臺上使用TerminateProcess

  屬性:
  name                   進程名
  daemon                 守護進程
  pid                    進程ID
  exitcode               若是進程尚未結束,該值爲None
  authkey                    
            
2)Queue
  Queue相似於queue.Queue,通常用來進程間交互信息
  例子:

  1. from multiprocessing import Process, Queue
  2.  
  3. def f(q):
  4.     q.put([42, None, 'hello'])
  5.  
  6.  if __name__ == '__main__':
  7.      q = Queue()
  8.      p = Process(target=f, args=(q,))
  9.      p.start()
  10.      print(q.get())    # prints "[42, None, 'hello']"
  11.      p.join()

  注意:Queue是進程和線程安全的。
  Queue實現了queue.Queue的大部分方法,但task_done()和join()沒有實現。    
  建立Queue:multiprocessing.Queue([maxsize])
  函數:
  qsize()                             返回Queue的大小
  empty()                             返回一個boolean值表示Queue是否爲空
  full()                              返回一個boolean值表示Queue是否滿
  put(item[, block[, timeout]])       
  put_nowait(item)
  get([block[, timeout]])
  get_nowait()
  get_no_wait()

  close()                             表示該Queue不在加入新的元素
  join_thread()                       
  cancel_join_thread()

3)JoinableQueue
  建立:multiprocessing.JoinableQueue([maxsize])
  task_done()
  join()

4)Pipe

  1. from multiprocessing import Process, Pipe
  2.  
  3. def f(conn):
  4.     conn.send([42, None, 'hello'])
  5.     conn.close()
  6.  
  7. if __name__ == '__main__':
  8.     parent_conn, child_conn = Pipe()
  9.     p = Process(target=f, args=(child_conn,))
  10.     p.start()
  11.     print(parent_conn.recv())   # prints "[42, None, 'hello']"
  12.     p.join()

  multiprocessing.Pipe([duplex])      返回一個Connection對象

5)異步化synchronization
 

  1. from multiprocessing import Process, Lock
  2.  
  3. def f(l, i):
  4.     l.acquire()
  5.     print('hello world', i)
  6.     l.release()
  7.  
  8. if __name__ == '__main__':
  9.     lock = Lock()
  10.  
  11.     for num in range(10):
  12.         Process(target=f, args=(lock, num)).start()

6)Shared Memory

  1. from multiprocessing import Process, Value, Array
  2.  
  3. def f(n, a):
  4.     n.value = 3.1415927
  5.     for i in range(len(a)):
  6.         a[i] = -a[i]
  7.  
  8. if __name__ == '__main__':
  9.     num = Value('d', 0.0)
  10.     arr = Array('i', range(10))
  11.  
  12.     p = Process(target=f, args=(num, arr))
  13.     p.start()
  14.     p.join()
  15.  
  16.     print(num.value)
  17.     print(arr[:])

1>Value
2>Array

7)Manager

  1. from multiprocessing import Process, Manager
  2.  
  3. def f(d, l):
  4.     d[1] = '1'
  5.     d['2'] = 2
  6.     d[0.25] = None
  7.     l.reverse()
  8.  
  9. if __name__ == '__main__':
  10.     manager = Manager()
  11.  
  12.     d = manager.dict()
  13.     l = manager.list(range(10))
  14.  
  15.     p = Process(target=f, args=(d, l))
  16.     p.start()
  17.     p.join()
  18.  
  19.     print(d)
  20.     print(l)


8)Pool

  1. from multiprocessing import Pool
  2.  
  3. def f(x):
  4.     return x*x
  5.  
  6. if __name__ == '__main__':
  7.     pool = Pool(processes=4)              # start 4 worker processes
  8.     result = pool.apply_async(f, [10])     # evaluate "f(10)" asynchronously
  9.     print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
  10.     print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

multiprocessing.Pool([processes[, initializer[, initargs]]])

函數:
  apply(func[, args[, kwds]])
  apply_async(func[, args[, kwds[, callback]]])
  map(func,iterable[, chunksize])
  map_async(func,iterable[, chunksize[, callback]])
  imap(func, iterable[, chunksize])
  imap_unordered(func, iterable[, chunksize])
  close()
  terminate()
  join()
 

  1. from multiprocessing import Pool
  2.  
  3. def f(x):
  4.     return x*x
  5.  
  6. if __name__ == '__main__':
  7.     pool = Pool(processes=4)              # start 4 worker processes
  8.  
  9.     result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously
  10.     print(result.get(timeout=1))          # prints "100" unless your computer is *very* slow
  11.  
  12.     print(pool.map(f, range(10)))         # prints "[0, 1, 4,..., 81]"
  13.  
  14.     it = pool.imap(f, range(10))
  15.     print(next(it))                       # prints "0"
  16.     print(next(it))                       # prints "1"
  17.     print(it.next(timeout=1))             # prints "4" unless your computer is *very* slow
  18.  
  19.     import time
  20.     result = pool.apply_async(time.sleep, (10,))
  21.     print(result.get(timeout=1))          # raises TimeoutError

    

9)雜項
multiprocessing.active_children()          返回全部活動子進程的列表
multiprocessing.cpu_count()                返回CPU數目
multiprocessing.current_process()          返回當前進程對應的Process對象
multiprocessing.freeze_support()
multiprocessing.set_executable()

10)Connection對象
send(obj)
recv()
fileno()
close()
poll([timeout])
send_bytes(buffer[, offset[, size]])
recv_bytes([maxlength])
recv_bytes_info(buffer[, offset])  

  1. >>> from multiprocessing import Pipe
  2. >>> a, b = Pipe()
  3. >>> a.send([1, 'hello', None])
  4. >>> b.recv()
  5. [1, 'hello', None]
  6. >>> b.send_bytes('thank you')
  7. >>> a.recv_bytes()
  8. 'thank you'
  9. >>> import array
  10. >>> arr1 = array.array('i', range(5))
  11. >>> arr2 = array.array('i', [0] * 10)
  12. >>> a.send_bytes(arr1)
  13. >>> count = b.recv_bytes_into(arr2)
  14. >>> assert count == len(arr1) * arr1.itemsize
  15. >>> arr2
  16. array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
相關文章
相關標籤/搜索