多進程模塊(multiprocessing)

#!/usr/bin/env python
# coding=utf-8

import multiprocessing
import time
import os

'''
Python中的多線程並非真正的多線程,是利用GIL(Global Interpreter Lock)來實現的多線程,本質上仍然是單線程。
若是想利用多個CPU的資源,Python中大部分狀況須要使用多進程,multiprocessing模塊提供了相似於多線程的API,能夠徹底利用多CPU資源。
'''

'''
https://docs.python.org/2/library/multiprocessing.html

進程類Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
group:保留爲之後所用,用None便可
target:可調用目標,會在run()中調用
name:進程名,默認爲Process-#
args:傳入target的參數元組
kwargs:傳入target的參數字典

屬性:
name:進程名
daemon:是否爲守護進程,要在start()以前設置纔有效
pid:進程ID,在進程生成以前爲None
exitcode:退出碼,如還沒終止,爲None,由信號N終止,則爲-N
authkey:驗證密鑰

方法:
start():開始進程,進程開始後,會自動運行run()
run():進程執行任務入口
join([timeout]):父進程要等待調用join的進程結束以後才能運行,或等待timeout時間
is_alive():進程是否還活着
terminate():終止進程
'''

# 1.單進程
# 定義一個進程要運行的任務
def target_1(interval):
    n = 3
    while n > 0:
        print u'時間點:{0}'.format(time.ctime())
        time.sleep(interval)  # 睡眠interval秒
        n -= 1

def process_target_1():
    p = multiprocessing.Process(target=target_1, args=(3,))
    p.start()
    print u'進程ID:{0}'.format(p.pid)
    print u'進程名:{0}'.format(p.name)
    print u'進程活着嗎?{0}'.format(p.is_alive())
    p.join()


# 2.多進程
def task_1(interval):
    print u'任務1開始\n'
    time.sleep(interval)
    print u'任務1結束\n'

def task_2(interval):
    print u'任務2開始\n'
    time.sleep(interval)
    print u'任務2結束\n'


def task_3(interval):
    print u'任務3開始\n'
    time.sleep(interval)
    print u'任務3結束\n'

def process_multi_tasks():
    p1 = multiprocessing.Process(target=task_1, args=(3,))
    p2 = multiprocessing.Process(target=task_2, args=(2,))
    p3 = multiprocessing.Process(target=task_3, args=(1,))
    p1.start()
    p2.start()
    p3.start()
    print u'CPU數爲:{0}'.format(multiprocessing.cpu_count())
    for p in multiprocessing.active_children():
        print u'子進程名:{0},ID:{1}'.format(p.name, p.pid)
    p1.join()
    p2.join()
    p3.join()
    print u'多進程任務結束'


# 3.經過繼承自定義進程類
class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):  # 當start()被調用,run()會被自動調用
        n = 3
        while n > 0:
            print u'時間點:{0}'.format(time.ctime())
            time.sleep(self.interval)
            n -= 1

# 使用自定義進程類
def customized_process():
    p = ClockProcess(3)
    p.start()  # 自動調用run()
    p.join()


# 4.使用Lock進行進程間同步,避免資源訪問的衝突
def use_lock_task(lock, num):
    lock.acquire()  # 得到鎖
    print u'數字:{0}'.format(num)
    lock.release()  # 釋放鎖

def process_use_lock_task():
    lock = multiprocessing.Lock()
    for num in range(10):
        p = multiprocessing.Process(target=use_lock_task, args=(lock, num))
        p.start()
        p.join()


# 5.使用Queue實現多進程之間數據傳遞,Queue是多進程安全的隊列
def use_queue_write_task(q):
    try:
        q.put([1, None, 'Good'], block=False)
    except:
        pass

def use_queue_read_task(q):
    try:
        print q.get(block=False)
    except:
        pass

def process_use_queue():
    q = multiprocessing.Queue()
    w = multiprocessing.Process(target=use_queue_write_task, args=(q,))
    r = multiprocessing.Process(target=use_queue_read_task, args=(q,))
    w.start()
    r.start()
    w.join()
    r.join()


# 6.使用Pipe能夠創建一對鏈接對象,倆鏈接對象是雙工工做的
def use_pipe_task(conn):
    conn.send(['circle', {'cx': 5}, {'cy': 3}, {'radius': 5}])
    conn.close()

def process_use_pipe():
    conn1, conn2 = multiprocessing.Pipe()
    p = multiprocessing.Process(target=use_pipe_task, args=(conn2,))
    p.start()
    print conn1.recv()
    p.join()

    conn3, conn4 = multiprocessing.Pipe()
    p2 = multiprocessing.Process(target=use_pipe_task, args=(conn3,))
    p2.start()
    print conn4.recv()
    p2.join()


# 7.使用Value或Array共享資源,共享的對象是進程安全的
def use_value_array_task(v, a):
    v.value = 3.1415926
    for i in range(len(a)):
        a[i] = a[i] * a[i]

