python — 協程

1. 協程

1.1 協程基礎

1.協程 :可以在一個線程下的多個任務之間來回切換,那麼每個任務都是一個協程。html

2.協程的優勢:python

  • 1.一個線程中的阻塞都被其餘的各類任務沾滿了
  • 2.讓操做系統以爲這個線程很忙,儘可能的減小這個線程進入阻塞的狀態,提升了單線程對CPU的利用率。
  • 3.多個任務在同一個線程中執行,也達到了一個併發的效果,規避了每個任務的io操做,減小了線程的個數,減輕了操做系統的負擔。

3.協程是用戶級別的,由咱們本身寫的python代碼來控制切換的,是操做系統不可見的。
4.在Cpython解釋器下:web

  • 因爲多線程自己就不能利用多核,因此即使是開啓了多個線程也只能輪流在一個CPU上執行,協程若是把全部任務的IO操做都規避掉,只剩下須要使用CPU的操做,就意味着協程就能夠作到提升CPU利用率的效果。
  • 協程和線程都不能利用多核,都是在一個CPU上輪流執行

5.多線程和協程的區別:編程

  • 線程:切換須要操做系統,開銷大,操做系統不可控,給操做系統的壓力大,操做系統對IO操做的感知更加靈敏。
  • 協程:切換須要python代碼,開銷小,用戶操做可控,徹底不會增長操做系統的壓力,用戶級別可以對IO操做的感知比較低。

1.2 切換

兩種切換方式:安全

  • 原生python完成 — asyncio(基於yield)
  • C語言完成的python模塊 —— gevent 或 greenlet
1.2.1 gevent模塊 / greenlet模塊

1.greenlet模塊網絡

import time
from  greenlet import greenlet

def eat():
    print('wusir is eating')
    time.sleep(0.5)
    g2.switch()
    print('wusir finished eat')

def sleep():
    print('小馬哥 is sleeping')
    time.sleep(0.5)
    print('小馬哥 finished sleep')
    g1.switch()

g1 = greenlet(eat)  # (實例化一個對象)創造一個協程任務
g2 = greenlet(sleep)
g1.switch()   # 作切換

2.gevent模塊多線程

gevent是基於greenlet切換的併發

<1.> 沒有返回值app

import time
import gevent
def eat():
    print('wusir is eating')
    time.sleep(1)
    print('wusir finished eat')
def sleep():
    print('小馬哥 is sleeping')
    time.sleep(1)
    print('小馬哥 finished sleep')

g1 = gevent.spawn(eat)  # 創造一個協程任務
gevent.sleep(1)  # 阻塞,切換出去去執行任務

注意:框架

  • 在gevent中,gevent.sleep(1)與time.sleep(1)執行的效果不同,

    gevent.sleep(1) -- 切換、切出去,time.sleep(1) -- 就是普通的睡,沒有切換的效果。

  • 若是想讓gevent認識time.sleep() -- 切出去,須要導入:

    from gevent import monkey
    monkey.patch_all()

    注意:

    • 此時的time已經不是python中內置函數time了,已經被 patch_all 重寫了。(效果見下面示例)

分辨gevent是否識別了咱們寫的代碼中的io操做的方法:

  • 在patchall以前打印一下涉及到io操做的函數地址
  • 在patchall以後打印一下涉及到io操做的函數地址
  • 若是兩個地址一致,說明gevent沒有識別這個io,若是不一致說明識別了
import time
print('-->',time.sleep)
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    print('in eat: ',time.sleep)
    time.sleep(1)
    print('wusir finished eat')

def sleep():
    print('小馬哥 is sleeping')
    time.sleep(1)
    print('小馬哥 finished sleep')

g1 = gevent.spawn(eat)  # 創造一個協程任務(發佈任務)
g2 = gevent.spawn(sleep)
g1.join()   # 阻塞 直到g1任務完成爲止(沒有join,主程序不會作切換)
g2.join()
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    time.sleep(1)
    print('wusir finished eat')

def sleep():
    print('小馬哥 is sleeping')
    time.sleep(1)
    print('小馬哥 finished sleep')
    
# 方式一:
# g1 = gevent.spawn(eat)
# g2 = gevent.spawn(sleep)
# # g1.join()
# # g2.join()

# gevent.joinall([g1,g2])  # gevent.joinall([g1,g2]) 至關於 g1.join() + g2.join()

# 方式二:
g_l = []
for i in range(10):
    g = gevent.spawn(eat)
    g_l.append(g)
gevent.joinall(g_l)

<2.> 有返回值

對象.value 接收返回值

value 是一個屬性,沒有阻塞功能,須要使用join / joinall 阻塞切換出去

import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    time.sleep(1)
    print('wusir finished eat')
    return 'wusir***'

def sleep():
    print('小馬哥 is sleeping')
    time.sleep(1)
    print('小馬哥 finished sleep')
    return '小馬哥666'

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
gevent.joinall([g1,g2])
print(g1.value)
print(g2.value)

<3.> 用於socket

用於socket的 server端 時,遇到 io 操做就切換。

傳參:gevent.spawn(函數名,參數1,參數2,……)

# server端
import gevent
from gevent import monkey
monkey.patch_all()
import socket

def chat(conn):
    while True:
        msg = conn.recv(1024).decode('utf-8')
        conn.send(msg.upper().encode('utf-8'))

sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()

while True:
    conn,_ = sk.accept()
    gevent.spawn(chat,conn)

