python併發3:使用asyncio編寫服務器

前幾篇文章html

asyncio
上一篇咱們介紹了 asyncio 包,以及如何使用異步編程管理網絡應用中的高併發。在這一篇,咱們主要介紹使用 asyncio 包編程的兩個例子。

async/await語法

咱們先介紹下 async/await 語法,要否則看完這篇可能會困惑,爲何以前使用 asyncio.coroutine 裝飾器 和 yield from,這裏都是 用的 async 和 await?python

python併發2:使用asyncio處理併發web

async/await 是Python3.5 的新語法,語法以下:ajax

async def read_data(db):
    pass複製代碼

async 是明確將函數聲明爲協程的關鍵字,即便沒有await表達式,函數執行也會返回一個協程對象。
在協程函數內部,能夠在某個表達式以前使用 await 關鍵字來暫停協程的執行,以等待某協程完成:數據庫

async def read_data(db):
    data = await db.fetch('SELECT ...')複製代碼

這個代碼若是使用 asyncio.coroutine 裝飾器語法爲:編程

@asyncio.coroutine
def read_data(db):
    data = yield from db.fetch('SELECT ...')複製代碼

這兩段代碼執行的結果是同樣的,也就是說 能夠把 asyncio.coroutine 替換爲 async, yield from 替換爲 await。瀏覽器

使用新的語法有什麼好處呢:bash

  • 使生成器和協程的概念更容易理解,由於語法不一樣
  • 能夠消除因爲重構時不當心移出協程中yield 聲明而致使的不明確錯誤,這回致使協程變成普通的生成器。

使用 asyncio 包編寫服務器

這個例子主要是使用 asyncio 包 和 unicodedata 模塊,實現經過規範名稱查找Unicode 字符。服務器

咱們先來看一下代碼:websocket

# charfinder.py
import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple

