Tornado——異步,websocket

模擬tornado兩個socket請求

同步執行

循序漸進的依次執行,知道上一個步驟執行完才執行下一步。javascript

# coding:utf-8
import time

def long_io():      # 長io操做
    print("開始執行IO操做")
    time.sleep(2)
    print("完成IO操做")

def req_a():    # 模擬請求a
    print('開始處理請求req_a')
    long_io()       # 執行一個長io操做
    print('完成處理請求req_a')

def req_b():    # 模擬請求b
    print('開始處理請求req_b')
    print('完成處理請求req_b')

def main():     # 模擬tornado框架,處理兩個請求
    req_a()
    req_b()

if __name__ == "__main__":
    main()

結果:php

開始處理請求req_a
開始執行IO操做
完成IO操做
完成處理請求req_a
開始處理請求req_b
完成處理請求req_bhtml

異步執行

對於耗時的過程,咱們將其交給別人(如其另一個線程)去執行,而咱們繼續往下處理,當別人執行完耗時操做後再將結果反饋給咱們,這就是咱們所說的異步。前端

1)引入線程和回調函數

# coding:utf-8
import time,threading   # 引入多線程

thread_list = []

def long_io(callback):      # 長io操做
    def fun(cb):
        print("開始執行IO操做")
        time.sleep(5)
        print("完成IO操做")
        cb("IO操做結束")     # 當線程結束時,執行回調函數

    threading.Thread(target=fun,args=(callback,)).start()    # 將長的io操做交個另外一個線程來處理,此時long_io執行完畢(python多線程不用join)

def callback(ret):
    print(ret)

def req_a():    # 模擬請求a
    print('開始處理請求req_a')
    long_io(callback)       # 執行一個長io操做
    print('完成處理請求req_a')

def req_b():    # 模擬請求b
    print('開始處理請求req_b')
    print('完成處理請求req_b')

def main():     # 模擬tornado框架,處理兩個請求
    req_a()
    req_b()

if __name__ == "__main__":
    main()

結果:java

開始處理請求req_a
開始執行IO操做
完成處理請求req_a
開始處理請求req_b
完成處理請求req_b
完成IO操做
IO操做結束python

2)引入協程

yield關鍵字:將函數/循環...變成生成器,使用__next__執行到下一個yield關鍵字的位置,使用send賦值並喚醒。jquery

正常版本:web

# coding:utf-8
import time,threading   # 引入多線程

thread_list = []

def long_io():      # 長io操做
    def fun():
        print("開始執行IO操做")
        time.sleep(5)
        print("完成IO操做")
        try:
            gen.send("============ IO操做結束 ============")     # 使用send返回結果並喚醒程序繼續執行
        except:     # 捕獲生成器完成迭代的異常,防止程序退出
            pass

    threading.Thread(target=fun).start()    # 將長的io操做交個另外一個線程來處理,python多進程不用join

def req_a():    # 模擬請求a
    print('開始處理請求req_a')
    ret = yield long_io()       # 執行一個長io操做,接收結果
    print(ret)
    print('完成處理請求req_a')

def req_b():    # 模擬請求b
    print('開始處理請求req_b')
    print('完成處理請求req_b')

def main():     # 模擬tornado框架,處理兩個請求
    global gen
    gen = req_a()   # 初始化生成器
    gen.__next__()    # 執行到第一個yield位置

    req_b()

if __name__ == "__main__":
    main()

結果:數據庫

開始處理請求req_a
開始執行IO操做
開始處理請求req_b
完成處理請求req_b
完成IO操做
============ IO操做結束 ============
完成處理請求req_a編程

裝飾器版本:

# coding:utf-8
import time,threading   # 引入多線程

thread_list = []

def long_io():      # 長io操做
    def fun():
        print("開始執行IO操做")
        time.sleep(5)
        print("完成IO操做")
        try:
            gen.send("============ IO操做結束 ============")     # 使用send返回結果並喚醒程序繼續執行
        except:     # 捕獲生成器完成迭代的異常,防止程序退出
            pass

    threading.Thread(target=fun).start()    # 將長的io操做交個另外一個線程來處理,python多進程不用join

def gen_coroutine(f):
    def inner(*args,**kwargs):
        global gen
        gen = f()  # 執行req_a,初始化生成器
        gen.__next__()  # 執行到第一個yield位置
    return inner

@gen_coroutine
def req_a():    # 模擬請求a
    print('開始處理請求req_a')
    ret = yield long_io()       # 執行一個長io操做,接收結果
    print(ret)
    print('完成處理請求req_a')

def req_b():    # 模擬請求b
    print('開始處理請求req_b')
    print('完成處理請求req_b')

def main():     # 模擬tornado框架,處理兩個請求
    req_a()  # 下面不修改
    req_b()

