去年微信公衆號就陸陸續續發佈了,我一直覺得博客也彙總同步了,這幾天有朋友說一直沒找到,遂發現,的確是漏了,因此補上一篇html
在線預覽:https://github.lesschina.com/python/base/concurrency/4.併發編程-協程篇.htmlnode
示例代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutinepython
多進程和多線程切換之間也是有資源浪費的,相比而言協程更輕量級jquery
往期文章:http://www.javashuo.com/article/p-wtppcuyx-g.htmllinux
基礎拓展篇已經講的很透徹了,就再也不雷同了,貼一個簡單案例,而後擴展說說可迭代
、迭代器
和生成器
nginx
% time
from functools import wraps
def log(func):
@wraps(func)
def wrapper(*args,**kv):
print("%s log_info..." % func.__name__)
return func(*args,**kv)
return wrapper
@log
def login_out():
print("已經退出登陸")
def main():
# @wraps(func) 可使得裝飾先後,方法簽名一致
print(f"方法簽名:{login_out.__name__}")
login_out()
# @wraps能讓你經過屬性 __wrapped__ 直接訪問被包裝函數
login_out.__wrapped__() # 執行原來的函數
if __name__ == '__main__':
main()
往期文章:http://www.javashuo.com/article/p-tmcacaan-d.htmlgit
過於基礎的就不說了,簡單說下,而後舉一個OOP
的Demo
:程序員
from collections.abc import Iterable
isinstance(xxx, Iterable)
next(xxx)
遍歷)
from collections.abc import Iterator
isinstance(xxx, Iterable)
list、dict、str
等Iterable
變成Iterator
可使用iter()
函數 eg:iter([])
(節省資源)Iterator
對象,但list、dict、str雖然是Iterable
,卻不是Iterator
提醒一下:from collections import Iterable, Iterator # 如今已經不推薦使用了(3.8會棄用)
github
查看一下typing.py
的源碼就知道了:web
# 模仿collections.abc中的那些(Python3.7目前只是過渡的兼容版,沒有具體實現)
def _alias(origin, params, inst=True):
return _GenericAlias(origin, params, special=True, inst=inst)
T_co = TypeVar('T_co', covariant=True) # Any type covariant containers.
Iterable = _alias(collections.abc.Iterable, T_co)
Iterator = _alias(collections.abc.Iterator, T_co)
以前說了個 CSharp 的 OOP Demo,此次來個Python
的,咱們來一步步演變:
% time
# 導入相關模塊
from collections.abc import Iterable, Iterator
# from collections import Iterable, Iterator # 如今已經不推薦使用了(3.8會棄用)
# 定義一個Class
class MyArray(object):
pass
# 是否可迭代 False
isinstance(MyArray(),Iterable)
# 是不是迭代器 False
isinstance(MyArray(),Iterator)
# 若是Class裏面含有`__iter__`方法就是可迭代的
# 從新定義測試:
class MyArray(object):
def __iter__(self):
pass
# 是否可迭代 False
isinstance(MyArray(),Iterable)
# 是不是迭代器 False
isinstance(MyArray(),Iterator)
這時候依然不是迭代器
這個能夠類比C#:
// 能不能foreach就看你遍歷對象有沒有實現IEnumerable,就說明你是否是一個可枚舉類型
public interface IEnumerable
{
IEnumerator GetEnumerator();
}
// 是否是個枚舉器(enumerator)就看你實現了IEnumerator接口沒
public interface IEnumerator
{
object Current { get; }
bool MoveNext();
void Reset();
}
先看看Python對於的類吧:
# https://github.com/lotapp/cpython3/blob/master/Lib/_collections_abc.py
class Iterable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __iter__(self):
while False:
yield None
@classmethod
def __subclasshook__(cls, C):
if cls is Iterable:
return _check_methods(C, "__iter__")
return NotImplemented
class Iterator(Iterable):
__slots__ = ()
@abstractmethod
def __next__(self):
'Return the next item from the iterator. When exhausted, raise StopIteration'
raise StopIteration
def __iter__(self):
return self
@classmethod
def __subclasshook__(cls, C):
if cls is Iterator:
return _check_methods(C, '__iter__', '__next__')
return NotImplemented
讀源碼的好處來了==>抽象方法:@abstractmethod(子類必須實現)
,上次漏講了吧~
上面說迭代器確定能夠迭代,說很抽象,代碼太直觀了 (繼承):class Iterator(Iterable)
如今咱們來模仿並實現一個Python
版本的迭代器
:
% time
# 先搭個空架子
class MyIterator(Iterator):
def __next__(self):
pass
class MyArray(Iterable):
def __iter__(self):
return MyIterator() # 返回一個迭代器
def main():
# 可迭代 True
print(isinstance(MyArray(), Iterable))
# 迭代器也是可迭代的 True
print(isinstance(MyIterator(), Iterable))
# 是迭代器 True
print(isinstance(MyIterator(), Iterator))
if __name__ == '__main__':
main()
% time
# 把迭代器簡化合並
class MyIterator(Iterator):
def __next__(self):
pass
def __iter__(self):
return self # 返回一個迭代器(如今就是它本身了)
def main():
print(isinstance(MyIterator(), Iterable))
print(isinstance(MyIterator(), Iterator))
if __name__ == '__main__':
main()
% time
# 立刻進入正題了,先回顧一下Fibona
def fibona(n):
a, b = 0, 1
for i in range(n):
a, b = b, a+b
print(a)
# 獲取10個斐波拉契數列
fibona(10)
% time
# 改形成迭代器
from collections.abc import Iterable, Iterator
class FibonaIterator(Iterator):
def __init__(self, n):
self.__a = 0
self.__b = 1
self.__n = n # 獲取多少個
self.__index = 0 # 當前索引
def __next__(self):
if self.__index < self.__n:
self.__index += 1
# 生成下一波
self.__a, self.__b = self.__b, self.__a + self.__b
return self.__a
else:
raise StopIteration # for循環結束條件
def main():
print(FibonaIterator(10))
for i in FibonaIterator(10):
print(i)
if __name__ == "__main__":
main()
往期文章:http://www.javashuo.com/article/p-tmcacaan-d.html
生成器是啥?看源碼就秒懂了:(迭代器的基礎上再封裝)
class Generator(Iterator):
__slots__ = ()
def __next__(self):
"""從生成器返回下一個item,結束的時候拋出 StopIteration"""
return self.send(None)
@abstractmethod
def send(self, value):
"""將值發送到生成器。返回下一個產生的值或拋出StopIteration"""
raise StopIteration
@abstractmethod
def throw(self, typ, val=None, tb=None):
"""在生成器中引起異常。返回下一個產生的值或拋出StopIteration"""
if val is None:
if tb is None:
raise typ
val = typ()
if tb is not None:
val = val.with_traceback(tb)
raise val
# 如今知道以前close後爲啥沒異常了吧~
def close(self):
"""屏蔽異常"""
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
@classmethod
def __subclasshook__(cls, C):
if cls is Generator:
return _check_methods(C, '__iter__', '__next__',
'send', 'throw', 'close')
return NotImplemented
迭代器的基礎上再封裝了兩個抽象方法send
、throw
和屏蔽異常的方法close
如今用生成器的方式改寫下斐波拉契數列:(列表推導式改爲小括號是最簡單的一種生成器)
% time
# 代碼瞬間就簡潔了
def fibona(n):
a = 0
b = 1
for _ in range(n):
a, b = b, a + b
yield a # 加個yiel就變成生成器了
def main():
print(fibona(10))
for i in fibona(10):
print(i)
if __name__ == "__main__":
main()
注意下這幾點:
TypeError: can't send non-None value to a just-started generator
StopIteration
錯誤,返回值包含在StopIteration
的value
中def test_send(n):
for i in range(n):
if i==2:
return "i==2"
yield i
g = test_send(5)
while True:
try:
tmp = next(g)
print(tmp)
except StopIteration as ex:
print(ex.value)
break
輸出:
0 1 i==2
其餘的也沒什麼好說的了,讀完源碼再看看以前講的內容別有一番滋味在心頭
哦~
上集回顧:網絡:靜態服務器+壓測
同步是指一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成。
異步是指不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做。而後繼續執行下面代碼邏輯,只要本身完成了整個任務就算完成了(異步通常使用狀態、通知和回調)
PS:項目裏面通常是這樣的:(我的經驗)
await xxx()
阻塞是指調用結果返回以前,當前線程會被掛起,一直處於等待消息通知,不可以執行其餘業務(大部分代碼都是這樣的)
非阻塞是指在不能馬上獲得結果以前,該函數不會阻塞當前線程,而會馬上返回(繼續執行下面代碼,或者重試機制走起)
PS:項目裏面重試機制爲啥通常都是3次?
對於一次IO訪問,數據會先被拷貝到內核的緩衝區中,而後纔會從內核的緩衝區拷貝到應用程序的地址空間。須要經歷兩個階段:
因爲存在這兩個階段,Linux產生了下面五種IO模型(以socket爲例
)
recvfrom
等阻塞方法時,內核進入IO的第1個階段:準備數據(內核須要等待足夠的數據再拷貝)這個過程須要等待,用戶進程會被阻塞,等內核將數據準備好,而後拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態kernel
中的數據尚未準備好,那麼它並不會block
用戶進程,而是馬上返回一個error
。error
時,它就知道數據尚未準備好,因而它能夠再次發送read操做kernel
中的數據準備好了,而且又再次收到了用戶進程的system call
,那麼它立刻就將數據拷貝到了用戶內存,而後返回select
、poll
和epoll
POSIX
的aio_
系列函數)
IO read
以後,會馬上返回,不會阻塞用戶進程。signal
告訴它read操做完成了貼一下Unix編程裏面的圖:
開始以前我們經過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False)
)
import time
import socket
def select(socket_addr_list):
for client_socket, client_addr in socket_addr_list:
try:
data = client_socket.recv(2048)
if data:
print(f"[來自{client_addr}的消息:]\n")
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
# 沒有消息是觸發異常,空消息是斷開鏈接
client_socket.close() # 關閉客戶端鏈接
socket_addr_list.remove((client_socket, client_addr))
print(f"[客戶端{client_addr}已斷開鏈接,當前鏈接數:{len(socket_addr_list)}]")
except Exception:
pass
def main():
# 存放客戶端集合
socket_addr_list = list()
with socket.socket() as tcp_server:
# 防止端口綁定的設置
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
tcp_server.setblocking(False) # 服務端非阻塞
while True:
try:
client_socket, client_addr = tcp_server.accept()
client_socket.setblocking(False) # 客戶端非阻塞
socket_addr_list.append((client_socket, client_addr))
except Exception:
pass
else:
print(f"[來自{client_addr}的鏈接,當前鏈接數:{len(socket_addr_list)}]")
# 防止客戶端斷開後出錯
if socket_addr_list:
# 輪詢查看客戶端有沒有消息
select(socket_addr_list) # 引用傳參
time.sleep(0.01)
if __name__ == "__main__":
main()
輸出:
能夠思考下:
select和上面的有點相似,就是輪詢的過程交給了操做系統:
kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程
來個和上面等同的案例:
import select
import socket
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
socket_info_dict = dict()
socket_list = [tcp_server] # 監測列表
while True:
# 劣勢:select列表數量有限制
read_list, write_list, error_list = select.select(
socket_list, [], [])
for item in read_list:
# 服務端迎接新的鏈接
if item == tcp_server:
client_socket, client_address = item.accept()
socket_list.append(client_socket)
socket_info_dict[client_socket] = client_address
print(f"[{client_address}已鏈接,當前鏈接數:{len(socket_list)-1}]")
# 客戶端發來
else:
data = item.recv(2048)
if data:
print(data.decode("utf-8"))
item.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
item.close()
socket_list.remove(item)
info = socket_info_dict[item]
print(f"[{info}已斷開,當前鏈接數:{len(socket_list)-1}]")
if __name__ == "__main__":
main()
輸出和上面同樣
擴展說明:
select 函數監視的文件描述符分3類,分別是
writefds
、readfds
、和exceptfds
。調用後select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,若是當即返回設爲null便可)select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024(64位=>2048)
而後Poll就出現了,就是把上限給去掉了,本質並沒變,仍是使用的輪詢
epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,採用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表
先來看個案例吧:(輸出和上面同樣)
import socket
import select
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
# epoll是linux獨有的
epoll = select.epoll()
# tcp_server註冊到epoll中
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
# key-value
fd_socket_dict = dict()
# 回調須要本身處理
while True:
# 返回可讀寫的socket fd 集合
poll_list = epoll.poll()
for fd, event in poll_list:
# 服務器的socket
if fd == tcp_server.fileno():
client_socket, client_addr = tcp_server.accept()
fd = client_socket.fileno()
fd_socket_dict[fd] = (client_socket, client_addr)
# 把客戶端註冊進epoll中
epoll.register(fd, select.EPOLLIN | select.EPOLLET)
else: # 客戶端
client_socket, client_addr = fd_socket_dict[fd]
data = client_socket.recv(2048)
print(
f"[來自{client_addr}的消息,當前鏈接數:{len(fd_socket_dict)}]\n")
if data:
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
del fd_socket_dict[fd]
print(
f"[{client_addr}已離線,當前鏈接數:{len(fd_socket_dict)}]\n"
)
# 從epoll中註銷
epoll.unregister(fd)
client_socket.close()
if __name__ == "__main__":
main()
擴展:epoll的兩種工做模式
LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序能夠不當即處理該事件。下次調用epoll_wait時,會再次響應應用程序並通知此事件。LT模式是默認的工做模式。 LT模式同時支持阻塞和非阻塞socket。
ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須當即處理該事件。若是不處理,下次調用epoll_wait時,不會再次響應應用程序並通知此事件。 ET是高速工做方式,只支持非阻塞socket(ET模式減小了epoll事件被重複觸發的次數,所以效率要比LT模式高)
Code提煉一下:
epoll = select.epoll()
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
epoll.unregister(fd)
PS:epoll
不必定比Select
性能高,通常都是分場景的:
其實IO多路複用還有一個kqueue
,和epoll
相似,下面的通用寫法中有包含
Selector
)¶通常來講:Linux下使用epoll,Win下使用select(IO多路複用會這個通用的便可)
先看看Python源代碼:
# 選擇級別:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
實戰案例:(可讀和可寫能夠不分開)
import socket
import selectors
# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()
class Task(object):
def __init__(self):
# 存放客戶端fd和socket鍵值對
self.fd_socket_dict = dict()
def run(self):
self.server = socket.socket()
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(('', 8080))
self.server.listen()
# 把Server註冊到epoll
Selector.register(self.server.fileno(), selectors.EVENT_READ,
self.connected)
def connected(self, key):
"""客戶端鏈接時處理"""
client_socket, client_address = self.server.accept()
fd = client_socket.fileno()
self.fd_socket_dict[fd] = (client_socket, client_address)
# 註冊一個客戶端讀的事件(服務端去讀消息)
Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
print(f"{client_address}已鏈接,當前鏈接數:{len(self.fd_socket_dict)}")
def call_back_reads(self, key):
"""客戶端可讀時處理"""
# 一個fd只能註冊一次,監測可寫的時候須要把可讀給註銷
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
print(f"[來自{client_address}的消息:]\n")
data = client_socket.recv(2048)
if data:
print(data.decode("utf-8"))
# 註冊一個客戶端寫的事件(服務端去發消息)
Selector.register(key.fd, selectors.EVENT_WRITE,
self.call_back_writes)
else:
client_socket.close()
del self.fd_socket_dict[key.fd]
print(f"{client_address}已斷開,當前鏈接數:{len(self.fd_socket_dict)}")
def call_back_writes(self, key):
"""客戶端可寫時處理"""
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
client_socket.send(b"ok")
Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)
def main():
t = Task()
t.run()
while True:
ready = Selector.select()
for key, obj in ready:
# 須要本身回調
call_back = key.data
call_back(key)
if __name__ == "__main__":
main()
Code提煉一下:
Selector = selectors.DefaultSelector()
Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
Selector.unregister(key.fd)
業餘拓展:
select, iocp, epoll,kqueue及各類I/O複用機制 https://blog.csdn.net/shallwake/article/details/5265287 kqueue用法簡介 http://www.cnblogs.com/luminocean/p/5631336.html
咱們常常有這樣的需求:讀取兩個分表的數據列表,而後合併以後進行一些處理
平時能夠借用itertools.chain
來遍歷:
# https://docs.python.org/3/library/itertools.html#itertools.chain
import itertools
def main():
# 模擬分表後的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍歷key(這種狀況須要本身封裝合併方法並處理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合併並遍歷
for item in itertools.chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
輸出:
小張 小明 小潘 小周 name name1
它的內部實現實際上是這樣的:(至關於兩層遍歷,用yield返回
)
def my_chain(*args, **kwargs):
for items in args:
for item in items:
yield item
def main():
# 模擬分表後的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍歷key(這種狀況須要本身封裝合併方法並處理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合併並遍歷
for item in my_chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
而後Python3.3
以後語法再一步簡化(yield from iterable對象
)
def my_chain(*args, **kwargs):
for items in args:
yield from items
def main():
# 模擬分表後的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# 需求:合併並遍歷
for item in my_chain(user1, user2):
print(item)
if __name__ == '__main__':
main()
輸出:
小張 小明 小潘 小周 test1 test2
其實知道了內部實現,很容易就寫上一段應對的處理:
def my_chain(*args, **kwargs):
for my_iterable in args:
# 若是是字典類型就返回value
if isinstance(my_iterable, dict):
my_iterable = my_iterable.values()
for item in my_iterable:
yield item
def main():
# 模擬分表後的兩個查詢結果
user1 = ["小張", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍歷key(這種狀況須要本身封裝合併方法並處理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合併並遍歷
for item in my_chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
輸出:
小張 小明 小潘 小周 test1 test2
PS:通常不會這麼幹的,通常都是[{},{}]
遍歷並處理:
import itertools
def main():
# 模擬分表後的兩個查詢結果
user1 = [{"name": "小張"}, {"name": "小明"}]
user2 = [{"name": "小潘"}, {"name": "小周"}]
user3 = [{"name": "test1"}, {"name": "test2"}]
# 需求:合併並遍歷
for item in itertools.chain(user1, user2, user3):
# 通常都是直接在這裏進行處理
for key, value in item.items():
print(value)
if __name__ == '__main__':
main()
協程的目的其實很簡單:像寫同步代碼那樣實現異步編程
先看個需求:生成繪圖的數據(max,min,avg
)
好比說原來數據是這樣的:
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
處理以後:
new_products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99],
"max": 120,
"min": 76,
"avg": 96.0
},
{
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89],
"max": 89,
"min": 30,
"avg": 61.25
}]
處理過的數據通常用來畫圖,實際效果相似於:
若是不借助協程,咱們通常這麼處理:(數據庫獲取過程省略)
# 生成新的dict數據
def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
return item
def get_new_data(data):
newdata = []
for item in data:
new_item = get_new_item(item)
# print(new_item) # 處理後的新dict
newdata.append(new_item)
return newdata
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = get_new_data(products)
print(new_products)
if __name__ == "__main__":
main()
改爲yield版的協程也很方便,基本上代碼沒有變,也不用像IO多路複用那樣來回的回調
# 生成新的dict數據
def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
yield item
def get_new_data(data):
for item in data:
yield from get_new_item(item)
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = list()
# 若是須要返回值就捕獲StopIteration異常
for item in get_new_data(products):
new_products.append(item)
print(new_products)
if __name__ == "__main__":
main()
簡單解析一下:(用yield from
的目的就是爲了引出等會說的async/await
)
yield from
(委託生成器get_new_data
)的好處就是讓調用方(main
)和yield
子生成器(get_new_item
)直接創建一個雙向通道
你也能夠把yield from
看成一箇中介(若是不理解就把yield from
想象成await
就容易理解了),本質就是下面代碼:
# 生成新的數據
def get_new_data(data):
for item in data:
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
yield item
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = list()
for item in get_new_data(products):
new_products.append(item)
print(new_products)
if __name__ == "__main__":
main()
yield from
內部其實在yield
基礎上作了不少事情(好比一些異常的處理),具體能夠看看 PEP 380
先提煉一個簡版
的:
# 正常調用
RESULT = yield from EXPR
# _i:子生成器(也是個迭代器)
# _y:子生成器生產的值
# _r:yield from 表達式最終結果
# _s:調用方經過send發送的值
# _e:異常對象
# 內部原理
_i = iter(EXPR) # EXPR是一個可迭代對象,_i是子生成器
try:
# 第一次不能send值,只能next() or send(None),並把產生的值放到_y中
_y = next(_i)
except StopIteration as _e:
# 若是子生成器直接就return了,那就會拋出異常,經過value能夠拿到子生成器的返回值
_r = _e.value
else:
# 嘗試進行循環(調用方和子生成器交互過程),yield from這個生成器會阻塞(委託生成器)
while 1:
# 這時候子生成器已經和調用方創建了雙向通道,在等待調用方send(value),把這個值保存在_s中
_s = yield _y # 這邊還會進行一系列異常處理,我先刪掉,等會看
try:
# 若是send(None),那麼繼續next遍歷
if _s is None:
_y = next(_i) # 把子生成器結果放到 _y 中
else:
_y = _i.send(_s) # 若是調用方send一個值,就轉發到子生成器
except StopIteration as _e:
_r = _e.value # 若是子生成器遍歷完了,就把返回值給_r
break
RESULT = _r # 最終的返回值(yield from 最終的返回值)
如今再來看完整版
壓力就沒有那麼大了:
# 正常調用
RESULT = yield from EXPR
# _i:子生成器(也是個迭代器)
# _y:子生成器生產的值
# _r:yield from 表達式最終結果
# _s:調用方經過send發送的值
# _e:異常對象
# 內部原理
_i = iter(EXPR) # EXPR是一個可迭代對象,_i是子生成器
try:
# 第一次不能send值,只能next() or send(None),並把產生的值放到_y中
_y = next(_i)
except StopIteration as _e:
# 若是子生成器直接就return了,那就會拋出異常,經過value能夠拿到子生成器的返回值
_r = _e.value
else:
# 嘗試進行循環(調用方和子生成器交互過程),yield from這個生成器會阻塞(委託生成器)
while 1:
try:
# 這時候子生成器已經和調用方創建了雙向通道,在等待調用方send(value),把這個值保存在_s中
_s = yield _y
# 【如今補全】有這麼幾種狀況須要處理
# 1.子生成器可能只是一個迭代器,並不能做爲協程的生成器(不支持throw和close)
# 2.子生成器雖然支持了throw和close,但在子生成器內部兩種方法都會拋出異常
# 3.調用法調用了gen.throw(),想讓子生成器本身拋異常
# 這時候就要處理 gen.close() 和 gen.throw()的狀況
# 生成器close()異常的處理
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass # 屏蔽close的異常
else:
_m()
raise _e # 上拋異常
# 生成器throw()異常的處理
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
# 若是send(None),那麼繼續next遍歷
if _s is None:
_y = next(_i) # 把子生成器結果放到 _y 中
else:
_y = _i.send(_s) # 若是調用方send一個值,就轉發到子生成器
except StopIteration as _e:
_r = _e.value # 若是子生成器遍歷完了,就把返回值給_r
break
RESULT = _r # 最終的返回值(yield from 最終的返回值)
把上面的原生代碼用async和await
改裝一下:(協程的目的就是像寫同步代碼同樣寫異步,這個纔算是真作到了)
import asyncio
# 生成新的dict數據
async def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
return item
async def get_new_data(data):
newdata = []
for item in data:
new_item = await get_new_item(item)
# print(new_item) # 處理後的新dict
newdata.append(new_item)
return newdata
def main():
# 需求:生成繪圖的數據(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊補水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
# python 3.7
new_products = asyncio.run(get_new_data(products))
print(new_products)
if __name__ == "__main__":
main()
輸出:(是否是很原生代碼沒啥區別?)
[{'id': 2344, 'title': '御泥坊補水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76}, {'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]
下級預估:asyncio
官方文檔:https://docs.python.org/3/library/asyncio.html
開發中常見錯誤:https://docs.python.org/3/library/asyncio-dev.html
代碼示例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine
PS:asyncio
是Python
用於解決異步IO
編程的一整套
解決方案
上次說了下協程演變過程,此次繼續,先接着上次的說:
像JS
是能夠生成器和async
和await
混用的,那Python
呢?(NetCore不能夠混用)
import types
# 和生成器徹底分開了,不過能夠理解爲yield from
@types.coroutine
def get_value(value):
yield value
async def get_name(name):
# 一系列邏輯處理
return await get_value(name)
if __name__ == '__main__':
gen = get_name("小明")
print(gen.send(None))
# 直接混用會報錯:TypeError: object generator can't be used in 'await' expression
咱們的async
和await
雖然和yield from
不是一個概念,可是能夠理解爲yield from
上面這段代碼你能夠理解爲:
import types
def get_value(value):
yield value
# 這個async和await替換成yield from
def get_name(name):
# 一系列邏輯處理
yield from get_value(name)
if __name__ == '__main__':
gen = get_name("小明")
print(gen.send(None))
PS:Python默認和NetCore同樣,不能直接混用,若是你必定要混用,那麼得處理下(使用@asyncio.coroutine
也行)
在今天以前,協程咱們是這麼實現的:事件循環(loop)
+回調(驅動生成器)
+IO多路複用(epoll)
如今能夠經過官方提供的asyncio
(能夠理解爲協程池)來實現了(第三方還有一個uvloop
【基於C寫的libuv
庫(nodejs
也是基於這個庫)】)
PS:uvloop
的使用很是簡單,只要在獲取事件循環前將asyncio
的事件循環策略設置爲uvloop
的:asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
先看個簡單的協程案例:
import types
import asyncio
# 模擬一個耗時操做
async def test():
print("start...")
# 不能再使用之前阻塞的暫停了
await asyncio.sleep(2)
print("end...")
return "ok"
if __name__ == '__main__':
import time
start_time = time.time()
# # >=python3.4
# # 返回asyncio的事件循環
# loop = asyncio.get_event_loop()
# # 運行事件循環,直到指定的future運行完畢,返回結果
# result = loop.run_until_complete(test())
# print(result)
# python3.7
result = asyncio.run(test())
print(result)
print(time.time() - start_time)
輸出:
start... end... ok 2.001772403717041
簡單說下,asyncio.run
是python3.7才簡化出來的語法(類比NetCore的Task.Run
)看看源碼就知道了:
# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
# 之前是直接使用"asyncio.get_event_loop()"(開發人員通常都習慣這個了)
# 3.7開始推薦使用"asyncio.get_running_loop()"來獲取正在運行的loop(獲取不到就拋異常)
if events._get_running_loop() is not None:
raise RuntimeError("沒法從正在運行的事件循環中調用asyncio.run()")
if not coroutines.iscoroutine(main):
raise ValueError("{!r}應該是一個協程".format(main))
loop = events.new_event_loop() # 建立一個新的事件循環
try:
events.set_event_loop(loop) # 設置事件循環
loop.set_debug(debug) # 是否調試運行(默認否)
return loop.run_until_complete(main) # 等待運行
finally:
try:
_cancel_all_tasks(loop) # 取消其餘任務
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
新版本其實就是使用了一個新的loop
去啓動run_until_complete
PS:uvloop
也能夠這樣去使用:獲取looploop = uvloop.new_event_loop()
再替換原生的loopasyncio.set_event_loop(loop)
import asyncio
# 模擬一個耗時操做
async def test(i):
print("start...")
# 不能再使用之前阻塞的暫停了
await asyncio.sleep(2)
print("end...")
return i
if __name__ == '__main__':
import time
start_time = time.time()
# # >=python3.4
loop = asyncio.get_event_loop()
# tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
# 注意:是loop的方法,而不是asyncio的,否則就會引起RuntimeError:no running event loop
tasks = [loop.create_task(test(i)) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print(task.result())
print(time.time() - start_time)
輸出:(tasks替換成這個也同樣:tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
)
start... start... start... start... start... start... start... start... start... start... end... end... end... end... end... end... end... end... end... end... 0 1 2 3 4 5 6 7 8 9 2.028331995010376
而後咱們再看看這個asyncio.wait
是個啥:(回顧:http://www.javashuo.com/article/p-rcfjlior-d.html)
# return_when 這個參數和以前同樣 FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION ALL_COMPLETED = concurrent.futures.ALL_COMPLETED # 官方準備在將來版本廢棄它的loop參數 # 和concurrent.futures裏面的wait不同,這邊是個協程 async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
平時使用能夠用高級APIasyncio.gather(*tasks)
來替換asyncio.wait(tasks)
PS:官方推薦使用create_task的方式
來建立一個任務
import asyncio
# 模擬一個耗時操做
async def test(i):
print("start...")
# 不能再使用之前阻塞的暫停了
await asyncio.sleep(2)
print("end...")
return i
async def main():
tasks = [test(i) for i in range(10)]
# await task 能夠獲得返回值(獲得結果或者異常)
# for task in asyncio.as_completed(tasks):
# try:
# print(await task)
# except Exception as ex:
# print(ex)
return [await task for task in asyncio.as_completed(tasks)]
if __name__ == '__main__':
import time
start_time = time.time()
# old推薦使用
loop = asyncio.get_event_loop()
result_list = loop.run_until_complete(main())
print(result_list)
print(time.time() - start_time)
輸出:(PS:用asyncio.gather(*tasks)
直接替換asyncio.wait(tasks)
也行)
start... start... start... start... start... start... start... start... start... start... end... end... end... end... end... end... end... end... end... end... [1, 6, 4, 5, 0, 7, 8, 3, 2, 9] 2.0242035388946533
其實理解起來很簡單,並且和NetCore
以及NodeJS
它們統一了,只要是await xxx
就返回一個(結果
|異常
),不await
就是一個task對象
import asyncio
# 模擬一個耗時操做
async def test(i):
print("start...")
await asyncio.sleep(2)
print("end...")
return i
async def main():
tasks = [test(i) for i in range(10)]
# 給`協程/futures`返回一個future聚合結果
return await asyncio.gather(*tasks) # 記得加*來解包
if __name__ == '__main__':
import time
start_time = time.time()
# python3.7
result_list = asyncio.run(main())
print(result_list)
# 2.0259485244750977
print(time.time() - start_time)
輸出:(語法簡化太多了,用起來特別簡單)
start... start... start... start... start... start... start... start... start... start... end... end... end... end... end... end... end... end... end... end... [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2.00840163230896
關於參數須要加*
解包的說明 ==> 看看函數定義就秒懂了:
# 給 協程/futures 返回一個future聚合結果
def gather(*coros_or_futures, loop=None, return_exceptions=False):
pass
# 把協程或者awaitable對象包裹成task
def ensure_future(coro_or_future, *, loop=None):
pass
# 傳入一個協程對象,返回一個task對象
class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro):
pass
asyncio的高級(high-level
)API通常用於這幾個方面:(開發基本夠用了)
IO
)和進程間通訊(IPC
)subprocesses
)相關Queue
)分配任務(Tasks
)synchronize
)併發代碼低級(low-level
)API通常這麼用:(事件循環和回調會用下,其餘基本不用)
Signal
)等提供異步(asynchronous
)APIasync/await
語法橋接基於回調的庫和代碼回調通常不利於代碼維護,如今基本上是儘可能不用了(異步代碼用起來都和同步沒多大差異了,回調也就沒那麼大用處了)
上面說的獲取返回值,其實也能夠經過回調函數來獲取:
# 低級API示例
import asyncio
async def get_html(url):
print(f"get {url} ing")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def call_back(task):
print(type(task))
print(task.result())
if __name__ == "__main__":
import time
start_time = time.time()
urls = [
"https://www.baidu.com", "https://www.sogou.com",
"https://www.python.org", "https://www.asp.net"
]
tasks = set() # 任務集合
loop = asyncio.get_event_loop()
for url in urls:
# task = asyncio.ensure_future(get_html(url))
task = loop.create_task(get_html(url))
# 設置回調函數
task.add_done_callback(call_back)
# 添加到任務集合中
tasks.add(task)
# 批量執行
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start_time)
輸出:(task.add_done_callback(回調函數)
)
get https://www.baidu.com ing get https://www.sogou.com ing get https://www.python.org ing get https://www.asp.net ing <class '_asyncio.Task'> <h1>This is a test for https://www.baidu.com</h1> <class '_asyncio.Task'> <h1>This is a test for https://www.python.org</h1> <class '_asyncio.Task'> <h1>This is a test for https://www.sogou.com</h1> <class '_asyncio.Task'> <h1>This is a test for https://www.asp.net</h1> 2.0168468952178955
實例:
import asyncio
import functools
async def get_html(url):
await asyncio.sleep(2)
return "This is a test for"
# 注意一個東西:經過偏函數傳過來的參數在最前面
def call_back(url, task):
# do something
print(type(task))
print(task.result(), url)
if __name__ == "__main__":
import time
start_time = time.time()
urls = [
"https://www.baidu.com", "https://www.sogou.com",
"https://www.python.org", "https://www.asp.net"
]
tasks = set() # 任務集合
loop = asyncio.get_event_loop()
for url in urls:
# task = asyncio.ensure_future(get_html(url))
task = loop.create_task(get_html(url))
# 設置回調函數 (不支持傳參數,咱們就利用偏函數來傳遞)
task.add_done_callback(functools.partial(call_back, url))
# 添加到任務集合中
tasks.add(task)
# 批量執行
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start_time)
輸出:(PS:經過偏函數傳過來的參數在最前面)
<class '_asyncio.Task'> This is a test for https://www.baidu.com <class '_asyncio.Task'> This is a test for https://www.python.org <class '_asyncio.Task'> This is a test for https://www.sogou.com <class '_asyncio.Task'> This is a test for https://www.asp.net 2.0167236328125
以前說的await task
可能獲得結果也可能獲得異常有些人可能還不明白 ==> 其實你把他看出同步代碼(PS:協程的目的就是像寫同步代碼同樣進行異步編程)就好理解了,函數執行要麼獲得結果要麼獲得返回值
看個異常的案例:
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
tasks = [get_html(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
輸出:(和同步代碼沒差異,可能出異常的部分加個異常捕獲便可)
get https://www.baidu.com ing get https://www.asp.net ing get https://www.python.org ing get https://www.sogou.com ing Exception is over 0.008000373840332031
再一眼舊版怎麼用:(PS:基本差很少,下次所有用新用法了)
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
tasks = set() # 任務集合
tasks = [get_html(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
try:
# 批量執行
loop.run_until_complete(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
Python3調試過程當中的常見異常:http://www.javashuo.com/article/p-qqaluvos-t.html
官方文檔:https://docs.python.org/3/library/asyncio-exceptions.html
asyncio.TimeoutError(Exception.Error)
:
asyncio.CancelledError(Exception.Error)
:
asyncio.InvalidStateError(Exception.Error)
:
Task/Future
內部狀態無效引起asyncio.IncompleteReadError(Exception.Error)
:讀取未完成引起的錯誤:
asyncio.LimitOverrunError(Exception)
:
asyncio.SendfileNotAvailableError(Exception.ReferenceError.RuntimeError)
:
有些異常官方沒有寫進去,我補了一些經常使用的異常:https://docs.python.org/3/library/exceptions.html
BaseException
SystemExit
:sys.exit()
引起的異常(目的:讓Python解釋器退出)KeyboardInterrupt
:用戶Ctrl+C終止程序引起的異常GeneratorExit
:生成器或者協程關閉的時候產生的異常(特別注意)Exception
:全部內置異常(非系統退出)或者用戶定義異常的基類
asyncio.Error
asyncio.CancelledError
asyncio.TimeoutError
:和Exception.OSError.TimeoutError
區分開asyncio.InvalidStateError
:Task/Future
內部狀態無效引起asyncio.LimitOverrunError
:超出緩衝區引起的異常StopIteration
:next()、send()
引起的異常:
https://www.cnblogs.com/dotnetcrazy/p/9278573.html#6.Python迭代器
StopAsyncIteration
:__anext__()
引起的異常AssertionError
:當斷言assert
語句失敗時引起AttributeError
:當屬性引用或賦值失敗時引起EOFError
asyncio.IncompleteReadError
:讀取操做未完成引起的錯誤OSError
:當系統函數返回與系統相關的錯誤時引起
TimeoutError
:系統函數執行超時時觸發ReferenceError
:引用錯誤(對象被資源回收或者刪除了)RuntimeError
:出錯了,可是檢測不到錯誤類別時觸發
NotImplementedError
:爲實現報錯(好比調用了某個不存在的子類方法)RecursionError
:遞歸程度太深引起的異常asyncio.SendfileNotAvailableError
:系統調用不適用於給定的套接字或文件類型SyntaxError
:語法錯誤時引起(粘貼代碼常常遇到)
IndentationError
:縮進有問題TabError
:當縮進包含不一致的製表符和空格使用時引起TypeError
:類型錯誤Net方向的同志記得對比當時寫的 Python3 與 C# 併發編程之~Net篇:https://www.cnblogs.com/dunitian/p/9419325.html
先說說概念:
event_loop
事件循環:
coroutine
協程:
async
關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象future
對象:
task
任務:
Task
對象是Future
的子類,它將coroutine
和Future
聯繫在一塊兒,將coroutine
封裝成一個Future
對象async/await
關鍵字:
async
定義一個協程,await
用於掛起阻塞的異步調用接口yield from
(都是在調用方與子協程之間直接創建一個雙向通道)爲了不讀者混亂於新舊代碼的使用,從下面開始就直接使用最新的語法的
asyncio.run(main())
if __name__ == "__main__"
)asyncio.create_task(func())
asyncio.gather(*tasks)
asyncio.wait
asyncio.get_event_loop()
asyncio.get_running_loop()
(獲取不到會拋異常)# 若是和舊版本混用,就應該這麼寫了(麻煩)
try:
loop = asyncio.get_running_loop()
except RuntimeError as ex:
print(ex) # no running event loop
loop = asyncio.get_event_loop()
...
loop.run_until_complete(xxx)
新語法:
async def main():
loop = asyncio.get_running_loop()
...
asyncio.run(main())
Task基本上就是這幾個狀態(生成器、Future也是):
Pending
:建立Task,還未執行Running
:事件循環正在調用執行任務Done
:Task執行完畢Cancelled
:Task被取消後的狀態Python3.7以前官方貼了張時序圖,咱們拿來理解上面的話:https://docs.python.org/3.6/library/asyncio-task.html
import asyncio
async def compute(x, y):
print(f"計算 {x}+{y}...")
await asyncio.sleep(1.0)
return x + y
async def main(x, y):
result = await compute(x, y)
print(f"{x}+{y}={result}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main(1, 2))
loop.close()
和舊版本比起來其實就是建立一個task
,而後爲task
添加一個回調函數add_done_callback
import asyncio
async def get_html(url):
print(f"get {url} ing")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def callback_func(task):
print(type(task))
if task.done():
print(f"done") # print(task.result())
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
# asyncio.create_task來建立一個Task
tasks = [asyncio.create_task(get_html(url)) for url in urls]
# 給每一個任務都加一個回調函數
for task in tasks:
task.add_done_callback(callback_func)
# 批量執行任務
result = await asyncio.gather(*tasks)
print(result) # 返回 result list
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
get https://www.baidu.com ing get https://www.asp.net ing get https://www.python.org ing get https://www.sogou.com ing <class '_asyncio.Task'> done <class '_asyncio.Task'> done <class '_asyncio.Task'> done <class '_asyncio.Task'> done ['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>'] 2.0189685821533203
注意:`add_signal_handler`是loop獨有的方法,Task中沒有,eg:loop.add_signal_handler(signal.SIGINT, callback_handle, *args)
關於批量任務的異常處理:
return_exceptions=True
:不影響其餘任務,異常消息也放在結果列表中gather
被取消的時候,無論True or False,這批次任務所有取消import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def callback_func(task):
if task.done():
print(f"done") # print(task.result())
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
# asyncio.create_task來建立一個Task
tasks = [asyncio.create_task(get_html(url)) for url in urls]
# 給每一個任務都加一個回調函數
for task in tasks:
task.add_done_callback(callback_func)
# 批量執行任務
result = await asyncio.gather(*tasks, return_exceptions=True)
print(result) # 返回 result list
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
get https://www.baidu.com ing get https://www.asp.net ing get https://www.python.org ing get https://www.sogou.com ing done done done done ['<h1>This is a test for https://www.baidu.com</h1>', Exception('Exception is over'), '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>'] 2.013272523880005
看個簡單的任務分組案例:
import asyncio
async def get_html(url):
print(f"get url for{url}")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls1 = ["https://www.baidu.com", "https://www.asp.net"]
urls2 = ["https://www.python.org", "https://www.sogou.com"]
tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]
# 等待兩組都完成,而後返回聚合結果
result = await asyncio.gather(*tasks1, *tasks2)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
輸出:(兩個分組結果被一塊兒放到了list中)
get url forhttps://www.baidu.com get url forhttps://www.asp.net get url forhttps://www.python.org get url forhttps://www.sogou.com ['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>'] 2.0099380016326904
若是想要對Group1
和Group2
進行更多的自定化,能夠再包裹一層gather
方法:
import asyncio
async def get_html(url):
print(f"get url for{url}")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls1 = ["https://www.baidu.com", "https://www.asp.net"]
urls2 = ["https://www.python.org", "https://www.sogou.com"]
tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]
group1 = asyncio.gather(*tasks1)
group2 = asyncio.gather(*tasks2)
# 分組2由於某緣由被取消任務了(模擬)
group2.cancel()
# 等待兩組都完成,而後返回聚合結果
result = await asyncio.gather(group1, group2, return_exceptions=True)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
輸出:
get url forhttps://www.baidu.com get url forhttps://www.asp.net [['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>'], CancelledError()] 2.0090348720550537
再看個單個任務的案例:
import asyncio
async def test():
print("start...")
await asyncio.sleep(10)
print("end...")
async def main():
task = asyncio.create_task(test())
await asyncio.sleep(1)
# 取消task任務
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"任務已經被取消:{task.cancelled()}")
print(f"任務是由於異常而完成:{task.done()}")
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start... 任務已經被取消:True 任務是由於異常而完成:True 1.0133979320526123
簡單說明下:
task.done()
:任務是否完成
task.done() ==> true
:
task.cancelled()
:用來判斷是否成功取消爲何這麼說?看看源碼:
# 完成包含了正常+異常
if outer.done():
# 把由於異常完成的任務打個標記
if not fut.cancelled():
fut.exception() # 標記檢索的異常
PS:官方推薦asyncio.all_tasks
(loop中還沒有完成的Task集合):
asyncio.Task.all_tasks
來獲取(返回loop的全部Task集合)超時等待:asyncio.wait_for(task, timeout)
import asyncio
async def test(time):
print("start...")
await asyncio.sleep(time)
print("end...")
return time
async def main():
task = asyncio.create_task(test(3))
try:
result = await asyncio.wait_for(task, timeout=2)
print(result)
except asyncio.CancelledError:
print("Cancel")
except asyncio.TimeoutError:
print("超時取消")
except Exception as ex:
print(ex)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start... 超時取消 2.007002115249634
wait
是比gather
更底層的api,好比如今這個多任務限時等待gather
並不能知足:
import asyncio
async def test(time):
print("start...")
await asyncio.sleep(time)
print("end...")
return time
async def main():
tasks = [asyncio.create_task(test(i)) for i in range(10)]
# 已完成的任務(包含異常),未完成的任務
done, pending = await asyncio.wait(tasks, timeout=2)
# 任務總數(我用了3種表示)PS:`all_tasks()`的時候記得去除main的那個
print(
f"任務總數:{len(tasks)}=={len(done)+len(pending)}=={len(asyncio.Task.all_tasks())-1}"
)
# 全部未完成的task:asyncio.all_tasks(),記得去掉run(main())
print(f"未完成Task:{len(pending)}=={len(asyncio.all_tasks()) - 1}")
print(await asyncio.gather(*done))
# for task in done:
# print(await task)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start... start... start... start... start... start... start... start... start... start... end... end... end... 任務總數:10==10==10 未完成Task:7==7 [0, 1, 2] 2.0071778297424316
用法其實和Future同樣(https://www.cnblogs.com/dotnetcrazy/p/9528315.html#Future對象),這邊就當再普及下新語法了
項目裏常常有這麼一個場景:同時調用多個同效果的API,有一個返回後取消其餘請求
,看個引入案例
import asyncio
async def test(i):
print(f"start...task{i}")
await asyncio.sleep(i)
print(f"end...task{i}")
return "ok"
# 第一個任務執行完成則結束此批次任務
async def main():
tasks = [asyncio.create_task(test(i)) for i in range(10)]
# 項目裏常常有這麼一個場景:同時調用多個同效果的API,有一個返回後取消其餘請求
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
# print(await asyncio.gather(*done))
for task in done:
print(await task)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
start...task0 start...task1 start...task2 start...task3 start...task4 start...task5 start...task6 start...task7 start...task8 start...task9 end...task0 ok 0.017002105712890625
課後拓展:(asyncio.shield
保護等待對象不被取消) https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation
下級預估:舊代碼兼容、同步語、Socket新用
以前有人問我,這個asyncio.get_running_loop()
究竟是用仍是不用?爲何一會asyncio.get_event_loop()
一會又是asyncio.get_running_loop()
,一會是loop.run_until_complete()
一會又是asyncio.run()
的,有點混亂了。
以前逆天簡單的提了一下,可能說的仍是不太詳細,這邊再舉幾個例子說說:
首先:若是你用的是Python3.7
以前的版本,那麼你用不到loop = asyncio.get_running_loop()
和asyncio.run()
的
若是是老版本你就使用asyncio.get_event_loop()
來獲取loop
,用loop.run_until_complete()
來運行:
import asyncio
async def test():
print("start ...")
await asyncio.sleep(2)
print("end ...")
# 若是你用`get_running_loop`就不要和`loop.run_until_complete`混用
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
輸出:(混用須要捕獲Runtime的異常)
start ... end ...
上節課說使用asyncio.get_running_loop()
麻煩的情景是這個:(這種狀況倒不如直接asyncio.get_event_loop()
獲取loop了)
# 若是和舊版本混用,就應該這麼寫了(麻煩)
try:
loop = asyncio.get_running_loop()
except RuntimeError as ex:
loop = asyncio.get_event_loop()
...
asyncio.run(test())
官方推薦的新語法是這樣的:(>=Python3.7
)
async def main():
loop = asyncio.get_running_loop()
...
asyncio.run(main())
PS:記住一句就行:asyncio.get_running_loop()
和asyncio.run()
成對出現
能夠這麼理解:asyncio.run
裏會建立對應的loop
,因此你才能獲取正在運行的loop
:
# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError("沒法從正在運行的事件循環中調用asyncio.run()")
if not coroutines.iscoroutine(main):
raise ValueError("{!r}應該是一個協程".format(main))
# 建立一個新的事件循環
loop = events.new_event_loop()
try:
events.set_event_loop(loop) # 設置事件循環
loop.set_debug(debug) # 是否調試運行(默認否)
return loop.run_until_complete(main) # 等待運行
finally:
try:
_cancel_all_tasks(loop) # 取消其餘任務
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
就是怕你們混亂,上節課開始就直接使用的最新語法,舊語法文章裏儘可能不使用了,本節也是
部分能夠參考官方文檔:https://docs.python.org/3/library/asyncio-eventloop.html
學了協程GIL
的問題其實也不是多大的事情了,多進程+協程就能夠了,asyncio
如今也提供了線程安全的run
方法:asyncio.run_coroutine_threadsafe(coro)
(也算是對GIL給出的官方解決方法了)
前面咱們說過了併發編程(線程+進程)的通用解決方案:併發編程:concurrent.futures專欄
asyncio
框架雖然幾乎包含了全部經常使用功能,但畢竟是新事物,舊代碼怎麼辦?協程只是單線程工做,理論上不能使用阻塞代碼,那庫或者api只能提供阻塞的調用方式怎麼辦? ~ 不用慌,可使用官方提供的兼容方法,先看個案例:
import asyncio
import concurrent.futures
# 模擬一個耗時操做
def test(n):
return sum(i * i for i in range(10**n))
# old main
def main():
with concurrent.futures.ThreadPoolExecutor() as pool:
# 注意:future和asyncio.future是不同的
future = pool.submit(test, 7)
result = future.result()
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
main() # old
print(time.time() - start_time)
輸出:(注意:future
和asyncio.future
不是一個東西,只是相似而已)
333333283333335000000 15.230607032775879
import asyncio
import concurrent.futures
# 模擬一個耗時操做
def test(n):
return sum(i * i for i in range(10**n))
async def main():
# 獲取loop
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
# 新版兼任代碼
result = await loop.run_in_executor(pool, test, 7)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main()) # new
print(time.time() - start_time)
輸出:(不談其餘的,至少運行速度快了)
333333283333335000000 15.283994913101196
咱們來看看run_in_executor
的內部邏輯是啥:
class BaseEventLoop(events.AbstractEventLoop):
def run_in_executor(self, executor, func, *args):
# 檢查loop是否關閉,若是關閉就拋`RuntimeError`異常
self._check_closed()
if self._debug:
self._check_callback(func, 'run_in_executor')
# 若是不傳一個executor,就會使用默認的executor
# 換句話說:你能夠不傳`線程池`
if executor is None:
executor = self._default_executor
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
# 把`concurrent.futures.Future`對象封裝成`asyncio.futures.Future`對象
return futures.wrap_future(executor.submit(func, *args), loop=self)
看完源碼就發現,代碼還能夠進一步簡化:
import asyncio
# 模擬一個耗時操做
def test(n):
return sum(i * i for i in range(10**n))
async def main():
# 獲取loop
loop = asyncio.get_running_loop()
# 新版兼任代碼
result = await loop.run_in_executor(None, test, 7)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
333333283333335000000 15.367998838424683
PS:協程裏面不該該出現傳統的阻塞代碼,若是隻能用那些代碼,那麼這個就是一個兼任的措施了
這個沒有以前講的那些經常使用,就當瞭解下,框架裏面碰到不至於懵逼:
add_done_callback(回調函數)
task.add_done_callback()
or loop.add_done_callback()
functools.partial(call_back, url)
call_back(url,task)
call_soon(callback,*args)
loop.call_soon()
、線程安全:loop.call_soon_threadsafe()
loop.call_later(0,callback,*args)
loop.call_later(delay,callback,*args)
loop.call_at(絕對時間,callback,*args)
注意點:首先要保證任務執行前loop不斷開,好比你call_later(2,xxx)
,這時候loop退出了,那麼任務確定完成不了
這個比較簡單,看個案例:
import asyncio
def test(name):
print(f"start {name}...")
print(f"end {name}...")
async def main():
# 正在執行某個任務
loop = asyncio.get_running_loop()
# 插入一個更要緊的任務
# loop.call_later(0, callback, *args)
task1 = loop.call_soon(test, "task1")
# 多少秒後執行
task2 = loop.call_later(2, test, "task2")
# 內部時鐘時間
task3 = loop.call_at(loop.time() + 3, test, "task3")
print(type(task1))
print(type(task2))
print(type(task3))
# 保證loop在執行完畢後才關閉
await asyncio.sleep(5)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(回調函數通常都是普通函數
)
<class 'asyncio.events.Handle'> <class 'asyncio.events.TimerHandle'> <class 'asyncio.events.TimerHandle'> start task1... end task1... start task2... end task2... start task3... end task3... 4.9966819286346436
PS:關於返回值的說明能夠看官方文檔:https://docs.python.org/3/library/asyncio-eventloop.html#callback-handles
而後說下call_later
(這個執行過程會按照時間排個前後順序,而後再批次運行)
import asyncio
# 回調函數通常都是普通函數
def test(name):
print(name)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
# 新版本限制了時間不能超過24h(防止有些人當定時任務來亂用)
# 這個執行過程會安裝時間排個前後順序,而後再批次運行
task4 = loop.call_later(4, test, "task2-4")
task2 = loop.call_later(2, test, "task2-2")
task3 = loop.call_later(3, test, "task2-3")
task1 = loop.call_later(1, test, "task2-1")
# 取消測試
task4.cancel()
# close是直接丟棄任務而後關閉loop
loop.call_later(4, loop.stop) # 等任務執行完成結束任務 loop.stop()
# run內部運行的是run_until_complete,而run_until_complete內部運行的是run_forever
loop.run_forever()
print(time.time() - start_time)
輸出:(asyncio.get_running_loop()
不要和舊代碼混用)
task2-1 task2-2 task2-3 4.009201526641846
PS:run
內部運行的是run_until_complete
,而run_until_complete
內部運行的是run_forever
從開始說新語法以後,咱們建立任務都直接用asyncio.create_task
來包裹一層,有人問我這個Task
除了是Future
的子類外,有啥用?爲啥不直接使用Future
呢?貌似也沒語法啊?
看一個案例:
import asyncio
# 不是協程就加個裝飾器
@asyncio.coroutine
def test():
print("this is a test")
async def test_async():
print("this is a async test")
await asyncio.sleep(1)
async def main():
# 傳入一個協程對象,返回一個task
task1 = asyncio.create_task(test())
task2 = asyncio.create_task(test_async())
await asyncio.gather(task1, task2)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
this is a test this is a async test 1.0070011615753174
咱們來看看asyncio.create_task
的源碼:(關鍵在Task
類)
# 傳入一個協程對象,返回一個Task對象
def create_task(self, coro):
self._check_closed()
if self._task_factory is None:
# look:核心點
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
return task
看看核心類Task
:
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
...
# 安排了一個儘快執行的回調方法:self.__step
self._loop.call_soon(self.__step, context=self._context)
def __step(self, exc=None):
try:
if exc is None:
# 協程初始化(生成器或者協程初始化 next(xxx))
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# 在中止以前取消任務
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
# 拿到了協程/生成器的結果
super().set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
super().set_exception(exc)
except BaseException as exc:
super().set_exception(exc)
raise
...
PS:那麼很明顯了,Task
的做用就相似於future
和協程
的中間人了(屏蔽某些差別)
官方文檔:https://docs.python.org/3/library/asyncio-stream.html
asyncio
實現了TCP、UDP、SSL
等協議,aiohttp
則是基於asyncio
實現的HTTP框架,咱們簡單演示一下(PS:網絡通訊基本上都是使用aiohttp
)
服務端:
import asyncio
async def handler(client_reader, client_writer):
# 沒有數據就阻塞等(主線程作其餘事情去了)
data = await client_reader.read(2048)
print(data.decode("utf-8"))
client_writer.write("驪山語罷清宵半,淚雨霖鈴終不怨\n何如薄倖錦衣郎,比翼連枝當日願".encode("utf-8"))
await client_writer.drain() # 等待緩衝區(緩衝區沒佔滿就直接返回)
client_writer.close() # 關閉鏈接
async def main():
server = await asyncio.start_server(handler, "127.0.0.1", 8080)
print("Server已經啓動,端口:8080")
# 實現了協程方法`__aenter__`和`__aexit__`的可使用`async with`
async with server:
# async def serve_forever(self):pass ==> use await
await server.serve_forever() # 異步方法
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
客戶端:
import asyncio
async def main():
reader, writer = await asyncio.open_connection("127.0.0.1", 8080)
writer.write("人生若只如初見,何事秋風悲畫扇\n等閒變卻故人心,卻道故人心易變".encode("utf-8"))
data = await reader.read(2048)
if data:
print(data.decode("utf-8"))
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出圖示:
再舉個HTTP的案例:
import asyncio
async def get_html(host):
print("get_html %s..." % host)
reader, writer = await asyncio.open_connection(host, 80)
writer.write(f"GET / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode('utf-8'))
await writer.drain() # 等待緩衝區
html_list = []
async for line in reader:
html_list.append(line.decode("utf-8"))
writer.close() # 關閉鏈接
return "\n".join(html_list)
async def main():
tasks = [
asyncio.create_task(get_html(url))
for url in ['dotnetcrazy.cnblogs.com', 'dunitian.cnblogs.com']
]
html_list = await asyncio.gather(*tasks)
print(html_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
get_html dotnetcrazy.cnblogs.com... get_html dunitian.cnblogs.com... [html內容省略,html內容省略] 5.092018604278564
GIF過程圖:
PS:(後面會繼續說的)
__anext__
的可使用async for
__aenter__
和__aexit__
的可使用async with
還記得以前IO多路複用的時候本身寫的非阻塞Server
不,簡單梳理下流程,而後我們再一塊兒看看asyncio
對應的源碼:
Socket
爲非阻塞(socket.setblocking(False)
)fd
(register
)socket
進行相應操做unregister
)看看await asyncio.open_connection(ip,port)
的源碼:
# asyncio.streams.py
async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds):
if loop is None:
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
# 核心點
transport, _ = await loop.create_connection(lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
發現,其實內部核心在loop.create_connection
中
# asyncio.base_events.py
# 鏈接TCP服務器
class BaseEventLoop(events.AbstractEventLoop):
async def create_connection(self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None):
...
# 主要邏輯
if host is not None or port is not None:
exceptions = []
# 主要邏輯
for family, type, proto, cname, address in infos:
try:
sock = socket.socket(family=family, type=type, proto=proto)
sock.setblocking(False) # 1.設置非阻塞 <<<< look
if local_addr is not None:
for _, _, _, _, laddr in laddr_infos:
try:
sock.bind(laddr) # 端口綁定
break
except OSError as exc:
msg = (f'error while attempting to bind on '
f'address {laddr!r}: '
f'{exc.strerror.lower()}')
exc = OSError(exc.errno, msg)
exceptions.append(exc)
else:
sock.close()
sock = None
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
# 在selector_events中
await self.sock_connect(sock, address) # <<< look
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
except:
if sock is not None:
sock.close()
raise
else:
break
發現源碼中設置了socket爲非阻塞,調用了sock_connect
async def sock_connect(self, sock, address):
"""鏈接遠程socket地址(協程方法)"""
# 非阻塞檢查
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
...
fut = self.create_future()
self._sock_connect(fut, sock, address)
return await fut
def _sock_connect(self, fut, sock, address):
fd = sock.fileno() # 獲取socket的文件描述符 <<< look
try:
sock.connect(address)
except (BlockingIOError, InterruptedError):
# 設置future的回調函數_sock_connect_done(用來註銷的)<<< look
fut.add_done_callback(functools.partial(self._sock_connect_done, fd))
# 註冊selector.register
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
先看下sock_connect
中調用的add_writer
(註冊)
def add_writer(self, fd, callback, *args):
"""添加一個寫的回調"""
self._ensure_fd_no_transport(fd)
return self._add_writer(fd, callback, *args)
def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self, None)
try:
key = self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, selectors.EVENT_WRITE,
(None, handle)) # selector.register
else:
mask, (reader, writer) = key.events, key.data
self._selector.modify(fd, mask | selectors.EVENT_WRITE,
(reader, handle))
if writer is not None:
writer.cancel()
再看下sock_connect
中設置的回調函數_sock_connect_done
(註銷)
def _sock_connect_done(self, fd, fut):
# 取消註冊selector.unregister
self.remove_writer(fd)
def remove_writer(self, fd):
"""移除寫的回調"""
self._ensure_fd_no_transport(fd)
return self._remove_writer(fd)
def _remove_writer(self, fd):
if self.is_closed():
return False
try:
key = self._selector.get_key(fd)
except KeyError:
return False
else:
mask, (reader, writer) = key.events, key.data
mask &= ~selectors.EVENT_WRITE
if not mask:
self._selector.unregister(fd) # 註銷 <<< look
else:
self._selector.modify(fd, mask, (reader, None))
if writer is not None:
writer.cancel()
return True
else:
return False
PS:嵌套的很是深,並且底層代碼一致在變(Python3.6到Python3.7這個新小更新就變化很大)
以前併發編程的基礎知識已經講的很清楚了,也分析了不少源碼,你能夠本身去拓展一下(Python3
的asyncio
模塊的源碼一直在優化改進的路上)我這邊就不一一分析了(源碼很亂,估計幾個版本後會清晰,如今是多層混套用),你能夠參考部分源碼解析:https://github.com/lotapp/cpython3/tree/master/Lib/asyncio
課後拓展:
https://docs.python.org/3/library/asyncio-protocol.html#examples https://docs.python.org/3/library/asyncio-eventloop.html#creating-network-servers
下節預估:同步與通訊、aiohttp版爬蟲
官方文檔:
https://docs.python.org/3/library/asyncio-sync.html https://docs.python.org/3/library/asyncio-queue.html
寫在前面:
asyncio
具備如下基本同步原語:Lock、Event、Condition、Semaphore、BoundedSemaphore
先看個原來的引導案例:估計的結果是0,而不借助lock得出的結果每每出乎意料
import concurrent.futures
num = 0
def test(i):
global num
for _ in range(10000000):
num += i
def main():
with concurrent.futures.ThreadPoolExecutor() as executor:
print("start submit...")
future1 = executor.submit(test, 1)
future2 = executor.submit(test, -1)
concurrent.futures.wait([future1, future2]) # wait some time
print("end submit...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
main()
print(f"time:{time.time()-start_time}")
輸出:(可是代碼並非線程安全的,因此結果每每不是咱們想要的)
start submit... end submit... 82705 time:5.032064199447632
再看看協程的案例:
import asyncio
num = 0
async def test(i):
global num
for _ in range(10000000):
num += i
async def main():
print("start tasks...")
task1 = asyncio.create_task(test(1))
task2 = asyncio.create_task(test(-1))
await asyncio.gather(task1, task2)
print("end tasks...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(f"time:{time.time()-start_time}")
輸出:(就一個線程,固然安全)
start tasks... end tasks... 0 time:4.860997438430786
PS:你使用協程的兼容代碼,並不能解決線程不安全的問題
import asyncio import concurrent.futures num = 0 def test(i): global num for _ in range(10000000): num += i async def main(): # 獲取當前loop loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as executor: print("start submit...") future1 = loop.run_in_executor(executor, test, 1) future2 = loop.run_in_executor(executor, test, -1) # await asyncio.wait([future1,future2]) await asyncio.gather(future1, future2) print("end submit...") global num print(num) if __name__ == "__main__": import time start_time = time.time() asyncio.run(main()) print(f"time:{time.time()-start_time}")
輸出:
start submit... end submit... -1411610 time:5.0279998779296875
咋一看,單線程不用管線程安全啥的啊,要啥同步機制?其實在業務場景裏面仍是會出現諸如重複請求的狀況,這個時候就須要一個同步機制了:
import asyncio
# 用來存放頁面緩存
cache_dict = {}
# 模擬一個獲取html的過程
async def fetch(url):
# 每次網絡訪問,時間其實不肯定的
import random
time = random.randint(2, 5)
print(time)
await asyncio.sleep(time)
return f"<h2>{url}</h2>"
async def get_html(url):
# 若是緩存存在,則返回緩存的頁面
for url in cache_dict:
return cache_dict[url]
# 不然獲取頁面源碼並緩存
html = await fetch(url)
cache_dict[url] = html
return html
async def parse_js(url):
html = await get_html(url)
# do somthing
return len(html)
async def parse_html(url):
html = await get_html(url)
# do somthing
return html
async def main():
# 提交兩個Task任務
task1 = asyncio.create_task(parse_js("www.baidu.com"))
task2 = asyncio.create_task(parse_html("www.baidu.com"))
# 等待任務結束
result_list = await asyncio.gather(task1, task2)
print(result_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(fetch
方法訪問了兩次 ==> 兩次網絡請求)
2 3 [22, '<h2>www.baidu.com</h2>'] 3.0100157260894775
簡單說明:baidu.com
一開始沒緩存,那當解析js和解析html的任務提交時,就會進行兩次網絡請求(網絡IO比較耗時),這樣更容易觸發反爬蟲機制
線程相關的Lock複習:http://www.javashuo.com/article/p-rcfjlior-d.html
協程是線程安全的,那麼這個Lock
確定是和多線程/進程
裏面的Lock
是不同的,咱們先看一下提煉版的源碼:
class Lock(_ContextManagerMixin):
def __init__(self, *, loop=None):
self._waiters = collections.deque()
self._locked = False
if loop is not None:
self._loop = loop
else:
self._loop = events.get_event_loop()
async def acquire(self):
if not self._locked:
self._locked = True # 改變標識
...
return self._locked
def release(self):
if self._locked:
self._locked = False
...
PS:源碼看完秒懂了,asyncio裏面的lock其實就是一個標識而已
修改一下上面的例子:
import asyncio
# 用來存放頁面緩存
cache_dict = {}
lock = None # 你能夠試試在這邊直接寫`asyncio.Lock()`
# 模擬一個獲取html的過程
async def fetch(url):
# 每次網絡訪問,時間其實不肯定的
import random
time = random.randint(2, 5)
print(time)
await asyncio.sleep(time)
return f"<h2>{url}</h2>"
async def get_html(url):
async with lock:
# 若是緩存存在,則返回緩存的頁面
for url in cache_dict:
return cache_dict[url]
# 不然獲取頁面源碼並緩存
html = await fetch(url)
cache_dict[url] = html
return html
async def parse_js(url):
html = await get_html(url)
# do somthing
return len(html)
async def parse_html(url):
html = await get_html(url)
# do somthing
return html
async def main():
global lock
lock = asyncio.Lock() # 若是在開頭就定義,那麼lock的loop和方法的loop就會不一致了
# 提交兩個Task任務
task1 = asyncio.create_task(parse_js("www.baidu.com"))
task2 = asyncio.create_task(parse_html("www.baidu.com"))
# 等待任務結束
result_list = await asyncio.gather(task1, task2)
print(result_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(fetch
方法訪問了1次 ==> 1次網絡請求)
3 [22, '<h2>www.baidu.com</h2>'] 3.0020127296447754
線程篇Semaphore
:http://www.javashuo.com/article/p-rcfjlior-d.html
這個用的比較多,簡單回顧下以前講的概念案例:
通俗講就是:在互斥鎖的基礎上封裝了下,實現必定程度的並行
舉個例子,之前使用互斥鎖的時候:(廁所就一個坑位,必須等裏面的人出來才能讓另外一我的上廁所)
使用信號量Semaphore
以後:廁所坑位增長到5個(本身指定),這樣能夠5我的一塊兒上廁所了==> 實現了必定程度的併發控制
先看下縮略的源碼:(能夠這麼想:內部維護了一個引用計數,每次來個任務就-1,一個任務結束計數就+1)
class Semaphore(_ContextManagerMixin):
def __init__(self, value=1, *, loop=None):
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
if loop is not None:
self._loop = loop
else:
self._loop = events.get_event_loop()
async def acquire(self):
while self._value <= 0:
fut = self._loop.create_future()
self._waiters.append(fut) # 把當前任務放入Queue中
try:
await fut # 等待一個任務的完成再繼續
except:
fut.cancel() # 任務取消
if self._value > 0 and not fut.cancelled():
self._wake_up_next() # 喚醒下一個任務
raise
self._value -= 1 # 用掉一個併發量
return True
def release(self):
self._value += 1 # 恢復一個併發量
self._wake_up_next() # 喚醒下一個任務
如今舉個常見的場景:好比調用某個免費的api,該api限制併發數爲5
import asyncio
sem = None
# 模擬api請求
async def api_test(i):
async with sem:
await asyncio.sleep(1)
print(f"The Task {i} is done")
async def main():
global sem
sem = asyncio.Semaphore(5) # 設置併發數爲5
tasks = [asyncio.create_task(api_test(i)) for i in range(20)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
動態輸出:
PS:BoundedSemaphore
是Semaphore
的一個版本,在調用release()
時檢查計數器的值是否超過了計數器的初始值,若是超過了就拋出一個異常
線程篇Event
:http://www.javashuo.com/article/p-rcfjlior-d.html
以前講的很詳細了,舉個爬蟲批量更新
的例子就一筆帶過:
import asyncio
event = None
html_dict = {}
async def updates():
# event.wait()是協程方法,須要await
await event.wait()
# 入庫操做省略 html_dict >> DB
return "html_dict >> DB done"
async def get_html(url):
# 摸擬網絡請求
await asyncio.sleep(2)
html_dict[url] = f"<h1>{url}</h1>" # 能夠暫時寫入臨時文件中
event.set() # 標記完成,普通方法
return f"{url} done"
async def main():
global event
event = asyncio.Event() # 初始化 event 對象
# 建立批量任務
tasks = [
asyncio.create_task(get_html(f"www.mmd.com/a/{i}"))
for i in range(1, 10)
]
# 批量更新操做
tasks.append(asyncio.create_task(updates()))
result = await asyncio.gather(*tasks)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
['www.mmd.com/a/1 done', 'www.mmd.com/a/2 done', 'www.mmd.com/a/3 done', 'www.mmd.com/a/4 done', 'www.mmd.com/a/5 done', 'www.mmd.com/a/6 done', 'www.mmd.com/a/7 done', 'www.mmd.com/a/8 done', 'www.mmd.com/a/9 done', 'html_dict >> DB done'] 2.0012683868408203
跟以前基本上同樣,就一個地方不太同樣:async def wait(self)
,wait
方法如今是協程方法了,使用的時候須要await
coroutine wait()
True
True
。不然,一直阻塞,直到另外的任務調用set()
set()
True
clear()
False
)set()
方法被再次調用is_set()
True
,則返回True
線程篇Condition
:http://www.javashuo.com/article/p-rcfjlior-d.html
先簡單看看方法列表:
coroutine acquire()
:
notify(n=1)
:
acquire()
獲取鎖,並在調用該方法以後釋放鎖。RuntimeError
異常。locked()
:
notify_all()
:
release()
:
coroutine wait()
:
coroutine wait_for(predicate)
predicate
變爲True。predicate
必須可調用,它的執行結果會被解釋爲布爾值,並做爲最終結果返回。PS:Condition
結合了Event
和Lock
的功能(也可使多個Condition對象共享一個Lock,容許不一樣任務之間協調對共享資源的獨佔訪問)
看個生產消費者的案例:
import asyncio
cond = None
p_list = []
# 生產者
async def producer(n):
for i in range(5):
async with cond:
p_list.append(f"{n}-{i}")
print(f"[生產者{n}]生產商品{n}-{i}")
# 通知任意一個消費者
cond.notify() # 通知所有消費者:cond.notify_all()
# 摸擬一個耗時操做
await asyncio.sleep(0.01)
# 消費者
async def consumer(i):
while True:
async with cond:
if p_list:
print(f"列表商品:{p_list}")
name = p_list.pop() # 消費商品
print(f"[消費者{i}]消費商品{name}")
print(f"列表剩餘:{p_list}")
# 摸擬一個耗時操做
await asyncio.sleep(0.01)
else:
await cond.wait()
async def main():
global cond
cond = asyncio.Condition() # 初始化condition
p_tasks = [asyncio.create_task(producer(i)) for i in range(2)] # 兩個生產者
c_tasks = [asyncio.create_task(consumer(i)) for i in range(5)] # 五個消費者
await asyncio.gather(*p_tasks, *c_tasks)
if __name__ == "__main__":
asyncio.run(main())
輸出:
[生產者0]生產商品0-0 [生產者1]生產商品1-0 列表商品:['0-0', '1-0'] [消費者0]消費商品1-0 列表剩餘:['0-0'] 列表商品:['0-0'] [消費者1]消費商品0-0 列表剩餘:[] [生產者0]生產商品0-1 [生產者1]生產商品1-1 列表商品:['0-1', '1-1'] [消費者0]消費商品1-1 列表剩餘:['0-1'] 列表商品:['0-1'] [消費者1]消費商品0-1 列表剩餘:[] [生產者0]生產商品0-2 [生產者1]生產商品1-2 列表商品:['0-2', '1-2'] [消費者0]消費商品1-2 列表剩餘:['0-2'] 列表商品:['0-2'] [消費者1]消費商品0-2 列表剩餘:[] [生產者0]生產商品0-3 [生產者1]生產商品1-3 列表商品:['0-3', '1-3'] [消費者0]消費商品1-3 列表剩餘:['0-3'] 列表商品:['0-3'] [消費者1]消費商品0-3 列表剩餘:[] [生產者0]生產商品0-4 [生產者1]生產商品1-4 列表商品:['0-4', '1-4'] [消費者0]消費商品1-4 列表剩餘:['0-4'] 列表商品:['0-4'] [消費者1]消費商品0-4 列表剩餘:[]
PS:第七條的簡單說明:(來看看wait_for
方法的源碼)
# 一直等到函數返回true(從返回結果來講:要麼一直阻塞,要麼返回true)
async def wait_for(self, predicate):
result = predicate()
# 若是不是返回true就繼續等待
while not result:
await self.wait()
result = predicate()
return result
課後拓展:async_timeout
(兼容async的超時的上下文管理器) https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/ZCoroutine/async_timeout_timeout.py
官方文檔:https://docs.python.org/3/library/asyncio-queue.html
線程篇Queue
:http://www.javashuo.com/article/p-rcfjlior-d.html
其實你不考慮限流的狀況下,協程裏面的queue和list基本上差很少(ps:asyncio.Queue(num)
能夠指定數量)
舉個經典的生產消費者案例:
import random
import asyncio
async def producer(q, i):
for i in range(5):
num = random.random()
await q.put(num)
print(f"[生產者{i}]商品{num}出廠了")
await asyncio.sleep(num)
async def consumer(q, i):
while True:
data = await q.get()
print(f"[消費者{i}]商品{data}搶光了")
async def main():
queue = asyncio.Queue(10) # 爲了演示,我這邊限制一下
p_tasks = [asyncio.create_task(producer(queue, i)) for i in range(2)] # 兩個生產者
c_tasks = [asyncio.create_task(consumer(queue, i)) for i in range(5)] # 五個消費者
await asyncio.gather(*p_tasks, *c_tasks)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(注意一下get
和put
方法都是協程方法便可)
[生產者0]商品0.20252203397767787出廠了 [生產者0]商品0.9641503458079388出廠了 [消費者0]商品0.20252203397767787搶光了 [消費者0]商品0.9641503458079388搶光了 [生產者1]商品0.8049655468032324出廠了 [消費者0]商品0.8049655468032324搶光了 [生產者1]商品0.6032743557097342出廠了 [消費者1]商品0.6032743557097342搶光了 [生產者2]商品0.08818326334746773出廠了 [消費者2]商品0.08818326334746773搶光了 [生產者3]商品0.3747289313977561出廠了 [消費者3]商品0.3747289313977561搶光了 [生產者4]商品0.3948823110071299出廠了 [消費者4]商品0.3948823110071299搶光了 [生產者2]商品0.5775767044660681出廠了 [消費者0]商品0.5775767044660681搶光了 [生產者3]商品0.500537752889471出廠了 [消費者1]商品0.500537752889471搶光了 [生產者4]商品0.9921528527523727出廠了 [消費者2]商品0.9921528527523727搶光了
PS:協程也提供了Priority Queue
優先級隊列 and LifoQueue
後進先出隊列,這邊就再也不囉嗦了(前面咱們畫圖演示並手動實現過)
課後拓展:https://docs.python.org/3/library/asyncio-queue.html#examples
官方文檔:https://docs.python.org/3/library/asyncio-subprocess.html
這個以前進程篇的時候說過,不是咱們今天的重點,我貼一個官方demo:
import asyncio
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
asyncio.run(run('ls /zzz'))
輸出:
['ls /zzz' exited with 1] [stderr] ls: /zzz: No such file or directory
下節預告:asyncio
+aiohttp
版爬蟲
代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine/z_spider
asyncio
庫只有TCP
和UDP
服務,並不支持HTTP
,aiohttp
就能夠理解爲是基於asyncio
的http
服務
先來個獲取頁面html的demo:
import asyncio
import aiohttp
error_urls = set()
# 獲取頁面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 添加到待處理集合中
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
if html: # 獲取到html
print(len(html))
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
24287 0.5429983139038086
推薦一款輕量級網頁解析庫:pyquery
(一個相似jquery的python庫)
在上面基礎上簡單提取:(pq.items("dd a")
==> 類比JQ選擇器)
import asyncio
import aiohttp
from pyquery import PyQuery
error_urls = set()
# 獲取頁面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 待處理的url集合
# 阻塞方法
def saves(results):
with open("www.biquge.cm.txt", "a+", encoding="utf-8") as fs:
fs.writelines(results)
print("ok")
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
pq = PyQuery(html)
results = [
item.text() + ":" + item.attr("href") + "\n"
for item in pq.items("dd a")
]
# print(pq("dd a").text())
# 兼容阻塞舊代碼
await asyncio.get_running_loop().run_in_executor(None, saves, results)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:www.biquge.cm.txt
新書的一些話:/12/12097/7563947.html 第一章論壇裏的鬼故事。:/12/12097/7563949.html 第二章臨時講課:/12/12097/7563950.html 第三章鬼域。:/12/12097/7563951.html 第四章恐怖敲門鬼:/12/12097/7565568.html 第五章迷路:/12/12097/7565569.html 第六章廁所中的手:/12/12097/7565570.html 第七章身後的腳步:/12/12097/7565571.html 第八章奇怪的樹:/12/12097/7565572.html 第九章鬼嬰:/12/12097/7565573.html 第十章惡鬼之力:/12/12097/7565574.html ... 第三百二十七章三口箱子:/12/12097/7950281.html 第三百二十八章鬼櫥裏的照片:/12/12097/7952145.html 第三百二十九章中山市事件:/12/12097/7955244.html 第三百三十章兩條信息:/12/12097/7956401.html 第三百三十一章進入中山市:/12/12097/7959077.html 第三百三十二章出乎意料:/12/12097/7962119.html 第三百三十四章酒店的二樓:/12/12097/7964192.html 第三百三十五章黑色的燭火:/12/12097/7969058.html 第三百三十六章微笑的屍體:/12/12097/7973826.html
獲取一個詳情頁看看:
import asyncio
import aiohttp
from pyquery import PyQuery
error_urls = set()
# 獲取頁面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 待處理的url集合
# 詳情頁獲取測試
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session,
"http://www.biquge.cm//12/12097/7563949.html")
pq = PyQuery(html)
print(pq("#content").text())
# results = [item.text() for item in pq.items("#content")]
# print(results)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:
老夫掐指一算,你如今正在牀上看小說,並且仍是側身,搞很差手機還在充電。 正在讀高三的楊間此刻正躺在被窩裏無聊的翻看着手機,他隨手點開了一個帖子,下面有很多網友在回帖。 「臥槽,樓主真乃神人也,這都被樓主猜中了。」 「呵,你會告訴大家我如今正在廁所蹲坑麼?不用問了,腳麻了。」 ...... 0.6684205532073975
PS:Win下Py包安裝出錯就去這個網站下對應包 https://www.lfd.uci.edu/~gohlke/pythonlibs/
限流以及反爬蟲和如何應對反爬蟲機制,後面咱們會繼續說,這邊簡單舉個小說離線的例子:
import asyncio
import aiohttp
from pyquery import PyQuery
sem = None
error_urls = set()
# 獲取html
async def fetch(session, url):
async with sem:
async with session.get(url) as response:
if response.status == 200:
# aiohttp遇到非法字符的處理
return await response.text("gbk", "ignore") # 忽略非法字符
else:
error_urls.add(url) # 待處理的url集合
# 獲取文章正文
async def get_text(session, url):
# 把相對路徑改爲域名+路徑
if not url.startswith("http://www.biquge.cm"):
url = "http://www.biquge.cm" + url
html = await fetch(session, url)
pq = PyQuery(html)
return pq("#content").text()
# 普通阻塞方法
def save(title, text):
with open("恐怖復甦.md", "a+", encoding="gbk") as fs:
fs.write(f"## {title}\n\n{text}\n\n")
print(f"{title} done...")
async def main():
global sem
sem = asyncio.Semaphore(3) # 控制併發數反而更快
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
pq = PyQuery(html)
for item in pq.items("dd a"):
title = item.text()
text = await get_text(session, item.attr("href"))
# 兼容阻塞舊代碼
await loop.run_in_executor(None, save, title, text)
print("task over")
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
輸出:(爬取整站就不用我說了吧:提取a標籤中的src,url去重後爬取內容
)
新書的一些話 done... 第一章論壇裏的鬼故事。 done... 第二章臨時講課 done... 第三章鬼域。 done... 第四章恐怖敲門鬼 done... 第五章迷路 done... 第六章廁所中的手 done... 第七章身後的腳步 done... 第八章奇怪的樹 done... 第九章鬼嬰 done... 第十章惡鬼之力 done... 第十一章逐漸復甦 done... 第十二章宛如智障 done... 第十三章羊皮紙 done... 第十四章詭異的紙 done... ...... 第三百二十八章鬼櫥裏的照片 done... 第三百二十九章中山市事件 done... 第三百三十章兩條信息 done... 第三百三十一章進入中山市 done... 第三百三十二章出乎意料 done... 第三百三十四章酒店的二樓 done... 第三百三十五章黑色的燭火 done... 第三百三十六章微笑的屍體 done... task over
動態展現:
【推薦】Python高性能異步框架:https://github.com/LessChina/sanic
逆天點評:(只看主線,只說個人見識)
Django
(方便)Tornado
(IO多路複用)來代替Flask
(簡單)Japronto
,瞬間驚豔和吊打的全部開發語言,可是隻是冒了泡就不怎麼維護了asyncio
andaiohttp
(Node兄弟這麼優秀,憑啥咱們不行)asyncio
替代品uvloop
(C實現的程度比官方多(誰多誰高效),PS:官方用法太醜陋了3.7纔給足了語法糖)sanic
(語法和Flask
很像,性能不亞於Japronto
)vibora
(都是C實現)有超過sanic
的趨勢(PS:等過幾個版本再試水,不過如今不少開發者都是Go + Python
了)最後BB一句:
gevent
用猴子補丁的確很方便,但不少內部異常就被屏蔽了,並且性能如今不是最高tornado
爲了兼容py2
和py3
,內部仍是經過生成器來實現異步的,效率相對低點asyncio
是將來的主流方向,sanic
是目前最火的異步框架(vibora
還在觀察中)PS:Django
、Flask
是阻塞式IO,web框架通常不會直接部署(它自帶的解決方案只是方便調試),通常使用uwsgi
or gunicorn
+ nginx
來部署(tornado能夠直接部署)
參考連接:
python異步編程之asyncio https://www.cnblogs.com/shenh/p/9090586.html uWSGI, Gunicorn, 啥玩意兒? https://www.cnblogs.com/gdkl/p/6807667.html asyncio異步IO中文翻譯: http://www.cnblogs.com/mamingqian/p/10008279.html https://www.cnblogs.com/mamingqian/p/10075444.html https://www.cnblogs.com/mamingqian/p/10044730.html PyQuery基礎: https://www.cnblogs.com/zhaof/p/6935473.html https://www.cnblogs.com/lei0213/p/7676254.html