Python3 與 C# 併發編程之~ 進程篇

 

上次說了不少Linux下進程相關知識,這邊再也不復述,下面來講說Python的併發編程,若有錯誤歡迎提出~html

若是遇到聽不懂的能夠看上一次的文章:http://www.javashuo.com/article/p-hjlukftr-d.htmlpython

官方文檔:https://docs.python.org/3/library/concurrency.htmlgit

在線預覽:http://github.lesschina.com/python/base/concurrency/2.併發編程-進程篇.htmlgithub

1.進程篇

官方文檔:https://docs.python.org/3/library/multiprocessing.htmlshell

Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess編程

1.1.進程(Process)

Python的進程建立很是方便,看個案例:(這種方法通用,fork只適用於Linux系)segmentfault

import os
# 注意一下,導入的是Process不是process(Class是大寫開頭)
from multiprocessing import Process

def test(name):
    print("[子進程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))

def main():
    print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = Process(target=test, args=("萌萌噠", )) # 單個元素的元組表達別忘了(x,)
    p.start()
    p.join()  # 父進程回收子進程資源(內部調用了wait系列方法)

if __name__ == '__main__':
    main()

運行結果:緩存

[父進程]PID:25729,PPID:23434
[子進程-萌萌噠]PID:25730,PPID:25729

建立子進程時,傳入一個執行函數和參數,用start()方法來啓動進程便可安全

join()方法是父進程回收子進程的封裝(主要是回收殭屍子進程(點我)bash

其餘參數能夠參考源碼 or 文檔,貼一下源碼的init方法:

def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)

擴展:name:爲當前進程實例的別名

  1. p.is_alive() 判斷進程實例p是否還在執行
  2. p.terminate() 終止進程(發SIGTERM信號)

上面的案例若是用OOP來實現就是這樣:(若是不指定方法,默認調Run方法)

import os
from multiprocessing import Process

class My_Process(Process):
    # 重寫了Proce類的Init方法
    def __init__(self, name):
        self.__name = name
        Process.__init__(self)  # 調用父類方法

    # 重寫了Process類的run()方法
    def run(self):
        print("[子進程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
                                          os.getppid()))

def main():
    print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = My_Process("萌萌噠") # 若是不指定方法,默認調Run方法
    p.start()
    p.join()  # 父進程回收子進程資源(內部調用了wait系列方法)


if __name__ == '__main__':
    main()

PS:multiprocessing.Process自行處理僵死進程,不用像os.fork那樣本身創建信號處理程序、安裝信號處理程序


1.1.源碼拓展

如今說說裏面的一些門道(只想用的能夠忽略)

新版本的封裝可能多層,這時候能夠看看Python3.3.X系列(這個算是Python3早期版本了,不少代碼都暴露出來,比較明瞭直觀)

multiprocessing.process.py

# 3.4.x開始,Process有了一個BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
    '''一直等到子進程over'''
    self._check_closed()
    # 斷言(False就觸發異常,提示就是後面的內容
    # 開發中用的比較多,部署的時候能夠python3 -O xxx 去除因此斷言
    assert self._parent_pid == os.getpid(), "只能 join 一個子進程"
    assert self._popen is not None, "只能加入一個已啓動的進程"
    res = self._popen.wait(timeout) # 本質就是用了咱們以前講的wait系列
    if res is not None:
        _children.discard(self) # 銷燬子進程

multiprocessing.popen_fork.py

# 3.4.x開始,在popen_fork文件中(之前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
    if self.returncode is None:
        # 設置超時的一系列處理
        if timeout is not None:
            from multiprocessing.connection import wait
            if not wait([self.sentinel], timeout):
                return None
        # 核心操做
        return self.poll(os.WNOHANG if timeout == 0.0 else 0)
    return self.returncode

# 回顧一下上次說的:os.WNOHANG - 若是沒有子進程退出,則不阻塞waitpid()調用
def poll(self, flag=os.WNOHANG):
    if self.returncode is None:
        try:
            # 他的內部調用了waitpid
            pid, sts = os.waitpid(self.pid, flag)
        except OSError as e:
            # 子進程還沒有建立
            # e.errno == errno.ECHILD == 10
            return None
        if pid == self.pid:
            if os.WIFSIGNALED(sts):
                self.returncode = -os.WTERMSIG(sts)
            else:
                assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
                self.returncode = os.WEXITSTATUS(sts)
    return self.returncode

關於斷言的簡單說明:(別氾濫)

若是條件爲真,它什麼都不作,反之它觸發一個帶可選錯誤信息的AssertionError

def test(a, b):
    assert b != 0, "哥哥,分母不能爲0啊"
    return a / b

def main():
    test(1, 0)

if __name__ == '__main__':
    main()

結果:

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 2, in test
    assert b != 0, "哥哥,分母不能爲0啊"
AssertionError: 哥哥,分母不能爲0啊

運行的時候能夠指定-O參數來忽略assert,eg:

python3 -O 0.assert.py

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 3, in test
    return a / b
ZeroDivisionError: division by zero

擴展:

https://docs.python.org/3/library/unittest.html

http://www.javashuo.com/article/p-yefnycop-r.html


1.2.進程池

多個進程就不須要本身手動去管理了,有Pool來幫你完成,先看個案例:

import os
import time
from multiprocessing import Pool  # 首字母大寫

