python成長之路 多進程、協程

1、python3的多進程

一、multiprocessing模塊

python3是經過multiprocessing模塊來開啓子進程,並執行相應的定製任務python

multiprocessing模塊能夠支持子進程、通訊和數據共享、執行不一樣形式的同步,提供了process、Queue、Pipe、lock等組件。git

在這裏咱們強調的是多線程有共享狀態,進程沒有任何共享狀態,程序員

 

二、Process類的介紹

  • 建立進程的類

咱們經過Process類來建立子進程。下面是關於Process類中的參數的解釋github

Process(group=None, target=None, name=None, args=(), kwargs={})編程

在這些參數中group參數未使用,始終未None安全

target至關於咱們要開啓的子進程的任務的位置也就是調用的對象多線程

args使咱們給這個子進程傳遞參數,這個必須是元組的方式,若是隻有一個參數就寫成(num,)這種形式記得後面有一個「,」 args = (num,)併發

kwargs表示調用對象的字典,kwargs={"name":"Tom","age":18}app

name爲子進程的名稱異步

記住上面只是建立一個子進程這個子進程屬於未開啓狀態

 

  • 方法介紹

如上面我盟建立了一個子進程如

p = multiprocessing.Process()  #這個只是建立了一個子進程可是他並無運行。

使用p.start(): 這個表示啓動這個子進程,並調用了該子進程的p.run()

p.run(): 進程啓動時的方法,正是它去調用target指定的函數,在multiprocessing裏面已經實現,要是咱們自定義類的話一應要實現該方法。

p.is_alive():主要是看p是否在運行,若是還在運行那麼返回True。

p.join([timeout]): 主進程等待子進程p執行完畢纔會執行,因此在p執行完畢以前主進程處於等待狀態。timeout是可選的超時時間,在這裏咱們要記住p.join只能等待start開啓的進程,而對run開啓的進程沒有效果。

  • 一些屬性介紹

p.daemon:默認值爲Flash,若是設爲True,表明p爲後臺雲遜的守護進程,當p的父進程終止時,p也隨之終止,而且設置爲True後,p不能建立本身的子進程,必須在p.start()以前設置。

p.name:進程的名稱。

p.pid: 進程的pid

三、 使用Process類建立進程

#!/usr/bin/env python
# -*-coding:utf-8-*-
import multiprocessing
import time


def run(id):
    time.sleep(2)
    print("hello process{}".format(id))

if __name__ == "__main__":
    l = []
    i = 1
    for i in range(5):
        p = multiprocessing.Process(target=run, args=(i,))#args是一個元組因此必需要有,
        p.start()
        i += 1
        l.append(p)
    for i in l:     
        i.join() 
    print("主進程")
#若是沒有i.join()的話就會出現子進程沒有運行完,下面的代碼就會運行結果。

上面的運行結果是:
hello process1
hello process0
hello process4
hello process2
hello process3
主進程

若是沒有i.join()的話,運行結果爲 主進程 hello process0 hello process3 hello process4 hello process1 hello process2

從上能夠看出當有i.join()時咱們的程序就不會往下走。它會等待join()過的子進程執行完畢纔會繼續往下走

  

下面的演示是關於在子進程中開啓多線程

#!/usr/bin/env python
# -*-coding:utf-8-*-

import multiprocessing,time
import threading


def run():
    time.sleep(2)
    print("hello process")
    t = threading.Thread(target=trun,)
    t.start()


def trun():
    time.sleep(1)
    print("hello threading", threading.get_ident())


def main():
    l = []
    for i in range(10):
        p = multiprocessing.Process(target=run)
        p.start()
        print(p.name)
        l.append(p)
    for i in l:
        i.join()


if __name__ == "__main__":
    main()

#在Windows中的process()執行必須放到if __name__ == '__main__':下如上面的代碼,要否則就會出錯。

  

關於p.daemon=True的用法

#!/usr/bin/env python
# -*-coding:utf-8-*-
import multiprocessing
import time


def run(id):
    time.sleep(2)
    print("hello process{}".format(id))

if __name__ == "__main__":
    l = []
    i = 1
    for i in range(5):
        p = multiprocessing.Process(target=run, args=(i,))
        p.daemon = True   #記住這個必定要在start()方法以前設置
