Python3 與 C# 併發編程之~ 協程篇

 

3.協程篇

去年微信公衆號就陸陸續續發佈了,我一直覺得博客也彙總同步了,這幾天有朋友說一直沒找到,遂發現,的確是漏了,因此補上一篇html

在線預覽:https://github.lesschina.com/python/base/concurrency/4.併發編程-協程篇.htmlnode

示例代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutinepython

多進程和多線程切換之間也是有資源浪費的,相比而言協程更輕量級jquery

3.1.知識回顧

1.裝飾器

往期文章:http://www.javashuo.com/article/p-wtppcuyx-g.htmllinux

基礎拓展篇已經講的很透徹了,就再也不雷同了,貼一個簡單案例,而後擴展說說可迭代迭代器生成器nginx

In [1]:
% time

from functools import wraps

def log(func):
    @wraps(func)
    def wrapper(*args,**kv):
        print("%s log_info..." % func.__name__)
        return func(*args,**kv)
    return wrapper

@log
def login_out():
    print("已經退出登陸")

def main():
    # @wraps(func) 可使得裝飾先後,方法簽名一致
    print(f"方法簽名:{login_out.__name__}")
    login_out()
    
    # @wraps能讓你經過屬性 __wrapped__ 直接訪問被包裝函數
    login_out.__wrapped__() # 執行原來的函數

if __name__ == '__main__':
    main()
 
Wall time: 0 ns
方法簽名:login_out
login_out log_info...
已經退出登陸
已經退出登陸
 

2.迭代器

往期文章:http://www.javashuo.com/article/p-tmcacaan-d.htmlgit

過於基礎的就不說了,簡單說下,而後舉一個OOPDemo程序員

  1. 判斷是否可迭代:(能不能for遍歷)
    • from collections.abc import Iterable
    • isinstance(xxx, Iterable)
  2. 判斷是不是迭代器:(能不能next(xxx)遍歷)
    • from collections.abc import Iterator
    • isinstance(xxx, Iterable)
    • PS:迭代器是必定能夠迭代的
  3. 可迭代對象轉迭代器:(生成器都是迭代器)
    • list、dict、strIterable變成Iterator可使用iter()函數 eg:iter([])(節省資源)
    • PS:生成器都是Iterator對象,但list、dict、str雖然是Iterable,卻不是Iterator

提醒一下:from collections import Iterable, Iterator # 如今已經不推薦使用了(3.8會棄用)github

查看一下typing.py的源碼就知道了:web

# 模仿collections.abc中的那些(Python3.7目前只是過渡的兼容版,沒有具體實現)
def _alias(origin, params, inst=True):
    return _GenericAlias(origin, params, special=True, inst=inst)

T_co = TypeVar('T_co', covariant=True)  # Any type covariant containers.

Iterable = _alias(collections.abc.Iterable, T_co)
Iterator = _alias(collections.abc.Iterator, T_co)

以前說了個 CSharp 的 OOP Demo,此次來個Python的,咱們來一步步演變:

In [2]:
% time

# 導入相關模塊
from collections.abc import Iterable, Iterator
# from collections import Iterable, Iterator # 如今已經不推薦使用了(3.8會棄用)
 
Wall time: 0 ns
In [3]:
# 定義一個Class
class MyArray(object):
    pass
In [4]:
# 是否可迭代 False
isinstance(MyArray(),Iterable)
Out[4]:
False
In [5]:
# 是不是迭代器 False
isinstance(MyArray(),Iterator)
Out[5]:
False
In [6]:
# 若是Class裏面含有`__iter__`方法就是可迭代的
In [7]:
# 從新定義測試:
class MyArray(object):
    def __iter__(self):
        pass

# 是否可迭代 False
isinstance(MyArray(),Iterable)
Out[7]:
True
In [8]:
# 是不是迭代器 False
isinstance(MyArray(),Iterator)
Out[8]:
False
 

這時候依然不是迭代器

這個能夠類比C#:

  1. 能不能foreach就看你遍歷對象有沒有實現IEnumerable,就說明你是否是一個可枚舉類型(enumerator type)
  2. 是否是個枚舉器(enumerator)就看你實現了IEnumerator接口沒
// 能不能foreach就看你遍歷對象有沒有實現IEnumerable,就說明你是否是一個可枚舉類型
public interface IEnumerable
{
    IEnumerator GetEnumerator();
}

// 是否是個枚舉器(enumerator)就看你實現了IEnumerator接口沒
public interface IEnumerator
{
    object Current { get; }

    bool MoveNext();

    void Reset();
}

先看看Python對於的類吧:

# https://github.com/lotapp/cpython3/blob/master/Lib/_collections_abc.py
class Iterable(metaclass=ABCMeta):

    __slots__ = ()

    @abstractmethod
    def __iter__(self):
        while False:
            yield None

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Iterable:
            return _check_methods(C, "__iter__")
        return NotImplemented

class Iterator(Iterable):

    __slots__ = ()

    @abstractmethod
    def __next__(self):
        'Return the next item from the iterator. When exhausted, raise StopIteration'
        raise StopIteration

    def __iter__(self):
        return self

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Iterator:
            return _check_methods(C, '__iter__', '__next__')
        return NotImplemented

讀源碼的好處來了==>抽象方法:@abstractmethod(子類必須實現),上次漏講了吧~

上面說迭代器確定能夠迭代,說很抽象,代碼太直觀了 (繼承):class Iterator(Iterable)

如今咱們來模仿並實現一個Python版本的迭代器

In [9]:
% time

# 先搭個空架子
class MyIterator(Iterator):
    def __next__(self):
        pass

class MyArray(Iterable):
    def __iter__(self):
        return MyIterator() # 返回一個迭代器

def main():
    # 可迭代 True
    print(isinstance(MyArray(), Iterable))
    # 迭代器也是可迭代的 True
    print(isinstance(MyIterator(), Iterable))
    # 是迭代器 True
    print(isinstance(MyIterator(), Iterator))

if __name__ == '__main__':
    main()
 
Wall time: 0 ns
True
True
True
In [10]:
% time

# 把迭代器簡化合並
class MyIterator(Iterator):
    def __next__(self):
        pass

    def __iter__(self):
        return self # 返回一個迭代器(如今就是它本身了)

def main():
    print(isinstance(MyIterator(), Iterable))
    print(isinstance(MyIterator(), Iterator))

if __name__ == '__main__':
    main()
 
Wall time: 0 ns
True
True
In [11]:
% time

# 立刻進入正題了,先回顧一下Fibona
def fibona(n):
    a, b = 0, 1
    for i in range(n):
        a, b = b, a+b
        print(a)

# 獲取10個斐波拉契數列
fibona(10)
 
Wall time: 0 ns
1
1
2
3
5
8
13
21
34
55
In [12]:
% time

# 改形成迭代器
from collections.abc import Iterable, Iterator

class FibonaIterator(Iterator):
    def __init__(self, n):
        self.__a = 0
        self.__b = 1
        self.__n = n  # 獲取多少個
        self.__index = 0  # 當前索引

    def __next__(self):
        if self.__index < self.__n:
            self.__index += 1
            # 生成下一波
            self.__a, self.__b = self.__b, self.__a + self.__b
            return self.__a
        else:
            raise StopIteration # for循環結束條件

def main():
    print(FibonaIterator(10))
    for i in FibonaIterator(10):
        print(i)

if __name__ == "__main__":
    main()
 
Wall time: 0 ns
<__main__.FibonaIterator object at 0x000001CAFFD2C748>
1
1
2
3
5
8
13
21
34
55
 

3.生成器

往期文章:http://www.javashuo.com/article/p-tmcacaan-d.html

生成器是啥?看源碼就秒懂了:(迭代器的基礎上再封裝)

class Generator(Iterator):
    __slots__ = ()

    def __next__(self):
        """從生成器返回下一個item,結束的時候拋出 StopIteration"""
        return self.send(None)

    @abstractmethod
    def send(self, value):
        """將值發送到生成器。返回下一個產生的值或拋出StopIteration"""
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """在生成器中引起異常。返回下一個產生的值或拋出StopIteration"""
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    # 如今知道以前close後爲啥沒異常了吧~
    def close(self):
        """屏蔽異常"""
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("generator ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Generator:
            return _check_methods(C, '__iter__', '__next__',
                                  'send', 'throw', 'close')
        return NotImplemented

迭代器的基礎上再封裝了兩個抽象方法sendthrow和屏蔽異常的方法close

如今用生成器的方式改寫下斐波拉契數列:(列表推導式改爲小括號是最簡單的一種生成器)

In [13]:
% time

# 代碼瞬間就簡潔了
def fibona(n):
    a = 0
    b = 1
    for _ in range(n):
        a, b = b, a + b
        yield a # 加個yiel就變成生成器了

def main():
    print(fibona(10))
    for i in fibona(10):
        print(i)

if __name__ == "__main__":
    main()
 
Wall time: 0 ns
<generator object fibona at 0x000001CAFFD1AC00>
1
1
2
3
5
8
13
21
34
55
 

注意下這幾點:

  1. generator剛啓動的時候,要麼 next(),要麼 send(None),否則會引起:
    • TypeError: can't send non-None value to a just-started generator
  2. 在一個generator函數中,遇到return或者break則直接拋出StopIteration終止迭代
    • 若是沒有則默認執行至函數完畢
  3. 若是想要拿到返回值,必須捕獲StopIteration錯誤,返回值包含在StopIterationvalue
def test_send(n):
    for i in range(n):
        if i==2:
            return "i==2"
        yield i

g = test_send(5)

while True:
    try:
        tmp = next(g)
        print(tmp)
    except StopIteration as ex:
        print(ex.value)
        break

輸出:

0
1
i==2

其餘的也沒什麼好說的了,讀完源碼再看看以前講的內容別有一番滋味在心頭哦~

 

3.2.概念篇

上集回顧:網絡:靜態服務器+壓測

1.同步與異步

同步是指一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成。

異步是指不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做。而後繼續執行下面代碼邏輯,只要本身完成了整個任務就算完成了(異步通常使用狀態、通知和回調)

PS:項目裏面通常是這樣的:(我的經驗)

  1. 同步架構:通常都是和錢相關的需求,須要實時返回的業務
  2. 異步架構:更可能是對寫要求比較高時的場景(同步變異步)
    • 讀通常都是實時返回,代碼通常都是await xxx()
  3. 想象個情景就清楚了:
    • 異步:如今用戶寫了篇文章,能夠異步操做,就算沒真正寫到數據庫也能夠返回:發表成功(大不了失敗提示一下)
    • 同步:用戶獲取訂單信息,你若是異步就會這樣了:提示下獲取成功,而後一片空白...用戶不卸載就怪了...

2.阻塞與非阻塞

阻塞是指調用結果返回以前,當前線程會被掛起,一直處於等待消息通知,不可以執行其餘業務(大部分代碼都是這樣的)

非阻塞是指在不能馬上獲得結果以前,該函數不會阻塞當前線程,而會馬上返回(繼續執行下面代碼,或者重試機制走起)

PS:項目裏面重試機制爲啥通常都是3次?

  1. 第一次重試,兩臺PC掛了也是有可能的
  2. 第二次重試,負載均衡分配的三臺機器同時掛的可能性不是很大,這時候就有多是網絡有點擁堵了
  3. 最後一次重試,再失敗就沒意義了,日記寫起來,再重試網絡負擔就加大了,得不償失了

3.五種IO模型

對於一次IO訪問,數據會先被拷貝到內核的緩衝區中,而後纔會從內核的緩衝區拷貝到應用程序的地址空間。須要經歷兩個階段:

  1. 準備數據
  2. 將數據從內核緩衝區拷貝到進程地址空間

因爲存在這兩個階段,Linux產生了下面五種IO模型(以socket爲例

  1. 阻塞式IO:
    • 當用戶進程調用了recvfrom等阻塞方法時,內核進入IO的第1個階段:準備數據(內核須要等待足夠的數據再拷貝)這個過程須要等待,用戶進程會被阻塞,等內核將數據準備好,而後拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態
    • Linux中默認狀況下全部的socket都是阻塞的
  2. 非阻塞式IO:
    • 當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error
    • 用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做
    • 一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回
    • 非阻塞IO模式下用戶進程須要不斷地詢問內核的數據準備好了沒有
  3. IO多路複用
    • 經過一種機制,一個進程能夠監視多個文件描述符(套接字描述符)一旦某個文件描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做(這樣就不須要每一個用戶進程不斷的詢問內核數據準備好了沒)
    • 經常使用的IO多路複用方式有selectpollepoll
  4. 信號驅動IO:(以前咱們講進程先導篇的時候說過)
    • 內核文件描述符就緒後,經過信號通知用戶進程,用戶進程再經過系統調用讀取數據。
    • 此方式屬於同步IO(實際讀取數據到用戶進程緩存的工做仍然是由用戶進程本身負責的)
  5. 異步IOPOSIXaio_系列函數)
    • 用戶進程發起read操做以後,馬上就能夠開始去作其它的事。內核收到一個異步IO read以後,會馬上返回,不會阻塞用戶進程。
    • 內核會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,內核會給用戶進程發送一個signal告訴它read操做完成了

4.Unix圖示

貼一下Unix編程裏面的圖:

非阻塞IO

2.非阻塞IO.png

IO複用

3.IO複用.png

信號IO

4.信號IO.png

異步AIO

5.異步AIO.png

3.3.IO多路複用

開始以前我們經過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False))

