Python - - 併發編程 - - 協成

目錄

  • 協程介紹
  • greenlet模塊
  • gevent模塊

1,協程介紹

  • 協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
  • 併發的本質:切換+保存狀態
  • 在操做系統中
    • 進程:是資源分配的最小單位
    • 線程:是CPU調度的最小單位
    • 協程:是單線程內實現併發切換執行任務的
  • 使用yield也能夠實如今一個主線程中切換執行
# 生成器 yield
def func1():
    print(1)
    yield
    print(3)
    yield

def func2():
    g = func1()   # 生成器函數在被調用時不會當即執行,除非next(g)觸發才能夠
    next(g)       # 開始執行func1()函數,可是遇到yield就會中止
    print(2)
    next(g)
    print(4)

func2()

# 結果呈現
1
2
3
4
def consumer():    # 消費者模型
    while True:
        n = yield     # yield接收g.send()的結果,而後賦值給n
        print("消費了包子 %s" % n)

def producer():   # 生產者模型
    g = consumer()    # 調用生成器函數,並不會理解執行生成器函數內部的代碼(除非next()進行觸發)
    next(g)   # 開始執行生成器函數
    for i in range(10):
        print("生產了包子 %s" % i)
        g.send(i) # send()給生成器函數yield處接收

producer()

# 結果呈現
生產了包子 0
消費了包子 0
生產了包子 1
消費了包子 1
生產了包子 2
消費了包子 2
生產了包子 3
消費了包子 3
生產了包子 4
消費了包子 4
生產了包子 5
消費了包子 5
生產了包子 6
消費了包子 6
生產了包子 7
消費了包子 7
生產了包子 8
消費了包子 8
生產了包子 9
消費了包子 9
  • 在yield切換中,在任務一遇到io狀況下,切到任務二去執行,這樣就能夠利用任務一阻塞的時間完成任務二的計算,效率的提高就在於此。
  • 須要強調的是:
    • 1 python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
    • 2 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)
  • 對比操做系統控制線程的切換,用戶在單線程內控制協程的切換
    • 優勢:
      • 1 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
      • 2 單線程內就能夠實現併發的效果,最大限度地利用CPU
    • 缺點:
      • 1 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程開啓多個線程,每一個線程內開啓協程
      • 2 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
  • 總結協程特色
    • 1 必須在只有要給單線程裏實現併發
    • 2 修改共享數據不須要加鎖
    • 3 用戶程序裏本身保持多個控制的上下文棧
    • 4 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

2,greenlet模塊

2.1 greenlet 實現同一線程內切換

import time
from greenlet import greenlet   # 在單線程中切換狀態的模塊
def eat1():
    print("吃雞腿")
    g2.switch()     # 切換執行eat2
    time.sleep(5)         # greenlet進行切換時,並不會規避掉IO時間(也就是切換回來時仍是須要等待2秒在執行)
    print("吃雞翅")
    g2.switch()     # 切換執行eat2
def eat2():
    print("吃餃子")
    g1.switch()     # 切換執行eat1
    time.sleep(3)         # greenlet進行切換時,並不會規避掉IO時間(也就是切換回來時仍是須要等待2秒在執行)
    print("白切雞")

g1 = greenlet(eat1)
g2 = greenlet(eat2)
g1.switch()    # 切換執行eat1

# 結果呈現
吃雞腿
吃餃子
吃雞翅
白切雞
  • 若是在同一個程序有IO的狀況下,才切換會讓效率提升不少,可是yield greenlet均不會在切換時規避掉IO時間

2.2 greenlet 實現 效率對比

#順序執行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

3,gevent模塊

  • gevent 就是當遇到gevent.sleep() IO 時會自動切換;
# gevent 內部封裝了greenlet模塊
import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)        # gevent能夠在gevevt.sleep()本身認識的IO操做切換
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
# g1.join()
# g2.join()
gevent.joinall([g1,g2])    # 至關於上面的g1.join()  g2.join()

# 結果呈現
egon eat 1
egon play 1
egon play 2
egon eat 2
  • gevent()對普通的IO (好比time模塊的sleep,socket 以及urllib request等網絡請求)是沒法切換的:
import gevent,time
def eat(name):
    print('%s eat 1' %name)
    time.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    time.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
gevent.joinall([g1,g2])

# 結果呈現
egon eat 1
egon eat 2
egon play 1
egon play 2
  • 上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
  • from gevent import monkey;monkey.patch_all() 必須放到被打補丁者的前面,如time,socket模塊以前
  • 咱們能夠用 threading.current_thread().getName() 來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程