RE_WORD = re.compile('\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
RE_CODEPOINT = re.compile('U\+[0-9A-F]{4, 6}')

INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'

sample_chars = [
    '$',  # DOLLAR SIGN
    'A',  # LATIN CAPITAL LETTER A
    'a',  # LATIN SMALL LETTER A
    '\u20a0',  # EURO-CURRENCY SIGN
    '\u20ac',  # EURO SIGN
]

CharDescription = namedtuple('CharDescription', 'code_str char name')

QueryResult = namedtuple('QueryResult', 'count items')


def tokenize(text):
    ''' :param text: :return: return iterable of uppercased words '''
    for match in RE_WORD.finditer(text):
        yield match.group().upper()


def query_type(text):
    text_upper = text.upper()
    if 'U+' in text_upper:
        return 'CODEPOINT'
    elif RE_UNICODE_NAME.match(text_upper):
        return 'NAME'
    else:
        return 'CHARACTERS'


class UnicodeNameIndex:
    # unicode name 索引類

    def __init__(self, chars=None):
        self.load(chars)

    def load(self, chars=None):
        # 加載 unicode name 
        self.index = None
        if chars is None:
            try:
                with open(INDEX_NAME, 'rb') as fp:
                    self.index = pickle.load(fp)
            except OSError:
                pass
        if self.index is None:
            self.build_index(chars)
        if len(self.index) > MINIMUM_SAVE_LEN:
            try:
                self.save()
            except OSError as exc:
                warnings.warn('Could not save {!r}: {}'
                              .format(INDEX_NAME, exc))

    def save(self):
        with open(INDEX_NAME, 'wb') as fp:
            pickle.dump(self.index, fp)

    def build_index(self, chars=None):
        if chars is None:
            chars = (chr(i) for i in range(32, sys.maxunicode))
        index = {}
        for char in chars:
            try:
                name = unicodedata.name(char)
            except ValueError:
                continue
            if name.startswith(CJK_UNI_PREFIX):
                name = CJK_UNI_PREFIX
            elif name.startswith(CJK_CMP_PREFIX):
                name = CJK_CMP_PREFIX

            for word in tokenize(name):
                index.setdefault(word, set()).add(char)

        self.index = index

    def word_rank(self, top=None):
        # (len(self.index[key], key) 是一個生成器,須要用list 轉成列表,要否則下邊排序會報錯
        res = [list((len(self.index[key], key)) for key in self.index)]
        res.sort(key=lambda  item: (-item[0], item[1]))
        if top is not None:
            res = res[:top]
        return res

    def word_report(self, top=None):
        for postings, key in self.word_rank(top):
            print('{:5} {}'.format(postings, key))

    def find_chars(self, query, start=0, stop=None):
        stop = sys.maxsize if stop is None else stop
        result_sets = []
        for word in tokenize(query):
            # tokenize 是query 的生成器 a b 會是 ['a', 'b'] 的生成器
            chars = self.index.get(word)
            if chars is None:
                result_sets = []
                break
            result_sets.append(chars)

        if not result_sets:
            return QueryResult(0, ())

        result = functools.reduce(set.intersection, result_sets)
        result = sorted(result)  # must sort to support start, stop
        result_iter = itertools.islice(result, start, stop)
        return QueryResult(len(result),
                           (char for char in result_iter))

    def describe(self, char):
        code_str = 'U+{:04X}'.format(ord(char))
        name = unicodedata.name(char)
        return CharDescription(code_str, char, name)

    def find_descriptions(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe(char)

    def get_descriptions(self, chars):
        for char in chars:
            yield self.describe(char)

    def describe_str(self, char):
        return '{:7}\t{}\t{}'.format(*self.describe(char))

    def find_description_strs(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe_str(char)

 @staticmethod # not an instance method due to concurrency
    def status(query, counter):
        if counter == 0:
            msg = 'No match'
        elif counter == 1:
            msg = '1 match'
        else:
            msg = '{} matches'.format(counter)
        return '{} for {!r}'.format(msg, query)

def main(*args):
    index = UnicodeNameIndex()
    query = ' '.join(args)
    n = 0
    for n, line in enumerate(index.find_description_strs(query), 1):
        print(line)
    print('({})'.format(index.status(query, n)))


if __name__ == '__main__':
    if len(sys.argv) > 1:
        main(*sys.argv[1:])
    else:
        print('Usage: {} word1 [word2]...'.format(sys.argv[0]))複製代碼

這個模塊讀取Python內建的Unicode數據庫,爲每一個字符名稱中的每一個單詞創建索引,而後倒排索引,存入一個字典。
例如,在倒排索引中,'SUN' 鍵對應的條目是一個集合,裏面是名稱中包含'SUN' 這個詞的10個Unicode字符。倒排索引保存在本地一個名爲charfinder_index.pickle 的文件中。若是查詢多個單詞,會計算從索引中所得集合的交集。
運行示例以下:

>>> main('rook')  # doctest: +NORMALIZE_WHITESPACE
    U+2656  ♖  WHITE CHESS ROOK
    U+265C  ♜  BLACK CHESS ROOK
    (2 matches for 'rook')
    >>> main('rook', 'black')  # doctest: +NORMALIZE_WHITESPACE
    U+265C  ♜  BLACK CHESS ROOK
    (1 match for 'rook black')
    >>> main('white bishop')  # doctest: +NORMALIZE_WHITESPACE
    U+2657  ♗   WHITE CHESS BISHOP
    (1 match for 'white bishop')
    >>> main("jabberwocky's vest")
    (No match for "jabberwocky's vest")複製代碼

這個模塊沒有使用併發,主要做用是爲使用 asyncio 包編寫的服務器提供支持。
下面咱們來看下 tcp_charfinder.py 腳本:

# tcp_charfinder.py
import sys
import asyncio

# 用於構建索引,提供查詢方法
from charfinder import UnicodeNameIndex

CRLF = b'\r\n'
PROMPT = b'?> '

# 實例化UnicodeNameIndex 類,它會使用charfinder_index.pickle 文件
index = UnicodeNameIndex()

async def handle_queries(reader, writer):
    # 這個協程要傳給asyncio.start_server 函數,接收的兩個參數是asyncio.StreamReader 對象和 asyncio.StreamWriter 對象
    while True:  # 這個循環處理會話,直到從客戶端收到控制字符後退出
        writer.write(PROMPT)  # can't await! # 這個方法不是協程,只是普通函數;這一行發送 ?> 提示符
        await writer.drain()  # must await! # 這個方法刷新writer 緩衝;由於它是協程,因此要用 await
        data = await reader.readline()  # 這個方法也是協程,返回一個bytes對象,也要用await
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:
            # Telenet 客戶端發送控制字符時,可能會拋出UnicodeDecodeError異常
            # 咱們這裏默認發送空字符
            query = '\x00'
        client = writer.get_extra_info('peername')  # 返回套接字鏈接的遠程地址
        print('Received from {}: {!r}'.format(client, query))  # 在控制檯打印查詢記錄
        if query:
            if ord(query[:1]) < 32:  # 若是收到控制字符或者空字符,退出循環
                break
            # 返回一個生成器,產出包含Unicode 碼位、真正的字符和字符名稱的字符串
            lines = list(index.find_description_strs(query)) 
            if lines:
                # 使用默認的UTF-8 編碼把lines 轉換成bytes 對象,並在每一行末添加回車符合換行符
                # 參數列表是一個生成器
                writer.writelines(line.encode() + CRLF for line in lines) 
            writer.write(index.status(query, len(lines)).encode() + CRLF) # 輸出狀態

            await writer.drain()  # 刷新輸出緩衝
            print('Sent {} results'.format(len(lines)))  # 在服務器控制檯記錄響應

    print('Close the client socket')  # 在控制檯記錄會話結束
    writer.close()  # 關閉StreamWriter流



def main(address='127.0.0.1', port=2323):  # 添加默認地址和端口,因此調用默承認以不加參數
    port = int(port)
    loop = asyncio.get_event_loop()
    # asyncio.start_server 協程運行結束後,
    # 返回的協程對象返回一個asyncio.Server 實例,即一個TCP套接字服務器
    server_coro = asyncio.start_server(handle_queries, address, port,
                                loop=loop) 
    server = loop.run_until_complete(server_coro) # 驅動server_coro 協程,啓動服務器

    host = server.sockets[0].getsockname()  # 得到這個服務器的第一個套接字的地址和端口
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))  # 在控制檯中顯示地址和端口
    try:
        loop.run_forever()  # 運行事件循環 main 函數在這裏阻塞,直到服務器的控制檯中按CTRL-C 鍵
    except KeyboardInterrupt:  # CTRL+C pressed
        pass

    print('Server shutting down.')
    server.close()
    # server.wait_closed返回一個 future
    # 調用loop.run_until_complete 方法,運行 future
    loop.run_until_complete(server.wait_closed())  
    loop.close()  # 終止事件循環


if __name__ == '__main__':
    main(*sys.argv[1:])複製代碼

運行 tcp_charfinders.py

python tcp_charfinders.py複製代碼

打開終端,使用 telnet 命令請求服務,運行結果以下所示:

在 telnet 會話中訪問tcp版字符串查找服務器所作的查詢
在 telnet 會話中訪問tcp版字符串查找服務器所作的查詢

main 函數幾乎會當即顯示 Serving on... 消息,而後在調用loop.run_forever() 方法時阻塞。這時,控制權流動到事件循環中,並且一直等待,偶爾會回到handle_queries 協程,這個協程須要等待網絡發送或接收數據時,控制權又交給事件循環。

handle_queries 協程能夠處理多個客戶端發來的屢次請求。只要有新客戶端鏈接服務器,就會啓動一個handle_queries 協程實例。

handle_queries 的I/O操做都是使用bytes格式。咱們從網絡獲得的數據要解碼,發出去的數據也要編碼

asyncio包提供了高層的流API,提供了現成的服務器,咱們只須要實現一個處理程序。詳細信息能夠查看文檔:docs.python.org/3/library/a…

雖然,asyncio包提供了服務器,可是功能相對來講仍是比較簡陋的,如今咱們使用一下 基於asyncio包的 web 框架 sanci,用它來實現一個http版的簡易服務器

sanic
的簡單入門在上一篇文章有介紹, python web 框架 Sanci 快速入門

使用 sanic 包編寫web 服務器

Sanic 是一個和類Flask 的基於Python3.5+的web框架,提供了比較高階的API,好比路由、request參數,response等,咱們只須要實現處理邏輯便可。

下邊是使用 sanic 實現的簡易的 字符查詢http web 服務:

from sanic import Sanic
from sanic import response

from charfinder import UnicodeNameIndex

app = Sanic()

index = UnicodeNameIndex()

html_temp = '<p>{char}</p>'

@app.route('/charfinder') # app.route 函數的第一個參數是url path,咱們這裏指定路徑是charfinder
async def charfinder(request):
    # request.args 能夠取到url 的查詢參數
    # ?key1=value1&key2=value2 的結果是 {'key1': ['value1'], 'key2': ['value2']}
    # 咱們這裏支持傳入多個查詢參數,因此這裏使用 request.args.getlist('char')
    # 若是咱們 使用 request.args.get('char') 只能取到第一個參數
    query = request.args.getlist('char')
    query = ' '.join(query)
    lines = list(index.find_description_strs(query))
    # 將獲得的結果生成html
    html = '\n'.join([html_temp.format(char=line) for line in lines])
    return response.html(html)

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)  # 設置服務器運行地址和端口號複製代碼

