Python3學習之路~10.2 協程、Greenlet、Gevent

一 協程html

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程python

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:git

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。github

協程的好處:編程

  • 無需線程上下文切換的開銷
  • 無需原子操做鎖定及同步的開銷
    •   "原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:數組

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子 併發

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)

def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)

if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

咱們先給協程一個標準定義,即符合什麼條件就能稱之爲協程:異步

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 一個協程遇到IO操做自動切換到其它協程

基於上面這4點定義,咱們剛纔用yield實現的程並不能算是合格的線程,由於它有一點功能沒實現,哪一點呢?是第4點。socket

二 Greenletasync

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

安裝gevent模塊後,就可使用greenlet了。

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1) #啓動一個協程
gr2 = greenlet(test2)
gr1.switch() #手動切換協程,相似於yield的__next__()

感受確實用着比generator還簡單了呢,但好像尚未解決一個問題,就是遇到IO操做,自動切換,對不對?

三 Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

gevent能夠自動識別IO操做,以下程序中,fun1函數執行過程當中,進行了2秒的IO操做,fun2函數執行過程當中,進行了1秒的IO操做,使用gevent協程操做,遇到IO操做就自動切換,從而實現了併發,原本串行執行完這兩個函數須要3秒,如今只須要2秒。

import gevent

def func1():
    print('running in func1')
    gevent.sleep(2) #模擬IO操做
    print('wait 1 s switch to func1')

def func2():
    print('switch to func2')
    gevent.sleep(1)
    print('wait 1 s switch to func2')

gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2),
])
running in func1
switch to func2
wait 1 s switch to func2
wait 1 s switch to func1
輸出

同步與異步的性能區別

import gevent

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1, 10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()
View Code

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

前面咱們知道了,gevent 遇到IO阻塞時會自動切換任務。接下來咱們再看一個例子

先學習如何使用urllib爬一個網頁
from urllib import request
import gevent

def f(url):
    print('GET:%s' %url)
    resp = request.urlopen(url)
    data = resp.read()
    # f = open('url.html','wb')
    # f.write(data)
    # f.close()
    print('%d bytes received from %s' %(len(data),url))

gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    # gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/')
])
而後咱們使用協程一次爬多個網頁

比較一下 使用協程爬多個網頁 和 使用同步串行爬多個網頁 所花費的時間:

from urllib import request
import gevent
import time

def f(url):
    print('GET:%s' %url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s' %(len(data),url))

async_start_time = time.time()

gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://github.com/')
])

print('並行cost:',time.time()-async_start_time)
並行cost: 6.569547176361084
from urllib import request
import time

def f(url):
    print('GET:%s' %url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s' %(len(data),url))

url = [
    'https://www.python.org/',
     'https://github.com/',
]

sync_start_time = time.time()

for url in url:
    f(url)

print('串行cost:',time.time()-sync_start_time)
串行cost: 6.123349905014038

咱們發現,兩者所用時間差很少,並行並無比串行快。這是由於,gevent默認檢測不到urllib(還有前面所學的socket)的IO操做,因此它遇到IO阻塞後沒有自動切換任務,也就是說gevent對urllib來講很差使。

那麼怎麼才能讓gevent知道urllib正在進行IO操做呢,給它打個補丁。monkey.patch_all()就是把當前程序的全部IO操做全都單獨打上標記,這樣,gevent遇到它本身不能識別的IO操做時,由於有了標記,gevent也可以自動切換任務。

經過gevent實現單線程下的urllib爬網頁併發

from urllib import request
import gevent,time
from gevent import monkey

monkey.patch_all() #把當前程序的全部IO操做給我單獨打上標記

def f(url):
    print('GET:%s' %url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s' %(len(data),url))

async_start_time = time.time()

gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://github.com/')
])

print('並行cost:',time.time()-async_start_time)

#並行cost: 3.2701869010925293

 

經過gevent實現單線程下的多socket併發

server side 

import gevent
from gevent import socket, monkey

monkey.patch_all()

def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept() #過來一個連接
        gevent.spawn(handle_request, cli) #將連接交個gevent去起一個協程,把新生成的客戶端的鏈接實例交給handle_request方法

def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()

if __name__ == '__main__':
    server(8001)

client side  

import socket

HOST = 'localhost'  # The remote host
PORT = 8001  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)

    print('Received', repr(data))
s.close()
import socket
import threading

def sock_conn():

    client = socket.socket()

    client.connect(("localhost",8001))
    count = 0
    while True:
        #msg = input(">>:").strip()
        #if len(msg) == 0:continue
        client.send( ("hello %s" %count).encode("utf-8"))

        data = client.recv(1024)

        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果
        count +=1
    client.close()


for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()
併發100個sock鏈接
相關文章
相關標籤/搜索