Tornado Demo 之 chatdemo 不徹底解讀

tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應用: chatdemo css

首先看 chatdemo 的目錄結構:html

├── chatdemo.py
├── static
│   ├── chat.css
│   └── chat.js
└── templates
    ├── index.html
    ├── message.html
    └── room.html

很是簡單,基本沒有分層,三個模版一個 js 一個 css ,還有一個最重要的 chatdemo.pypython

本文的重點是弄清楚 chatdemo.py 的運行流程,因此對於此項目的其餘文件,包括模版及 chat.js 的實現都不會分析,只要知道 chat.js 的工做流程相信對於理解 chatdemo.py 沒有任何問題git

此 demo 主要基於長輪詢。 獲取新消息的原理:github

  1. 在 chat.js 中有一個定時器會定時執行 update 操做web

  2. 當沒有新消息時 tornado 會一直 hold 住 chat.js 發來的 update 請求ajax

  3. 當有新消息時 tornado 將包含新消息的數據返回給全部 hold 的 update 請求shell

  4. 此時 chat.js 收到 update 回覆後更新返回數據在聊天室中,同時再進行一次 update 請求, 而後又從 1. 開始執行。json

發送新消息的原理:緩存

  1. 輸入消息, 點擊 post 按鈕, chat.js 獲取表單後用 ajax 方式發送請求 new

  2. tornado 收到請求 new ,返回消息自己, 同時通知全部 hold 住的 update 請求 ( 這裏也包括髮送 new 請求的 chat.js 所發送的 update 請求 ) 返回新消息

  3. 全部在線的 chat.js 收到 update 請求回覆,更新返回信息到聊天室,同時再進行一次 update 請求。

清楚了以上流程,咱們直接來看 chatdemo.py :

def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
            ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
        )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

main 函數主要用做初始化應用、監聽端口以及啓動 tornado server 。
咱們看路由:

  1. 主頁對應 MainHandler

  2. new 請求對應 MessageNewHandler

  3. updates 請求對應 MessageUpdatesHandler

下面來看 MainHandler :

# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", messages=global_message_buffer.cache)

只有一行代碼,就是渲染並返回 index.html,渲染的附加信息就是 global_message_buffer 的全部緩存消息。 global_message_buffer 是 MessageBuffer 的一個實例。 咱們先不關心 MessageBuffer 內部是什麼,如今咱們只要記住它主要是用來儲存聊天消息和鏈接到此聊天室的人的信息的類。 其中 MessageBuffer().cache 就是保存聊天室全部聊天消息的結構。

而後來看 MessageNewHandler :

class MessageNewHandler(tornado.web.RequestHandler):
    def post(self):
        message = {
            "id": str(uuid.uuid4()),
            "body": self.get_argument("body"),
        }
        # to_basestring is necessary for Python 3's json encoder,
        # which doesn't accept byte strings.
        message["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=message))
        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            self.write(message)
        global_message_buffer.new_messages([message])

一樣很簡單,從 post 信息裏獲取發來的新消息 ( body ) ,而後給消息分配一個惟一的 uuid,接着把這段消息渲染成一段 html ,而後 self.write(message) 返回這段 html, 同時給 global_message_buffer ( MessageBuffer 的實例 ) 添加這條新信息。 這裏其實我更傾向於返回 json 之類的數據,這樣更加直觀和規範,可能寫 demo 的人考慮到讀者對 json 之類的協議可能不熟悉故而選擇了返回渲染好的 html 直接讓 chat.js append 到 index.html 裏。

接着來看 MessageUpdatesHandler :

class MessageUpdatesHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        cursor = self.get_argument("cursor", None)
        # Save the future returned by wait_for_messages so we can cancel
        # it in wait_for_messages
        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
        messages = yield self.future
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))

    def on_connection_close(self):
        global_message_buffer.cancel_wait(self.future)

重點就在這裏,能夠看到其內部的 post 方法被 gen.coroutine 修飾器修飾,也就是說這個 post 方法如今是 協程 ( coroutine ) 方式工做。 對於協程比較陌生的童鞋,你能夠直接把它看成是單線程解決 io ( 網絡請求 ) 密集運算被阻塞而致使低效率的解決方案。 固然這樣理解協程還比較籠統,以後我會詳細寫一篇關於協程的文章,但在這裏這樣理解是沒有問題的。