import time
import socket

def select(socket_addr_list):
    for client_socket, client_addr in socket_addr_list:
        try:
            data = client_socket.recv(2048)
            if data:
                print(f"[來自{client_addr}的消息:]\n")
                print(data.decode("utf-8"))
                client_socket.send(
                    b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                )
            else:
                # 沒有消息是觸發異常,空消息是斷開鏈接
                client_socket.close()  # 關閉客戶端鏈接
                socket_addr_list.remove((client_socket, client_addr))
                print(f"[客戶端{client_addr}已斷開鏈接,當前鏈接數:{len(socket_addr_list)}]")
        except Exception:
            pass

def main():
    # 存放客戶端集合
    socket_addr_list = list()

    with socket.socket() as tcp_server:
        # 防止端口綁定的設置
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        tcp_server.setblocking(False)  # 服務端非阻塞
        while True:
            try:
                client_socket, client_addr = tcp_server.accept()
                client_socket.setblocking(False)  # 客戶端非阻塞
                socket_addr_list.append((client_socket, client_addr))
            except Exception:
                pass
            else:
                print(f"[來自{client_addr}的鏈接,當前鏈接數:{len(socket_addr_list)}]")
            # 防止客戶端斷開後出錯
            if socket_addr_list:
                # 輪詢查看客戶端有沒有消息
                select(socket_addr_list)  # 引用傳參
                time.sleep(0.01)

if __name__ == "__main__":
    main()

輸出: 6.nowait.gif

能夠思考下:

  1. 爲何Server也要設置爲非阻塞?
    • PS:一個線程裏面只能有一個死循環,如今程序須要兩個死循環,so ==> 放一塊兒咯
  2. 斷開鏈接怎麼判斷?
    • PS:沒有消息是觸發異常,空消息是斷開鏈接
  3. client_socket爲何不用dict存放?
    • PS:dict在循環的過程當中,del會引起異常

1.Select

select和上面的有點相似,就是輪詢的過程交給了操做系統:

kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程

來個和上面等同的案例:

import select
import socket

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        socket_info_dict = dict()
        socket_list = [tcp_server]  # 監測列表
        while True:
            # 劣勢:select列表數量有限制
            read_list, write_list, error_list = select.select(
                socket_list, [], [])
            for item in read_list:
                # 服務端迎接新的鏈接
                if item == tcp_server:
                    client_socket, client_address = item.accept()
                    socket_list.append(client_socket)
                    socket_info_dict[client_socket] = client_address
                    print(f"[{client_address}已鏈接,當前鏈接數:{len(socket_list)-1}]")
                # 客戶端發來
                else:
                    data = item.recv(2048)
                    if data:
                        print(data.decode("utf-8"))
                        item.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        item.close()
                        socket_list.remove(item)
                        info = socket_info_dict[item]
                        print(f"[{info}已斷開,當前鏈接數:{len(socket_list)-1}]")

if __name__ == "__main__":
    main()

輸出和上面同樣

擴展說明:

select 函數監視的文件描述符分3類,分別是writefdsreadfds、和exceptfds。調用後select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,若是當即返回設爲null便可)

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024(64位=>2048)

而後Poll就出現了,就是把上限給去掉了,本質並沒變,仍是使用的輪詢

2.EPoll

epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,採用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表

先來看個案例吧:(輸出和上面同樣)

import socket
import select

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()

        # epoll是linux獨有的
        epoll = select.epoll()
        # tcp_server註冊到epoll中
        epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)

        # key-value
        fd_socket_dict = dict()

        # 回調須要本身處理
        while True:
            # 返回可讀寫的socket fd 集合
            poll_list = epoll.poll()
            for fd, event in poll_list:
                # 服務器的socket
                if fd == tcp_server.fileno():
                    client_socket, client_addr = tcp_server.accept()
                    fd = client_socket.fileno()
                    fd_socket_dict[fd] = (client_socket, client_addr)
                    # 把客戶端註冊進epoll中
                    epoll.register(fd, select.EPOLLIN | select.EPOLLET)
                else:  # 客戶端
                    client_socket, client_addr = fd_socket_dict[fd]
                    data = client_socket.recv(2048)
                    print(
                        f"[來自{client_addr}的消息,當前鏈接數:{len(fd_socket_dict)}]\n")
                    if data:
                        print(data.decode("utf-8"))
                        client_socket.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        del fd_socket_dict[fd]
                        print(
                            f"[{client_addr}已離線,當前鏈接數:{len(fd_socket_dict)}]\n"
                        )
                        # 從epoll中註銷
                        epoll.unregister(fd)
                        client_socket.close()

if __name__ == "__main__":
    main()

擴展:epoll的兩種工做模式

LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。LT模式是默認的工做模式。 LT模式同時支持阻塞和非阻塞socket。

ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。 ET是高速工做方式,只支持非阻塞socket(ET模式減小了epoll事件被重複觸發的次數,所以效率要比LT模式高)

Code提煉一下

  1. 實例化對象:epoll = select.epoll()
  2. 註冊對象:epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
  3. 註銷對象:epoll.unregister(fd)

PS:epoll不必定比Select性能高,通常都是分場景的:

  1. 高併發下,鏈接活躍度不高時:epoll比Select性能高(eg:web請求,頁面隨時關閉)
  2. 併發不高,鏈接活躍度比較高:Select更合適(eg:小遊戲)
  3. Select是win和linux通用的,而epoll只有linux有

其實IO多路複用還有一個kqueue,和epoll相似,下面的通用寫法中有包含


3.通用寫法(Selector

通常來講:Linux下使用epoll,Win下使用select(IO多路複用會這個通用的便可)

先看看Python源代碼:

# 選擇級別:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

實戰案例:(可讀和可寫能夠不分開)

import socket
import selectors

# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()

class Task(object):
    def __init__(self):
        # 存放客戶端fd和socket鍵值對
        self.fd_socket_dict = dict()

    def run(self):
        self.server = socket.socket()
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind(('', 8080))
        self.server.listen()
        # 把Server註冊到epoll
        Selector.register(self.server.fileno(), selectors.EVENT_READ,
                          self.connected)

    def connected(self, key):
        """客戶端鏈接時處理"""
        client_socket, client_address = self.server.accept()
        fd = client_socket.fileno()
        self.fd_socket_dict[fd] = (client_socket, client_address)
        # 註冊一個客戶端讀的事件(服務端去讀消息)
        Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
        print(f"{client_address}已鏈接,當前鏈接數:{len(self.fd_socket_dict)}")

    def call_back_reads(self, key):
        """客戶端可讀時處理"""
        # 一個fd只能註冊一次,監測可寫的時候須要把可讀給註銷
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        print(f"[來自{client_address}的消息:]\n")
        data = client_socket.recv(2048)
        if data:
            print(data.decode("utf-8"))
            # 註冊一個客戶端寫的事件(服務端去發消息)
            Selector.register(key.fd, selectors.EVENT_WRITE,
                              self.call_back_writes)
        else:
            client_socket.close()
            del self.fd_socket_dict[key.fd]
            print(f"{client_address}已斷開,當前鏈接數:{len(self.fd_socket_dict)}")

    def call_back_writes(self, key):
        """客戶端可寫時處理"""
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        client_socket.send(b"ok")
        Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)

def main():
    t = Task()
    t.run()
    while True:
        ready = Selector.select()
        for key, obj in ready:
            # 須要本身回調
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    main()

Code提煉一下

  1. 實例化對象:Selector = selectors.DefaultSelector()
  2. 註冊對象:
    • Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
    • Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
  3. 註銷對象:Selector.unregister(key.fd)
  4. 注意一下:一個fd只能註冊一次,監測可寫的時候須要把可讀給註銷(反之同樣)

業餘拓展:

select, iocp, epoll,kqueue及各類I/O複用機制
https://blog.csdn.net/shallwake/article/details/5265287

kqueue用法簡介
http://www.cnblogs.com/luminocean/p/5631336.html
 

3.4.協程引入

1.yield from

咱們常常有這樣的需求:讀取兩個分表的數據列表,而後合併以後進行一些處理

平時能夠借用itertools.chain來遍歷:

# https://docs.python.org/3/library/itertools.html#itertools.chain
import itertools

def main():
    # 模擬分表後的兩個查詢結果
    user1 = ["小張", "小明"]
    user2 = ["小潘", "小周"]
    # dict只能遍歷key(這種狀況須要本身封裝合併方法並處理下)
    user3 = {"name": "test1", "name1": "test2"}

    # 需求:合併並遍歷
    for item in itertools.chain(user1, user2, user3):
        print(item)

if __name__ == '__main__':
    main()

輸出:

小張
小明
小潘
小周
name
name1

