python協程和IO多路複用

 協程介紹                                                                                                                    python

協程:是單線程下的併發,又稱微線程,是一種用戶態的輕量級線程。自己並不存在,是由程序員創造的。git

須要強調的是:程序員

1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(非io操做的切換與效率無關)
優勢:
  1,協程的切換開銷更小,屬於程序級別的切換,操做系統感知不到,於是更加輕量級;
  2,單線程內就能夠實現併發效果,最大限度利用cpu
缺點:
  1,協程的本質是在單線程下,沒法利用多核,能夠一個程序開啓多個進程,一個進程開啓多個線程,每一個線程內開啓協程。
  2,協程指的是單個線程,於是一旦協程出現阻塞,就會阻塞整個線程。

greenlet
import greenlet

def f1():
    print(11)
    gr2.switch()
    print(22)
    gr2.switch()

def f2():
    print(33)
    gr1.switch()
    print(44)

# 協程 gr1
gr1 = greenlet.greenlet(f1)
# 協程 gr2
gr2 = greenlet.greenlet(f2)
gr1.switch()
狀態切換

單純的切換(在沒有io的狀況或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度.github

gevent 
from gevent import monkey;monkey.patch_all()
import gevent
import time
import threading

def eat():
    print(threading.current_thread().getName())
    print(11)
    time.sleep(1)
    print(22)
def play():
    print(threading.current_thread().getName())
    print(33)
    time.sleep(1)
    print(44)
g1=gevent.spawn(eat)
g2=gevent.spawn(play)

gevent.joinall([g1,g2])
遇到IO主動切換
 

注意: from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前.web

from gevent import spawn, joinall, monkey;
monkey.patch_all()

import time
def task(pid):
    time.sleep(0.5)
    print('Task %s done' % pid)
def f1():  # 同步
    for i in range(10):
        task(i,)
def f2():  # 異步
    g = [spawn(task, i) for i in range(10)]
    joinall(g)

if __name__ == '__main__':
    print('f1')
    f1()
    print('f2')
    f2()
    print('DONE')
同步和異步

協程應用:windows

from gevent import monkey
monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 協程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 協程3
])
爬蟲

 

 IO多路複用                                                                                                                                  數組

I/O多路複用是指單個進程能夠同時監聽多個網絡的鏈接IO,用於提高效率.網絡

I/O(input/output),經過一種機制,能夠監視多個文件描述,一旦描述符就緒(讀就緒和寫就緒),能通知程序進行相應的讀寫操做。本來爲多進程或多線程來接收多個鏈接的消息變爲單進程或單線程保存多個socket的狀態後輪詢處理。多線程

非阻塞實例:併發

import socket

client = socket.socket()
client.setblocking(False) # 將原來阻塞的位置變成非阻塞(報錯)
# 百度建立鏈接: 阻塞
try:
    client.connect(('www.baidu.com',80)) # 執行了但報錯了
except BlockingIOError as e:
    pass
# 檢測到已經鏈接成功

# 問百度我要什麼?
client.sendall(b'GET /s?wd=fanbingbing HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')

# 我等着接收百度給個人回覆
chunk_list = []
while True:
    chunk = client.recv(8096) # 將原來阻塞的位置變成非阻塞(報錯)
    if not chunk:
        break
    chunk_list.append(chunk)

body = b''.join(chunk_list)
print(body.decode('utf-8'))
setblock

可是非阻塞IO模型毫不被推薦。

    咱們不可否則其優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。

    可是也難掩其缺點:  

  1. 循環調用recv()將大幅度推高CPU佔用率,在低配主機下極容易出現卡機狀況   2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。
select 
select是經過系統調用來監視一組由多個文件描述符組成的數組,經過調用select(),就緒的文件描述符會被內核標記出來,而後進程就能夠得到這些文件描述符,進行相應的讀寫操做.
執行過程:
  1,select須要提供要監控的數組,而後由用戶態拷貝到內核態
  2,內核態線性循環監控數組,每次都須要遍歷整個數組
  3,內核發現文件狀態符符合操做結果將其返回