對比兩段代碼能夠發現,使用 sanic 很是簡單。

運行服務:

python http_charsfinder.py複製代碼

咱們在瀏覽器輸入地址 http://0.0.0.0:8000/charfinder?char=sun 結果示例以下

https://user-gold-cdn.xitu.io/2017/7/1/28f220bae095ad24a1e70b1d8684eea2
https://user-gold-cdn.xitu.io/2017/7/1/28f220bae095ad24a1e70b1d8684eea2

如今對比下兩段代碼

在TCP 的示例中,服務器經過main函數下的這兩行代碼建立並排定運行時間:

server_coro = asyncio.start_server(handle_queries, address, port,
                                loop=loop)
server = loop.run_until_complete(server_coro)複製代碼

而在sanic的HTTP示例中,使用,建立服務器:

app.run(host="0.0.0.0", port=8000)複製代碼

這兩個看起來運行方式徹底不一樣,但若是咱們翻開sanic的源碼會看到 app.run() 內部是調用 的 server_coroutine = loop.create_server()建立服務器,
server_coroutine 是經過 loop.run_until_complete()驅動的。

因此說,爲了啓動服務器,這兩個都是由 loop.run_until_complete 驅動,完成運行的。只不過 sanic 封裝了run 方法,使得使用更加方便。

這裏能夠獲得一個基本事實:只有驅動協程,協程才能作事,而驅動 asyncio.coroutine 裝飾的協程有兩種方式,使用 yield from 或者傳給asyncio 包中某個參數爲協程或future的函數,例如 run_until_complete

如今若是你搜索 cjk,會獲得7萬多條數據3M 的一個html文件,耗時大約2s,這若是是生產服務的一個請求,耗時2s是不能接收的,咱們可使用分頁,這樣咱們能夠每次只取200條數據,當用戶想看更多數據時再使用 ajax 或者 websockets發送下一批數據。

這一篇咱們使用 asyncio 包實現了TCP服務器,使用sanic(基於asyncio sanic 默認使用 uvloop替代asyncio)實現了HTTP服務器,用於按名稱搜索Unicode 字符。可是並無涉及服務器併發部分,這部分能夠之後再討論。

這一篇仍是 《流暢的python》asyncio 一章的讀書筆記,下一篇將是python併發的第三篇,《使用線程處理併發》。

參考連接

最後,感謝女友支持。

>歡迎關注 >請我喝芬達
歡迎關注
歡迎關注
請我喝芬達
請我喝芬達
相關文章
相關標籤/搜索