if __name__ == "__main__":
    main()

結果:

開始處理請求req_a
開始執行IO操做
開始處理請求req_b
完成處理請求req_b
完成IO操做
============ IO操做結束 ============
完成處理請求req_a

這個版本就是理解Tornado異步編程原理的最簡易模型,可是,Tornado實現異步的機制不是線程,而是ioloop,即將異步過程交給ioloop執行並進行監視回調。

須要注意的一點是,咱們實現的版本嚴格意義上來講不能算是協程,由於兩個程序的掛起與喚醒是在兩個線程上實現的,而Tornado利用ioloop來實現異步,程序的掛起與喚醒始終在一個線程上,由Tornado本身來調度,屬於真正意義上的協程。雖如此,並不妨礙咱們理解Tornado異步編程的原理。

Tornado的異步

由於epoll主要是用來解決網絡IO的併發問題,因此Tornado的異步編程也主要體如今網絡IO的異步上,即異步Web請求。

將每一個創建起來的socket送入epoll池中監聽,當有請求來了的時候,epoll會捕捉信號並將生成器,斷點...所有打包給IOLoop,讓IOLoop來進行調度。

通過路由映射交給視圖函數處理,當遇到堵塞的時候會將生成器,斷點...所有打包還給IOLoop,等待下一次調度。

tornado.httpclient.AsyncHTTPClient

Tornado提供了一個異步Web請求客戶端tornado.httpclient.AsyncHTTPClient用來進行異步Web請求。

fetch(request, callback=None)

用於執行一個web請求request,並異步返回一個tornado.httpclient.HTTPResponse響應。

request能夠是一個url,也能夠是一個tornado.httpclient.HTTPRequest對象。若是是url,fetch會本身構造一個HTTPRequest對象。

HTTPRequest

HTTP請求類,HTTPRequest的構造函數能夠接收衆多構造參數,最經常使用的以下:

  • url (string) – 要訪問的url,此參數必傳,除此以外均爲可選參數
  • method (string) – HTTP訪問方式,如「GET」或「POST」,默認爲GET方式
  • headers (HTTPHeaders or dict) – 附加的HTTP協議頭
  • body – HTTP請求的請求體

HTTPResponse

HTTP響應類,其經常使用屬性以下:

  • code: HTTP狀態碼,如 200 或 404
  • reason: 狀態碼描述信息
  • body: 響應體字符串
  • error: 異常(無關緊要)

裝飾器(tornado.web.asynchronous)

此裝飾器用於回調形式的異步方法,而且應該僅用於HTTP的方法上(如get、post等)。

此裝飾器不會讓被裝飾的方法變爲異步,而只是告訴框架被裝飾的方法是異步的,當方法返回時響應還沒有完成。只有在request handler調用了finish方法後,纔會結束本次請求處理,發送響應。

不帶此裝飾器的請求在get、post等方法返回時自動完成結束請求處理。

callback異步:

# coding:utf-8
import tornado.web
import tornado.ioloop
import tornado.httpserver
import tornado.options
import json

from tornado.web import url,RequestHandler
from tornado.options import define,options
from tornado.httpclient import AsyncHTTPClient      # 引入異步Web請求客戶端

tornado.options.define("port",default=8001,type=int,help="給個端口號唄")

class IndexHandler(RequestHandler):
    @tornado.web.asynchronous  # 不關閉鏈接,也不發送響應(由於在get,post方法執行後會執行on_finish方法,會關閉管道)
    def get(self):
        http = AsyncHTTPClient()  # 實例化異步客戶端
        http.fetch("http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=14.130.112.24",callback=self.on_response)  # 發送一個http請求

    def on_response(self, response):
        if response.error:
            self.send_error(500)
        else:
            data = json.loads(response.body)  # 獲取響應體
            if 1 == data["ret"]:
                self.write(u"國家:%s 省份: %s 城市: %s" % (data["country"], data["province"], data["city"]))
            else:
                self.write("查詢IP信息錯誤")
        self.finish()       # 發送響應信息,結束請求處理

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(
        [
        (r"/",IndexHandler),
        ],
        debug = True
    )

    http_server = tornado.httpserver.HTTPServer(app)    # 建立httpserver實例
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()

結果:

國家:中國 省份: 北京 城市: 北京

協程異步:

# coding:utf-8
import tornado.web
import tornado.ioloop
import tornado.httpserver
import tornado.options
import json

from tornado.web import url,RequestHandler
from tornado.options import define,options
from tornado.httpclient import AsyncHTTPClient      # 引入異步Web請求客戶端
import tornado.gen

tornado.options.define("port",default=8001,type=int,help="給個端口號唄")

