網絡編程--協程

引子

上一節中咱們知道GIL鎖將致使CPython中多線程沒法並行執行,只能併發的執行。git

而併發實現的原理是切換+保存,那就意味着使用多線程實現併發,就須要爲每個任務建立一個線程,必然增長了線程建立銷燬與切換的帶來的開銷github

明顯的問題就是,高併發狀況下,因爲任務數量太多致使沒法開啓新的線程,使得即沒有實際任務要執行,也沒法建立新線程來處理新任務的狀況編程

如何解決上述問題呢,首先要保證併發效果,而後來想辦法避免建立線程帶來的開銷問題;json

協程既是所以而出現的,其原理是使用單線程來實現多任務併發,那麼如何能實現單線程併發呢?服務器

1、單線程實現併發

是否可行

單線程實現併發這句話乍一聽好像在瞎說多線程

首先須要明確併發的定義併發

併發:指的是多個任務同時發生,看起來好像是同時都在進行異步

並行:指的是多個任務真正的同時進行socket

早期的計算機只有一個CPU,既然CPU能夠切換線程來實現併發,那麼爲什麼不能在線程中切換任務來併發呢?

因此線程實現併發理論上是可行的

如何夠實現

併發 = 切換任務+保存狀態,只要找到一種方案,可以在兩個任務之間切換執行而且保存狀態,那就能夠實現單線程併發

python中的生成器就具有這樣一個特色,每次調用next都會回到生成器函數中執行代碼,這意味着任務之間能夠切換,而且是基於上一次運行的結果,這意味着生成器會自動保存執行狀態!

因而乎咱們能夠利用生成器來實現併發執行:

def task1():
    while True:
        yield
        print("task1 run")

def task2():
    g = task1()
    while True:
        next(g)
        print("task2 run")
task2()

併發雖然實現了,可是這對效率的影響是好是壞呢?來測試一下

# 兩個計算任務一個採用生成器切換併發執行  一個直接串行調用
import  time
def task1():
    a = 0
    for i in range(10000000):
        a += i
        yield

def task2():
    g = task1()
    b = 0
    for i in range(10000000):
        b += 1
        next(g)
s = time.time()
task2()
print("併發執行時間",time.time()-s)

# 單線程下串行執行兩個計算任務 效率反而比並發高 由於併發須要切換和保存
def task1():
    a = 0
    for i in range(10000000):
        a += i
def task2():
    b = 0
    for i in range(10000000):
        b += 1
s = time.time()
task1()
task2()
print("串行執行時間",time.time()-s)

能夠看到對於純計算任務而言,單線程併發反而使執行效率降低了一半左右,因此這樣的方案對於純計算任務而言是沒有必要的

greenlet模塊實現併發

咱們暫且不考慮這樣的併發對程序的好處是什麼,在上述代碼中,使用yield來切換是的代碼結構很是混亂,若是十個任務須要切換呢,不敢想象!所以就有人專門對yield進行了封裝,這便有了greenlet模塊

from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('jack')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('rose')#能夠在第一次switch時傳入參數,之後都不須要再次傳

該模塊簡化了yield複雜的代碼結構,實現了單線程下多任務併發,可是不管直接使用yield仍是greenlet都不能檢測IO操做,遇到IO時一樣進入阻塞狀態,一樣的對於純計算任務而言效率也是沒有任何提高的。

測試:

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題,

任務的代碼一般會既有計算操做又有阻塞操做,咱們徹底能夠在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提升效率,這就用到了Gevent模塊。

2、協程

協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

須要強調的是:

#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
#2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換

優勢以下:

#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
#2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

缺點以下:

#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程來儘量提升效率
#2. 協程本質是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

gevent模塊

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

經常使用方法:

#用法
#建立一個協程對象g1,
g1=gevent.spawn(func,1,,2,3,x=4,y=5)
#spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的
g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

遇到IO阻塞時會自動切換任務

import gevent,sys
from gevent import monkey # 導入monkey補丁
monkey.patch_all() # 打補丁 
import time

print(sys.path)

def task1():
    print("task1 run")
    # gevent.sleep(3)
    time.sleep(3)
    print("task1 over")

def task2():
    print("task2 run")
    # gevent.sleep(1)
    time.sleep(1)
    print("task2 over")

g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
#gevent.joinall([g1,g2])
g1.join()
g2.join()
# 執行以上代碼會發現不會輸出任何消息
# 這是由於協程任務都是以異步方式提交,因此主線程會繼續往下執行,而一旦執行完最後一行主線程也就結束了,
# 致使了協程任務沒有來的及執行,因此這時候必須join來讓主線程等待協程任務執行完畢   也就是讓主線程保持存活
# 後續在使用協程時也須要保證主線程一直存活,若是主線程不會結束也就意味着不須要調用join

須要注意:

1.若是主線程結束了 協程任務也會當即結束。

2.monkey補丁的原理是把原始的阻塞方法替換爲修改後的非阻塞方法,即偷樑換柱,來實現IO自動切換

必須在打補丁後再使用相應的功能,避免忘記,建議寫在最上方

咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程

monke補丁原理

#myjson.py
def dump():
    print("一個被替換的 dump函數")

def load():
    print("一個被替換的 load函數")
# test.py
import myjson
import json
# 補丁函數
def monkey_pacth_json():
    json.dump = myjson.dump
    json.load = myjson.load
    
# 打補丁
monkey_pacth_json()

# 測試 
json.dump()
json.load()
# 輸出:
# 一個被替換的 dump函數
# 一個被替換的 load函數

使用Gevent案例一 爬蟲:

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

使用Gevent案例二 TCP:

服務器
#=====================================服務端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)
客戶端
#=====================================多線程模擬多個客戶端併發訪問
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字對象必定要加到函數內,即局部名稱空間內,放在函數外則被全部線程共享,則你們公用一個套接字對象,那麼客戶端端口永遠同樣了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()
相關文章
相關標籤/搜索