#同時記住當咱們設置守護進程時p進程就不能建立子進程,而且父進程結束,p也結束無論p有沒有運行完 p.start() i += 1 l.append(p) for i in l: i.join() #去掉這個後就會發現開啓守護進程的區別 print("主進程") #上面的加上守護進程咱們不會發現什麼不一樣和咱們前面沒加上同樣的結果。 #可是當咱們去掉join()方法的時候就會發現子進程尚未運行完畢整個進程就結束了

  

四、進程同步(鎖)

上面咱們也說了進程之間的數據是不共享的,可是共享同一套文件系統,因此訪問同一個文件或者同一個打印終端是沒問題的。當咱們同一個終端打印的時候會發現多行打印到一行的現象,出現這種現象就是多個進程共享並搶佔同一個終端打印致使打印亂了。

在這裏咱們加鎖的就是爲了保證同一時間同一個數據只能被一個進程修改,這樣作速度慢了可是卻也保證了數據的正確性

#!/usr/bin/env pytho# 
-*-coding:utf-8-*

from multiprocessing import Process,Lock def run(l, i): l.acquire() print("process:", i) # 在屏幕上打印的時候打印完整 l.release() if __name__ == "__main__": lock = Lock() p_list = [] for i in range(10): p = Process(target=run, args=(lock, i)) p.start() p_list.append(p) for i in p_list: i.join() 運行結果爲: process: 1 process: 0 process: 2 process: 8 process: 7 process: 5 process: 3 process: 6 process: 4 process: 9
這個程序我運行了不少次都沒有出現過打印錯亂的狀況,
好比第一行沒打印完,第二行就插在中間的這種狀況。
不過我我在Linux上弄了好屢次包括python3和python2
都沒有出現過相應的狀況。有可能我試驗少了,可是看別
人的博客上說python2運行的時候有可能出現錯亂的現象。
無論怎麼樣記住加上進程鎖就是爲了防止在屏幕上打印錯亂。

  

五、進程間的通信

隊列 Queue

上面咱們說了不一樣的進程是不可以共享內存的,可是爲了實現兩個進程之間的數據交換咱們可使用一下方法。

Queue

這裏使用的queue和線程threading的queue的用法差很少。

隊列屬於先進先出的原則的

建立隊列的類:

Queue([maxsize]): 建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

關於裏面的maxsize表示的是隊列容許的最大項數,省略表示無大小限制。

關於Queue的方法介紹:

q.put 是用來向隊列中插入數據的

q.get 是向隊列中去除一個元素,並刪除掉

q.empty():調用此方法時,q爲空則返回True 

q.full():調用此方法時q已滿則返回True

q.qsize():返回隊列中目前元素的數目

#!/usr/bin/env python
# -*-coding:utf-8-*-
from multiprocessing import Process, Queue


def run(arg):
    arg.put([41, None, 'miss'])

if __name__ == "__main__":
    q = Queue()
    p = Process(target=run, args=(q,))
    p.start()
    print("進程:", q.get())
    p.join()


運行的結果爲
進程: [41, None, 'miss']

  

管道 Pipe

 進程間的通訊方式除了隊列還有管道。

建立管道的類:

Pipe([duplex]):在進程間建立一條管道,並返回元組(conn1, conn2),其中conn1,conn2表示管道兩端的鏈接對象,必須在產生process對象以前,產生管道。 

duplex:默認管道是雙全工,若是duplex設置爲Flash那麼conn1只能用於接受。conn2只能用於發送

關於Pipe其中方法的介紹:

conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接受,recv會一直阻塞,若是鏈接的另外一端關閉了那麼recv方法會拋出EOFError。

conn2.send(obj):經過鏈接發送對象,obj是與序列化兼容的對象

#!/usr/bin/env python
# -*-coding:utf-8-*-
from multiprocessing import Process, Pipe