class IndexHandler(RequestHandler):
    class IndexHandler(tornado.web.RequestHandler):
        @tornado.gen.coroutine
        def get(self):
            http = AsyncHTTPClient()
            response = yield http.fetch(
                "http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=14.130.112.24")
            if response.error:
                self.send_error(500)
            else:
                data = json.loads(response.body)
                if 1 == data["ret"]:
                    self.write(u"國家:%s 省份: %s 城市: %s" % (data["country"], data["province"], data["city"]))
                else:
                    self.write("查詢IP信息錯誤")

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(
        [
        (r"/",IndexHandler),
        ],
        debug = True
    )

    http_server = tornado.httpserver.HTTPServer(app)    # 建立httpserver實例
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()

也能夠將異步Web請求單獨出來:

class IndexHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        rep = yield self.get_ip_info("14.130.112.24")
        if 1 == rep["ret"]:
            self.write(u"國家:%s 省份: %s 城市: %s" % (rep["country"], rep["province"], rep["city"]))
        else:
            self.write("查詢IP信息錯誤")

    @tornado.gen.coroutine
    def get_ip_info(self, ip):
        http = tornado.httpclient.AsyncHTTPClient()
        response = yield http.fetch("http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=" + ip)
        if response.error:
            rep = {"ret:0"}
        else:
            rep = json.loads(response.body)
        return(rep)  # 此處須要注意,python2中使用raise tornado.gen.Return

代碼中咱們須要注意的地方是get_ip_info返回值的方式,在python 2中,使用了yield的生成器可使用不返回任何值的return,但不能return value,所以Tornado爲咱們封裝了用於在生成器中返回值的特殊異常tornado.gen.Return,並用raise來返回此返回值。

並行協程:

Tornado能夠同時執行多個異步,併發的異步可使用列表或字典,以下:

class IndexHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        ips = ["14.130.112.24",
            "15.130.112.24",
            "16.130.112.24",
            "17.130.112.24"]
        rep1, rep2 = yield [self.get_ip_info(ips[0]), self.get_ip_info(ips[1])]
        rep34_dict = yield dict(rep3=self.get_ip_info(ips[2]), rep4=self.get_ip_info(ips[3]))
        self.write_response(ips[0], rep1) 
        self.write_response(ips[1], rep2) 
        self.write_response(ips[2], rep34_dict['rep3']) 
        self.write_response(ips[3], rep34_dict['rep4']) 

    def write_response(self, ip, response):
        self.write(ip) 
        self.write(":<br/>") 
        if 1 == response["ret"]:
            self.write(u"國家:%s 省份: %s 城市: %s<br/>" % (response["country"], response["province"], response["city"]))
        else:
            self.write("查詢IP信息錯誤<br/>")

    @tornado.gen.coroutine
    def get_ip_info(self, ip):
        http = tornado.httpclient.AsyncHTTPClient()
        response = yield http.fetch("http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=" + ip)
        if response.error:
            rep = {"ret:1"}
        else:
            rep = json.loads(response.body)
        raise tornado.gen.Return(rep)

關於數據庫的異步說明

網站基本都會有數據庫操做,而Tornado是單線程的,這意味着若是數據庫查詢返回過慢,整個服務器響應會被堵塞。

數據庫查詢,實質上也是遠程的網絡調用;理想狀況下,是將這些操做也封裝成爲異步的;但Tornado對此並無提供任何支持。

這是Tornado的設計,而不是缺陷。

一個系統,要知足高流量;是必須解決數據庫查詢速度問題的!

數據庫若存在查詢性能問題,整個系統不管如何優化,數據庫都會是瓶頸,拖慢整個系統!

異步並不能從本質上提到系統的性能;它僅僅是避免多餘的網絡響應等待,以及切換線程的CPU耗費。

若是數據庫查詢響應太慢,須要解決的是數據庫的性能問題;而不是調用數據庫的前端Web應用。

對於實時返回的數據查詢,理想狀況下須要確保全部數據都在內存中,數據庫硬盤IO應該爲0;這樣的查詢才能足夠快;而若是數據庫查詢足夠快,那麼前端web應用也就無將數據查詢封裝爲異步的必要。

就算是使用協程,異步程序對於同步程序始終仍是會提升複雜性;須要衡量的是處理這些額外複雜性是否值得。

若是後端有查詢實在是太慢,沒法繞過,Tornaod的建議是將這些查詢在後端封裝獨立封裝成爲HTTP接口,而後使用Tornado內置的異步HTTP客戶端進行調用。

websocket

WebSocket是HTML5規範中新提出的客戶端-服務器通信協議,協議自己使用新的ws://URL格式。

WebSocket 是獨立的、建立在 TCP 上的協議,和 HTTP 的惟一關聯是使用 HTTP 協議的101狀態碼進行協議切換,使用的 TCP 端口是80,能夠用於繞過大多數防火牆的限制。