from gevent import monkey;monkey.patch_all()    # 加上這句話,gevent遇到其餘模塊(time,socket等IO操做)的IO 須要等待時 就會切換協程
from threading import current_thread
import gevent,time
def func1():
    print(current_thread().name)    # 打印當前線程名(其實協程並非線程,多個協程是在同一個線程內完成的)
    print(123)
    time.sleep(1)
    print(456)

def func2():
    print(current_thread().name)
    print(789)
    time.sleep(1)
    print(101112)

g1 = gevent.spawn(func1)    # 碰見它認識的io會自動切換的模塊
g2 = gevent.spawn(func2)
# g1.join()
# g2.join()
gevent.joinall([g1,g2])

# 結果呈現
DummyThread-1
123
DummyThread-2
789
456
101112

3.1 Gevent之同步與異步

  • 測試有IO操做時,使用多個協程與開單線程單步執行多個任務執行效率的對比
from gevent import monkey;monkey.patch_all()
import gevent,time

def task(args):
    time.sleep(1)
    print(args)

def sync_fucn():    # 同步
    for i in range(10):
        task(i)

def async_func():   # 異步
    g_lst = []
    for i in range(10):
        g_lst.append(gevent.spawn(task,i))    # 發起協程任務,傳參數
    gevent.joinall(g_lst)

start = time.time()
sync_fucn()
print(time.time() - start)

start = time.time()
async_func()
print(time.time() - start)

# 結果呈現
0
1
2
3
4
5
6
7
8
9
10.011728048324585
0
1
2
3
4
5
6
7
8
9
1.0025279521942139

3.2 開多個協程去爬取多個網頁與單線程單步執行爬取網頁的效率對比

# 爬去網頁信息的例子
from gevent import monkey;monkey.patch_all()
import gevent,requests,time

# 協程函數發起10個網頁的爬取任務
def get_url(url):
    res = requests.get(url)
    print(url,res.status_code,len(res.text))    # 返回爬取網頁的信息(requests.get(url).text----獲取網頁源代碼; requests.get(url).status_code----獲取網頁狀態碼)

url_lst = [
    "http://www.sohu.com",
    "http://www.baidu.com",
    "http://www.qq.com",
    "http://www.python.org",
    "http://www.cnblogs.com",
    "http://www.mi.com",
    "http://www.apache.org"
]

g_lst = []
start = time.time()
for url in url_lst:
    g = gevent.spawn(get_url,url)
    g_lst.append(g)
gevent.joinall(g_lst)
print(time.time() - start)

start = time.time()
for url in url_lst:
    get_url(url)
print(time.time() - start)

# 結果呈現
http://www.baidu.com 200 2381
http://www.sohu.com 200 178923
http://www.qq.com 200 205793
http://www.mi.com 200 312788
http://www.cnblogs.com 200 41063
http://www.apache.org 200 62019
http://www.python.org 200 49235
1.198430061340332
http://www.sohu.com 200 178923
http://www.baidu.com 200 2381
http://www.qq.com 200 205793
http://www.python.org 200 49235
http://www.cnblogs.com 200 41043
http://www.mi.com 200 312788
http://www.apache.org 200 62019
2.1779263019561768
  • 協程在響應一個網頁時,有網絡延時,它就可能利用這個時間去打開其餘網頁了,也就是時間複用,有可能利用第一個網頁等待時間,把剩下全部網頁的請求都發出去了;
  • 同步單步執行時,每執行一個網頁就會等待網絡延時,串行的;而協程就是在發送一個網頁時,不等,由於它直到有網絡延時,因此直接執行下一個任務;

3.3 使用協程完成server端和client端的通訊

  • 測試連通性
# server
import socket

sk = socket.socket()
sk.bind(("127.0.0.1",8080))
sk.listen()

conn,addr = sk.accept()
ret = conn.recv(1024).decode("utf-8")
print(ret)
conn.send(ret.upper().encode("utf-8"))
conn.close()
sk.close()
# client
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",8080))
sk.send(b"hi")
ret = sk.recv(1024).decode("utf-8")
print(ret)
sk.close()
  • 客戶端併發鏈接服務端
# server
from gevent import monkey;monkey.patch_all()
import socket,gevent

def talk(conn):
    while True:
        ret = conn.recv(1024).decode("utf-8")
        print(ret)
        conn.send(ret.upper().encode("utf-8"))
    conn.close()

sk = socket.socket()
sk.bind(("127.0.0.1",8080))
sk.listen()

while True:
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)
sk.close()
# client
from gevent import monkey;monkey.patch_all()
import socket,gevent,time,threading

def my_client():
    sk = socket.socket()
    sk.connect(("127.0.0.1",8080))
    while True:
        sk.send(b"hi")
        ret = sk.recv(1024).decode("utf-8")
        print(ret)
        time.sleep(1)
    sk.close()
for i in range(500):
    threading.Thread(target=my_client).start()
相關文章
相關標籤/搜索