如今來看代碼內容,首先獲取 cursor ,一個用來標識咱們已經獲取的消息的指針,這樣 tornado 就不會把你已經獲取的消息重複的發給你。 而後調用 global_message_buffer.wait_for_messages(cursor=cursor) 獲取一個 future 對象。 future 對象是 tornado 實現的一個特殊的類的實例,它的做用就是包含以後 ( 將來 ) 將會返回的數據,咱們如今不用關心 Future() 內部如何實現,只要記住上面它的做用就行。 關於 Future 的解讀我會放到閱讀 Future 源碼時講。

而後看最關鍵的這句: messages = yield self.future 注意這裏的 yield 就是 hold updates 請求的關鍵,它到這裏至關於暫停了整個 post 函數 ( updates 請求被 hold )同時也至關於 updates 此次網絡請求被阻塞,這個時候協程發揮做用,把這個函數暫停的地方的全部信息保存掛起,而後把工做線程釋放,這樣 tornado 能夠繼續接受 new、 updates 等請求而後運行相應的方法處理請求。

當有新的消息返回時,tornado 底層的 ioloop 實例將會調用 gen.send(value) 返回新消息( value )給每一個被暫停的方法的 yield 處, 此時協程依次恢復這些被暫停的方法, 同時用得到的返回消息繼續執行方法, 這時 messages = yield self.future 繼續執行,messages 得到 yield 的返回值 value ( python 中調用 gen.send(value) 將會把 value 值返回到 yield 處並替換原前 yield 後的值 ),而後判斷下用戶是否已經離開,若是還在線則返回新消息。

明白了以上流程,咱們最後來看 MessageBuffer:

class MessageBuffer(object):
    def __init__(self):
        self.waiters = set()
        self.cache = []
        self.cache_size = 200

    def wait_for_messages(self, cursor=None):
        # Construct a Future to return to our caller.  This allows
        # wait_for_messages to be yielded from a coroutine even though
        # it is not a coroutine itself.  We will set the result of the
        # Future when results are available.
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future

    def cancel_wait(self, future):
        self.waiters.remove(future)
        # Set an empty result to unblock any coroutines waiting.
        future.set_result([])

    def new_messages(self, messages):
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for future in self.waiters:
            future.set_result(messages)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]

初始化方法中 self.waiters 就是一個等待新消息的 listener 集合 ( 直接理解成全部被 hold 住的 updates 請求隊列可能更清晰 )

self.cache 就是儲存全部聊天消息的列表,self.cache_size = 200 則定義了 cache 的大小 是存 200 條消息。

而後先來看簡單的 new_messages:

遍歷 waiters 列表,而後給全部的等待者返回新消息,同時清空等待者隊列。 而後把消息加到緩存 cache 裏,若是緩存大於限制則取最新的 200 條消息。這裏只要注意到 future.set_result(messages) 就是用來給 future 對象添加返回數據 ( 以前被 yield 暫停的地方此時由於 set_result() 方法將會得到 "將來" 的數據 ) 這一點便可。

而後來看 wait_for_messages :

def wait_for_messages(self, cursor=None):
        # Construct a Future to return to our caller.  This allows
        # wait_for_messages to be yielded from a coroutine even though
        # it is not a coroutine itself.  We will set the result of the
        # Future when results are available.
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future

首先初始化一個 Future 對象,而後根據 cursor 判斷哪些消息已經獲取了哪些還沒獲取,若是緩存中有對於這個 waiter 還沒獲取過的消息,則直接調用 set_result() 返回這些緩存中已有的但對於這個 waiter 來講是新的的數據。 若是這個 waiter 已經有緩存中的全部數據,那麼就把它加到等待者隊列裏保持等待,直到有新消息來時調用 new_messages 再返回。

而最後一個 cancel_wait 就很簡單了,當有用戶退出聊天室時,直接從 self.waiters 中移除他所對應的等待者。

當明白了整個代碼的運行流程後,咱們能夠基於這個簡單的 demo 而寫出更加豐富的例子,好比加入 session ,作登錄、作好友關係,作單聊作羣聊等等。

chatdemo with room是我添加的一個簡單功能,輸入聊天室房號後再進行聊天,只有同一房間中的人才能收到彼此的消息。

以上就是鄙人對整個 chatdemo.py 的解讀。 在閱讀此 demo 時,我沒有參考其餘源碼解讀,只是經過閱讀 tornado 底層的源碼而得出的我的的理解,所以確定會有不少理解不成熟甚至錯誤的地方,還望你們多多指教。

原文地址

做者:rapospectre

相關文章
相關標籤/搜索