注意:對於要監控的socket都要設置爲非阻塞的

python中使用select

r,w,e=select.selct(rlist,wlist,errlist,[timeout])

rlist,wlist,errlist均是waitable object;都是文件描述符,就是一個整數,或者擁有一個返回文件描述符的函數fileno的對象.

rlist:等待讀就緒的文件描述符數組

wlist:等待寫就緒的文件描述符數組

errlist:等待異常的數組

當rlist數組中的文件描述符發生可讀時,(調用accept或者read函數),則獲取文件描述符並添加到r數組中.

當wlist數組中的文件描述符發生可寫時,則獲取文件描述符添加到w數組中

當errlist數組中的的文件描述符發生錯誤時,將會添加到e隊列中.

select的實例:
import socket
import select

client1 = socket.socket()
client1.setblocking(False) # 百度建立鏈接: 非阻塞
try:
    client1.connect(('www.baidu.com',80))
except BlockingIOError as e:
    pass

client2 = socket.socket()
client2.setblocking(False) # 百度建立鏈接: 非阻塞
try:
    client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
    pass

socket_list = [client1,client2]
conn_list = [client1,client2]

while True:
    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
    # wlist中表示已經鏈接成功的socket對象
    for sk in wlist:
        if sk == client1:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
        elif sk==client2:
            sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n')
    for sk in rlist:
        chunk_list = []
        while True:
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
            except BlockingIOError as e:
                break
        body = b''.join(chunk_list)
        # print(body.decode('utf-8'))
        print(body)
        sk.close()
        socket_list.remove(sk)
    if not socket_list:
        break
單進程的併發
import socket
import select

class Req(object):
    def __init__(self,sk,func):
        self.sock = sk
        self.func = func
    def fileno(self):
        return self.sock.fileno()

class Nb(object):
    def __init__(self):
        self.conn_list = []
        self.socket_list = []
    def add(self,url,func):
        client = socket.socket()
        client.setblocking(False)  # 非阻塞
        try:
            client.connect((url, 80))
        except BlockingIOError as e:
            pass
        obj = Req(client,func)
        self.conn_list.append(obj)
        self.socket_list.append(obj)

    def run(self):
        while True:
            rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005)
            # wlist中表示已經鏈接成功的req對象
            for sk in wlist:
                # 發生變換的req對象
                sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
                self.conn_list.remove(sk)
            for sk in rlist:
                chunk_list = []
                while True:
                    try:
                        chunk = sk.sock.recv(8096)
                        if not chunk:
                            break
                        chunk_list.append(chunk)
                    except BlockingIOError as e:
                        break
                body = b''.join(chunk_list)
                # print(body.decode('utf-8'))
                sk.func(body)
                sk.sock.close()
                self.socket_list.remove(sk)
            if not self.socket_list:
                break
def baidu_repsonse(body):
    print('百度下載結果:',body)
def sogou_repsonse(body):
    print('搜狗下載結果:', body)
def google_repsonse(body):
    print('谷歌下載結果:', body)

t1 = Nb()
t1.add('www.baidu.com',baidu_repsonse)
t1.add('www.sogou.com',sogou_repsonse)
t1.add('www.google.com',google_repsonse)
t1.run()
高級版

select優勢:能夠跨平臺使用。

    缺點:1,每次調用select,都須要把fd集合由用戶態拷貝到內核態,在fd多的時候開銷會很大。

       2,每次select都是線性遍歷整個整個列表,在fd很大的時候遍歷開銷也很大。

操做系統檢測socket是否發生變化,有三種模式(後二者在windows上不支持):
    select:最多1024個socket;循環去檢測。
    poll:不限制監聽socket個數;循環去檢測(水平觸發)。
    epoll:不限制監聽socket個數;回調方式(邊緣觸發)。

線程、進程、協程的區別:

相關文章
相關標籤/搜索