python-----多進程筆記

多進程筆記:

在Python中多進程的建立方式對比:

1.在Python中,能夠經過os.fork()建立子進程,可是這種方式智能在'linux'和'unix'以及'mac'下面使用,不能跨平臺,因此通常不推薦使用這種方式。
2.使用'multiprocessing'模塊也能夠建立多進程,而且這種方式在寫起來更加簡單,而且支持跨平臺,
因此通常推薦使用'multiprocessing'的方式來寫多進程的代碼。python

'multiprocessing'的基本用法:

'multiprocessing'這個模塊下面有一個'Process'的類,使用這個類能夠建立一個多進程,使用方式以下:linux

from multiprocessing import Process
import os

def demo():
    print('我是子進程--->')
    print('子進程進程號是%s'%(os.getpid()))
    print('父進程進程號是%s'%(os.getppid()))

if __name__ == '__main__':
    p = Process(target=demo)
    p.start()
    print('主進程的進程號是%s'%(os.getpid()))

須要注意一點的是,若是在'windows'操做系統下,
全部和進程建立相關的代碼都必須放在'if name == 'main''下面,不然會報錯。windows

獲取進程號:

1.經過'os.getpid()'能夠獲取到當前這個進程的id
2.經過'os.getppid()'能夠獲取到當前這個進程的父進程的idapp

父進程會等待全部子進程執行完畢後再退出:

若是在父進程中執行完全部代碼後,還有子進程在執行,那麼父進程會等待子進程執行完全部代碼後再退出。async

'join'方法:

'join'方法可讓你的主進程阻塞,直到這個子進程執行完畢之後纔會執行主進程後面的代碼。函數

from multiprocessing import Process
import os
import time

def demo():
    for i in range(5):
        print('子進程')
        time.sleep(1)

if __name__ == '__main__':
    p = Process(target=demo)
    p.start()
    print('主進程')

    p.join() # join方法的時候,就至關於主進程會阻塞在這個地方,直到這個子進程執行完畢之後纔會執行父進程後的代碼
    print('執行完畢')

適用於類的方式建立子進程

1.使用'Process'做爲父類,從新自定義一個類。
2.在自定義的類中,重寫父類的'run'方法,這個是必須的,其餘方法,就按照你平時如何寫就能夠了。
3.使用自定義的進程類建立子進程的時候,不須要傳'target'參數。操作系統

from multiprocessing import Process
import os
class MyProcess(Process):
    def run(self):
        print('子進程的id:%s' % os.getpid())
        print('父進程的id:%s' % os.getppid())
        for i in range(5):
            print('子進程:%s'%i)

if __name__ == '__main__':
    p = MyProcess()
    p.start()
    print('父進程的id:%s'%os.getpid())
    print('子進程開始了')
    p.join()
    print('子進程結束了')

進程池:

1.'multiprocessing'中的'Pool'能夠實現一個容器,來管理子進程。
2.使用進程池有什麼好處,進程池能夠控制同一時刻,最多隻能有多少個進程在運行。
3.主進程不會等待進程池中的子進程都執行完畢之後再退出,二十若是父進程代碼執行完畢之後,就會將整個程序都退出,
全部咱們在寫進程池的時候,應該使用'pool.join()'來保證進程池中全部的子進程都可以執行完成。
4.'apply_async'至關因而並聯的方式執行(同一時刻只能執行一個任務,而且只能等待前面的任務執行完後,才能執行後面的任務)unix

from multiprocessing import Pool
import os
import time
def demo(num):
    for x in range(5):
        print('子進程id:%s ,值:%s'% (os.getpid(),num))
        time.sleep(2)
if __name__ == '__main__':
    # 這個池子中同一時刻最多隻能有3個進程
    pool = Pool(2)
    for x in range(10):
        pool.apply_async(demo,args=(x,))
    # 關閉進程池,不能再添加新進程了
    pool.close()
    # 主進程把子進程添加到進程池中後,不會等待進程池中其餘的子進程都執行完畢後再退出,
    # 而是當主進程的代碼執行完畢後悔馬上退出,所以若是這個地方沒有join,那麼子進程將得不到執行。
    pool.join()

進程間數據不共享:

在程序中建立了子進程,子進程會徹底copy一份主進程的環境,包括變量、函數、類等。
因此在子進程中使用變量、函數等的時候,實際上是使用的是子進程中的那一份,跟主進程沒有任何關係。code

from  multiprocessing import Process

AGE = 1

def hello():
    print('hello')

def greet(names):
    global AGE
    AGE += 1
    names.append('ketang')
    print('=====子進程代碼=====')
    print('AGE的值:%d, AGE的id:%s' % (AGE,id(AGE)))
    print('names:%s' % names)
    print(id(hello))
    print('=====子進程代碼=====')