# client端    
import time
import socket
def client(i):
    sk = socket.socket()
    sk.connect(('127.0.0.1',9000))

    while True:
        sk.send('hello'.encode('utf-8'))
        print(i*'*',sk.recv(1024))
        time.sleep(0.5)

from threading import Thread
for i in range(500):
    Thread(target=client,args =(i,)).start()
1.2.2 asyncio 模塊

asyncio 模塊是基於yield機制切換的

await asyncio.sleep() 異步 阻塞(作切換)

await 所在的函數前面必須加 async,變成一個async函數。

python原生的底層的協程模塊:

  • 爬蟲、webserver框架
  • 提升網絡編程的效率和併發效果

語法:

  • 1.await — async wait: 阻塞 協程函數這裏要切換出去,還能保證一下子再切回來

    await 必須寫在async函數裏,async函數是協程函數

  • loop 事件循環

    全部的協程的執行 調度 都離不開這個loop

1.起一個任務

import asyncio
async def demo():   # 協程方法(表示是一個async函數)
    print('start')
    await asyncio.sleep(1)  # 阻塞
    print('end')

loop = asyncio.get_event_loop()  # 建立一個事件循環
loop.run_until_complete(demo())  # 把demo任務丟到事件循環中去執行

2.啓動多個任務,而且沒有返回值

import asyncio
async def demo():   # 協程方法
    print('start')
    await asyncio.sleep(1)  # 阻塞
    print('end')

loop = asyncio.get_event_loop()  # 建立一個事件循環
wait_obj = asyncio.wait([demo(),demo(),demo()])
loop.run_until_complete(wait_obj)

3.啓動多個任務而且有返回值

import asyncio
async def demo():   # 協程方法
    print('start')
    await asyncio.sleep(1)  # 阻塞
    print('end')
    return 123

loop = asyncio.get_event_loop()
t1 = loop.create_task(demo())
t2 = loop.create_task(demo())
tasks = [t1,t2]
wait_obj = asyncio.wait([t1,t2])
loop.run_until_complete(wait_obj)
for t in tasks:
    print(t.result())

4.誰先回來先取誰的結果

import asyncio
async def demo(i):   # 協程方法
    print('start')
    await asyncio.sleep(10-i)  # 阻塞
    print('end')
    return i,123

async def main():
    task_l = []
    for i in range(10):
        task = asyncio.ensure_future(demo(i))
        task_l.append(task)
    for ret in asyncio.as_completed(task_l):
        res = await ret
        print(res)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

5.用asyncio作一個底層的爬蟲示例

import asyncio

async def get_url():
    reader,writer = await asyncio.open_connection('www.baidu.com',80)
    writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
    all_lines = []
    async for line in reader:
        data = line.decode()
        all_lines.append(data)
    html = '\n'.join(all_lines)
    return html

async def main():
    tasks = []
    for url in range(20):
        tasks.append(asyncio.ensure_future(get_url()))
    for res in asyncio.as_completed(tasks):
        result = await res
        print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())  # 處理一個任務

1.3 總結

1.進程、線程、協程各自的特色:

  • 1.進程:開銷大,數據隔離,能利用多核,數據不安全,操做系統控制
  • 2.線程:開銷適中,數據共享,在cpython解釋器下不能利用多核,數據不安全,操做系統控制
  • 3.協程:開銷小,數據共享,不能利用多核,數據安全,用戶控制

2.ansyncio模塊爲咱們提供了哪兩個關鍵字?分別用什麼做用?

  • async 標識一個協程函數
  • await 後面跟着一個asyncio模塊提供的io操做的函數
  • loop 事件循環,負責在多個任務之間進行切換的

3.關於讀代碼相關的問題:

  • 1.開啓線程是幾乎不須要事件的,start是一個異步非阻塞方法
  • 2.對於整數的 += 、-= 來講 異步的多線程數據不安全,若是是同步的數據就安全了
  • 3.對於列表的操做:不管是異步仍是同步的 都是數據安全的

4.有一個文件,這個文件中有20001行數據,開啓一個線程池,爲每100行建立一個任務,打印這100行數據。

# 方法一:
def print_line(lines):
    print(lines)

from concurrent.futures import ThreadPoolExecutor
tp = ThreadPoolExecutor(20)
with open('file',encoding='utf-8') as f:
    lines = []
    for line in f:
        if len(lines) == 100:
            tp.submit(print_line,lines)
            lines.clear()
        lines.append(line)
    if lines:
        tp.submit(print_line, lines)
# 方法二:
def print_line(lines):
    print(lines)

def read_file(filename):
    with open(filename, encoding='utf-8') as f:
        for line in f:
            yield line

def submit_func(tp,line=None,end= False,lines = []):
    if line:
        lines.append(line)
    if len(lines) == 100 or end:
        tp.submit(print_line, lines)
        lines.clear()

from concurrent.futures import ThreadPoolExecutor
tp = ThreadPoolExecutor(20)
for line in read_file('file'):
    submit_func(tp,line)
submit_func(tp,end=True)
  1. print 和 文件的讀、寫都是io操做:

    上面這個題主要就是起線程在一個線程讀文件的時候,利用這個線程讀取文件的時間交給另外一個線程來進行打印操做

  2. 使用線程能夠有效的規避掉io操做的時間,提升程序的的效率

  3. 解耦程序的功能

  4. 默認參數是列表

相關文章
相關標籤/搜索