def process_use_value_array():
    v = multiprocessing.Value('d', 0.)  # d是指double類型
    a = multiprocessing.Array('i', range(5))  # i是指有符號整數
    p = multiprocessing.Process(target=use_value_array_task, args=(v, a))
    p.start()
    p.join()
    print v.value
    print a[:]


# 8.服務進程,Manager能夠控制一個持有Python對象的服務進程,能夠容許其它進程經過代理來操縱這些對象。
# Manager支持的對象類型包括:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value, Array
# 比直接用Value和Array更加靈活,支持的類型更多,但速度上會有必定的代價
def use_manager_task(d, l):
    d['pi'] = 3.1415926
    for i in range(len(l)):
        l[i] = l[i] * l[i]

def process_use_manager():
    m = multiprocessing.Manager()
    d = m.dict()
    l = m.list(range(5))
    p = multiprocessing.Process(target=use_manager_task, args=(d, l))
    p.start()
    p.join()
    print d
    print l


# 9.使用Pool工做進程池
def pool_task(x):
    return x * x

def process_pool():
    pool = multiprocessing.Pool(processes=3)  # 三個工做進程
    print pool.map(pool_task, range(10))
    for i in pool.imap_unordered(pool_task, range(10)):  # 亂序
        print i,
    print

    res = pool.apply_async(pool_task, (3,))  # 只運行一個進程
    print res.get(timeout=1)

    res = pool.apply_async(os.getpid, ())  # 只運行一個進程,異步運行os.getpid()
    print res.get(timeout=1)  # 進程的ID

    results = [pool.apply_async(os.getpid, ()) for _ in range(3)]
    print [res.get(timeout=1) for res in results]

    res = pool.apply_async(time.sleep, (10,))  # 一個進程睡眠10秒
    try:
        print res.get(timeout=1)
    except multiprocessing.TimeoutError:
        print u'TimeoutError異常!'


if __name__ == '__main__':
    print u'1.單進程:\n'
    process_target_1()
    print '*' * 50
    print u'2.多進程:\n'
    process_multi_tasks()
    print '*' * 50
    print u'3.經過繼承自定義進程類:\n'
    customized_process()
    print '*' * 50
    print u'4.使用Lock進行進程間同步,避免資源訪問的衝突:\n'
    process_use_lock_task()
    print '*' * 50
    print u'5.使用Queue實現多進程之間數據傳遞,Queue是多進程安全的隊列:\n'
    process_use_queue()
    print '*' * 50
    print u'6.使用Pipe能夠創建一對鏈接對象,倆鏈接對象是雙工工做的:\n'
    process_use_pipe()
    print '*' * 50
    print u'7.使用Value或Array共享資源,共享的對象是進程安全的:\n'
    process_use_value_array()
    print '*' * 50
    print u'8.服務進程,Manager:\n'
    process_use_manager()
    print '*' * 50
    print u'9.使用Pool工做進程池:\n'
    process_pool()


'''
輸出結果:

1.單進程:

進程ID:9200
進程名:Process-1
進程活着嗎?True
時間點:Thu Jun 27 13:49:30 2019
時間點:Thu Jun 27 13:49:33 2019
時間點:Thu Jun 27 13:49:36 2019
**************************************************
2.多進程:

CPU數爲:16
子進程名:Process-2,ID:10684
子進程名:Process-3,ID:7680
子進程名:Process-4,ID:5884
任務1開始

任務3開始

任務2開始

任務3結束

任務2結束

任務1結束

多進程任務結束
**************************************************
3.經過繼承自定義進程類:

時間點:Thu Jun 27 13:49:42 2019
時間點:Thu Jun 27 13:49:45 2019
時間點:Thu Jun 27 13:49:48 2019
**************************************************
4.使用Lock進行進程間同步,避免資源訪問的衝突:

數字:0
數字:1
數字:2
數字:3
數字:4
數字:5
數字:6
數字:7
數字:8
數字:9
**************************************************
5.使用Queue實現多進程之間數據傳遞,Queue是多進程安全的隊列:

[1, None, 'Good']
**************************************************
6.使用Pipe能夠創建一對鏈接對象,倆鏈接對象是雙工工做的:

['circle', {'cx': 5}, {'cy': 3}, {'radius': 5}]
['circle', {'cx': 5}, {'cy': 3}, {'radius': 5}]
**************************************************
7.使用Value或Array共享資源,共享的對象是進程安全的:

3.1415926
[0, 1, 4, 9, 16]
**************************************************
8.服務進程,Manager:

{'pi': 3.1415926}
[0, 1, 4, 9, 16]
**************************************************
9.使用Pool工做進程池:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0 1 4 9 16 25 36 49 64 81
9
10916
[8416, 10916, 8416]
TimeoutError異常!
'''

源碼可於github下載:https://github.com/gkimeeq/PythonLearninghtml

相關文章
相關標籤/搜索