if __name__ == '__main__':
    names = ['demo']
    p = Process(target=greet,args=(names,))
    p.start()
    p.join()
    print('=====父進程代碼=====')
    print('AGE的值:%d, AGE的id:%s' % (AGE,id(AGE)))
    print('names:%s' % names)
    print(id(hello))
    print('=====父進程代碼=====')

Queue消息隊列:

  1. Queue(n):初始化一個消息隊列,並指定這個隊列中最多可以容納多少條消息。
  2. put(obj,[block,[timeout]]):推入一條消息到這個隊列中。默認是阻塞的,也就是說若是這個消息隊列中已經滿了,
    那麼會一直等待,將這個消息添加到消息隊列中。timeout能夠指定這個阻塞最長時間,若是超過這個時間仍是滿的,就會拋出異常。
  3. put_nowait():非阻塞的推入一條消息,若是這個隊列已經滿了,那麼會立馬拋出異常。
  4. qsize():獲取這個消息隊列消息的數量。
  5. full():判斷這個消息隊列是否滿了。
  6. empty():判斷這個消息隊列是否空了。
  7. get([block,[timeout]]):獲取隊列中的一條消息,而後將其從隊列中移除,block默認爲True。若是設置block爲False,
    那麼若是沒值,會立馬拋出異常。timeout指定若是多久沒有獲取到值後會拋出異常。
from multiprocessing import Queue

# Queue能夠指定maxsize的值
# 之後這個隊列中就只能裝maxsize個值
# 若是不指定,那麼就是爲-1
# -1 意味着能夠裝任意多個消息,直到你的內存滿了
q = Queue(3)

# put方法,能夠傳遞任意數據類型的消息
q.put('m1')
q.put('m2')
q.put('m3')
# qsize: 獲取這個消息隊列中總共的消息數量
print('qsize:%s' % q.qsize())
# full, 若是消息隊列滿了,那麼會返回True,不然返回False
print(q.full())
# empty:若是消息隊列爲空,那麼會返回True,不然返回False
print(q.empty())

# put方法默認是阻塞的方式
# 若是消息隊列已經滿了,那麼會阻塞在這個地方,直到這個消息隊列沒有滿爲止
# block參數:能夠設置爲False,若是爲False,那麼意味着不會阻塞,若是消息隊列滿了,那麼會立馬拋出一個異常
# timeout參數:指定阻塞的最長時間。若是超過了這個時間就再也不阻塞,而是拋出一個異常。
# q.put('m4',block=True,timeout=2)
# put_nowait:其實等價於q.put(obj,block=False)
# q.put_nowait('m4')
# print('finished')

# get方法:獲取到的是第一個添加進去的值。
# get方法:除了獲取這個值外,還會把這個值從消息隊列中刪除掉
# block參數:默認是等於True,即以阻塞的方式獲取值,若是這個隊列中沒有任何消息,那麼會阻塞到這個地方。若是block=False,那麼若是隊列中沒有值,就會當即拋出異常。
# timeout參數:指定阻塞的最長時間,若是超過了這個時間就再也不阻塞,而是拋出一個異常
print(q.get())
print(q.get())
print(q.get())
print(q.get(block=True,timeout=2))

使用Queue作進程間通訊:

  1. Process進程作通訊:直接使用Queue的對象座位進程的參數就能夠了。
  2. Pool進程作通訊,應該使用multiprocessing.Manager().Queue()對象來作通訊,
    這個對象的使用方法跟multiprocessing.Queue()是同樣的。
from multiprocessing import Process,Queue
import os

def write(q):
    for x in ['m1','m2','m3']:
        q.put(x)
        print('子進程%s已經存放了消息:%s' % (os.getpid(),x))

def read(q):
    while True:
        try:
            msg = q.get(block=False)
            print('子進程%s已經讀取了消息:%s' % (os.getpid(),msg))
        except:
            print('全部消息都已經取出來了')
            break

if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))

    pw.start()
    pr.start()

    pw.join()
from multiprocessing import Process,Queue,Pool,Manager
import os

def write(q):
    for x in ['m1','m2','m3']:
        q.put(x)
        print('子進程%s已經存放了消息:%s' % (os.getpid(),x))

def read(q):
    while True:
        try:
            msg = q.get(block=False)
            print('子進程%s已經讀取了消息:%s' % (os.getpid(),msg))
        except:
            print('全部消息都已經取出來了')
            break

if __name__ == '__main__':
    q = Manager().Queue()
    pool = Pool(2)
    pool.apply(func=write,args=(q,))
    pool.apply(func=read,args=(q,))
相關文章
相關標籤/搜索