def test(name):
    print("[子進程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)

def main():
    print("[父進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool(5) # 設置最多5個進程(不設置就默認爲CPU核數)
    for i in range(10):
        # 異步執行
        p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建議用)
    p.close() # 關閉池,再也不加入新任務
    p.join() # 等待全部子進程執行完畢回收資源(join能夠指定超時時間,eg:`p.join(1)`)
    print("over")

if __name__ == '__main__':
    main()

圖示:(join能夠指定超時時間,eg:p.join(1)1.進程池

調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process(下面會說爲何)


1.3.源碼拓展

驗證一下Pool的默認大小是CPU的核數,看源碼:

multiprocessing.pool.py

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
    def __init__(self, processes=指定的進程數,...):
        if processes is None:
            processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核數

源碼裏面apply_async方法,是有回調函數(callback)的

def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
    if self._state != RUN:
        raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result

來看個例子:(和JQ很像)

import os
import time
from multiprocessing import Pool  # 首字母大寫

def test(name):
    print("[子進程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)
    return name

def error_test(name):
    print("[子進程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    raise Exception("[子進程%s]啊,我掛了~" % name)

def callback(result):
    """成功以後的回調函數"""
    print("[子進程%s]執行完畢" % result)  # 沒有返回值就爲None

def error_callback(msg):
    """錯誤以後的回調函數"""
    print(msg)

def main():
    print("[父進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool()  # CPU默認核數
    for i in range(5):
        # 搞2個出錯的看看
        if i > 2:
            p.apply_async(
                error_test,
                args=(i, ),
                callback=callback,
                error_callback=error_callback)  # 異步執行
        else:
            # 異步執行,成功後執行callback函數(有點像jq)
            p.apply_async(test, args=(i, ), callback=callback)
    p.close()  # 關閉池,再也不加入新任務
    p.join()  # 等待全部子進程執行完畢回收資源
    print("over")

if __name__ == '__main__':
    main()

輸出:

[父進程]PID=12348,PPID=10999
[子進程0]PID=12349,PPID=12348
[子進程2]PID=12351,PPID=12348
[子進程1]PID=12350,PPID=12348
[子進程3]PID=12352,PPID=12348
[子進程4]PID=12352,PPID=12348
[子進程3]啊,我掛了~
[子進程4]啊,我掛了~
[子進程0]執行完畢
[子進程2]執行完畢
[子進程1]執行完畢
over
 

接着上面繼續拓展,補充說說獲取函數返回值。上面是經過成功後的回調函數來獲取返回值,此次說說自帶的方法:

import time
from multiprocessing import Pool, TimeoutError

def test(x):
    """開平方"""
    time.sleep(1)
    return x * x

def main():
    pool = Pool()
    task = pool.apply_async(test, (10, ))
    print(task)
    try:
        print(task.get(timeout=1))
    except TimeoutError as ex:
        print("超時了~", ex)

if __name__ == '__main__':
    main()

輸出:(apply_async返回一個ApplyResult類,裏面有個get方法能夠獲取返回值)

<multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8>
超時了~

再舉個例子,順便把Pool裏面的mapimap方法搞個案例(類比jq)

import time
from multiprocessing import Pool

def test(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        task = pool.apply_async(test, (10, ))
        print(task.get(timeout=1))

        obj_list = pool.map(test, range(10))
        print(obj_list)
        # 返回一個可迭代類的實例對象
        obj_iter = pool.imap(test, range(10))
        print(obj_iter)
        next(obj_iter)
        for i in obj_iter:
            print(i, end=" ")

輸出:

100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<multiprocessing.pool.IMapIterator object at 0x7ff7f9734198>
1 4 9 16 25 36 49 64 81

微微看一眼源碼:(基礎忘了能夠查看==> 點我 )

class IMapIterator(object):
    def __init__(self, cache):
        self._cond = threading.Condition(threading.Lock())
        self._job = next(job_counter)
        self._cache = cache
        self._items = collections.deque()
        self._index = 0
        self._length = None
        self._unsorted = {}
        cache[self._job] = self

    def __iter__(self):
        return self # 返回一個迭代器

    # 實現next方法
    def next(self, timeout=None):
        with self._cond:
            try:
                item = self._items.popleft()
            except IndexError:
                if self._index == self._length:
                    raise StopIteration from None
                self._cond.wait(timeout)
                try:
                    item = self._items.popleft()
                except IndexError:
                    if self._index == self._length:
                        raise StopIteration from None
                    raise TimeoutError from None

        success, value = item
        if success:
            return value
        raise value
......

擴展:優雅殺死子進程的探討 https://segmentfault.com/q/1010000005077517


1.4.拓展之subprocess

官方文檔:https://docs.python.org/3/library/subprocess.html

還記得以前李代桃僵的execlxxx系列嗎?

這不,subprocess就是它的一層封裝,固然了要強大的多,先看個例子:(以os.execlp的例子爲引)

import subprocess

def main():
    # os.execlp("ls", "ls", "-al")  # 執行Path環境變量能夠搜索到的命令
    result = subprocess.run(["ls", "-al"])
    print(result)

if __name__ == '__main__':
    main()

輸出

總用量 44
drwxrwxr-x 2 dnt dnt 4096 8月   7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月   6 08:01 ..
-rw-rw-r-- 1 dnt dnt  151 8月   3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt  723 8月   5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt  501 8月   3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月   6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt  340 8月   7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt  481 8月   7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt  652 8月   5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt  191 8月   7 17:33 3.subprocess.py
CompletedProcess(args=['ls', '-al'], returncode=0)

文檔

如今看下官方的文檔描述來理解一下:

r"""
具備可訪問I / O流的子進程
Subprocesses with accessible I/O streams

此模塊容許您生成進程,鏈接到它們輸入/輸出/錯誤管道,並獲取其返回代碼。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.

完整文檔能夠查看:https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.

Main API
========
run(...): 運行命令,等待它完成,而後返回`CompletedProcess`實例。
Runs a command, waits for it to complete, 
then returns a CompletedProcess instance.

Popen(...): 用於在新進程中靈活執行命令的類
A class for flexibly executing a command in a new process

Constants(常量)
---------
DEVNULL: 特殊值,表示應該使用`os.devnull`
Special value that indicates that os.devnull should be used

PIPE:    表示應建立`PIPE`管道的特殊值
Special value that indicates a pipe should be created

STDOUT:  特殊值,表示`stderr`應該轉到`stdout`
Special value that indicates that stderr should go to stdout

Older API(儘可能不用,說不定之後就淘汰了)
=========
call(...): 運行命令,等待它完成,而後返回返回碼。
Runs a command, waits for it to complete, then returns the return code.

check_call(...): Same as call() but raises CalledProcessError()
    if return code is not 0(返回值不是0就引起異常)

check_output(...): 與check_call()相同,但返回`stdout`的內容,而不是返回代碼
Same as check_call but returns the contents of stdout instead of a return code

getoutput(...): 在shell中運行命令,等待它完成,而後返回輸出
Runs a command in the shell, waits for it to complete,then returns the output

getstatusoutput(...): 在shell中運行命令,等待它完成,而後返回一個(exitcode,output)元組
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""

其實看看源碼頗有意思:(內部其實就是調用的os.popen【進程先導篇講進程守護的時候用過】)

def run(*popenargs, input=None, capture_output=False,
        timeout=None, check=False, **kwargs):

    if input is not None:
        if 'stdin' in kwargs:
            raise ValueError('stdin和輸入參數可能都不會被使用。')
        kwargs['stdin'] = PIPE

    if capture_output:
        if ('stdout' in kwargs) or ('stderr' in kwargs):
            raise ValueError('不能和capture_outpu一塊兒使用stdout 或 stderr')
        kwargs['stdout'] = PIPE
        kwargs['stderr'] = PIPE

    with Popen(*popenargs, **kwargs) as process:
        try:
            stdout, stderr = process.communicate(input, timeout=timeout)
        except TimeoutExpired:
            process.kill()
            stdout, stderr = process.communicate()
            raise TimeoutExpired(
                process.args, timeout, output=stdout, stderr=stderr)
        except:  # 包括KeyboardInterrupt的通訊處理。
            process.kill()
            # 不用使用process.wait(),.__ exit__爲咱們作了這件事。
            raise
        retcode = process.poll()
        if check and retcode:
            raise CalledProcessError(
                retcode, process.args, output=stdout, stderr=stderr)
    return CompletedProcess(process.args, retcode, stdout, stderr)

返回值類型:CompletedProcess

# https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):
    def __init__(self, args, returncode, stdout=None, stderr=None):
        self.args = args
        self.returncode = returncode
        self.stdout = stdout
        self.stderr = stderr

    def __repr__(self):
    """對象按指定的格式顯示"""
        args = [
            'args={!r}'.format(self.args),
            'returncode={!r}'.format(self.returncode)
        ]
        if self.stdout is not None:
            args.append('stdout={!r}'.format(self.stdout))
        if self.stderr is not None:
            args.append('stderr={!r}'.format(self.stderr))
        return "{}({})".format(type(self).__name__, ', '.join(args))

    def check_returncode(self):
        """若是退出代碼非零,則引起CalledProcessError"""
        if self.returncode:
            raise CalledProcessError(self.returncode, self.args, self.stdout,
                                     self.stderr)

簡單demo

再來個案例體會一下方便之處:

import subprocess

def main():
    result = subprocess.run(["ping", "www.baidu.com"])
    print(result.stdout)

if __name__ == '__main__':
    main()

圖示: 2.subprocess.gif

交互demo

再來個強大的案例(交互的程序均可以,好比 ftpnslookup 等等):popen1.communicate

import subprocess

def main():
    process = subprocess.Popen(
        ["ipython3"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    try:
        # 對pstree進行交互
        out, err = process.communicate(input=b'print("hello")', timeout=3)
        print("Out:%s\nErr:%s" % (out.decode(), err.decode()))
    except TimeoutError:
        # 若是超時到期,則子進程不會被終止,須要本身處理一下
        process.kill()
        out, err = process.communicate()
        print("Out:%s\nErr:%s" % (out.decode(), err.decode()))

if __name__ == '__main__':
    main()

輸出:

IPython 6.4.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: hello

In [2]: Do you really want to exit ([y]/n)?

Err:

注意點:若是超時到期,則子進程不會被終止,須要本身處理一下(官方提醒)

通訊demo

這個等會說進程間通訊還會說,因此簡單舉個例子,老規矩拿ps aux | grep bash說事:

import subprocess


def main():
    # ps aux | grep bash
    # 進程1獲取結果
    p1 = subprocess.Popen(["ps", "-aux"], stdout=subprocess.PIPE)
    # 獲得進程1的結果再進行篩選
    p2 = subprocess.Popen(["grep", "bash"], stdin=p1.stdout, stdout=subprocess.PIPE)
    # 關閉寫段(結果已經獲取到進程2中了,防止干擾顯示)
    p1.stdout.close()
    # 與流程交互:將數據發送到stdin並關閉它。
    msg_tuple = p2.communicate()
    # 輸出結果
    print(msg_tuple[0].decode())

if __name__ == '__main__':
    main()

輸出:(之前案例:進程間通訊~PIPE匿名管道

dnt       2470  0.0  0.1  24612  5236 pts/0    Ss   06:01   0:00 bash
dnt       2512  0.0  0.1  24744  5760 pts/1    Ss   06:02   0:00 bash
dnt      20784  0.0  0.1  24692  5588 pts/2    Ss+  06:21   0:00 /bin/bash
dnt      22377  0.0  0.0  16180  1052 pts/1    S+   06:30   0:00 grep bash

其餘擴展能夠看看這篇文章:subprocess與Popen()

 

1.5.進程間通訊~PIPE管道通訊

這個比較有意思,看個案例:

from multiprocessing import Process, Pipe

def test(w):
    w.send("[子進程]老爸,老媽回來記得喊我一下~")
    msg = w.recv()
    print(msg)

def main():
    r, w = Pipe()
    p1 = Process(target=test, args=(w, ))
    p1.start()
    msg = r.recv()
    print(msg)
    r.send("[父進程]滾犢子,趕忙寫做業,否則我得跪方便麪!")
    p1.join()

if __name__ == '__main__':
    main()

結果:

老爸,老媽回來記得喊我一下~
滾犢子,趕忙寫做業,否則我得跪方便麪!

multiprocessing.Pipe源碼分析

按照道理應該子進程本身寫完本身讀了,和上次講得不同啊?不急,先看看源碼:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def Pipe(self, duplex=True):
    '''返回由管道鏈接的兩個鏈接對象'''
    from .connection import Pipe
    return Pipe(duplex)

看看connection.Pipe方法的定義部分,是否是雙向通訊就看你是否設置duplex=True

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/connection.py
if sys.platform != 'win32':
    def Pipe(duplex=True):
        '''返回管道兩端的一對鏈接對象'''
        if duplex:
            # 雙工內部實際上是socket系列(下次講)
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = Connection(s1.detach())
            c2 = Connection(s2.detach())
        else:
            # 這部分就是咱們上次講的pipe管道
            fd1, fd2 = os.pipe()
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)
        return c1, c2
else: 
    def Pipe(duplex=True):
        # win平臺的一系列處理
        ......
        c1 = PipeConnection(h1, writable=duplex)
        c2 = PipeConnection(h2, readable=duplex)
        return c1, c2

經過源碼知道了,原來雙工是經過socket搞的啊~

再看個和原來同樣效果的案例:(不用關來關去的了,方便!)

from multiprocessing import Process, Pipe

def test(w):
    # 只能寫
    w.send("[子進程]老爸,我們完了,老媽一直在門口~")

def main():
    r, w = Pipe(duplex=False)
    p1 = Process(target=test, args=(w, ))
    p1.start() # 你把這個放在join前面就直接死鎖了
    msg = r.recv() # 只能讀
    print(msg)
    p1.join()

if __name__ == '__main__':
    main()

輸出:(能夠思考下爲何start換個位置就死鎖,提示:阻塞讀寫

[子進程]老爸,我們完了,老媽一直在門口~

再舉個Pool的例子,我們就進入今天的重點了:

from multiprocessing import Pipe, Pool

def proc_test1(conn):
    conn.send("[小明]小張,今天哥們要見一女孩,你陪我唄,我24h等你回覆哦~")
    msg = conn.recv()
    print(msg)

def proc_test2(conn):
    msg = conn.recv()
    print(msg)
    conn.send("[小張]不去,萬一被我帥氣的外表迷倒就坑了~")

def main():
    conn1, conn2 = Pipe()
    p = Pool()
    p.apply_async(proc_test1, (conn1, ))
    p.apply_async(proc_test2, (conn2, ))
    p.close()  # 關閉池,再也不接收新任務
    p.join()  # 等待回收,必須先關才能join,否則會異常

if __name__ == '__main__':
    main()

輸出:

[小明]小張,今天哥們要見一女孩,你陪我唄,我24h等你回覆哦
[小張]不去,萬一被我帥氣的外表迷倒就坑了~

pool.join源碼分析

看看源碼就理解了:看看Pool的join是啥狀況?看源碼:

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/pool.py
def join(self):
    util.debug('joining pool')
    if self._state == RUN:
        # 沒關閉就join,這邊就會拋出一個異常
        raise ValueError("Pool is still running")
    elif self._state not in (CLOSE, TERMINATE):
        raise ValueError("In unknown state")
    self._worker_handler.join()
    self._task_handler.join()
    self._result_handler.join()
    for p in self._pool:
        p.join() # 循環join回收

在pool的__init__的方法中,這幾個屬性:

self._processes = processes # 指定的進程數
self._pool = [] # 列表
self._repopulate_pool() # 給列表append內容的方法

將池進程的數量增長到指定的數量,join的時候會使用這個列表

def _repopulate_pool(self):
    # 指定進程數-當前進程數,差幾個補幾個
    for i in range(self._processes - len(self._pool)):
        w = self.Process(target=worker,
                         args=(self._inqueue, self._outqueue,
                               self._initializer,
                               self._initargs, self._maxtasksperchild,
                               self._wrap_exception)
                        )
        self._pool.append(w) # 重點來了
        w.name = w.name.replace('Process', 'PoolWorker')
        w.daemon = True # pool退出後,經過pool建立的進程都會退出
        w.start()
        util.debug('added worker')

注意:池的方法只能由建立它的進程使用


1.5.進程間通訊~Queue管道通訊(經常使用)

一步步的設局,從底層的的pipe()->os.pipe->PIPE,如今終於到Queue了,心酸啊,明知道上面兩個項目

裏面基本上不會用,但爲了大家能看懂源碼,說了這麼久%>_<%其實之後當咱們從Queue說到MQRPC以後,如今

講得這些進程間通訊(IPC)也基本上不會用了,但本質你得清楚,我儘可能多分析點源碼,這樣大家之後看開源項目壓力會很小

歡迎批評指正~

引入案例

from multiprocessing import Process, Queue

def test(q):
    q.put("[子進程]老爸,我出去嗨了")
    print(q.get())

def main():
    q = Queue()
    p = Process(target=test, args=(q, ))
    p.start()
    msg = q.get()
    print(msg)
    q.put("[父進程]去吧比卡丘~")
    p.join()

if __name__ == '__main__':
    main()

輸出:(getput默認是阻塞等待的)

[子進程]老爸,我出去嗨了
[父進程]去吧比卡丘~

源碼拓展

先看看Queue的初始化方法:(不指定大小就是最大隊列數)

# 隊列類型,使用PIPE,緩存,線程
class Queue(object):
    # ctx = multiprocessing.get_context("xxx")
    # 上下文總共3種:spawn、fork、forkserver(擴展部分會提一下)
    def __init__(self, maxsize=0, *, ctx):
        # 默認使用最大容量
        if maxsize <= 0:
            from .synchronize import SEM_VALUE_MAX as maxsize
        self._maxsize = maxsize  # 指定隊列大小
        # 建立了一個PIPE匿名管道(單向)
        self._reader, self._writer = connection.Pipe(duplex=False)
        # `multiprocessing/synchronize.py > Lock`
        self._rlock = ctx.Lock()  # 進程鎖(讀)【非遞歸】
        self._opid = os.getpid()  # 獲取PID
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()  # 進程鎖(寫)【非遞歸】
        # Semaphore信號量一般用於保護容量有限的資源
        # 控制信號量,超了就異常
        self._sem = ctx.BoundedSemaphore(maxsize)
        # 不忽略PIPE管道破裂的錯誤
        self._ignore_epipe = False 
        # 線程相關操做
        self._after_fork()
        # 向`_afterfork_registry`字典中註冊
        if sys.platform != 'win32':
            register_after_fork(self, Queue._after_fork)

關於getput是阻塞的問題,看下源碼探探究竟:

q.get():收消息

def get(self, block=True, timeout=None):
    # 默認狀況是阻塞(lock加鎖)
    if block and timeout is None:
        with self._rlock:
            res = self._recv_bytes()
        self._sem.release()  # 信號量+1
    else:
        if block:
            deadline = time.monotonic() + timeout
        # 超時拋異常
        if not self._rlock.acquire(block, timeout):
            raise Empty
        try:
            if block:
                timeout = deadline - time.monotonic()
                # 無論有沒有內容都去讀,超時就拋異常
                if not self._poll(timeout):
                    raise Empty
            elif not self._poll():
                raise Empty
            # 接收字節數據做爲字節對象
            res = self._recv_bytes()
            self._sem.release()  # 信號量+1
        finally:
            # 釋放鎖
            self._rlock.release()
    # 釋放鎖後,從新序列化數據
    return _ForkingPickler.loads(res)

queue.put():發消息

def put(self, obj, block=True, timeout=None):
        # 若是Queue已經關閉就拋異常
        assert not self._closed, "Queue {0!r} has been closed".format(self)
        # 記錄信號量的鎖
        if not self._sem.acquire(block, timeout):
            raise Full  # 超過數量,拋個異常
        # 條件變量容許一個或多個線程等待,直到另外一個線程通知它們
        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._notempty.notify()

非阻塞get_nowaitput_nowait本質其實也是調用了getput方法:

def get_nowait(self):
    return self.get(False)

def put_nowait(self, obj):
    return self.put(obj, False)

進程間通訊1

說這麼多不如來個例子看看:

from multiprocessing import Queue

def main():
    q = Queue(3)  # 只能 put 3條消息
    q.put([1, 2, 3, 4])  # put一個List類型的消息
    q.put({"a": 1, "b": 2})  # put一個Dict類型的消息
    q.put({1, 2, 3, 4})  # put一個Set類型的消息

    try:
        # 不加timeout,就一直阻塞,等消息隊列有空位才能發出去
        q.put("再加條消息唄", timeout=2)
    # Full(Exception)是空實現,你能夠直接用Exception
    except Exception:
        print("消息隊列已滿,隊列數%s,當前存在%s條消息" % (q._maxsize, q.qsize()))

    try:
        # 非阻塞,不能put就拋異常
        q.put_nowait("再加條消息唄")  # 至關於q.put(obj,False)
    except Exception:
        print("消息隊列已滿,隊列數%s,當前存在%s條消息" % (q._maxsize, q.qsize()))

    while not q.empty():
        print("隊列數:%s,當前存在%s條消息 內容%s" % (q._maxsize, q.qsize(), q.get_nowait()))

    print("隊列數:%s,當前存在:%s條消息" % (q._maxsize, q.qsize()))

if __name__ == '__main__':
    main()

輸出:

消息隊列已滿,隊列數3,當前存在3條消息
消息隊列已滿,隊列數3,當前存在3條消息
隊列數:3,當前存在3條消息 內容[1, 2, 3, 4]
隊列數:3,當前存在2條消息 內容{'a': 1, 'b': 2}
隊列數:3,當前存在1條消息 內容{1, 2, 3, 4}
隊列數:3,當前存在:0條消息

補充說明一下:

  1. q._maxsize 隊列數(儘可能不用_開頭的屬性和方法)
  2. q.qsize()查看當前隊列中存在幾條消息
  3. q.full()查看是否滿了
  4. q.empty()查看是否爲空

再看個簡單點的子進程間通訊:(鋪墊demo)

import os
import time
from multiprocessing import Process, Queue

def pro_test1(q):
    print("[子進程1]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
    q.put("[子進程1]小明,今晚擼串不?")

    # 設置一個簡版的重試機制(三次重試)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子進程2]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
    print(q.get())
    time.sleep(4)  # 模擬一下網絡延遲
    q.put("[子進程2]不去,我今天約了妹子")

def main():
    queue = Queue()
    p1 = Process(target=pro_test1, args=(queue, ))
    p2 = Process(target=pro_test2, args=(queue, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main()

輸出:(time python3 5.queue2.py

[子進程1]PPID=15220,PID=15221,GID=1000
[子進程2]PPID=15220,PID=15222,GID=1000
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子

real    0m6.087s
user    0m0.053s
sys 0m0.035s

進程間通訊2

多進程基本上都是用pool,可用上面說的Queue方法怎麼報錯了?

import os
import time
from multiprocessing import Pool, Queue

def error_callback(msg):
    print(msg)

def pro_test1(q):
    print("[子進程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子進程1]小明,今晚擼串不?")

    # 設置一個簡版的重試機制(三次重試)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子進程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模擬一下網絡延遲
    q.put("[子進程2]不去,我今天約了妹子")

def main():
    print("[父進程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    queue = Queue()
    p = Pool()
    p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
    p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

輸出:(沒法將multiprocessing.Queue對象傳遞給Pool方法)

[父進程]PPID=4223,PID=32170,GID=1000
Queue objects should only be shared between processes through inheritance
Queue objects should only be shared between processes through inheritance

real    0m0.183s
user    0m0.083s
sys 0m0.012s

下面會詳說,先看一下正確方式:(隊列換了一下,其餘都同樣Manager().Queue()

import os
import time
from multiprocessing import Pool, Manager

def error_callback(msg):
    print(msg)

def pro_test1(q):
    print("[子進程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子進程1]小明,今晚擼串不?")

    # 設置一個簡版的重試機制(三次重試)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子進程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模擬一下網絡延遲
    q.put("[子進程2]不去,我今天約了妹子")

def main():
    print("[父進程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    queue = Manager().Queue()
    p = Pool()
    p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
    p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

輸出:

[父進程]PPID=4223,PID=31329,GID=1000
[子進程1]PPID=31329,PID=31335,GID=1000
[子進程2]PPID=31329,PID=31336,GID=1000
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子

real    0m6.134s
user    0m0.133s
sys 0m0.035s

再拋個思考題:(Linux)

import os
import time
from multiprocessing import Pool, Queue

def error_callback(msg):
    print(msg)

q = Queue()

def pro_test1():
    global q
    print("[子進程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子進程1]小明,今晚擼串不?")
    # 設置一個簡版的重試機制(三次重試)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2():
    global q
    print("[子進程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模擬一下網絡延遲
    q.put("[子進程2]不去,我今天約了妹子")

def main():
    print("[父進程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    q = Queue()
    p = Pool()
    p.apply_async(pro_test1, error_callback=error_callback)
    p.apply_async(pro_test2, error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

輸出:(爲啥這樣也能夠【提示:fork】)

[父進程]PPID=12855,PID=16879,GID=1000
[子進程1]PPID=16879,PID=16880,GID=1000
[子進程2]PPID=16879,PID=16881,GID=1000
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子

real    0m6.120s
user    0m0.105s
sys     0m0.024s

進程拓展

官方參考:https://docs.python.org/3/library/multiprocessing.html

1.上下文系

  1. spawn:(Win默認,Linux下也能夠用【>=3.4】)
    1. 父進程啓動一個新的python解釋器進程。
    2. 子進程只會繼承運行進程對象run()方法所需的那些資源。
    3. 不會繼承父進程中沒必要要的文件描述符和句柄。
    4. 與使用fork或forkserver相比,使用此方法啓動進程至關慢。
    5. 可在Unix和Windows上使用。Windows上的默認設置。
  2. fork:(Linux下默認)
    1. 父進程用於os.fork()分叉Python解釋器。
    2. 子進程在開始時與父進程相同(這時候內部變量之類的尚未被修改)
    3. 父進程的全部資源都由子進程繼承(用到多線程的時候可能有些問題)
    4. 僅適用於Unix。Unix上的默認值。
  3. forkserver:(經常使用)
    1. 當程序啓動並選擇forkserver start方法時,將啓動服務器進程。
    2. 從那時起,每當須要一個新進程時,父進程就會鏈接到服務器並請求它分叉一個新進程。
    3. fork服務器進程是單線程的,所以它能夠安全使用os.fork()。沒有沒必要要的資源被繼承。
    4. 可在Unix平臺上使用,支持經過Unix管道傳遞文件描述符。

這塊官方文檔很詳細,貼下官方的2個案例:

經過multiprocessing.set_start_method(xxx)來設置啓動的上下文類型

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn') # 不要過多使用
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

輸出:(set_start_method不要過多使用)

hello

real    0m0.407s
user    0m0.134s
sys     0m0.012s

若是你把設置啓動上下文註釋掉:(消耗的總時間少了不少)

real    0m0.072s
user    0m0.057s
sys     0m0.016s

也能夠經過multiprocessing.get_context(xxx)獲取指定類型的上下文

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

輸出:(get_context在Python源碼裏用的比較多,so=>也建議你們這麼用)

hello

real    0m0.169s
user    0m0.146s
sys 0m0.024s

從結果來看,總耗時也少了不少


2.日記系列

說下日記相關的事情:

先看下multiprocessing裏面的日記記錄:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def log_to_stderr(self, level=None):
    '''打開日誌記錄並添加一個打印到stderr的處理程序'''
    from .util import log_to_stderr
    return log_to_stderr(level)

更多Loging模塊內容能夠看官方文檔:https://docs.python.org/3/library/logging.html

這個是內部代碼,看看便可:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/util.py
def log_to_stderr(level=None):
    '''打開日誌記錄並添加一個打印到stderr的處理程序'''
    # 全局變量默認是False
    global _log_to_stderr
    import logging

    # 日記記錄轉換成文本
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    # 一個處理程序類,它將已適當格式化的日誌記錄寫入流
    handler = logging.StreamHandler()  # 此類不會關閉流,由於用到了sys.stdout|sys.stderr
    # 設置格式:'[%(levelname)s/%(processName)s] %(message)s'
    handler.setFormatter(formatter)

    # 返回`multiprocessing`專用的記錄器
    logger = get_logger()
    # 添加處理程序
    logger.addHandler(handler)

    if level:
        # 設置日記級別
        logger.setLevel(level)
    # 如今log是輸出到stderr的
    _log_to_stderr = True
    return _logger

Logging以前也有提過,能夠看看:http://www.javashuo.com/article/p-wtppcuyx-g.html

來個案例:

import logging
from multiprocessing import Process, log_to_stderr

def test():
    print("test")

def start_log():
    # 把日記輸出定向到sys.stderr中
    logger = log_to_stderr()
    # 設置日記記錄級別
    # 敏感程度:DEBUG、INFO、WARN、ERROR、CRITICAL
    print(logging.WARN == logging.WARNING)  # 這兩個是同樣的
    level = logging.INFO
    logger.setLevel(level)  # 設置日記級別(通常都是WARN)

    # 自定義輸出
    # def log(self, level, msg, *args, **kwargs):
    logger.log(level, "我是通用格式")  # 通用,下面的內部也是調用的這個
    logger.info("info 測試")
    logger.warning("warning 測試")
    logger.error("error 測試")

def main():
    start_log()
    # 作的操做都會被記錄下來
    p = Process(target=test)
    p.start()
    p.join()

if __name__ == '__main__':
    main()

輸出:

True
[INFO/MainProcess] 我是通用格式
[INFO/MainProcess] info 測試
[WARNING/MainProcess] warning 測試
[ERROR/MainProcess] error 測試
[INFO/Process-1] child process calling self.run()
test
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

3.進程5態

以前忘記說了~如今快結尾了,補充一下進程5態:(來個草圖)

3.進程5態.png

 

1.6.進程間狀態共享

應該儘可能避免進程間狀態共享,但需求在那,因此仍是得研究,官方推薦了兩種方式:

1.共享內存(Value or Array

以前說過Queue:在Process之間使用沒問題,用到Pool,就使用Manager().xxxValueArray,就不太同樣了:

看看源碼:(Manager裏面的Array和Process共享的Array不是一個概念,並且也沒有同步機制)

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.py
class Value(object):
    def __init__(self, typecode, value, lock=True):
        self._typecode = typecode
        self._value = value

    def get(self):
        return self._value

    def set(self, value):
        self._value = value

    def __repr__(self):
        return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value)

    value = property(get, set) # 給value設置get和set方法(和value的屬性裝飾器同樣效果)

def Array(typecode, sequence, lock=True):
    return array.array(typecode, sequence)

Process爲例看看怎麼用:

from multiprocessing import Process, Value, Array

def proc_test1(value, array):
    print("子進程1", value.value)
    array[0] = 10
    print("子進程1", array[:])

def proc_test2(value, array):
    print("子進程2", value.value)
    array[1] = 10
    print("子進程2", array[:])

def main():
    try:
        value = Value("d", 3.14)  # d 類型,至關於C裏面的double
        array = Array("i", range(10))  # i 類型,至關於C裏面的int
        print(type(value))
        print(type(array))

        p1 = Process(target=proc_test1, args=(value, array))
        p2 = Process(target=proc_test2, args=(value, array))
        p1.start()
        p2.start()
        p1.join()
        p2.join()

        print("父進程", value.value)  # 獲取值
        print("父進程", array[:])  # 獲取值
    except Exception as ex:
        print(ex)
    else:
        print("No Except")

if __name__ == '__main__':
    main()

輸出:(ValueArray進程|線程安全的)

<class 'multiprocessing.sharedctypes.Synchronized'>
<class 'multiprocessing.sharedctypes.SynchronizedArray'>
子進程1 3.14
子進程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]
子進程2 3.14
子進程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]
父進程 3.14
父進程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]
No Except

類型方面的對應關係:

typecode_to_type = {
    'c': ctypes.c_char,
    'u': ctypes.c_wchar,
    'b': ctypes.c_byte,
    'B': ctypes.c_ubyte,
    'h': ctypes.c_short,
    'H': ctypes.c_ushort,
    'i': ctypes.c_int,
    'I': ctypes.c_uint,
    'l': ctypes.c_long,
    'L': ctypes.c_ulong,
    'q': ctypes.c_longlong,
    'Q': ctypes.c_ulonglong,
    'f': ctypes.c_float,
    'd': ctypes.c_double
}

這兩個類型實際上是ctypes類型,更多的類型能夠去` multiprocessing.sharedctypes`查看,來張圖: 4.ctypes.png 回頭解決GIL的時候會用到C系列或者Go系列的共享庫(講線程的時候會說)


關於進程安全的補充說明:對於原子性操做就不用說,鐵定安全,但注意一下i+=1並非原子性操做:

from multiprocessing import Process, Value

def proc_test1(value):
    for i in range(1000):
        value.value += 1

def main():
    value = Value("i", 0)
    p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)]
    # 批量啓動
    for i in p_list:
        i.start()
    # 批量資源回收
    for i in p_list:
        i.join()
    print(value.value)

if __name__ == '__main__':
    main()

輸出:(理論上應該是:5×1000=5000)

2153

稍微改一下才行:(進程安全:只是提供了安全的方法,並非什麼都不用你操心了

# 通用方法
def proc_test1(value):
    for i in range(1000):
        if value.acquire():
            value.value += 1
        value.release()

# 官方案例:(Lock可使用with託管)
def proc_test1(value):
    for i in range(1000):
        with value.get_lock():
            value.value += 1

# 更多能夠查看:`sharedctypes.SynchronizedBase` 源碼

輸出:(關於鎖這塊,後面講線程的時候會詳說,看看就好【語法的確比C#麻煩點】)

5000

看看源碼:(以前探討如何優雅的殺死子進程,其中就有一種方法使用了Value

def Value(typecode_or_type, *args, lock=True, ctx=None):
    '''返回Value的同步包裝器'''
    obj = RawValue(typecode_or_type, *args)
    if lock is False:
        return obj
    # 默認支持Lock
    if lock in (True, None):
        ctx = ctx or get_context() # 獲取上下文
        lock = ctx.RLock() # 獲取遞歸鎖
    if not hasattr(lock, 'acquire'): 
        raise AttributeError("%r has no method 'acquire'" % lock)
    # 一系列處理
    return synchronized(obj, lock, ctx=ctx)

def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
    '''返回RawArray的同步包裝器'''
    obj = RawArray(typecode_or_type, size_or_initializer)
    if lock is False:
        return obj
    # 默認是支持Lock的
    if lock in (True, None):
        ctx = ctx or get_context() # 獲取上下文
        lock = ctx.RLock()  # 遞歸鎖屬性
    # 查看是否有acquire屬性
    if not hasattr(lock, 'acquire'):
        raise AttributeError("%r has no method 'acquire'" % lock)
    return synchronized(obj, lock, ctx=ctx)

擴展部分能夠查看這篇文章:http://blog.51cto.com/11026142/1874807


2.服務器進程(Manager

官方文檔:https://docs.python.org/3/library/multiprocessing.html#managers

有一個服務器進程負責維護全部的對象,而其餘進程鏈接到該進程,經過代理對象操做服務器進程當中的對象

經過返回的經理Manager()將支持類型list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue

舉個簡單例子(後面還會再說):(本質其實就是多個進程經過代理,共同操做服務端內容)

from multiprocessing import Pool, Manager

def test1(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

def test2(d, l):
    print(d)
    print(l)

def main():
    with Manager() as manager:
        dict_test = manager.dict()
        list_test = manager.list(range(10))

        pool = Pool()
        pool.apply_async(test1, args=(dict_test, list_test))
        pool.apply_async(test2, args=(dict_test, list_test))
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()

輸出:

{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服務器進程管理器比使用共享內存對象更靈活,由於它們能夠支持任意對象類型。此外,單個管理器能夠經過網絡在不一樣計算機上的進程共享。可是,它們比使用共享內存慢(畢竟有了「中介」

同步問題依然須要注意一下,舉個例子體會一下:

from multiprocessing import Manager, Process, Lock

def test(dict1, lock):
    for i in range(100):
        with lock:  # 你能夠把這句話註釋掉,而後就知道爲何加了
            dict1["year"] += 1

def main():
    with Manager() as m:
        lock = Lock()
        dict1 = m.dict({"year": 2000})
        p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)]
        for i in p_list:
            i.start()
        for i in p_list:
            i.join()
        print(dict1)

if __name__ == '__main__':
    main()

擴展補充:

  1. multiprocessing.Lock是一個進程安全對象,所以您能夠將其直接傳遞給子進程並在全部進程中安全地使用它。
  2. 大多數可變Python對象(如list,dict,大多數類)不能保證進程中安全,因此它們在進程間共享時須要使用Manager
  3. 多進程模式的缺點是建立進程的代價大,在Unix/Linux系統下,用fork調用還行,在Windows下建立進程開銷巨大。

Manager這塊官方文檔很詳細,能夠看看:https://docs.python.org/3/library/multiprocessing.html#managers

WinServer的能夠參考這篇 or 這篇埋坑記(Manager通常都是部署在Linux的,Win的客戶端不影響)

擴展補充

還記得以前的:沒法將multiprocessing.Queue對象傳遞給Pool方法嗎?其實通常都是這兩種方式解決的:

  1. 使用Manager須要生成另外一個進程來託管Manager服務器。 而且全部獲取/釋放鎖的調用都必須經過IPC發送到該服務器。
  2. 使用初始化程序在池建立時傳遞常規multiprocessing.Queue()這將使Queue實例在全部子進程中全局共享

再看一下Pool的__init__方法:

# processes:進程數
# initializer,initargs 初始化進行的操做
# maxtaskperchild:每一個進程執行task的最大數目
# contex:上下文對象
def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None):

第一種方法不夠輕量級,在講案例前,稍微說下第二種方法:(也算把上面留下的懸念解了)

import os
import time
from multiprocessing import Pool, Queue

def error_callback(msg):
    print(msg)

def pro_test1():
    print("[子進程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))
    q.put("[子進程1]小明,今晚擼串不?")

    # 設置一個簡版的重試機制(三次重試)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2():
    print("[子進程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))
    print(q.get())
    time.sleep(4)  # 模擬一下網絡延遲
    q.put("[子進程2]不去,我今天約了妹子")

def init(queue):
    global q
    q = queue

def main():
    print("[父進程]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))
    queue = Queue()
    p = Pool(initializer=init, initargs=(queue, ))
    p.apply_async(pro_test1, error_callback=error_callback)
    p.apply_async(pro_test2, error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

輸出:(就是在初始化Pool的時候,傳了初始化執行的方法並傳了參數alizer=init, initargs=(queue, ))

[父進程]PPID=13157,PID=24864
[子進程1]PPID=24864,PID=24865
[子進程2]PPID=24864,PID=24866
[子進程1]小明,今晚擼串不?
[子進程2]不去,我今天約了妹子

real    0m6.105s
user    0m0.071s
sys     0m0.042s

Win下亦通用(win下沒有os.getgid5.win.png


1.7.分佈式進程的案例

有了1.6的基礎,我們來個例子練練:

BaseManager的縮略圖:

6.縮略.png

服務器端代碼:

from multiprocessing import Queue
from multiprocessing.managers import BaseManager

def main():
    # 用來身份驗證的
    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"
    get_zhang_queue = Queue()  # 小張消息隊列
    get_ming_queue = Queue()  # 小明消息隊列

    # 把Queue註冊到網絡上, callable參數關聯了Queue對象
    BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue)
    BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue)

    # 實例化一個Manager對象。綁定ip+端口, 設置驗證祕鑰
    manager = BaseManager(address=("192.168.36.235", 5438), authkey=key)
    # 運行serve
    manager.get_server().serve_forever()

if __name__ == '__main__':
    main()

客戶端代碼1:

from multiprocessing.managers import BaseManager

def main():
    """客戶端1"""
    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"

    # 註冊對應方法的名字(從網絡上獲取Queue)
    BaseManager.register("get_ming_queue")
    BaseManager.register("get_zhang_queue")

    # 實例化一個Manager對象。綁定ip+端口, 設置驗證祕鑰
    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)
    # 鏈接到服務器
    m.connect()

    q1 = m.get_zhang_queue()  # 在本身隊列裏面留言
    q1.put("[小張]小明,老大明天是否是去外地辦事啊?")

    q2 = m.get_ming_queue()  # 獲取小明說的話
    print(q2.get())

if __name__ == '__main__':
    main()

客戶端代碼2:

from multiprocessing.managers import BaseManager

def main():
    """客戶端2"""
    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"

    # 註冊對應方法的名字(從網絡上獲取Queue)
    BaseManager.register("get_ming_queue")
    BaseManager.register("get_zhang_queue")

    # 實例化一個Manager對象。綁定ip+端口, 設置驗證祕鑰
    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)
    # 鏈接到服務器
    m.connect()

    q1 = m.get_zhang_queue()  # 獲取小張說的話
    print(q1.get())

    q2 = m.get_ming_queue()  # 在本身隊列裏面留言
    q2.put("[小明]這幾天我們終於能夠不加班了(>_<)")

if __name__ == '__main__':
    main()

輸出圖示:

7.manager.gif

服務器運行在Linux的測試:

8.win.png

其實還有一部份內容沒說,明天得出去辦點事,先到這吧,後面找機會繼續帶一下


參考文章:

進程共享的探討:python-sharing-a-lock-between-processes

多進程鎖的探討:trouble-using-a-lock-with-multiprocessing-pool-pickling-error

JoinableQueue擴展:http://www.javashuo.com/article/p-fqbssexn-q.html

Python多進程編程:http://www.javashuo.com/article/p-uurtpuvk-d.html

有深度但須要辯證看的兩篇文章:

跨進程對象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue

關於Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process

 

NetCore併發編程

 Python的線程、並行、協程下次說

示例代碼:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency

先簡單說下概念(其實以前也有說,因此簡說下):

  1. 併發:同時作多件事情
  2. 多線程:併發的一種形式
  3. 並行處理:多線程的一種(線程池產生的一種併發類型,eg:異步編程
  4. 響應式編程:一種編程模式,對事件進行響應(有點相似於JQ的事件)

Net裏面不多用進程,在之前基本上都是線程+池+異步+並行+協程

我這邊簡單引入一下,畢竟主要是寫Python的教程,Net只是幫大家回顧一下,若是你發現還沒聽過這些概念,或者你的項目中還充斥着各類ThreadThreadPool的話,真的得系統的學習一下了,如今官網的文檔已經很完善了,記得早幾年啥都沒有,也只能挖那些外國開源項目:

https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency

1.異步編程(Task)

Task的目的其實就是爲了簡化ThreadThreadPool的代碼,下面一塊兒看看吧:

異步用起來比較簡單,通常IO,DB,Net用的比較多,不少時候都會採用重試機制,舉個簡單的例子:

/// <summary>
/// 模擬一個網絡操做(別忘了重試機制)
/// </summary>
/// <param name="url">url</param>
/// <returns></returns>
private async static Task<string> DownloadStringAsync(string url)
{
    using (var client = new HttpClient())
    {
        // 設置第一次重試時間
        var nextDelay = TimeSpan.FromSeconds(1);
        for (int i = 0; i < 3; i++)
        {
            try
            {
                return await client.GetStringAsync(url);
            }
            catch { }
            await Task.Delay(nextDelay); // 用異步阻塞的方式防止服務器被太多重試給阻塞了
            nextDelay *= 2; // 3次重試機會,第一次1s,第二次2s,第三次4s
        }
        // 最後一次嘗試,錯誤就拋出
        return await client.GetStringAsync(url);
    }
}

而後補充說下Task異常的問題,當你await的時候若是有異常會拋出,在第一個await處捕獲處理便可

若是asyncawait就是理解不了的能夠這樣想:async就是爲了讓await生效(爲了向後兼容)

對了,若是返回的是void,你設置成Task就好了,觸發是相似於事件之類的方法才使用void,否則沒有返回值都是使用Task

項目裏常常有這麼一個場景:等待一組任務完成後再執行某個操做,看個引入案例:

/// <summary>
/// 1.批量任務
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private async static Task<string[]> DownloadStringAsync(IEnumerable<string> list)
{
    using (var client = new HttpClient())
    {
        var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
        return await Task.WhenAll(tasks);
    }
}

再舉一個場景:同時調用多個同效果的API,有一個返回就行了,其餘的忽略

/// <summary>
/// 2.返回首先完成的Task
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private static async Task<string> GetIPAsync(IEnumerable<string> list)
{
    using (var client = new HttpClient())
    {
        var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
        var task = await Task.WhenAny(tasks); // 返回第一個完成的Task
        return await task;
    }
}

一個async方法被await調用後,當它恢復運行時就會回到原來的上下文中運行。

若是你的Task再也不須要上下文了可使用:task.ConfigureAwait(false),eg:寫個日記還要啥上下文?

逆天的建議是:在覈心代碼裏面一種使用ConfigureAwait,用戶頁面相關代碼,不須要上下文的加上

其實若是有太多await在上下文裏恢復那也是比較卡的,使用ConfigureAwait以後,被暫停後會在線程池裏面繼續運行

再看一個場景:好比一個耗時操做,我須要指定它的超時時間:

/// <summary>
/// 3.超時取消
/// </summary>
/// <returns></returns>
private static async Task<string> CancellMethod()
{
    //實例化取消任務
    var cts = new CancellationTokenSource();
    cts.CancelAfter(TimeSpan.FromSeconds(3)); // 設置失效時間爲3s
    try
    {
        return await DoSomethingAsync(cts.Token);
    }
    // 任務已經取消會引起TaskCanceledException
    catch (TaskCanceledException ex)
    {

        return "false";
    }
}
/// <summary>
/// 模仿一個耗時操做
/// </summary>
/// <returns></returns>
private static async Task<string> DoSomethingAsync(CancellationToken token)
{
    await Task.Delay(TimeSpan.FromSeconds(5), token);
    return "ok";
}

異步這塊簡單回顧就不說了,留兩個擴展,大家自行探討:

  1. 進度方面的可使用IProgress<T>,就當留個做業本身摸索下吧~
  2. 使用了異步以後儘可能避免使用task.Wait or task.Result,這樣能夠避免死鎖

Task其餘新特徵去官網看看吧,引入到此爲止了。


2.並行編程(Parallel)

這個其實出來好久了,如今基本上都是用PLinq比較多點,主要就是:

  1. 數據並行:重點在處理數據(eg:聚合)
  2. 任務並行:重點在執行任務(每一個任務塊儘量獨立,越獨立效率越高)

數據並行

之前都是Parallel.ForEach這麼用,如今和Linq結合以後很是方便.AsParallel()就OK了

說很抽象看個簡單案例:

static void Main(string[] args)
{
    IEnumerable<int> list = new List<int>() { 1, 2, 3, 4, 5, 7, 8, 9 };
    foreach (var item in ParallelMethod(list))
    {
        Console.WriteLine(item);
    }
}
/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
    return list.AsParallel().Select(x => x * x);
}

正常執行的結果應該是:

1
4
9
25
64
16
49
81

並行以後就是這樣了(無論順序了):

25
64
1
9
49
81
4
16

固然了,若是你就是對順序有要求可使用:.AsOrdered()

/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
    return list.AsParallel().AsOrdered().Select(x => x * x);
}

其實實際項目中,使用並行的時候:任務時間適中,太長不適合,過短也不適合

記得你們在項目裏常常會用到如SumCount等聚合函數,其實這時候使用並行就很合適

var list = new List<long>();
for (long i = 0; i < 1000000; i++)
{
    list.Add(i);
}
Console.WriteLine(GetSumParallel(list));
private static long GetSumParallel(IEnumerable<long> list)
{
    return list.AsParallel().Sum();
}

time dotnet PLINQ.dll

499999500000

real    0m0.096s
user    0m0.081s
sys 0m0.025s

不使用並行:(稍微多了點,CPU越密集差距越大)

499999500000

real    0m0.103s
user    0m0.092s
sys 0m0.021s

其實聚合有一個通用方法,能夠支持複雜的聚合:(以上面sum爲例)

.Aggregate(
            seed:0,
            func:(sum,item)=>sum+item
          );

稍微擴展一下,PLinq也是支持取消的,.WithCancellation(CancellationToken)

Token的用法和上面同樣,就不復述了,若是須要和異步結合,一個Task.Run就能夠把並行任務交給線程池了

也可使用Task的異步方法,設置超時時間,這樣PLinq超時了也就終止了

PLinq這麼方便,其實也是有一些小弊端的,好比它會直接最大程度的佔用系統資源,可能會影響其餘的任務,而傳統的Parallel則會動態調整


任務並行(並行調用)

這個PLinq好像沒有對應的方法,有新語法你能夠說下,來舉個例子:

await Task.Run(() =>
    Parallel.Invoke(
        () => Task.Delay(TimeSpan.FromSeconds(3)),
        () => Task.Delay(TimeSpan.FromSeconds(2))
    ));

取消也支持:

Parallel.Invoke(new ParallelOptions() { CancellationToken = token }, actions);

擴充說明

其實還有一些好比數據流響應編程沒說,這個以前都是用第三方庫,剛纔看官網文檔,好像已經支持了,因此就不賣弄了,感興趣的能夠去看看,其實項目裏面有流數據相關的框架,eg:Spark,都是比較成熟的解決方案了基本上也不太使用這些了。

而後還有一些沒說,好比NetCore裏面不可變類型(列表、字典、集合、隊列、棧、線程安全字典等等)以及限流任務調度等,這些關鍵詞我提一下,也方便你去搜索本身學習拓展

先到這吧,其餘的本身探索一下吧,最後貼一些Nuget庫,你能夠針對性的使用:

  1. 數據流Microsoft.Tpl.Dataflow
  2. 響應編程(Linq的Rx操做):Rx-Main
  3. 不可變類型Microsoft.Bcl.Immutable

不得不感慨一句,微軟媽媽真的花了不少功夫,Net的併發編程比Python省心多了(完)

相關文章
相關標籤/搜索