它的內部實現實際上是這樣的:(至關於兩層遍歷,用yield返回

def my_chain(*args, **kwargs):
    for items in args:
        for item in items:
            yield item

def main():
    # 模擬分表後的兩個查詢結果
    user1 = ["小張", "小明"]
    user2 = ["小潘", "小周"]
    # dict只能遍歷key(這種狀況須要本身封裝合併方法並處理下)
    user3 = {"name": "test1", "name1": "test2"}

    # 需求:合併並遍歷
    for item in my_chain(user1, user2, user3):
        print(item)

if __name__ == '__main__':
    main()

而後Python3.3以後語法再一步簡化(yield from iterable對象

def my_chain(*args, **kwargs):
    for items in args:
        yield from items

def main():
    # 模擬分表後的兩個查詢結果
    user1 = ["小張", "小明"]
    user2 = ["小潘", "小周"]

    # 需求:合併並遍歷
    for item in my_chain(user1, user2):
        print(item)

if __name__ == '__main__':
    main()

輸出:

小張
小明
小潘
小周
test1
test2

擴展(可忽略)

其實知道了內部實現,很容易就寫上一段應對的處理:

def my_chain(*args, **kwargs):
    for my_iterable in args:
        # 若是是字典類型就返回value
        if isinstance(my_iterable, dict):
            my_iterable = my_iterable.values()
        for item in my_iterable:
            yield item

def main():
    # 模擬分表後的兩個查詢結果
    user1 = ["小張", "小明"]
    user2 = ["小潘", "小周"]
    # dict只能遍歷key(這種狀況須要本身封裝合併方法並處理下)
    user3 = {"name": "test1", "name1": "test2"}
    # 需求:合併並遍歷
    for item in my_chain(user1, user2, user3):
        print(item)

if __name__ == '__main__':
    main()

輸出:

小張
小明
小潘
小周
test1
test2

擴展的正確處理

PS:通常不會這麼幹的,通常都是[{},{}]遍歷並處理:

import itertools

def main():
    # 模擬分表後的兩個查詢結果
    user1 = [{"name": "小張"}, {"name": "小明"}]
    user2 = [{"name": "小潘"}, {"name": "小周"}]
    user3 = [{"name": "test1"}, {"name": "test2"}]
    # 需求:合併並遍歷
    for item in itertools.chain(user1, user2, user3):
        # 通常都是直接在這裏進行處理
        for key, value in item.items():
            print(value)

if __name__ == '__main__':
    main()

1.yield版協程

協程的目的其實很簡單:像寫同步代碼那樣實現異步編程

先看個需求:生成繪圖的數據(max,min,avg

好比說原來數據是這樣的:

products = [{
    "id": 2344,
    "title": "御泥坊補水面膜",
    "price": [89, 76, 120, 99]
}, {
    "id": 2345,
    "title": "御泥坊火山泥面膜",
    "price": [30, 56, 70, 89]
}]

處理以後:

new_products = [{
    "id": 2344,
    "title": "御泥坊補水面膜",
    "price": [89, 76, 120, 99],
    "max": 120,
    "min": 76,
    "avg": 96.0
},
{
    "id": 2345,
    "title": "御泥坊火山泥面膜",
    "price": [30, 56, 70, 89],
    "max": 89,
    "min": 30,
    "avg": 61.25
}]

處理過的數據通常用來畫圖,實際效果相似於: 1.需求.png

若是不借助協程,咱們通常這麼處理:(數據庫獲取過程省略)

In [14]:
# 生成新的dict數據
def get_new_item(item):
    prices = item["price"]
    item["avg"] = sum(prices) / len(prices)
    item["max"] = max(prices)
    item["min"] = min(prices)
    return item

def get_new_data(data):
    newdata = []
    for item in data:
        new_item = get_new_item(item)
        # print(new_item) # 處理後的新dict
        newdata.append(new_item)
    return newdata

def main():
    # 需求:生成繪圖的數據(max,min,avg)
    products = [{
        "id": 2344,
        "title": "御泥坊補水面膜",
        "price": [89, 76, 120, 99]
    }, {
        "id": 2345,
        "title": "御泥坊火山泥面膜",
        "price": [30, 56, 70, 89]
    }]

    new_products = get_new_data(products)
    print(new_products)

if __name__ == "__main__":
    main()
 
[{'id': 2344, 'title': '御泥坊補水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76}, {'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]
 

改爲yield版的協程也很方便,基本上代碼沒有變,也不用像IO多路複用那樣來回的回調

In [15]:
# 生成新的dict數據
def get_new_item(item):
    prices = item["price"]
    item["avg"] = sum(prices) / len(prices)
    item["max"] = max(prices)
    item["min"] = min(prices)
    yield item

def get_new_data(data):
    for item in data:
        yield from get_new_item(item)

def main():
    # 需求:生成繪圖的數據(max,min,avg)
    products = [{
        "id": 2344,
        "title": "御泥坊補水面膜",
        "price": [89, 76, 120, 99]
    }, {
        "id": 2345,
        "title": "御泥坊火山泥面膜",
        "price": [30, 56, 70, 89]
    }]
    new_products = list()
    # 若是須要返回值就捕獲StopIteration異常
    for item in get_new_data(products):
        new_products.append(item)
    print(new_products)

if __name__ == "__main__":
    main()
 
[{'id': 2344, 'title': '御泥坊補水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76}, {'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]
 

簡單解析一下:(用yield from的目的就是爲了引出等會說的async/await

yield from(委託生成器get_new_data)的好處就是讓調用方(main)和yield子生成器(get_new_item)直接創建一個雙向通道

你也能夠把yield from看成一箇中介(若是不理解就把yield from想象成await就容易理解了),本質就是下面代碼:

In [16]:
# 生成新的數據
def get_new_data(data):
    for item in data:
        prices = item["price"]
        item["avg"] = sum(prices) / len(prices)
        item["max"] = max(prices)
        item["min"] = min(prices)
        yield item


def main():
    # 需求:生成繪圖的數據(max,min,avg)
    products = [{
        "id": 2344,
        "title": "御泥坊補水面膜",
        "price": [89, 76, 120, 99]
    }, {
        "id": 2345,
        "title": "御泥坊火山泥面膜",
        "price": [30, 56, 70, 89]
    }]
    new_products = list()
    for item in get_new_data(products):
        new_products.append(item)
    print(new_products)


if __name__ == "__main__":
    main()
 
[{'id': 2344, 'title': '御泥坊補水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76}, {'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]
 

PEP 380(含分析)

yield from內部其實在yield基礎上作了不少事情(好比一些異常的處理),具體能夠看看 PEP 380

先提煉一個簡版的:

# 正常調用
RESULT = yield from EXPR

# _i:子生成器(也是個迭代器)
# _y:子生成器生產的值
# _r:yield from 表達式最終結果
# _s:調用方經過send發送的值
# _e:異常對象

# 內部原理
_i = iter(EXPR) # EXPR是一個可迭代對象,_i是子生成器
try:
    # 第一次不能send值,只能next() or send(None),並把產生的值放到_y中
    _y = next(_i)
except StopIteration as _e:
    # 若是子生成器直接就return了,那就會拋出異常,經過value能夠拿到子生成器的返回值
    _r = _e.value
else:
    # 嘗試進行循環(調用方和子生成器交互過程),yield from這個生成器會阻塞(委託生成器)
    while 1:
        # 這時候子生成器已經和調用方創建了雙向通道,在等待調用方send(value),把這個值保存在_s中
        _s = yield _y # 這邊還會進行一系列異常處理,我先刪掉,等會看
        try:
            # 若是send(None),那麼繼續next遍歷
            if _s is None:
                _y = next(_i) # 把子生成器結果放到 _y 中
            else:
                _y = _i.send(_s) # 若是調用方send一個值,就轉發到子生成器
        except StopIteration as _e:
            _r = _e.value # 若是子生成器遍歷完了,就把返回值給_r
            break
RESULT = _r # 最終的返回值(yield from 最終的返回值)

如今再來看完整版壓力就沒有那麼大了:

# 正常調用
RESULT = yield from EXPR

# _i:子生成器(也是個迭代器)
# _y:子生成器生產的值
# _r:yield from 表達式最終結果
# _s:調用方經過send發送的值
# _e:異常對象

# 內部原理
_i = iter(EXPR) # EXPR是一個可迭代對象,_i是子生成器
try:
    # 第一次不能send值,只能next() or send(None),並把產生的值放到_y中
    _y = next(_i)
except StopIteration as _e:
    # 若是子生成器直接就return了,那就會拋出異常,經過value能夠拿到子生成器的返回值
    _r = _e.value
else:
    # 嘗試進行循環(調用方和子生成器交互過程),yield from這個生成器會阻塞(委託生成器)
    while 1:
        try:
            # 這時候子生成器已經和調用方創建了雙向通道,在等待調用方send(value),把這個值保存在_s中
            _s = yield _y

        # 【如今補全】有這麼幾種狀況須要處理
        # 1.子生成器可能只是一個迭代器,並不能做爲協程的生成器(不支持throw和close)
        # 2.子生成器雖然支持了throw和close,但在子生成器內部兩種方法都會拋出異常
        # 3.調用法調用了gen.throw(),想讓子生成器本身拋異常
        # 這時候就要處理 gen.close() 和 gen.throw()的狀況

        # 生成器close()異常的處理
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass # 屏蔽close的異常
            else:
                _m()
            raise _e # 上拋異常
        # 生成器throw()異常的處理
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                # 若是send(None),那麼繼續next遍歷
                if _s is None:
                    _y = next(_i) # 把子生成器結果放到 _y 中
                else:
                    _y = _i.send(_s) # 若是調用方send一個值,就轉發到子生成器
            except StopIteration as _e:
                _r = _e.value # 若是子生成器遍歷完了,就把返回值給_r
                break
RESULT = _r # 最終的返回值(yield from 最終的返回值)

2.async/await

把上面的原生代碼用async和await改裝一下:(協程的目的就是像寫同步代碼同樣寫異步,這個纔算是真作到了)

import asyncio

# 生成新的dict數據
async def get_new_item(item):
    prices = item["price"]
    item["avg"] = sum(prices) / len(prices)
    item["max"] = max(prices)
    item["min"] = min(prices)
    return item

async def get_new_data(data):
    newdata = []
    for item in data:
        new_item = await get_new_item(item)
        # print(new_item) # 處理後的新dict
        newdata.append(new_item)
    return newdata

def main():
    # 需求:生成繪圖的數據(max,min,avg)
    products = [{
        "id": 2344,
        "title": "御泥坊補水面膜",
        "price": [89, 76, 120, 99]
    }, {
        "id": 2345,
        "title": "御泥坊火山泥面膜",
        "price": [30, 56, 70, 89]
    }]

    # python 3.7
    new_products = asyncio.run(get_new_data(products))
    print(new_products)

if __name__ == "__main__":
    main()

輸出:(是否是很原生代碼沒啥區別?)

[{'id': 2344, 'title': '御泥坊補水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76}, 
{'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]

下級預估:asyncio

 

3.5.asyncio

官方文檔:https://docs.python.org/3/library/asyncio.html

開發中常見錯誤:https://docs.python.org/3/library/asyncio-dev.html

代碼示例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine

PS:asyncioPython用於解決異步IO編程的一整套解決方案

3.5.1.上節回顧

上次說了下協程演變過程,此次繼續,先接着上次的說:

JS是能夠生成器和asyncawait混用的,那Python呢?(NetCore不能夠混用)

import types

# 和生成器徹底分開了,不過能夠理解爲yield from
@types.coroutine
def get_value(value):
    yield value

async def get_name(name):
    # 一系列邏輯處理
    return await get_value(name)

if __name__ == '__main__':
    gen = get_name("小明")
    print(gen.send(None))
# 直接混用會報錯:TypeError: object generator can't be used in 'await' expression

咱們的asyncawait雖然和yield from不是一個概念,可是能夠理解爲yield from上面這段代碼你能夠理解爲:

import types

def get_value(value):
    yield value

# 這個async和await替換成yield from
def get_name(name):
    # 一系列邏輯處理
    yield from get_value(name)

if __name__ == '__main__':
    gen = get_name("小明")
    print(gen.send(None))

PS:Python默認和NetCore同樣,不能直接混用,若是你必定要混用,那麼得處理下使用@asyncio.coroutine也行)

3.5.2.asyncio引入

在今天以前,協程咱們是這麼實現的:事件循環(loop)+回調(驅動生成器)+IO多路複用(epoll)

如今能夠經過官方提供的asyncio能夠理解爲協程池)來實現了(第三方還有一個uvloop【基於C寫的libuv庫(nodejs也是基於這個庫)】)

PS:uvloop的使用很是簡單,只要在獲取事件循環前將asyncio的事件循環策略設置爲uvloop的:asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

1.簡單案例

先看個簡單的協程案例:

import types
import asyncio

# 模擬一個耗時操做
async def test():
    print("start...")
    # 不能再使用之前阻塞的暫停了
    await asyncio.sleep(2)
    print("end...")
    return "ok"

if __name__ == '__main__':
    import time
    start_time = time.time()

    # # >=python3.4
    # # 返回asyncio的事件循環
    # loop = asyncio.get_event_loop()
    # # 運行事件循環,直到指定的future運行完畢,返回結果
    # result = loop.run_until_complete(test())
    # print(result)

    # python3.7
    result = asyncio.run(test())
    print(result)

    print(time.time() - start_time)

輸出:

start...
end...
ok
2.001772403717041

簡單說下,asyncio.run是python3.7才簡化出來的語法(類比NetCore的Task.Run)看看源碼就知道了:

# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
    # 之前是直接使用"asyncio.get_event_loop()"(開發人員通常都習慣這個了)
    # 3.7開始推薦使用"asyncio.get_running_loop()"來獲取正在運行的loop(獲取不到就拋異常)
    if events._get_running_loop() is not None:
        raise RuntimeError("沒法從正在運行的事件循環中調用asyncio.run()")

    if not coroutines.iscoroutine(main):
        raise ValueError("{!r}應該是一個協程".format(main))

    loop = events.new_event_loop() # 建立一個新的事件循環
    try:
        events.set_event_loop(loop)  # 設置事件循環
        loop.set_debug(debug)  # 是否調試運行(默認否)
        return loop.run_until_complete(main)  # 等待運行
    finally:
        try:
            _cancel_all_tasks(loop)  # 取消其餘任務
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

新版本其實就是使用了一個新的loop去啓動run_until_complete

PS:uvloop也能夠這樣去使用:獲取looploop = uvloop.new_event_loop()再替換原生的loopasyncio.set_event_loop(loop)

3.5.3.批量任務

1.舊版本實現

import asyncio

# 模擬一個耗時操做
async def test(i):
    print("start...")
    # 不能再使用之前阻塞的暫停了
    await asyncio.sleep(2)
    print("end...")
    return i

if __name__ == '__main__':
    import time

    start_time = time.time()

    # # >=python3.4
    loop = asyncio.get_event_loop()
    # tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
    # 注意:是loop的方法,而不是asyncio的,否則就會引起RuntimeError:no running event loop
    tasks = [loop.create_task(test(i)) for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        print(task.result())

    print(time.time() - start_time)

輸出:(tasks替換成這個也同樣:tasks = [asyncio.ensure_future(test(i)) for i in range(10)])

start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
end...
end...
end...
end...
end...
end...
end...
0
1
2
3
4
5
6
7
8
9
2.028331995010376

而後咱們再看看這個asyncio.wait是個啥:(回顧:http://www.javashuo.com/article/p-rcfjlior-d.html

# return_when 這個參數和以前同樣
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED

# 官方準備在將來版本廢棄它的loop參數
# 和concurrent.futures裏面的wait不同,這邊是個協程
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):

平時使用能夠用高級APIasyncio.gather(*tasks)來替換asyncio.wait(tasks)

1.舊版另用

PS:官方推薦使用create_task的方式來建立一個任務

import asyncio

# 模擬一個耗時操做
async def test(i):
    print("start...")
    # 不能再使用之前阻塞的暫停了
    await asyncio.sleep(2)
    print("end...")
    return i

async def main():
    tasks = [test(i) for i in range(10)]
    # await task 能夠獲得返回值(獲得結果或者異常)
    # for task in asyncio.as_completed(tasks):
    #     try:
    #         print(await task)
    #     except Exception as ex:
    #         print(ex)
    return [await task for task in asyncio.as_completed(tasks)]

if __name__ == '__main__':
    import time

    start_time = time.time()

    # old推薦使用
    loop = asyncio.get_event_loop()
    result_list = loop.run_until_complete(main())
    print(result_list)

    print(time.time() - start_time)

輸出:(PS:用asyncio.gather(*tasks)直接替換asyncio.wait(tasks)也行)

start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
end...
end...
end...
end...
end...
end...
end...
[1, 6, 4, 5, 0, 7, 8, 3, 2, 9]
2.0242035388946533

其實理解起來很簡單,並且和NetCore以及NodeJS它們統一了,只要是await xxx就返回一個(結果|異常),不await就是一個task對象

2.新版本實現

import asyncio

# 模擬一個耗時操做
async def test(i):
    print("start...")
    await asyncio.sleep(2)
    print("end...")
    return i

async def main():
    tasks = [test(i) for i in range(10)]
    # 給`協程/futures`返回一個future聚合結果
    return await asyncio.gather(*tasks) # 記得加*來解包

if __name__ == '__main__':
    import time

    start_time = time.time()

    # python3.7
    result_list = asyncio.run(main())
    print(result_list)

    # 2.0259485244750977
    print(time.time() - start_time)

輸出:(語法簡化太多了,用起來特別簡單)

start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
end...
end...
end...
end...
end...
end...
end...
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2.00840163230896

關於參數須要加*解包的說明 ==> 看看函數定義就秒懂了:

# 給 協程/futures 返回一個future聚合結果
def gather(*coros_or_futures, loop=None, return_exceptions=False):
    pass

# 把協程或者awaitable對象包裹成task
def ensure_future(coro_or_future, *, loop=None):
    pass

# 傳入一個協程對象,返回一個task對象
class BaseEventLoop(events.AbstractEventLoop):
    def create_task(self, coro):
        pass

關於高級和低級API的說明

asyncio的高級(high-level)API通常用於這幾個方面:(開發基本夠用了)

  1. 並行運行Python協同程序並徹底控制它們的執行
  2. 網絡通訊(IO)和進程間通訊(IPC
  3. 子進程subprocesses)相關
  4. 經過隊列Queue)分配任務(Tasks
  5. 同步synchronize)併發代碼

低級(low-level)API通常這麼用:(事件循環和回調會用下,其餘基本不用)

  1. 建立和管理事件循環,爲網絡、子進程、信號處理(Signal)等提供異步(asynchronous)API
  2. 爲傳輸使用高效協議
  3. 使用async/await語法橋接基於回調的庫和代碼

3.5.4.回調函數

回調通常不利於代碼維護,如今基本上是儘可能不用了(異步代碼用起來都和同步沒多大差異了,回調也就沒那麼大用處了)

1.回調函數獲取返回值

上面說的獲取返回值,其實也能夠經過回調函數來獲取:

# 低級API示例
import asyncio

async def get_html(url):
    print(f"get {url} ing")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

def call_back(task):
    print(type(task))
    print(task.result())

if __name__ == "__main__":
    import time
    start_time = time.time()

    urls = [
        "https://www.baidu.com", "https://www.sogou.com",
        "https://www.python.org", "https://www.asp.net"
    ]
    tasks = set()  # 任務集合
    loop = asyncio.get_event_loop()
    for url in urls:
        # task = asyncio.ensure_future(get_html(url))
        task = loop.create_task(get_html(url))
        # 設置回調函數
        task.add_done_callback(call_back)
        # 添加到任務集合中
        tasks.add(task)
    # 批量執行
    loop.run_until_complete(asyncio.gather(*tasks))

    print(time.time() - start_time)

輸出:(task.add_done_callback(回調函數)

get https://www.baidu.com ing
get https://www.sogou.com ing
get https://www.python.org ing
get https://www.asp.net ing
<class '_asyncio.Task'>
<h1>This is a test for https://www.baidu.com</h1>
<class '_asyncio.Task'>
<h1>This is a test for https://www.python.org</h1>
<class '_asyncio.Task'>
<h1>This is a test for https://www.sogou.com</h1>
<class '_asyncio.Task'>
<h1>This is a test for https://www.asp.net</h1>
2.0168468952178955

2.回調函數傳參擴展

實例:

import asyncio
import functools

async def get_html(url):
    await asyncio.sleep(2)
    return "This is a test for"

# 注意一個東西:經過偏函數傳過來的參數在最前面
def call_back(url, task):
    # do something
    print(type(task))
    print(task.result(), url)

if __name__ == "__main__":
    import time
    start_time = time.time()

    urls = [
        "https://www.baidu.com", "https://www.sogou.com",
        "https://www.python.org", "https://www.asp.net"
    ]
    tasks = set()  # 任務集合
    loop = asyncio.get_event_loop()
    for url in urls:
        # task = asyncio.ensure_future(get_html(url))
        task = loop.create_task(get_html(url))
        # 設置回調函數 (不支持傳參數,咱們就利用偏函數來傳遞)
        task.add_done_callback(functools.partial(call_back, url))
        # 添加到任務集合中
        tasks.add(task)
    # 批量執行
    loop.run_until_complete(asyncio.gather(*tasks))

    print(time.time() - start_time)

輸出:(PS:經過偏函數傳過來的參數在最前面)

<class '_asyncio.Task'>
This is a test for https://www.baidu.com
<class '_asyncio.Task'>
This is a test for https://www.python.org
<class '_asyncio.Task'>
This is a test for https://www.sogou.com
<class '_asyncio.Task'>
This is a test for https://www.asp.net
2.0167236328125

3.5.5.異常相關

以前說的await task可能獲得結果也可能獲得異常有些人可能還不明白 ==> 其實你把他看出同步代碼(PS:協程的目的就是像寫同步代碼同樣進行異步編程)就好理解了,函數執行要麼獲得結果要麼獲得返回值

看個異常的案例:

import asyncio

async def get_html(url):
    print(f"get {url} ing")
    if url == "https://www.asp.net":
        raise Exception("Exception is over")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

async def main():
    urls = [
        "https://www.baidu.com", "https://www.asp.net",
        "https://www.python.org", "https://www.sogou.com"
    ]
    tasks = [get_html(url) for url in urls]
    return await asyncio.gather(*tasks)

if __name__ == "__main__":
    import time
    start_time = time.time()

    try:
        asyncio.run(main())
    except Exception as ex:
        print(ex)

    print(time.time() - start_time)

輸出:(和同步代碼沒差異,可能出異常的部分加個異常捕獲便可)

get https://www.baidu.com ing
get https://www.asp.net ing
get https://www.python.org ing
get https://www.sogou.com ing
Exception is over
0.008000373840332031

再一眼舊版怎麼用:(PS:基本差很少,下次所有用新用法了)

import asyncio

async def get_html(url):
    print(f"get {url} ing")
    if url == "https://www.asp.net":
        raise Exception("Exception is over")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

async def main():
    urls = [
        "https://www.baidu.com", "https://www.asp.net",
        "https://www.python.org", "https://www.sogou.com"
    ]
    tasks = set()  # 任務集合
    tasks = [get_html(url) for url in urls]
    return await asyncio.gather(*tasks)

if __name__ == "__main__":
    import time
    start_time = time.time()

    loop = asyncio.get_event_loop()
    try:
        # 批量執行
        loop.run_until_complete(main())
    except Exception as ex:
        print(ex)

    print(time.time() - start_time)

常見異常

Python3調試過程當中的常見異常http://www.javashuo.com/article/p-qqaluvos-t.html

asyncio中常見異常

官方文檔:https://docs.python.org/3/library/asyncio-exceptions.html

  1. asyncio.TimeoutError(Exception.Error)
    • 任務超時引起的異常
  2. asyncio.CancelledError(Exception.Error)
    • 任務取消引起的異常
  3. asyncio.InvalidStateError(Exception.Error)
    • Task/Future內部狀態無效引起
  4. asyncio.IncompleteReadError(Exception.Error):讀取未完成引起的錯誤:
    • 不完整: 在到達流結束以前讀取字節字符串(讀取了不完整的字符串就轉換了)
    • 不清楚讀多少: 預期讀取的字節總數未知
  5. asyncio.LimitOverrunError(Exception)
    • 超出緩衝區引起的異常
  6. asyncio.SendfileNotAvailableError(Exception.ReferenceError.RuntimeError)
    • 系統調用不適用於給定的套接字或文件類型(系統調用類型不匹配致使的)

Python常見異常

有些異常官方沒有寫進去,我補了一些經常使用的異常:https://docs.python.org/3/library/exceptions.html

BaseException

  • SystemExitsys.exit()引起的異常(目的:讓Python解釋器退出)
  • KeyboardInterrupt:用戶Ctrl+C終止程序引起的異常
  • GeneratorExit:生成器或者協程關閉的時候產生的異常(特別注意
  • Exception:全部內置異常(非系統退出)或者用戶定義異常的基類
    • asyncio.Error
      • asyncio.CancelledError
      • asyncio.TimeoutError:和Exception.OSError.TimeoutError區分開
      • asyncio.InvalidStateErrorTask/Future內部狀態無效引起
    • asyncio.LimitOverrunError:超出緩衝區引起的異常
    • StopIterationnext()、send()引起的異常:
      • https://www.cnblogs.com/dotnetcrazy/p/9278573.html#6.Python迭代器
    • StopAsyncIteration__anext__()引起的異常
    • ArithmeticError
      • FloatingPointError
      • OverflowError
      • ZeroDivisionError
    • AssertionError:當斷言assert語句失敗時引起
    • AttributeError:當屬性引用或賦值失敗時引起
    • BufferError
    • EOFError
      • asyncio.IncompleteReadError:讀取操做未完成引起的錯誤
    • ImportError
      • ModuleNotFoundError
    • LookupError
      • IndexError
      • KeyError
    • MemoryError
    • NameError
      • UnboundLocalError
    • OSError:當系統函數返回與系統相關的錯誤時引起
      • BlockingIOError
      • ChildProcessError
      • ConnectionError
        • BrokenPipeError
        • ConnectionAbortedError
        • ConnectionRefusedError
        • ConnectionResetError
      • FileExistsError
      • FileNotFoundError
      • InterruptedError
      • IsADirectoryError
      • NotADirectoryError
      • PermissionError
      • ProcessLookupError
      • TimeoutError:系統函數執行超時時觸發
    • ReferenceError:引用錯誤(對象被資源回收或者刪除了)
    • RuntimeError:出錯了,可是檢測不到錯誤類別時觸發
      • NotImplementedError:爲實現報錯(好比調用了某個不存在的子類方法)
      • RecursionError:遞歸程度太深引起的異常
      • asyncio.SendfileNotAvailableError:系統調用不適用於給定的套接字或文件類型
    • SyntaxError:語法錯誤時引起(粘貼代碼常常遇到
      • IndentationError:縮進有問題
      • TabError:當縮進包含不一致的製表符和空格使用時引起
    • SystemError
    • TypeError:類型錯誤
    • ValueError
      • UnicodeError
      • UnicodeDecodeError
      • UnicodeEncodeError
      • UnicodeTranslateError
    • Warning
    • DeprecationWarning
    • PendingDeprecationWarning
    • RuntimeWarning
    • SyntaxWarning
    • UserWarning
    • FutureWarning
    • ImportWarning
    • UnicodeWarning
    • BytesWarning
    • ResourceWarning
 

新語法的說明

Net方向的同志記得對比當時寫的 Python3 與 C# 併發編程之~Net篇:https://www.cnblogs.com/dunitian/p/9419325.html

1.概念

先說說概念:

  1. event_loop事件循環:
    • 程序開啓一個無限的循環,程序員會把一些函數(協程)註冊到事件循環上
    • 當知足事件發生的時候,調用相應的協程函數
  2. coroutine協程:
    • 協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象
    • 協程對象須要註冊到事件循環,由事件循環調用
  3. future對象:
    • 表明未來執行或沒有執行的任務的結果(它和task上沒有本質的區別)
  4. task任務:
    • 一個協程對象就是一個原生能夠掛起的函數,Task則是對協程進一步封裝,其中包含任務的各類狀態
    • Task對象是Future的子類,它將coroutineFuture聯繫在一塊兒,將coroutine封裝成一個Future對象
  5. async/await關鍵字:
    • 定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口
    • 相似於yield from(都是在調用方與子協程之間直接創建一個雙向通道)

2.語法

爲了不讀者混亂於新舊代碼的使用,從下面開始就直接使用最新的語法的

  1. 運行asyncio:asyncio.run(main())
    • 只運行一次(if __name__ == "__main__")
  2. 建立一個任務:asyncio.create_task(func())
    • Python3.8會多一個name的別名參數
  3. 批量執行任務:asyncio.gather(*tasks)
    • return_exceptions=True能夠屏蔽這批任務的異常,並把異常結果返回
    • 若是有相似於(第一個任務完成|第一個異常產生後)進行相應的操做,則推薦asyncio.wait
  4. 獲取loop:asyncio.get_event_loop()
    • 優先考慮:asyncio.get_running_loop()(獲取不到會拋異常)
# 若是和舊版本混用,就應該這麼寫了(麻煩)
try:
    loop = asyncio.get_running_loop()
except RuntimeError as ex:
    print(ex) # no running event loop
    loop = asyncio.get_event_loop()
...
loop.run_until_complete(xxx)

新語法:

async def main():
    loop = asyncio.get_running_loop()
    ...

asyncio.run(main())

3.狀態

Task基本上就是這幾個狀態(生成器、Future也是):

  1. Pending:建立Task,還未執行
  2. Running:事件循環正在調用執行任務
  3. Done:Task執行完畢
  4. Cancelled:Task被取消後的狀態

4.時序圖

Python3.7以前官方貼了張時序圖,咱們拿來理解上面的話:https://docs.python.org/3.6/library/asyncio-task.html

import asyncio

async def compute(x, y):
    print(f"計算 {x}+{y}...")
    await asyncio.sleep(1.0)
    return x + y

async def main(x, y):
    result = await compute(x, y)
    print(f"{x}+{y}={result}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main(1, 2))
loop.close()

8.時序.png

3.5.4.回調函數(新用法)

和舊版本比起來其實就是建立一個task,而後爲task添加一個回調函數add_done_callback

import asyncio

async def get_html(url):
    print(f"get {url} ing")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

def callback_func(task):
    print(type(task))
    if task.done():
        print(f"done")  # print(task.result())

async def main():
    urls = [
        "https://www.baidu.com", "https://www.asp.net",
        "https://www.python.org", "https://www.sogou.com"
    ]
    # asyncio.create_task來建立一個Task
    tasks = [asyncio.create_task(get_html(url)) for url in urls]
    # 給每一個任務都加一個回調函數
    for task in tasks:
        task.add_done_callback(callback_func)
    # 批量執行任務
    result = await asyncio.gather(*tasks)
    print(result)  # 返回 result list

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

get https://www.baidu.com ing
get https://www.asp.net ing
get https://www.python.org ing
get https://www.sogou.com ing
<class '_asyncio.Task'>
done
<class '_asyncio.Task'>
done
<class '_asyncio.Task'>
done
<class '_asyncio.Task'>
done
['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>']
2.0189685821533203

注意:`add_signal_handler`是loop獨有的方法,Task中沒有,eg:loop.add_signal_handler(signal.SIGINT, callback_handle, *args)

3.5.5.異常相關擴展

關於批量任務的異常處理:

  1. 默認:同一批次有一個task產生了異常,這一批次任務就所有結束了
  2. return_exceptions=True:不影響其餘任務,異常消息也放在結果列表中
  3. gather被取消的時候,無論True or False,這批次任務所有取消
import asyncio

async def get_html(url):
    print(f"get {url} ing")
    if url == "https://www.asp.net":
        raise Exception("Exception is over")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

def callback_func(task):
    if task.done():
        print(f"done")  # print(task.result())

async def main():
    urls = [
        "https://www.baidu.com", "https://www.asp.net",
        "https://www.python.org", "https://www.sogou.com"
    ]
    # asyncio.create_task來建立一個Task
    tasks = [asyncio.create_task(get_html(url)) for url in urls]
    # 給每一個任務都加一個回調函數
    for task in tasks:
        task.add_done_callback(callback_func)
    # 批量執行任務
    result = await asyncio.gather(*tasks, return_exceptions=True)
    print(result)  # 返回 result list

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

get https://www.baidu.com ing
get https://www.asp.net ing
get https://www.python.org ing
get https://www.sogou.com ing
done
done
done
done
['<h1>This is a test for https://www.baidu.com</h1>', Exception('Exception is over'), '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>']
2.013272523880005

3.5.6.任務分組、取消

1.分組

看個簡單的任務分組案例:

import asyncio

async def get_html(url):
    print(f"get url for{url}")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

async def main():
    urls1 = ["https://www.baidu.com", "https://www.asp.net"]
    urls2 = ["https://www.python.org", "https://www.sogou.com"]

    tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
    tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]

    # 等待兩組都完成,而後返回聚合結果
    result = await asyncio.gather(*tasks1, *tasks2)
    print(result)

if __name__ == "__main__":
    import time
    start_time = time.time()

    try:
        asyncio.run(main())
    except Exception as ex:
        print(ex)

    print(time.time() - start_time)

輸出:(兩個分組結果被一塊兒放到了list中)

get url forhttps://www.baidu.com
get url forhttps://www.asp.net
get url forhttps://www.python.org
get url forhttps://www.sogou.com
['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>']
2.0099380016326904

2.取消

若是想要對Group1Group2進行更多的自定化,能夠再包裹一層gather方法:

import asyncio

async def get_html(url):
    print(f"get url for{url}")
    await asyncio.sleep(2)
    return f"<h1>This is a test for {url}</h1>"

async def main():
    urls1 = ["https://www.baidu.com", "https://www.asp.net"]
    urls2 = ["https://www.python.org", "https://www.sogou.com"]

    tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
    tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]

    group1 = asyncio.gather(*tasks1)
    group2 = asyncio.gather(*tasks2)

    # 分組2由於某緣由被取消任務了(模擬)
    group2.cancel()

    # 等待兩組都完成,而後返回聚合結果
    result = await asyncio.gather(group1, group2, return_exceptions=True)
    print(result)

if __name__ == "__main__":
    import time
    start_time = time.time()

    try:
        asyncio.run(main())
    except Exception as ex:
        print(ex)

    print(time.time() - start_time)

輸出:

get url forhttps://www.baidu.com
get url forhttps://www.asp.net
[['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>'], CancelledError()]
2.0090348720550537

再看個單個任務的案例:

import asyncio

async def test():
    print("start...")
    await asyncio.sleep(10)
    print("end...")

async def main():
    task = asyncio.create_task(test())

    await asyncio.sleep(1)

    # 取消task任務
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print(f"任務已經被取消:{task.cancelled()}")
        print(f"任務是由於異常而完成:{task.done()}")

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

start...
任務已經被取消:True
任務是由於異常而完成:True
1.0133979320526123

簡單說明下:

  1. task.done():任務是否完成
    • 任務完成:task.done() ==> true
      1. 任務正常完成
      2. 觸發異常而被標記爲任務完成
  2. task.cancelled():用來判斷是否成功取消

爲何這麼說?看看源碼:

# 完成包含了正常+異常
if outer.done():
    # 把由於異常完成的任務打個標記
    if not fut.cancelled():
        fut.exception() # 標記檢索的異常

PS:官方推薦asyncio.all_tasks(loop中還沒有完成的Task集合):

  • 原來是經過:asyncio.Task.all_tasks來獲取(返回loop的全部Task集合)

wait_for and wait

1.一個任務限時等待(wait_for)

超時等待:asyncio.wait_for(task, timeout)

import asyncio

async def test(time):
    print("start...")
    await asyncio.sleep(time)
    print("end...")
    return time

async def main():
    task = asyncio.create_task(test(3))
    try:
        result = await asyncio.wait_for(task, timeout=2)
        print(result)
    except asyncio.CancelledError:
        print("Cancel")
    except asyncio.TimeoutError:
        print("超時取消")
    except Exception as ex:
        print(ex)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

start...
超時取消
2.007002115249634

2.多個任務限時等待(wait)

wait是比gather更底層的api,好比如今這個多任務限時等待gather並不能知足:

import asyncio

async def test(time):
    print("start...")
    await asyncio.sleep(time)
    print("end...")
    return time

async def main():
    tasks = [asyncio.create_task(test(i)) for i in range(10)]

    # 已完成的任務(包含異常),未完成的任務
    done, pending = await asyncio.wait(tasks, timeout=2)
    # 任務總數(我用了3種表示)PS:`all_tasks()`的時候記得去除main的那個
    print(
        f"任務總數:{len(tasks)}=={len(done)+len(pending)}=={len(asyncio.Task.all_tasks())-1}"
    )
    # 全部未完成的task:asyncio.all_tasks(),記得去掉run(main())
    print(f"未完成Task:{len(pending)}=={len(asyncio.all_tasks()) - 1}")

    print(await asyncio.gather(*done))
    # for task in done:
        # print(await task)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

start...
start...
start...
start...
start...
start...
start...
start...
start...
start...
end...
end...
end...
任務總數:10==10==10
未完成Task:7==7
[0, 1, 2]
2.0071778297424316

wait的擴展

用法其實和Future同樣(https://www.cnblogs.com/dotnetcrazy/p/9528315.html#Future對象),這邊就當再普及下新語法了

第一個任務執行完成則結束此批次任務

項目裏常常有這麼一個場景:同時調用多個同效果的API,有一個返回後取消其餘請求,看個引入案例

import asyncio

async def test(i):
    print(f"start...task{i}")
    await asyncio.sleep(i)
    print(f"end...task{i}")
    return "ok"

# 第一個任務執行完成則結束此批次任務
async def main():
    tasks = [asyncio.create_task(test(i)) for i in range(10)]

    # 項目裏常常有這麼一個場景:同時調用多個同效果的API,有一個返回後取消其餘請求
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED)

    # print(await asyncio.gather(*done))
    for task in done:
        print(await task)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

start...task0
start...task1
start...task2
start...task3
start...task4
start...task5
start...task6
start...task7
start...task8
start...task9
end...task0
ok
0.017002105712890625

課後拓展:(asyncio.shield保護等待對象不被取消) https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation

下級預估:舊代碼兼容、同步語、Socket新用

 

代碼答疑

以前有人問我,這個asyncio.get_running_loop()究竟是用仍是不用?爲何一會asyncio.get_event_loop()一會又是asyncio.get_running_loop(),一會是loop.run_until_complete()一會又是asyncio.run()的,有點混亂了。

以前逆天簡單的提了一下,可能說的仍是不太詳細,這邊再舉幾個例子說說:

首先:若是你用的是Python3.7以前的版本,那麼你用不到loop = asyncio.get_running_loop()asyncio.run()

若是是老版本你就使用asyncio.get_event_loop()來獲取loop,用loop.run_until_complete()來運行:

import asyncio

async def test():
    print("start ...")
    await asyncio.sleep(2)
    print("end ...")

# 若是你用`get_running_loop`就不要和`loop.run_until_complete`混用
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

輸出:(混用須要捕獲Runtime的異常)

start ...
end ...

上節課說使用asyncio.get_running_loop()麻煩的情景是這個:(這種狀況倒不如直接asyncio.get_event_loop()獲取loop了)

# 若是和舊版本混用,就應該這麼寫了(麻煩)
try:
    loop = asyncio.get_running_loop()
except RuntimeError as ex:
    loop = asyncio.get_event_loop()
...
asyncio.run(test())

官方推薦的新語法是這樣的:(>=Python3.7)

async def main():
    loop = asyncio.get_running_loop()
    ...

asyncio.run(main())

PS:記住一句就行:asyncio.get_running_loop()asyncio.run()成對出現

能夠這麼理解:asyncio.run裏會建立對應的loop,因此你才能獲取正在運行的loop

# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
    if events._get_running_loop() is not None:
        raise RuntimeError("沒法從正在運行的事件循環中調用asyncio.run()")

    if not coroutines.iscoroutine(main):
        raise ValueError("{!r}應該是一個協程".format(main))

    # 建立一個新的事件循環
    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop) # 設置事件循環
        loop.set_debug(debug) # 是否調試運行(默認否)
        return loop.run_until_complete(main) # 等待運行
    finally:
        try:
            _cancel_all_tasks(loop) # 取消其餘任務
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

就是怕你們混亂,上節課開始就直接使用的最新語法,舊語法文章裏儘可能不使用了,本節也是

3.5.7.兼容舊代碼 or 運行阻塞代碼

部分能夠參考官方文檔:https://docs.python.org/3/library/asyncio-eventloop.html

學了協程GIL的問題其實也不是多大的事情了,多進程+協程就能夠了,asyncio如今也提供了線程安全的run方法:asyncio.run_coroutine_threadsafe(coro)(也算是對GIL給出的官方解決方法了)

1.協程 and 線程池

前面咱們說過了併發編程(線程+進程)的通用解決方案:併發編程:concurrent.futures專欄

asyncio框架雖然幾乎包含了全部經常使用功能,但畢竟是新事物,舊代碼怎麼辦?協程只是單線程工做,理論上不能使用阻塞代碼,那庫或者api只能提供阻塞的調用方式怎麼辦? ~ 不用慌,可使用官方提供的兼容方法,先看個案例:

1.回顧下一塊兒的通用方案:
import asyncio
import concurrent.futures

# 模擬一個耗時操做
def test(n):
    return sum(i * i for i in range(10**n))

# old main
def main():
    with concurrent.futures.ThreadPoolExecutor() as pool:
        # 注意:future和asyncio.future是不同的
        future = pool.submit(test, 7)
        result = future.result()
        print(result)

if __name__ == "__main__":
    import time

    start_time = time.time()

    main()  # old

    print(time.time() - start_time)

輸出:(注意:futureasyncio.future不是一個東西,只是相似而已)

333333283333335000000
15.230607032775879
2.兼容版新用法:
import asyncio
import concurrent.futures


# 模擬一個耗時操做
def test(n):
    return sum(i * i for i in range(10**n))

async def main():
    # 獲取loop
    loop = asyncio.get_running_loop()

    with concurrent.futures.ThreadPoolExecutor() as pool:
        # 新版兼任代碼
        result = await loop.run_in_executor(pool, test, 7)
        print(result)


if __name__ == "__main__":
    import time

    start_time = time.time()

    asyncio.run(main())  # new

    print(time.time() - start_time)

輸出:(不談其餘的,至少運行速度快了)

333333283333335000000
15.283994913101196
源碼分析

咱們來看看run_in_executor的內部邏輯是啥:

class BaseEventLoop(events.AbstractEventLoop):
    def run_in_executor(self, executor, func, *args):
        # 檢查loop是否關閉,若是關閉就拋`RuntimeError`異常
        self._check_closed()
        if self._debug:
            self._check_callback(func, 'run_in_executor')
        # 若是不傳一個executor,就會使用默認的executor
        # 換句話說:你能夠不傳`線程池`
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor
        # 把`concurrent.futures.Future`對象封裝成`asyncio.futures.Future`對象
        return futures.wrap_future(executor.submit(func, *args), loop=self)

看完源碼就發現,代碼還能夠進一步簡化:

import asyncio

# 模擬一個耗時操做
def test(n):
    return sum(i * i for i in range(10**n))

async def main():
    # 獲取loop
    loop = asyncio.get_running_loop()

    # 新版兼任代碼
    result = await loop.run_in_executor(None, test, 7)
    print(result)

if __name__ == "__main__":
    import time

    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

333333283333335000000
15.367998838424683

PS:協程裏面不該該出現傳統的阻塞代碼,若是隻能用那些代碼,那麼這個就是一個兼任的措施了

2.回調擴展

這個沒有以前講的那些經常使用,就當瞭解下,框架裏面碰到不至於懵逼:

  1. Task執行完後執行:add_done_callback(回調函數)
    • task.add_done_callback() or loop.add_done_callback()
    • 想要傳參數可使用:functools.partial(call_back, url)
    • PS:經過偏函數傳過來的參數在最前面:call_back(url,task)
  2. 儘快執行:call_soon(callback,*args)
    • loop.call_soon()、線程安全:loop.call_soon_threadsafe()
    • 能夠當作是loop.call_later(0,callback,*args)
    • 通常臨時插入一個任務的時候會用到
  3. 指定時間後執行:loop.call_later(delay,callback,*args)
    • 延遲能夠是int或float,以秒爲單位(相對於當前時間)
    • 返回的對象可使用cancel()方法來取消任務
  4. 指定協程時間後執行:loop.call_at(絕對時間,callback,*args)
    • 和call_later差很少,時間使用絕對時間(這個絕對時間是loop的time()方法)

注意點:首先要保證任務執行前loop不斷開,好比你call_later(2,xxx),這時候loop退出了,那麼任務確定完成不了

這個比較簡單,看個案例:

import asyncio

def test(name):
    print(f"start {name}...")
    print(f"end {name}...")

async def main():
    # 正在執行某個任務
    loop = asyncio.get_running_loop()

    # 插入一個更要緊的任務
    # loop.call_later(0, callback, *args)
    task1 = loop.call_soon(test, "task1")

    # 多少秒後執行
    task2 = loop.call_later(2, test, "task2")

    # 內部時鐘時間
    task3 = loop.call_at(loop.time() + 3, test, "task3")

    print(type(task1))
    print(type(task2))
    print(type(task3))

    # 保證loop在執行完畢後才關閉
    await asyncio.sleep(5)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:(回調函數通常都是普通函數)

<class 'asyncio.events.Handle'>
<class 'asyncio.events.TimerHandle'>
<class 'asyncio.events.TimerHandle'>
start task1...
end task1...
start task2...
end task2...
start task3...
end task3...
4.9966819286346436

PS:關於返回值的說明能夠看官方文檔:https://docs.python.org/3/library/asyncio-eventloop.html#callback-handles

而後說下call_later(這個執行過程會按照時間排個前後順序,而後再批次運行)

import asyncio

# 回調函數通常都是普通函數
def test(name):
    print(name)

if __name__ == "__main__":
    import time
    start_time = time.time()

    loop = asyncio.get_event_loop()

    # 新版本限制了時間不能超過24h(防止有些人當定時任務來亂用)
    # 這個執行過程會安裝時間排個前後順序,而後再批次運行
    task4 = loop.call_later(4, test, "task2-4")
    task2 = loop.call_later(2, test, "task2-2")
    task3 = loop.call_later(3, test, "task2-3")
    task1 = loop.call_later(1, test, "task2-1")
    # 取消測試
    task4.cancel()
    # close是直接丟棄任務而後關閉loop
    loop.call_later(4, loop.stop)  # 等任務執行完成結束任務 loop.stop()

    # run內部運行的是run_until_complete,而run_until_complete內部運行的是run_forever
    loop.run_forever()
    print(time.time() - start_time)

輸出:(asyncio.get_running_loop()不要和舊代碼混用)

task2-1
task2-2
task2-3
4.009201526641846

PS:run內部運行的是run_until_complete,而run_until_complete內部運行的是run_forever

 

Task答疑

從開始說新語法以後,咱們建立任務都直接用asyncio.create_task來包裹一層,有人問我這個Task除了是Future的子類外,有啥用?爲啥不直接使用Future呢?貌似也沒語法啊?

看一個案例:

import asyncio

# 不是協程就加個裝飾器
@asyncio.coroutine
def test():
    print("this is a test")

async def test_async():
    print("this is a async test")
    await asyncio.sleep(1)

async def main():
    # 傳入一個協程對象,返回一個task
    task1 = asyncio.create_task(test())
    task2 = asyncio.create_task(test_async())
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

this is a test
this is a async test
1.0070011615753174

咱們來看看asyncio.create_task的源碼:(關鍵在Task類)

# 傳入一個協程對象,返回一個Task對象
def create_task(self, coro):
    self._check_closed()
    if self._task_factory is None:
        # look:核心點
        task = tasks.Task(coro, loop=self)
        if task._source_traceback:
            del task._source_traceback[-1]
    else:
        task = self._task_factory(self, coro)
    return task

看看核心類Task

class Task(futures._PyFuture):
    def __init__(self, coro, *, loop=None):
        super().__init__(loop=loop)
        ...
        # 安排了一個儘快執行的回調方法:self.__step
        self._loop.call_soon(self.__step, context=self._context)

    def __step(self, exc=None):
       try:
            if exc is None:
                # 協程初始化(生成器或者協程初始化 next(xxx))
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # 在中止以前取消任務
                self._must_cancel = False
                super().set_exception(futures.CancelledError())
            else:
                # 拿到了協程/生成器的結果
                super().set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            super().set_exception(exc)
        except BaseException as exc:
            super().set_exception(exc)
            raise
    ...

PS:那麼很明顯了,Task的做用就相似於future協程的中間人了(屏蔽某些差別)

3.5.8.Socket新用法

官方文檔:https://docs.python.org/3/library/asyncio-stream.html

asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的HTTP框架,咱們簡單演示一下(PS:網絡通訊基本上都是使用aiohttp

1.簡單案例

服務端:

import asyncio

async def handler(client_reader, client_writer):
    # 沒有數據就阻塞等(主線程作其餘事情去了)
    data = await client_reader.read(2048)
    print(data.decode("utf-8"))

    client_writer.write("驪山語罷清宵半,淚雨霖鈴終不怨\n何如薄倖錦衣郎,比翼連枝當日願".encode("utf-8"))
    await client_writer.drain()  # 等待緩衝區(緩衝區沒佔滿就直接返回)
    client_writer.close()  # 關閉鏈接

async def main():
    server = await asyncio.start_server(handler, "127.0.0.1", 8080)
    print("Server已經啓動,端口:8080")
    # 實現了協程方法`__aenter__`和`__aexit__`的可使用`async with`
    async with server:
        # async def serve_forever(self):pass ==> use await
        await server.serve_forever()  # 異步方法

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

客戶端:

import asyncio

async def main():
    reader, writer = await asyncio.open_connection("127.0.0.1", 8080)
    writer.write("人生若只如初見,何事秋風悲畫扇\n等閒變卻故人心,卻道故人心易變".encode("utf-8"))
    data = await reader.read(2048)
    if data:
        print(data.decode("utf-8"))

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出圖示: 9.conn.png

2.HTTP案例

再舉個HTTP的案例:

import asyncio

async def get_html(host):
    print("get_html %s..." % host)
    reader, writer = await asyncio.open_connection(host, 80)
    writer.write(f"GET / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode('utf-8'))
    await writer.drain()  # 等待緩衝區

    html_list = []
    async for line in reader:
        html_list.append(line.decode("utf-8"))

    writer.close()  # 關閉鏈接
    return "\n".join(html_list)

async def main():
    tasks = [
        asyncio.create_task(get_html(url))
        for url in ['dotnetcrazy.cnblogs.com', 'dunitian.cnblogs.com']
    ]
    html_list = await asyncio.gather(*tasks)
    print(html_list)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

get_html dotnetcrazy.cnblogs.com...
get_html dunitian.cnblogs.com...
[html內容省略,html內容省略]
5.092018604278564

GIF過程圖: 10.過程.gif

PS:(後面會繼續說的)

  1. 實現了協程方法__anext__的可使用async for
  2. 實現了協程方法__aenter____aexit__的可使用async with

3.源碼分析

還記得以前IO多路複用的時候本身寫的非阻塞Server不,簡單梳理下流程,而後我們再一塊兒看看asyncio對應的源碼:

  1. 設置Socket爲非阻塞(socket.setblocking(False)
  2. 利用輪詢用來監視文件描述符fdregister
  3. 對可讀寫的socket進行相應操做
  4. 取消輪詢的監聽(unregister

看看await asyncio.open_connection(ip,port)的源碼:

# asyncio.streams.py
async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds):
    if loop is None:
        loop = events.get_event_loop()
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, loop=loop)
    # 核心點
    transport, _ = await loop.create_connection(lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)
    return reader, writer

發現,其實內部核心在loop.create_connection

# asyncio.base_events.py
# 鏈接TCP服務器
class BaseEventLoop(events.AbstractEventLoop):
    async def create_connection(self,
                                protocol_factory,
                                host=None,
                                port=None,
                                *,
                                ssl=None,
                                family=0,
                                proto=0,
                                flags=0,
                                sock=None,
                                local_addr=None,
                                server_hostname=None,
                                ssl_handshake_timeout=None):
        ...
        # 主要邏輯
        if host is not None or port is not None:
            exceptions = []
            # 主要邏輯
            for family, type, proto, cname, address in infos:
                try:
                    sock = socket.socket(family=family, type=type, proto=proto)
                    sock.setblocking(False) # 1.設置非阻塞 <<<< look
                    if local_addr is not None:
                        for _, _, _, _, laddr in laddr_infos:
                            try:
                                sock.bind(laddr) # 端口綁定
                                break
                            except OSError as exc:
                                msg = (f'error while attempting to bind on '
                                       f'address {laddr!r}: '
                                       f'{exc.strerror.lower()}')
                                exc = OSError(exc.errno, msg)
                                exceptions.append(exc)
                        else:
                            sock.close()
                            sock = None
                            continue
                    if self._debug:
                        logger.debug("connect %r to %r", sock, address)
                    # 在selector_events中
                    await self.sock_connect(sock, address) # <<< look
                except OSError as exc:
                    if sock is not None:
                        sock.close()
                    exceptions.append(exc)
                except:
                    if sock is not None:
                        sock.close()
                    raise
                else:
                    break

發現源碼中設置了socket爲非阻塞,調用了sock_connect

async def sock_connect(self, sock, address):
        """鏈接遠程socket地址(協程方法)"""
        # 非阻塞檢查
        if self._debug and sock.gettimeout() != 0:
            raise ValueError("the socket must be non-blocking")
        ...
        fut = self.create_future()
        self._sock_connect(fut, sock, address)
        return await fut

def _sock_connect(self, fut, sock, address):
        fd = sock.fileno() # 獲取socket的文件描述符 <<< look
        try:
            sock.connect(address)
        except (BlockingIOError, InterruptedError):
            # 設置future的回調函數_sock_connect_done(用來註銷的)<<< look
            fut.add_done_callback(functools.partial(self._sock_connect_done, fd))
            # 註冊selector.register
            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
        except Exception as exc:
            fut.set_exception(exc)
        else:
            fut.set_result(None)

先看下sock_connect中調用的add_writer(註冊)

def add_writer(self, fd, callback, *args):
    """添加一個寫的回調"""
    self._ensure_fd_no_transport(fd)
    return self._add_writer(fd, callback, *args)

def _add_writer(self, fd, callback, *args):
        self._check_closed()
        handle = events.Handle(callback, args, self, None)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_WRITE,
                                    (None, handle)) # selector.register
        else:
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
                                  (reader, handle))
            if writer is not None:
                writer.cancel()

再看下sock_connect中設置的回調函數_sock_connect_done(註銷)

def _sock_connect_done(self, fd, fut):
    # 取消註冊selector.unregister
    self.remove_writer(fd)

def remove_writer(self, fd):
    """移除寫的回調"""
    self._ensure_fd_no_transport(fd)
    return self._remove_writer(fd)

def _remove_writer(self, fd):
    if self.is_closed():
        return False
    try:
        key = self._selector.get_key(fd)
    except KeyError:
        return False
    else:
        mask, (reader, writer) = key.events, key.data
        mask &= ~selectors.EVENT_WRITE
        if not mask:
            self._selector.unregister(fd) # 註銷 <<< look
        else:
            self._selector.modify(fd, mask, (reader, None))

        if writer is not None:
            writer.cancel()
            return True
        else:
            return False

PS:嵌套的很是深,並且底層代碼一致在變(Python3.6到Python3.7這個新小更新就變化很大)

關於源碼的說明

以前併發編程的基礎知識已經講的很清楚了,也分析了不少源碼,你能夠本身去拓展一下(Python3asyncio模塊的源碼一直在優化改進的路上)我這邊就不一一分析了(源碼很亂,估計幾個版本後會清晰,如今是多層混套用),你能夠參考部分源碼解析:https://github.com/lotapp/cpython3/tree/master/Lib/asyncio 7.源碼.png

課後拓展:

https://docs.python.org/3/library/asyncio-protocol.html#examples
https://docs.python.org/3/library/asyncio-eventloop.html#creating-network-servers

下節預估:同步與通訊、aiohttp版爬蟲

 

3.5.9.同步與通訊

官方文檔:

https://docs.python.org/3/library/asyncio-sync.html
https://docs.python.org/3/library/asyncio-queue.html

寫在前面:

  1. 下面的方式不是線程安全的(協程就一個線程)
  2. 這些同步原語的方法不接受超時參數; 使用asyncio.wait_for(協程方法,超時時間)函數執行超時操做
  3. asyncio具備如下基本同步原語:Lock、Event、Condition、Semaphore、BoundedSemaphore

1.引導示例

1.1.old code

先看個原來的引導案例:估計的結果是0,而不借助lock得出的結果每每出乎意料

import concurrent.futures

num = 0

def test(i):
    global num
    for _ in range(10000000):
        num += i

def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        print("start submit...")
        future1 = executor.submit(test, 1)
        future2 = executor.submit(test, -1)
        concurrent.futures.wait([future1, future2])  # wait some time
        print("end submit...")
    global num
    print(num)

if __name__ == "__main__":
    import time
    start_time = time.time()
    main()
    print(f"time:{time.time()-start_time}")

輸出:(可是代碼並非線程安全的,因此結果每每不是咱們想要的)

start submit...
end submit...
82705
time:5.032064199447632
1.2.new code

再看看協程的案例:

import asyncio

num = 0

async def test(i):
    global num
    for _ in range(10000000):
        num += i

async def main():
    print("start tasks...")
    task1 = asyncio.create_task(test(1))
    task2 = asyncio.create_task(test(-1))
    await asyncio.gather(task1, task2)
    print("end tasks...")

    global num
    print(num)


if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(f"time:{time.time()-start_time}")

輸出:(就一個線程,固然安全)

start tasks...
end tasks...
0
time:4.860997438430786
1.3.注意點

PS:你使用協程的兼容代碼,並不能解決線程不安全的問題

import asyncio
import concurrent.futures

num = 0

def test(i):
    global num
    for _ in range(10000000):
        num += i

async def main():
    # 獲取當前loop
    loop = asyncio.get_running_loop()

    with concurrent.futures.ThreadPoolExecutor() as executor:
        print("start submit...")
        future1 = loop.run_in_executor(executor, test, 1)
        future2 = loop.run_in_executor(executor, test, -1)
        # await asyncio.wait([future1,future2])
        await asyncio.gather(future1, future2)
        print("end submit...")
    global num
    print(num)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(f"time:{time.time()-start_time}")

輸出:

start submit...
end submit...
-1411610
time:5.0279998779296875

2.爲何須要同步機制?

咋一看,單線程不用管線程安全啥的啊,要啥同步機制?其實在業務場景裏面仍是會出現諸如重複請求的狀況,這個時候就須要一個同步機制了:

import asyncio

# 用來存放頁面緩存
cache_dict = {}

# 模擬一個獲取html的過程
async def fetch(url):
    # 每次網絡訪問,時間其實不肯定的
    import random
    time = random.randint(2, 5)
    print(time)

    await asyncio.sleep(time)
    return f"<h2>{url}</h2>"

async def get_html(url):
    # 若是緩存存在,則返回緩存的頁面
    for url in cache_dict:
        return cache_dict[url]
    # 不然獲取頁面源碼並緩存
    html = await fetch(url)
    cache_dict[url] = html
    return html

async def parse_js(url):
    html = await get_html(url)
    # do somthing
    return len(html)

async def parse_html(url):
    html = await get_html(url)
    # do somthing
    return html

async def main():
    # 提交兩個Task任務
    task1 = asyncio.create_task(parse_js("www.baidu.com"))
    task2 = asyncio.create_task(parse_html("www.baidu.com"))
    # 等待任務結束
    result_list = await asyncio.gather(task1, task2)
    print(result_list)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:(fetch方法訪問了兩次 ==> 兩次網絡請求)

2
3
[22, '<h2>www.baidu.com</h2>']
3.0100157260894775

簡單說明:baidu.com一開始沒緩存,那當解析js和解析html的任務提交時,就會進行兩次網絡請求(網絡IO比較耗時),這樣更容易觸發反爬蟲機制

3.Lock(互斥鎖)

線程相關的Lock複習:http://www.javashuo.com/article/p-rcfjlior-d.html

協程是線程安全的,那麼這個Lock確定是和多線程/進程裏面的Lock是不同的,咱們先看一下提煉版的源碼:

class Lock(_ContextManagerMixin):
    def __init__(self, *, loop=None):
        self._waiters = collections.deque()
        self._locked = False
        if loop is not None:
            self._loop = loop
        else:
            self._loop = events.get_event_loop()

    async def acquire(self):
        if not self._locked:
            self._locked = True  # 改變標識
        ...
        return self._locked

    def release(self):
        if self._locked:
            self._locked = False
        ...

PS:源碼看完秒懂了,asyncio裏面的lock其實就是一個標識而已

修改一下上面的例子:

import asyncio

# 用來存放頁面緩存
cache_dict = {}
lock = None  # 你能夠試試在這邊直接寫`asyncio.Lock()`

# 模擬一個獲取html的過程
async def fetch(url):
    # 每次網絡訪問,時間其實不肯定的
    import random
    time = random.randint(2, 5)
    print(time)

    await asyncio.sleep(time)
    return f"<h2>{url}</h2>"


async def get_html(url):
    async with lock:
        # 若是緩存存在,則返回緩存的頁面
        for url in cache_dict:
            return cache_dict[url]
        # 不然獲取頁面源碼並緩存
        html = await fetch(url)
        cache_dict[url] = html
        return html

async def parse_js(url):
    html = await get_html(url)
    # do somthing
    return len(html)

async def parse_html(url):
    html = await get_html(url)
    # do somthing
    return html

async def main():
    global lock
    lock = asyncio.Lock()  # 若是在開頭就定義,那麼lock的loop和方法的loop就會不一致了

    # 提交兩個Task任務
    task1 = asyncio.create_task(parse_js("www.baidu.com"))
    task2 = asyncio.create_task(parse_html("www.baidu.com"))
    # 等待任務結束
    result_list = await asyncio.gather(task1, task2)
    print(result_list)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:(fetch方法訪問了1次 ==> 1次網絡請求)

3
[22, '<h2>www.baidu.com</h2>']
3.0020127296447754

4.Semaphore(信號量)

線程篇Semaphorehttp://www.javashuo.com/article/p-rcfjlior-d.html

這個用的比較多,簡單回顧下以前講的概念案例:

通俗講就是:在互斥鎖的基礎上封裝了下,實現必定程度的並行

舉個例子,之前使用互斥鎖的時候:(廁所就一個坑位,必須等裏面的人出來才能讓另外一我的上廁所)

2018-08-23/3.互斥鎖.png

使用信號量Semaphore以後:廁所坑位增長到5個(本身指定),這樣能夠5我的一塊兒上廁所了==> 實現了必定程度的併發控制

先看下縮略的源碼:(能夠這麼想:內部維護了一個引用計數,每次來個任務就-1,一個任務結束計數就+1

class Semaphore(_ContextManagerMixin):
    def __init__(self, value=1, *, loop=None):
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        if loop is not None:
            self._loop = loop
        else:
            self._loop = events.get_event_loop()

    async def acquire(self):
        while self._value <= 0:
            fut = self._loop.create_future()
            self._waiters.append(fut) # 把當前任務放入Queue中
            try:
                await fut # 等待一個任務的完成再繼續
            except:
                fut.cancel() # 任務取消
                if self._value > 0 and not fut.cancelled():
                    self._wake_up_next() # 喚醒下一個任務
                raise
        self._value -= 1 # 用掉一個併發量
        return True

    def release(self):
        self._value += 1 # 恢復一個併發量
        self._wake_up_next() # 喚醒下一個任務

如今舉個常見的場景:好比調用某個免費的api,該api限制併發數爲5

import asyncio

sem = None

# 模擬api請求
async def api_test(i):
    async with sem:
        await asyncio.sleep(1)
        print(f"The Task {i} is done")

async def main():
    global sem
    sem = asyncio.Semaphore(5)  # 設置併發數爲5
    tasks = [asyncio.create_task(api_test(i)) for i in range(20)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

動態輸出: 2018-12-07/11.sem.gif

PS:BoundedSemaphoreSemaphore的一個版本,在調用release()時檢查計數器的值是否超過了計數器的初始值,若是超過了就拋出一個異常

5.Event(事件)

線程篇Eventhttp://www.javashuo.com/article/p-rcfjlior-d.html

以前講的很詳細了,舉個爬蟲批量更新的例子就一筆帶過:

import asyncio

event = None
html_dict = {}

async def updates():
    # event.wait()是協程方法,須要await
    await event.wait()
    # 入庫操做省略 html_dict >> DB
    return "html_dict >> DB done"

async def get_html(url):
    # 摸擬網絡請求
    await asyncio.sleep(2)
    html_dict[url] = f"<h1>{url}</h1>" # 能夠暫時寫入臨時文件中

    event.set()  # 標記完成,普通方法
    return f"{url} done"

async def main():
    global event
    event = asyncio.Event()  # 初始化 event 對象

    # 建立批量任務
    tasks = [
        asyncio.create_task(get_html(f"www.mmd.com/a/{i}"))
        for i in range(1, 10)
    ]
    # 批量更新操做
    tasks.append(asyncio.create_task(updates()))

    result = await asyncio.gather(*tasks)
    print(result)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

['www.mmd.com/a/1 done', 'www.mmd.com/a/2 done', 'www.mmd.com/a/3 done', 'www.mmd.com/a/4 done', 'www.mmd.com/a/5 done', 'www.mmd.com/a/6 done', 'www.mmd.com/a/7 done', 'www.mmd.com/a/8 done', 'www.mmd.com/a/9 done', 'html_dict >> DB done']
2.0012683868408203

跟以前基本上同樣,就一個地方不太同樣:async def wait(self)wait方法如今是協程方法了,使用的時候須要await

  1. coroutine wait()
    • 等待事件內部標誌被設置爲True
    • 若是事件的內部內部標誌已設置,則當即返回True。不然,一直阻塞,直到另外的任務調用set()
  2. set()
    • 設置事件內部標誌爲True
    • 全部等待事件的任務將會當即被觸發
  3. clear()
    • 清除事件內部標誌(即重置爲False
    • 等待事件的任務將會阻塞,直到set()方法被再次調用
  4. is_set()
    • 若是事件內部標誌被設置爲True,則返回True

6.Condition(條件變量)

線程篇Conditionhttp://www.javashuo.com/article/p-rcfjlior-d.html

先簡單看看方法列表:

  1. coroutine acquire()
    • 獲取底層鎖。該方法一直等待,直到底層鎖處於未鎖定狀態,而後設置其爲鎖定狀態,而且返回True
  2. notify(n=1)
    • 喚醒至多n個等待條件的任務。若是沒有正在等待的任務,則該方法無操做。
    • 在調用該方法以前,必須先調用acquire()獲取鎖,並在調用該方法以後釋放鎖。
    • 若是在鎖爲鎖定的狀況下調用此方法,會引起RuntimeError異常。
  3. locked()
    • 若是底層鎖已獲取,則返回True。
  4. notify_all()
    • 喚醒全部正在等待該條件的任務。該方法與notify()相似,區別只在它會喚醒全部正在等待的任務。
  5. release()
    • 釋放底層鎖。在未鎖定的鎖上調用時,會引起RuntimeError異常。
  6. coroutine wait()
    • 等待通知。若是調用此方法的任務沒有獲取到鎖,則引起RuntimeError異常。
    • 此方法釋放底層鎖,而後保持阻塞,直至被notify()或notify_all()喚醒。被喚醒以後,條件對象從新申請鎖,該方法返回True。
  7. coroutine wait_for(predicate)
    • 等待predicate變爲True。predicate必須可調用,它的執行結果會被解釋爲布爾值,並做爲最終結果返回。

PS:Condition結合了EventLock的功能(也可使多個Condition對象共享一個Lock,容許不一樣任務之間協調對共享資源的獨佔訪問)

看個生產消費者的案例:

import asyncio

cond = None
p_list = []

# 生產者
async def producer(n):
    for i in range(5):
        async with cond:
            p_list.append(f"{n}-{i}")
            print(f"[生產者{n}]生產商品{n}-{i}")
            # 通知任意一個消費者
            cond.notify()  # 通知所有消費者:cond.notify_all()
        # 摸擬一個耗時操做
        await asyncio.sleep(0.01)

# 消費者
async def consumer(i):
    while True:
        async with cond:
            if p_list:
                print(f"列表商品:{p_list}")
                name = p_list.pop()  # 消費商品
                print(f"[消費者{i}]消費商品{name}")
                print(f"列表剩餘:{p_list}")

                # 摸擬一個耗時操做
                await asyncio.sleep(0.01)
            else:
                await cond.wait()

async def main():
    global cond
    cond = asyncio.Condition()  # 初始化condition
    p_tasks = [asyncio.create_task(producer(i)) for i in range(2)]  # 兩個生產者
    c_tasks = [asyncio.create_task(consumer(i)) for i in range(5)]  # 五個消費者
    await asyncio.gather(*p_tasks, *c_tasks)

if __name__ == "__main__":
    asyncio.run(main())

輸出:

[生產者0]生產商品0-0
[生產者1]生產商品1-0
列表商品:['0-0', '1-0']
[消費者0]消費商品1-0
列表剩餘:['0-0']
列表商品:['0-0']
[消費者1]消費商品0-0
列表剩餘:[]
[生產者0]生產商品0-1
[生產者1]生產商品1-1
列表商品:['0-1', '1-1']
[消費者0]消費商品1-1
列表剩餘:['0-1']
列表商品:['0-1']
[消費者1]消費商品0-1
列表剩餘:[]
[生產者0]生產商品0-2
[生產者1]生產商品1-2
列表商品:['0-2', '1-2']
[消費者0]消費商品1-2
列表剩餘:['0-2']
列表商品:['0-2']
[消費者1]消費商品0-2
列表剩餘:[]
[生產者0]生產商品0-3
[生產者1]生產商品1-3
列表商品:['0-3', '1-3']
[消費者0]消費商品1-3
列表剩餘:['0-3']
列表商品:['0-3']
[消費者1]消費商品0-3
列表剩餘:[]
[生產者0]生產商品0-4
[生產者1]生產商品1-4
列表商品:['0-4', '1-4']
[消費者0]消費商品1-4
列表剩餘:['0-4']
列表商品:['0-4']
[消費者1]消費商品0-4
列表剩餘:[]

PS:第七條的簡單說明:(來看看wait_for方法的源碼)

# 一直等到函數返回true(從返回結果來講:要麼一直阻塞,要麼返回true)
async def wait_for(self, predicate):
    result = predicate()
    # 若是不是返回true就繼續等待
    while not result:
        await self.wait()
        result = predicate()
    return result

課後拓展:async_timeout(兼容async的超時的上下文管理器) https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/ZCoroutine/async_timeout_timeout.py

7.Queue(隊列)

官方文檔:https://docs.python.org/3/library/asyncio-queue.html

線程篇Queuehttp://www.javashuo.com/article/p-rcfjlior-d.html

其實你不考慮限流的狀況下,協程裏面的queue和list基本上差很少(ps:asyncio.Queue(num)能夠指定數量)

舉個經典的生產消費者案例:

import random
import asyncio

async def producer(q, i):
    for i in range(5):
        num = random.random()
        await q.put(num)
        print(f"[生產者{i}]商品{num}出廠了")
        await asyncio.sleep(num)

async def consumer(q, i):
    while True:
        data = await q.get()
        print(f"[消費者{i}]商品{data}搶光了")

async def main():
    queue = asyncio.Queue(10)  # 爲了演示,我這邊限制一下

    p_tasks = [asyncio.create_task(producer(queue, i)) for i in range(2)]  # 兩個生產者
    c_tasks = [asyncio.create_task(consumer(queue, i)) for i in range(5)]  # 五個消費者
    await asyncio.gather(*p_tasks, *c_tasks)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:(注意一下getput方法都是協程方法便可)

[生產者0]商品0.20252203397767787出廠了
[生產者0]商品0.9641503458079388出廠了
[消費者0]商品0.20252203397767787搶光了
[消費者0]商品0.9641503458079388搶光了
[生產者1]商品0.8049655468032324出廠了
[消費者0]商品0.8049655468032324搶光了
[生產者1]商品0.6032743557097342出廠了
[消費者1]商品0.6032743557097342搶光了
[生產者2]商品0.08818326334746773出廠了
[消費者2]商品0.08818326334746773搶光了
[生產者3]商品0.3747289313977561出廠了
[消費者3]商品0.3747289313977561搶光了
[生產者4]商品0.3948823110071299出廠了
[消費者4]商品0.3948823110071299搶光了
[生產者2]商品0.5775767044660681出廠了
[消費者0]商品0.5775767044660681搶光了
[生產者3]商品0.500537752889471出廠了
[消費者1]商品0.500537752889471搶光了
[生產者4]商品0.9921528527523727出廠了
[消費者2]商品0.9921528527523727搶光了

PS:協程也提供了Priority Queue優先級隊列 and LifoQueue後進先出隊列,這邊就再也不囉嗦了(前面咱們畫圖演示並手動實現過)

課後拓展:https://docs.python.org/3/library/asyncio-queue.html#examples

擴展:Subprocesses

官方文檔:https://docs.python.org/3/library/asyncio-subprocess.html

這個以前進程篇的時候說過,不是咱們今天的重點,我貼一個官方demo:

import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

asyncio.run(run('ls /zzz'))

輸出:

['ls /zzz' exited with 1]
[stderr]
ls: /zzz: No such file or directory

下節預告:asyncio+aiohttp版爬蟲

 

4.aiohttp

代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine/z_spider

asyncio庫只有TCPUDP服務,並不支持HTTPaiohttp就能夠理解爲是基於asynciohttp服務

4.1.入門案例

先來個獲取頁面html的demo:

import asyncio
import aiohttp

error_urls = set()

# 獲取頁面html
async def fetch(session, url):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()
        else:
            error_urls.add(url)  # 添加到待處理集合中

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "http://www.biquge.cm/12/12097/")
        if html:  # 獲取到html
            print(len(html))

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

24287
0.5429983139038086

4.2.html解析

推薦一款輕量級網頁解析庫:pyquery(一個相似jquery的python庫)

4.2.1.列表頁

在上面基礎上簡單提取:(pq.items("dd a") ==> 類比JQ選擇器)

import asyncio
import aiohttp
from pyquery import PyQuery

error_urls = set()

# 獲取頁面html
async def fetch(session, url):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()
        else:
            error_urls.add(url)  # 待處理的url集合

# 阻塞方法
def saves(results):
    with open("www.biquge.cm.txt", "a+", encoding="utf-8") as fs:
        fs.writelines(results)
        print("ok")

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "http://www.biquge.cm/12/12097/")
        pq = PyQuery(html)

        results = [
            item.text() + ":" + item.attr("href") + "\n"
            for item in pq.items("dd a")
        ]
        # print(pq("dd a").text())

        # 兼容阻塞舊代碼
        await asyncio.get_running_loop().run_in_executor(None, saves, results)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:www.biquge.cm.txt

新書的一些話:/12/12097/7563947.html
第一章論壇裏的鬼故事。:/12/12097/7563949.html
第二章臨時講課:/12/12097/7563950.html
第三章鬼域。:/12/12097/7563951.html
第四章恐怖敲門鬼:/12/12097/7565568.html
第五章迷路:/12/12097/7565569.html
第六章廁所中的手:/12/12097/7565570.html
第七章身後的腳步:/12/12097/7565571.html
第八章奇怪的樹:/12/12097/7565572.html
第九章鬼嬰:/12/12097/7565573.html
第十章惡鬼之力:/12/12097/7565574.html
...
第三百二十七章三口箱子:/12/12097/7950281.html
第三百二十八章鬼櫥裏的照片:/12/12097/7952145.html
第三百二十九章中山市事件:/12/12097/7955244.html
第三百三十章兩條信息:/12/12097/7956401.html
第三百三十一章進入中山市:/12/12097/7959077.html
第三百三十二章出乎意料:/12/12097/7962119.html
第三百三十四章酒店的二樓:/12/12097/7964192.html
第三百三十五章黑色的燭火:/12/12097/7969058.html
第三百三十六章微笑的屍體:/12/12097/7973826.html

4.2.2.詳情頁

獲取一個詳情頁看看:

import asyncio
import aiohttp
from pyquery import PyQuery

error_urls = set()

# 獲取頁面html
async def fetch(session, url):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()
        else:
            error_urls.add(url)  # 待處理的url集合

# 詳情頁獲取測試
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session,
                           "http://www.biquge.cm//12/12097/7563949.html")
        pq = PyQuery(html)
        print(pq("#content").text())
        # results = [item.text() for item in pq.items("#content")]
        # print(results)

if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:

老夫掐指一算,你如今正在牀上看小說,並且仍是側身,搞很差手機還在充電。

正在讀高三的楊間此刻正躺在被窩裏無聊的翻看着手機,他隨手點開了一個帖子,下面有很多網友在回帖。

「臥槽,樓主真乃神人也,這都被樓主猜中了。」

「呵,你會告訴大家我如今正在廁所蹲坑麼?不用問了,腳麻了。」

......

0.6684205532073975

PS:Win下Py包安裝出錯就去這個網站下對應包 https://www.lfd.uci.edu/~gohlke/pythonlibs/

4.3.爬蟲小案例

4.3.1.小說網站實戰

限流以及反爬蟲和如何應對反爬蟲機制,後面咱們會繼續說,這邊簡單舉個小說離線的例子:

import asyncio
import aiohttp
from pyquery import PyQuery

sem = None
error_urls = set()

# 獲取html
async def fetch(session, url):
    async with sem:
        async with session.get(url) as response:
            if response.status == 200:
                # aiohttp遇到非法字符的處理
                return await response.text("gbk", "ignore")  # 忽略非法字符
            else:
                error_urls.add(url)  # 待處理的url集合

# 獲取文章正文
async def get_text(session, url):
    # 把相對路徑改爲域名+路徑
    if not url.startswith("http://www.biquge.cm"):
        url = "http://www.biquge.cm" + url
    html = await fetch(session, url)
    pq = PyQuery(html)
    return pq("#content").text()

# 普通阻塞方法
def save(title, text):
    with open("恐怖復甦.md", "a+", encoding="gbk") as fs:
        fs.write(f"## {title}\n\n{text}\n\n")
        print(f"{title} done...")

async def main():
    global sem
    sem = asyncio.Semaphore(3) # 控制併發數反而更快
    loop = asyncio.get_running_loop()

    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "http://www.biquge.cm/12/12097/")
        pq = PyQuery(html)
        for item in pq.items("dd a"):
            title = item.text()
            text = await get_text(session, item.attr("href"))
            # 兼容阻塞舊代碼
            await loop.run_in_executor(None, save, title, text)
    print("task over")


if __name__ == "__main__":
    import time
    start_time = time.time()

    asyncio.run(main())

    print(time.time() - start_time)

輸出:(爬取整站就不用我說了吧:提取a標籤中的src,url去重後爬取內容

新書的一些話 done...
第一章論壇裏的鬼故事。 done...
第二章臨時講課 done...
第三章鬼域。 done...
第四章恐怖敲門鬼 done...
第五章迷路 done...
第六章廁所中的手 done...
第七章身後的腳步 done...
第八章奇怪的樹 done...
第九章鬼嬰 done...
第十章惡鬼之力 done...
第十一章逐漸復甦 done...
第十二章宛如智障 done...
第十三章羊皮紙 done...
第十四章詭異的紙 done...
......
第三百二十八章鬼櫥裏的照片 done...
第三百二十九章中山市事件 done...
第三百三十章兩條信息 done...
第三百三十一章進入中山市 done...
第三百三十二章出乎意料 done...
第三百三十四章酒店的二樓 done...
第三百三十五章黑色的燭火 done...
第三百三十六章微笑的屍體 done...
task over

動態展現: 動態展現

閒言碎語

【推薦】Python高性能異步框架https://github.com/LessChina/sanic

逆天點評:(只看主線,只說個人見識)

  1. 原來你們都是使用大一統的Django(方便)
  2. 後來由於性能不佳,FaceBook開發了Tornado(IO多路複用)來代替
  3. 再後來時代主流是敏捷開發,因而就有了Flask(簡單)
  4. 後來Node和Go火了,NetCore也出山了,Python的Flask等同步框架老是被吊打
  5. 因而被逼出了Japronto,瞬間驚豔和吊打的全部開發語言,可是隻是冒了泡就不怎麼維護了
  6. 後來就是AI爆發時期,Python直接打上了AI的標籤了,而Web也逐漸被打上了初創公司的標配
  7. 以後官方看不下去了,本身搞了一套異步框架asyncioandaiohttp(Node兄弟這麼優秀,憑啥咱們不行)
  8. 民間看不下去了來了個asyncio替代品uvloop(C實現的程度比官方多(誰多誰高效),PS:官方用法太醜陋了3.7纔給足了語法糖)
  9. 解決方案雖然各類出,可是web框架不行啊,因而又冒了個主流sanic(語法和Flask很像,性能不亞於Japronto
  10. 如今又剛冒出vibora(都是C實現)有超過sanic的趨勢(PS:等過幾個版本再試水,不過如今不少開發者都是Go + Python了)

最後BB一句:

  1. gevent用猴子補丁的確很方便,但不少內部異常就被屏蔽了,並且性能如今不是最高
  2. tornado爲了兼容py2py3,內部仍是經過生成器來實現異步的,效率相對低點
  3. asyncio是將來的主流方向,sanic是目前最火的異步框架(vibora還在觀察中)

PS:DjangoFlask是阻塞式IO,web框架通常不會直接部署(它自帶的解決方案只是方便調試),通常使用uwsgi or gunicorn + nginx來部署(tornado能夠直接部署)

參考連接:

python異步編程之asyncio
https://www.cnblogs.com/shenh/p/9090586.html

uWSGI, Gunicorn, 啥玩意兒?
https://www.cnblogs.com/gdkl/p/6807667.html

asyncio異步IO中文翻譯:
http://www.cnblogs.com/mamingqian/p/10008279.html
https://www.cnblogs.com/mamingqian/p/10075444.html
https://www.cnblogs.com/mamingqian/p/10044730.html

PyQuery基礎:
https://www.cnblogs.com/zhaof/p/6935473.html
https://www.cnblogs.com/lei0213/p/7676254.html
相關文章
相關標籤/搜索