WebSocket 使得客戶端和服務器之間的數據交換變得更加簡單,容許服務端直接向客戶端推送數據而不須要客戶端進行請求,二者之間能夠建立持久性的鏈接,並容許數據進行雙向傳送。

目前常見的瀏覽器如 Chrome、IE、Firefox、Safari、Opera 等都支持 WebSocket,同時須要服務端程序支持 WebSocket。

1. Tornado的WebSocket模塊

Tornado提供支持WebSocket的模塊是tornado.websocket,其中提供了一個WebSocketHandler類用來處理通信。

WebSocketHandler.open()

當一個WebSocket鏈接創建後被調用。

WebSocketHandler.on_message(message)

當客戶端發送消息message過來時被調用,注意此方法必須被重寫。

WebSocketHandler.on_close()

當WebSocket鏈接關閉後被調用。

WebSocketHandler.write_message(message, binary=False)

向客戶端發送消息messagea,message能夠是字符串或字典(字典會被轉爲json字符串)。若binary爲False,則message以utf8編碼發送;二進制模式(binary=True)時,可發送任何字節碼。

WebSocketHandler.close()

關閉WebSocket鏈接。

WebSocketHandler.check_origin(origin)

判斷源origin,對於符合條件(返回判斷結果爲True)的請求源origin容許其鏈接,不然返回403。能夠重寫此方法來解決WebSocket的跨域請求(如始終return True)。

2. 前端JavaScript編寫

在前端JS中使用WebSocket與服務器通信的經常使用方法以下:

var ws = new WebSocket("ws://127.0.0.1:8888/websocket"); // 新建一個ws鏈接
ws.onopen = function() {  // 鏈接創建好後的回調
   ws.send("Hello, world");  // 向創建的鏈接發送消息
};
ws.onmessage = function (evt) {  // 收到服務器發送的消息後執行的回調
   alert(evt.data);  // 接收的消息內容在事件參數evt的data屬性中
};

在線聊天室的小Demo

 1 # coding:utf-8
 2 
 3 import tornado.web
 4 import tornado.ioloop
 5 import tornado.httpserver
 6 import tornado.options
 7 import os
 8 import datetime
 9 
10 from tornado.web import RequestHandler
11 from tornado.options import define, options
12 from tornado.websocket import WebSocketHandler
13 
14 define("port", default=8000, type=int)
15 
16 class IndexHandler(RequestHandler):
17     def get(self):
18         self.render("index.html")
19 
20 class ChatHandler(WebSocketHandler):
21 
22     users = set()  # 用來存放在線用戶的容器
23 
24     def open(self):
25         self.users.add(self)  # 創建鏈接後添加用戶到容器中
26         for u in self.users:  # 向已在線用戶發送消息
27             u.write_message(u"[%s]-[%s]-進入聊天室" % (self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
28 
29     def on_message(self, message):
30         for u in self.users:  # 向在線用戶廣播消息
31             u.write_message(u"[%s]-[%s]-說:%s" % (self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message))
32 
33     def on_close(self):
34         self.users.remove(self) # 用戶關閉鏈接後從容器中移除用戶
35         for u in self.users:
36             u.write_message(u"[%s]-[%s]-離開聊天室" % (self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
37 
38     def check_origin(self, origin):
39         return True  # 容許WebSocket的跨域請求
40 
41 if __name__ == '__main__':
42     tornado.options.parse_command_line()
43     app = tornado.web.Application([
44             (r"/", IndexHandler),
45             (r"/chat", ChatHandler),
46         ],
47         static_path = os.path.join(os.path.dirname(__file__), "static"),
48         template_path = os.path.join(os.path.dirname(__file__), "template"),
49         debug = True
50         )
51     http_server = tornado.httpserver.HTTPServer(app)
52     http_server.listen(options.port)
53     tornado.ioloop.IOLoop.current().start()
後端
 1 <!DOCTYPE html>
 2 <html>
 3 <head>
 4     <meta charset="utf-8">
 5     <title>聊天室</title>
 6 </head>
 7 <body>
 8     <div id="contents" style="height:500px;overflow:auto;"></div>
 9     <div>
10         <textarea id="msg"></textarea>
11         <a href="javascript:;" onclick="sendMsg()">發送</a>
12     </div>
13     <script src="{{static_url('js/jquery.min.js')}}"></script>
14     <script type="text/javascript">
15         var ws = new WebSocket("ws://192.168.114.177:8000/chat");
16         ws.onmessage = function(e) {
17             $("#contents").append("<p>" + e.data + "</p>");
18         }
19         function sendMsg() {
20             var msg = $("#msg").val();
21             ws.send(msg);
22             $("#msg").val("");
23         }
24     </script>
25 </body>
26 </html>
前端
相關文章
相關標籤/搜索