def run(arg):
    arg.send([1, None, "chengshun"])
    arg.send([2, None, "chengjie"])

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()  在建立進程以前使用Pipe()方法
    p = Process(target=run, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    p.join()

  

manager

主要是實如今進程間實現list,dict,Lock等等這種類型的數據的共享

#!/usr/bin/env python
# -*-coding:utf-8-*-
from multiprocessing import Process, Manager
import os


def run(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    #print(l)

if __name__ == "__main__":
    with Manager() as manager:  # 把Manager命名爲manager
        d = manager.dict()  # 建立一個共享的字典d
        l = manager.list(range(10))  # 建立一個共享的列表l
        process = []
        for i in range(10):
            p = Process(target=run, args=(d, l))  # 建立進程
            p.start()
            process.append(p)
        for i in process:
            i.join()

        print(l)
        print(d)

  

六、進程池Pool

咱們開多進程就是爲了併發執行任務,當咱們有多少核就開多少個進程,可是在日程中咱們經常須要的併發執行任務遠大於核數,這個是後咱們就能夠經過維護一個進程池來控制進程的數目。

進程池內部維護一個進程序列,當時用的,則取進程池中獲取一個進程,若是進程池中沒有可用序列的進程,那麼程序就會等待知道進程池中有可用的進程序列爲止。

Pool對象調用join方法會等待全部的子進程執行完畢,

在調用join方法以前,必須調用close

調用close以後就不可以繼續添加新的process了

Pool的中2中方法

apply_async:該方法用來同步執行進程,也就是說容許同時多個進程進入到進程池中。

apply:該方法只能容許一個進程進入池子,

#!/usr/bin/env python
# -*-coding:utf-8-*-
from multiprocessing import Pool,process
import os, time


def Foo(i):
    time.sleep(1)
    print("子進程號:", os.getpid())
    return i + 100


def back(arg):  # 這個回調函數是主進程調用的
    print("---exit---", arg)  # 這裏的arg就是func中的return返回值,若是沒有,那麼arg爲None


if __name__ == "__main__":
    pool = Pool(processes=5)  # 進程池中同時能夠容許多少個進程
    for i in range(40):
        pool.apply_async(func=Foo, args=(i,), callback=back)  # callback爲回調函數
        # pool.apply(func=Foo, args=(i,))  
        # 也就是說在沒次執行完func這個函數的時候就會調用callback函數。

    pool.close()
    pool.join()  # join()以前必需要close(),同時close()後就不可以在添加新的process
# 若是直接註釋掉pool.join()那麼程序就不會等待子進程結束就自動結束了

  

上面代碼中的pool.apply_async()這個運行的結果咱們會看見會出現運行的時候去掉---exit---會出現5個5個一組打印在屏幕上。若是是pool.apply()這個運行的時候咱們會發現會一行一行的打印。也就是串行的結果。

 

2、協程

協程,又稱爲微線程,纖程。英文名字Coroutine。咱們能夠說協程是一種用戶態的輕量級線程。

線程是系統級別的它由操做系統調度,而協程則是應用程序級別的由程序根據需求本身調度。一個線程中會有不少函數,咱們把這些函數稱之爲子程序,在子程序執行過程當中能夠去執行別的子程序,而別的子程序也能夠中斷回來繼續執行以前的子程序,這個過程我麼稱之爲協程。也就是說在同一線程內一段代碼在執行過程當中會中斷而後跳轉執行別的代碼,接着在以前中斷的地方繼續執行,相似於yield操做。

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置

協程的優勢:

        (1)無需線程上下文切換的開銷,協程避免了無心義的調度,由此能夠提升性能(但也所以,程序員必須本身承擔調度的責任,同時,協程也失去了標準線程使用多CPU的能力)

  (2)無需原子操做鎖定及同步的開銷

  (3)方便切換控制流,簡化編程模型

  (4)高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

協程的缺點:

  (1)沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

  (2)進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

 

一、關於yield實現協程效果

import time
import queue


def consumer(name):
    print("--->開始吃包子..." )
    while True:
        print("%s 須要包子" % name)
        new_baozi = yield
        print("[%s] 吃了包子%s" % (name, new_baozi))


def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m 是包子 %s" % n)


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

  

二、greenlet模塊實現模塊之間的切換

#!/usr/bin/env python
# -*-coding:utf-8-*-
from greenlet import greenlet


def fun1():
    print("1")   # 程序先執行這一行
    gre2.switch()  # 這表示咱們切換到gre2,也就是fun2.同時保留如今這個狀態,下次切回繼續執行
    print(4)  # 有下面的gre1.switch()切換到這執行該行
    gre2.switch()  # 在切換到gre2


def fun2():
    print("2")  # 由gre2.switch()切換到這執行
    gre1.switch()  # 這個表示切換到上面gre1,若是上面有保留狀態則從保留狀態開始
    print(3)


if __name__ == "__main__":
    gre1 = greenlet(fun1)  # 啓動一個協程
    gre2 = greenlet(fun2)  # 啓動一個協程
    gre1.switch()  # 一開始運行gre1也就是fun1

  上面的模塊咱們能夠知道這個須要咱們手動切換,不可以實現IO自動切換。

 

三、gevent

gevent是一個第三方庫,能夠輕鬆經過gevent實現協程編程,在gevent用到的主要模塊時greenlet,它是以C擴展模塊形式接入到Python的輕量級協程。 greenlet所有運行在主程序操做系統的內部,但它們協做式的調度。下面咱們看看

#!/usr/bin/env python
# -*-coding:utf-8-*-
import gevent


def f1():
    print("1")  
    gevent.sleep(2)  # 到這裏程序會自動的切換到f2函數
    print("6")


def f2():
    print("2")
    gevent.sleep(1) # 自動切換到f3哪怕這個sleep了0秒,它也會切換到下一個,至關於觸發了自動切換。
    print("5")


def f3():
    print("3")
    gevent.sleep(0) # 這個會切換到f1看看有沒有運行完,若是沒有運行完就按着順序繼續運行下去。
    print("4")

if __name__ == "__main__":
    gevent.joinall([
        gevent.spawn(f1),  # 至關於啓動一個協程,第一位就是一開始運行
        gevent.spawn(f2),
        gevent.spawn(f3)
    ])

 上面的程序運行的時候咱們能夠看到運行的結果是

1

2

3

4

5

6

它基本的實現了IO的自動切換

下面一個例子時候關於協程,爲了體現差別咱們同時也執行了同步時間

#!/usr/bin/env python
# -*-coding:utf-8-*-
import gevent, time
from urllib import request  # 包下面還有一個包,因此必須這樣不可以使用urllib.request
from gevent import monkey


def run(url, name):
    res = request.urlopen(url)
    data = res.read()
    f = open('{}.txt'.format(name), 'wb')
    f.write(data)
    f.close()
    print("{} bytes received from {}".format(len(data), url))


if __name__ == "__main__":
    # 同步時間
    get = [
        ("https://www.python.org", 'python'),
        ("https://www.yahoo.com", 'yahoo'),
        ("https://www.github.com", 'github')  ]
    start_time = time.time()
    for i, j in get:
        run(i, j)
    done_time = time.time()
    print("同步時間", done_time - start_time)

    # 異步時間
    monkey.patch_all()  # urllib默認狀況下是串行的使用這個是把當前程序的全部的IO操做的打上標記這個樣就能夠異步了
    asy_start_time = time.time()
    gevent.joinall([
        gevent.spawn(run, "https://www.python.org", 'python'),
        gevent.spawn(run, "https://www.yahoo.com", 'yahoo'),
        gevent.spawn(run, "https://www.github.com", 'github')

    ])  # 從第一位開始運行
    asy_done_time = time.time()
    print("異步時間", asy_done_time - asy_start_time)

運行的結果是
48872 bytes received from https://www.python.org
532554 bytes received from https://www.yahoo.com
52840 bytes received from https://www.github.com
同步時間 6.285885572433472
48872 bytes received from https://www.python.org
525992 bytes received from https://www.yahoo.com
52840 bytes received from https://www.github.com
異步時間 3.48797869682312

這個有時候時間是不對的,由於和網速有關,這幾個網址有時候會出現打開速度慢。

  

四、協程實現socket多併發

gevent_socket_server

#!/usr/bin/env python
# -*-coding:utf-8-*-
import socket, gevent
from gevent import monkey
monkey.patch_all()  # 必需要有這個,要否則就實現多併發


def Server():
    server = socket.socket()
    server.bind(("localhost", 9999))
    server.listen(500)
    while True:
        conn, add = server.accept()
        gevent.spawn(handle_request, conn)


def handle_request(conns):
    while True:
        received_data = conns.recv(1024).decode("utf-8")
        print("received:", received_data)
        if not received_data:
            conns.shutdown(socket.SHUT_WR)
        conns.send(received_data.encode("utf-8"))
    conns.close()

if __name__ == "__main__":
  Server()

 

socket_client

#!/usr/bin/env python
# -*-coding:utf-8-*-
import socket

client = socket.socket()
client.connect(("localhost", 9999))
while True:
    acquire = input(">>>").strip().encode("utf-8")
    print(acquire)
    if acquire.decode("utf-8") == "exit":
        break
    if len(acquire) == 0:
        continue
    client.send(acquire)
    print(2)
    data = client.recv(1024).decode("utf-8")
    print(3)
    print(data)
client.close()

 這個就是記錄本身學習中的知識,同時也知道寫的也不是很全。這個權當本身對知識的鞏固。

相關文章
相關標籤/搜索