前幾篇文章html
上一篇咱們介紹了 asyncio 包,以及如何使用異步編程管理網絡應用中的高併發。在這一篇,咱們主要介紹使用 asyncio 包編程的兩個例子。
咱們先介紹下 async/await 語法,要否則看完這篇可能會困惑,爲何以前使用 asyncio.coroutine 裝飾器 和 yield from,這裏都是 用的 async 和 await?python
async/await 是Python3.5 的新語法,語法以下:ajax
async def read_data(db):
pass複製代碼
async 是明確將函數聲明爲協程的關鍵字,即便沒有await表達式,函數執行也會返回一個協程對象。
在協程函數內部,能夠在某個表達式以前使用 await 關鍵字來暫停協程的執行,以等待某協程完成:數據庫
async def read_data(db):
data = await db.fetch('SELECT ...')複製代碼
這個代碼若是使用 asyncio.coroutine 裝飾器語法爲:編程
@asyncio.coroutine
def read_data(db):
data = yield from db.fetch('SELECT ...')複製代碼
這兩段代碼執行的結果是同樣的,也就是說 能夠把 asyncio.coroutine 替換爲 async, yield from 替換爲 await。瀏覽器
使用新的語法有什麼好處呢:bash
這個例子主要是使用 asyncio 包 和 unicodedata 模塊,實現經過規範名稱查找Unicode 字符。服務器
咱們先來看一下代碼:websocket
# charfinder.py
import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple
RE_WORD = re.compile('\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
RE_CODEPOINT = re.compile('U\+[0-9A-F]{4, 6}')
INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'
sample_chars = [
'$', # DOLLAR SIGN
'A', # LATIN CAPITAL LETTER A
'a', # LATIN SMALL LETTER A
'\u20a0', # EURO-CURRENCY SIGN
'\u20ac', # EURO SIGN
]
CharDescription = namedtuple('CharDescription', 'code_str char name')
QueryResult = namedtuple('QueryResult', 'count items')
def tokenize(text):
''' :param text: :return: return iterable of uppercased words '''
for match in RE_WORD.finditer(text):
yield match.group().upper()
def query_type(text):
text_upper = text.upper()
if 'U+' in text_upper:
return 'CODEPOINT'
elif RE_UNICODE_NAME.match(text_upper):
return 'NAME'
else:
return 'CHARACTERS'
class UnicodeNameIndex:
# unicode name 索引類
def __init__(self, chars=None):
self.load(chars)
def load(self, chars=None):
# 加載 unicode name
self.index = None
if chars is None:
try:
with open(INDEX_NAME, 'rb') as fp:
self.index = pickle.load(fp)
except OSError:
pass
if self.index is None:
self.build_index(chars)
if len(self.index) > MINIMUM_SAVE_LEN:
try:
self.save()
except OSError as exc:
warnings.warn('Could not save {!r}: {}'
.format(INDEX_NAME, exc))
def save(self):
with open(INDEX_NAME, 'wb') as fp:
pickle.dump(self.index, fp)
def build_index(self, chars=None):
if chars is None:
chars = (chr(i) for i in range(32, sys.maxunicode))
index = {}
for char in chars:
try:
name = unicodedata.name(char)
except ValueError:
continue
if name.startswith(CJK_UNI_PREFIX):
name = CJK_UNI_PREFIX
elif name.startswith(CJK_CMP_PREFIX):
name = CJK_CMP_PREFIX
for word in tokenize(name):
index.setdefault(word, set()).add(char)
self.index = index
def word_rank(self, top=None):
# (len(self.index[key], key) 是一個生成器,須要用list 轉成列表,要否則下邊排序會報錯
res = [list((len(self.index[key], key)) for key in self.index)]
res.sort(key=lambda item: (-item[0], item[1]))
if top is not None:
res = res[:top]
return res
def word_report(self, top=None):
for postings, key in self.word_rank(top):
print('{:5} {}'.format(postings, key))
def find_chars(self, query, start=0, stop=None):
stop = sys.maxsize if stop is None else stop
result_sets = []
for word in tokenize(query):
# tokenize 是query 的生成器 a b 會是 ['a', 'b'] 的生成器
chars = self.index.get(word)
if chars is None:
result_sets = []
break
result_sets.append(chars)
if not result_sets:
return QueryResult(0, ())
result = functools.reduce(set.intersection, result_sets)
result = sorted(result) # must sort to support start, stop
result_iter = itertools.islice(result, start, stop)
return QueryResult(len(result),
(char for char in result_iter))
def describe(self, char):
code_str = 'U+{:04X}'.format(ord(char))
name = unicodedata.name(char)
return CharDescription(code_str, char, name)
def find_descriptions(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop).items:
yield self.describe(char)
def get_descriptions(self, chars):
for char in chars:
yield self.describe(char)
def describe_str(self, char):
return '{:7}\t{}\t{}'.format(*self.describe(char))
def find_description_strs(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop).items:
yield self.describe_str(char)
@staticmethod # not an instance method due to concurrency
def status(query, counter):
if counter == 0:
msg = 'No match'
elif counter == 1:
msg = '1 match'
else:
msg = '{} matches'.format(counter)
return '{} for {!r}'.format(msg, query)
def main(*args):
index = UnicodeNameIndex()
query = ' '.join(args)
n = 0
for n, line in enumerate(index.find_description_strs(query), 1):
print(line)
print('({})'.format(index.status(query, n)))
if __name__ == '__main__':
if len(sys.argv) > 1:
main(*sys.argv[1:])
else:
print('Usage: {} word1 [word2]...'.format(sys.argv[0]))複製代碼
這個模塊讀取Python內建的Unicode數據庫,爲每一個字符名稱中的每一個單詞創建索引,而後倒排索引,存入一個字典。
例如,在倒排索引中,'SUN' 鍵對應的條目是一個集合,裏面是名稱中包含'SUN' 這個詞的10個Unicode字符。倒排索引保存在本地一個名爲charfinder_index.pickle 的文件中。若是查詢多個單詞,會計算從索引中所得集合的交集。
運行示例以下:
>>> main('rook') # doctest: +NORMALIZE_WHITESPACE
U+2656 ♖ WHITE CHESS ROOK
U+265C ♜ BLACK CHESS ROOK
(2 matches for 'rook')
>>> main('rook', 'black') # doctest: +NORMALIZE_WHITESPACE
U+265C ♜ BLACK CHESS ROOK
(1 match for 'rook black')
>>> main('white bishop') # doctest: +NORMALIZE_WHITESPACE
U+2657 ♗ WHITE CHESS BISHOP
(1 match for 'white bishop')
>>> main("jabberwocky's vest")
(No match for "jabberwocky's vest")複製代碼
這個模塊沒有使用併發,主要做用是爲使用 asyncio 包編寫的服務器提供支持。
下面咱們來看下 tcp_charfinder.py 腳本:
# tcp_charfinder.py
import sys
import asyncio
# 用於構建索引,提供查詢方法
from charfinder import UnicodeNameIndex
CRLF = b'\r\n'
PROMPT = b'?> '
# 實例化UnicodeNameIndex 類,它會使用charfinder_index.pickle 文件
index = UnicodeNameIndex()
async def handle_queries(reader, writer):
# 這個協程要傳給asyncio.start_server 函數,接收的兩個參數是asyncio.StreamReader 對象和 asyncio.StreamWriter 對象
while True: # 這個循環處理會話,直到從客戶端收到控制字符後退出
writer.write(PROMPT) # can't await! # 這個方法不是協程,只是普通函數;這一行發送 ?> 提示符
await writer.drain() # must await! # 這個方法刷新writer 緩衝;由於它是協程,因此要用 await
data = await reader.readline() # 這個方法也是協程,返回一個bytes對象,也要用await
try:
query = data.decode().strip()
except UnicodeDecodeError:
# Telenet 客戶端發送控制字符時,可能會拋出UnicodeDecodeError異常
# 咱們這裏默認發送空字符
query = '\x00'
client = writer.get_extra_info('peername') # 返回套接字鏈接的遠程地址
print('Received from {}: {!r}'.format(client, query)) # 在控制檯打印查詢記錄
if query:
if ord(query[:1]) < 32: # 若是收到控制字符或者空字符,退出循環
break
# 返回一個生成器,產出包含Unicode 碼位、真正的字符和字符名稱的字符串
lines = list(index.find_description_strs(query))
if lines:
# 使用默認的UTF-8 編碼把lines 轉換成bytes 對象,並在每一行末添加回車符合換行符
# 參數列表是一個生成器
writer.writelines(line.encode() + CRLF for line in lines)
writer.write(index.status(query, len(lines)).encode() + CRLF) # 輸出狀態
await writer.drain() # 刷新輸出緩衝
print('Sent {} results'.format(len(lines))) # 在服務器控制檯記錄響應
print('Close the client socket') # 在控制檯記錄會話結束
writer.close() # 關閉StreamWriter流
def main(address='127.0.0.1', port=2323): # 添加默認地址和端口,因此調用默承認以不加參數
port = int(port)
loop = asyncio.get_event_loop()
# asyncio.start_server 協程運行結束後,
# 返回的協程對象返回一個asyncio.Server 實例,即一個TCP套接字服務器
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop)
server = loop.run_until_complete(server_coro) # 驅動server_coro 協程,啓動服務器
host = server.sockets[0].getsockname() # 得到這個服務器的第一個套接字的地址和端口
print('Serving on {}. Hit CTRL-C to stop.'.format(host)) # 在控制檯中顯示地址和端口
try:
loop.run_forever() # 運行事件循環 main 函數在這裏阻塞,直到服務器的控制檯中按CTRL-C 鍵
except KeyboardInterrupt: # CTRL+C pressed
pass
print('Server shutting down.')
server.close()
# server.wait_closed返回一個 future
# 調用loop.run_until_complete 方法,運行 future
loop.run_until_complete(server.wait_closed())
loop.close() # 終止事件循環
if __name__ == '__main__':
main(*sys.argv[1:])複製代碼
運行 tcp_charfinders.py
python tcp_charfinders.py複製代碼
打開終端,使用 telnet 命令請求服務,運行結果以下所示:
main 函數幾乎會當即顯示 Serving on... 消息,而後在調用loop.run_forever() 方法時阻塞。這時,控制權流動到事件循環中,並且一直等待,偶爾會回到handle_queries 協程,這個協程須要等待網絡發送或接收數據時,控制權又交給事件循環。
handle_queries 協程能夠處理多個客戶端發來的屢次請求。只要有新客戶端鏈接服務器,就會啓動一個handle_queries 協程實例。
handle_queries 的I/O操做都是使用bytes格式。咱們從網絡獲得的數據要解碼,發出去的數據也要編碼
asyncio包提供了高層的流API,提供了現成的服務器,咱們只須要實現一個處理程序。詳細信息能夠查看文檔:docs.python.org/3/library/a…
雖然,asyncio包提供了服務器,可是功能相對來講仍是比較簡陋的,如今咱們使用一下 基於asyncio包的 web 框架 sanci,用它來實現一個http版的簡易服務器
python web 框架 Sanci 快速入門的簡單入門在上一篇文章有介紹,
Sanic 是一個和類Flask 的基於Python3.5+的web框架,提供了比較高階的API,好比路由、request參數,response等,咱們只須要實現處理邏輯便可。
下邊是使用 sanic 實現的簡易的 字符查詢http web 服務:
from sanic import Sanic
from sanic import response
from charfinder import UnicodeNameIndex
app = Sanic()
index = UnicodeNameIndex()
html_temp = '<p>{char}</p>'
@app.route('/charfinder') # app.route 函數的第一個參數是url path,咱們這裏指定路徑是charfinder
async def charfinder(request):
# request.args 能夠取到url 的查詢參數
# ?key1=value1&key2=value2 的結果是 {'key1': ['value1'], 'key2': ['value2']}
# 咱們這裏支持傳入多個查詢參數,因此這裏使用 request.args.getlist('char')
# 若是咱們 使用 request.args.get('char') 只能取到第一個參數
query = request.args.getlist('char')
query = ' '.join(query)
lines = list(index.find_description_strs(query))
# 將獲得的結果生成html
html = '\n'.join([html_temp.format(char=line) for line in lines])
return response.html(html)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000) # 設置服務器運行地址和端口號複製代碼
對比兩段代碼能夠發現,使用 sanic 很是簡單。
運行服務:
python http_charsfinder.py複製代碼
咱們在瀏覽器輸入地址 http://0.0.0.0:8000/charfinder?char=sun 結果示例以下
在TCP 的示例中,服務器經過main函數下的這兩行代碼建立並排定運行時間:
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop)
server = loop.run_until_complete(server_coro)複製代碼
而在sanic的HTTP示例中,使用,建立服務器:
app.run(host="0.0.0.0", port=8000)複製代碼
這兩個看起來運行方式徹底不一樣,但若是咱們翻開sanic的源碼會看到 app.run() 內部是調用 的 server_coroutine = loop.create_server()
建立服務器,
server_coroutine 是經過 loop.run_until_complete()
驅動的。
因此說,爲了啓動服務器,這兩個都是由 loop.run_until_complete 驅動,完成運行的。只不過 sanic 封裝了run 方法,使得使用更加方便。
這裏能夠獲得一個基本事實:只有驅動協程,協程才能作事,而驅動 asyncio.coroutine 裝飾的協程有兩種方式,使用 yield from 或者傳給asyncio 包中某個參數爲協程或future的函數,例如 run_until_complete
如今若是你搜索 cjk,會獲得7萬多條數據3M 的一個html文件,耗時大約2s,這若是是生產服務的一個請求,耗時2s是不能接收的,咱們可使用分頁,這樣咱們能夠每次只取200條數據,當用戶想看更多數據時再使用 ajax 或者 websockets發送下一批數據。
這一篇咱們使用 asyncio 包實現了TCP服務器,使用sanic(基於asyncio sanic 默認使用 uvloop替代asyncio
)實現了HTTP服務器,用於按名稱搜索Unicode 字符。可是並無涉及服務器併發部分,這部分能夠之後再討論。
這一篇仍是 《流暢的python》asyncio 一章的讀書筆記,下一篇將是python併發的第三篇,《使用線程處理併發》。
最後,感謝女友支持。
>歡迎關注 | >請我喝芬達 |
---|---|