Tornado異步非阻塞及自定義異步非阻塞

1、Tornado異步非阻塞

通常的web框架,能夠分爲兩類:
阻塞式:(Django,Flask,Tornado,Bottle)
一個請求到來未處理完成,後續一直等待
解決方案:多線程或多進程

異步非阻塞(存在IO請求):Tornado (單進程+單線程)

- 使用
- @gen.coroutine
- yield Future對象
1.簡單的異步例子
import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        print('開始')
        future = Future()
        #從當前時間開始夯住10S,當時間結束執行 self.doing方法
        tornado.ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing)
        yield future

    def doing(self, *args, **kwargs):
        self.write('async')
        #關閉鏈接
        self.finish()


application = tornado.web.Application([
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

  

在內部中,Select循環檢測Future,當有返回值時,Future內部會做出變化,此時便可釋放向本身發送請求的連接html

 

2.做爲中轉,接收用戶對本身的請求,而本身須要向另外一個API發送請求處理,在發送請求的過程當中,佔用大量時間,這時候就可使用異步非阻塞的形式。python

好比一個購物網站,用戶向本身發來購買訂單,本身再向預約的後臺處理API發送處理請求,在這段時間內能夠接收其它用戶發來的訂單請求,這將大大提高處理的效率。mysql

import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
from tornado import httpclient

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        print('收到訂單')
        #開啓一個非阻塞HTTP客戶端對象
        http = httpclient.AsyncHTTPClient()
        # 執行請求,異步返回HTTPResponse。在向另外一url請求的過程當中,能夠繼續接收其它對本身的請求
        yield http.fetch("http://www.github.com", self.done,)

    def done(self, response):  #異步方法接收HTTPResponse,並能夠繼續執行其它代碼
        print(response)
        self.write('訂單成功')
        #關閉此鏈接
        self.finish()


application = tornado.web.Application([
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

  

fetchrequestcallback = Noneraise_error = True** kwargs [source]git

執行請求,異步返回HTTPResponsegithub

請求能夠是字符串URL或HTTPRequest對象。若是它是一個字符串,咱們構造一個HTTPRequest使用任何額外的kwargs:HTTPRequest(request, **kwargs)web

該方法return一個Future結果是一個 HTTPResponse。默認狀況下,Future將引起一個HTTPError,若是該請求返回一個非200響應代碼(其餘錯誤也能夠獲得提高,若是服務器沒法聯繫)。相反,若是raise_error設置爲False,則不管響應代碼如何,都將始終返回響應。sql

若是callback給出一個,它將被調用HTTPResponse。在回調界面中,HTTPError不會自動提出。相反,您必須檢查響應的error屬性或調用其rethrow方法。數據庫

3.模擬異步非阻塞執行的流程:服務器

 

import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
from tornado import httpclient

fu = None
class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        #將fu設置爲全局變量
        global fu
        print('瘋狂的追求')
        fu = Future()
        #添加一個回調,若是完成運行而且其結果可用時,它將做爲其參數被調用
        fu.add_done_callback(self.done)
        yield fu

    def done(self, response):
        self.write('終於等到你')
        self.finish()


class TestHandler(RequestHandler):
    def get(self):
    #手動設置Future結果 fu.set_result(666) self.write('我只能幫你到這裏了') application = tornado.web.Application([ (r"/index", IndexHandler), (r"/test", TestHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()

 

  

當return一個Future時,能夠添加一個回調函數,而這個回調函數,只有在請求成功接收到結果時纔會執行,不然將一直夯住,而Future內部則是經過set_result()方法使其獲得一個成功返回的信號從而執行回調。正常狀況下set_result()的值是正確的返回結果,這裏經過僞造結果達到異步的效果。done()方法中的response即set_result()的值。cookie

3.1 多線程自動執行:

import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
from threading import Thread

def waiting(future):
    import time
    time.sleep(10)
    #10S後執行set_result使回調生效。
    future.set_result(666)

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        global fu
        print('瘋狂的追求')
        fu = Future()
        fu.add_done_callback(self.done)
        #開啓線程執行waiting函數,
        thread = Thread(target=waiting,args=(fu,))
        thread.start()

        yield fu

    def done(self, response):
        self.write('終於等到你')
        self.finish()


application = tornado.web.Application([
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

  

 4.使用Tornado異步非阻塞模擬用戶登陸

import tornado.web
from tornado import gen
import tornado_mysql

#必需要使用支持的類庫,如tornado_mysql ,sqlalchemy以及pymysql都不支持
@gen.coroutine
def get_user(user):
    #鏈接時會有IO耗時
    conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb',
                                       charset='utf8')
    cur = conn.cursor()
    #查詢時也會有IO耗時
    # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,))
    yield cur.execute("select sleep(10)")
    row = cur.fetchone()
    cur.close()
    conn.close()
    #gen.Task特有的return方式,經過捕捉異常獲取數據庫查詢的值
    raise gen.Return(row)

class LoginHandler(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        self.render('login.html')

    @gen.coroutine
    def post(self, *args, **kwargs):
        user = self.get_argument('user')
        #yield一個Task任務,封裝數據庫查詢的函數,當查詢遇到IO操做時不會阻塞,能夠接收其它請求。
        data = yield gen.Task(get_user, user)
        if data:
            print(data)
            self.redirect('http://www.baidu.com')
        else:
            self.render('login.html')

  

2、自定義簡易非阻塞框架

1.socket服務端基本(阻塞式):

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 9999,))
sock.listen(5)

while True:
    client, address = sock.accept()
    data = client.recv(8096)
    print(data)
    # /r/n/r/n
    # 請求頭中URL
    # 請求體中數據
    # URL去路由關係中匹配 (請求體中數據)
    # response = func()
    import time
    time.sleep(10)
    client.sendall(b'uuuuuuuuuuuuuu')
    client.close()

  

2.應用select模擬非阻塞web框架的基本流程:

import socket
import select

def f1(request):
    return "內容1"

def f2(request):
    return "內容2"

#路由匹配系統
urls = [
    ('/index.html',f1),
    ('/home.html',f2),
]

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#設置非阻塞
sock.setblocking(False)
sock.bind(("127.0.0.1", 9999,))
sock.listen(5)

input_list = [sock,]

while True:
    """
    客戶端第一次創建鏈接以及後續的請求發來數據都會對rlist有變更,對rlist進行循環,
    若是等於sock便是創建鏈接的請求,不然就是客戶端後續發來數據,所以進行不一樣的操做
    """
    rlist,wlist,elist = select.select(input_list,[],[],0.005)
    for sk in rlist:
        # 有新人來鏈接:sock  進行accept,接收鏈接,等待客戶來鏈接
        if sk == sock:
            client, address = sock.accept()
            client.setblocking(False)
            input_list.append(client)  #將客戶端鏈接添加到循環列表中
        # 老人發來數據: client  創建鏈接並接收數據
        else:
            data = sk.recv(8096)
            # /r/n/r/n  (分割請求頭跟請求體)
            # 請求頭中URL
            # 請求體中數據      (提取URL跟數據)
            # URL去路由關係中匹配 (請求體中數據)
            # response = func()
            #根據數據匹配URL,服務器作相應的視圖處理,處理後的數據返回給客戶端
            response = f1(data)

            sk.sendall(response.encode('utf-8'))
            sk.close()

  

3.簡單的web框架基本實現原理

import re
import socket
import select
import time

class Future(object):
    """
    異步非阻塞模式時封裝回調函數以及是否準備就緒
    """
    def __init__(self, callback):
        self.callback = callback
        self._ready = False
        self.value = None

    def set_result(self, value=None):
        self.value = value
        self._ready = True

    @property
    def ready(self):
        return self._ready

class HttpResponse(object):
    """
    封裝響應信息
    """
    def __init__(self, content=''):
        self.content = content

        self.headers = {}
        self.cookies = {}

    def response(self):
        return bytes(self.content, encoding='utf-8')

class HttpNotFound(HttpResponse):
    """
    404時的錯誤提示
    """
    def __init__(self):
        super(HttpNotFound, self).__init__('404 Not Found')

class HttpRequest(object):
    """
    用戶封裝用戶請求信息
    """
    def __init__(self, conn):
        """

        :param conn: 客戶端的鏈接
        """
        self.conn = conn

        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):

        header_flag = False
        while True:
            try:
                received = self.conn.recv(8096)
            except Exception as e:
                received = None
            if not received:
                break
            if header_flag:
                self.body_bytes += received
                continue
            temp = received.split(b'\r\n\r\n', 1)
            if len(temp) == 1:
                self.header_bytes += temp
            else:
                h, b = temp
                self.header_bytes += h
                self.body_bytes += b
                header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v


class Snow(object):
    """
    微型Web框架類,
    """
    def __init__(self, routes):
        """

        :param routes: url路由匹配
        """
        self.routes = routes
        self.inputs = set()
        self.request = None
        self.async_request_handler = {}

    def run(self, host='localhost', port=9999):
        """
        事件循環
        :param host: IP地址
        :param port: 端口
        :return:
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((host, port,))
        sock.setblocking(False)
        sock.listen(128)
        sock.setblocking(0)
        self.inputs.add(sock)
        try:
            while True:
                readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
                for conn in readable_list:
                    if sock == conn:
                        # 鏈接
                        client, address = conn.accept()
                        client.setblocking(False)
                        self.inputs.add(client)
                    else:
                        # 發送請求
                        # HttpResponse
                        # 生成器
                        """
                        gen=HttpResponse對象,執行了三件事,
            1.HttpRequest類,將客戶端拿到,分割請求頭與請求體,將請求頭信息處理成header_dict {'method':xxx,'url':xxx}
            2.循環路由,實例化HttpRequest類, .url與其進行匹配,匹配成功將self.fun 賦值稱爲路由系統填寫的func函數。
            3.處理func視圖,返回2種類型,HttpResponse對象或者Future對象。若是是HttpResponse()對象,封裝了將字符串轉換
            成字節的方法,即仍是一種阻塞式,即通常的如Django類web框架的處理流程,
                        """
                        gen = self.process(conn)
                        if isinstance(gen,HttpResponse):
                            conn.sendall(gen.response())
                            self.inputs.remove(conn)
                            conn.close()
                        else:
                            yielded = next(gen)
                            self.async_request_handler[conn] = yielded

                for conn,fur in self.async_request_handler.items():
                    if not fur.ready:
                        continue
                    response = fur.callback(fur.value)
                    conn.sendall(response)
                    conn.close()
                    del self.async_request_handler[conn]
        except Exception as e:
            pass
        finally:
            sock.close()

    def process(self, conn):
        """
        處理路由系統以及執行函數
        :param conn:
        :return:
        """
        self.request = HttpRequest(conn)
        func = None
        for route in self.routes:
            if re.match(route[0], self.request.url):
                func = route[1]
                break
        if not func:
            return HttpNotFound()
        else:
            return func(self.request)


def index(request):
    return HttpResponse('OK')

def asynccccc(request):
    fur = Future(callback=done)
    yield fur

def done(arg):
    pass

routes = [
    (r'/index/', index),
    (r'/async/', asynccccc),
]

if __name__ == '__main__':
    app = Snow(routes)
    app.run(port=8012)

  

gen對象中,若是返回的是一個Future對象,則內部會將封裝的Future對象yield給回調函數,或者說是交給中轉站,由中轉站向基礎平臺的API發送IO請求,這時不會阻塞,會在循環中繼續yield新的請求,並生成一個字典,{conn:yield Future的對象},而在外層的循環中監聽這個對象,一旦對象發送的IO請求處理完成,將執行set_result方法,將ready設置成True,並拿到請求接收的值即賦值給value,再把value傳遞給回調函數中進行相應處理, 得到一個最後的值,將conn從字典中移除,將處理後的值返回給用戶。

 

4.最後整理後的簡單異步非阻塞web框架

 

import re
import socket
import select
import time


class HttpResponse(object):
    """
    封裝響應信息
    """
    def __init__(self, content=''):
        self.content = content

        self.headers = {}
        self.cookies = {}

    def response(self):
        return bytes(self.content, encoding='utf-8')


class HttpNotFound(HttpResponse):
    """
    404時的錯誤提示
    """
    def __init__(self):
        super(HttpNotFound, self).__init__('404 Not Found')


class HttpRequest(object):
    """
    用戶封裝用戶請求信息
    """
    def __init__(self, conn):
        self.conn = conn

        self.header_bytes = bytes()
        self.header_dict = {}
        self.body_bytes = bytes()

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):

        header_flag = False
        while True:
            try:
                received = self.conn.recv(8096)
            except Exception as e:
                received = None
            if not received:
                break
            if header_flag:
                self.body_bytes += received
                continue
            temp = received.split(b'\r\n\r\n', 1)
            if len(temp) == 1:
                self.header_bytes += temp
            else:
                h, b = temp
                self.header_bytes += h
                self.body_bytes += b
                header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v


class Future(object):
    """
    異步非阻塞模式時封裝回調函數以及是否準備就緒
    """
    def __init__(self, callback):
        self.callback = callback
        self._ready = False
        self.value = None

    def set_result(self, value=None):
        self.value = value
        self._ready = True

    @property
    def ready(self):
        return self._ready


class TimeoutFuture(Future):
    """
    異步非阻塞超時
    """
    def __init__(self, timeout):
        super(TimeoutFuture, self).__init__(callback=None)
        self.timeout = timeout
        self.start_time = time.time()

    @property
    def ready(self):
        current_time = time.time()
        if current_time > self.start_time + self.timeout:
            self._ready = True
        return self._ready


class Snow(object):
    """
    微型Web框架類
    """
    def __init__(self, routes):
        self.routes = routes
        self.inputs = set()
        self.request = None
        self.async_request_handler = {}

    def run(self, host='localhost', port=9999):
        """
        事件循環
        :param host:
        :param port:
        :return:
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((host, port,))
        sock.setblocking(False)
        sock.listen(128)
        sock.setblocking(0)
        self.inputs.add(sock)
        try:
            while True:
                readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
                for conn in readable_list:
                    if sock == conn:
                        client, address = conn.accept()
                        client.setblocking(False)
                        self.inputs.add(client)
                    else:
                        gen = self.process(conn)
                        if isinstance(gen, HttpResponse):
                            conn.sendall(gen.response())
                            self.inputs.remove(conn)
                            conn.close()
                        else:
                            yielded = next(gen)
                            self.async_request_handler[conn] = yielded
                self.polling_callback()

        except Exception as e:
            pass
        finally:
            sock.close()

    def polling_callback(self):
        """
        遍歷觸發異步非阻塞的回調函數
        :return:
        """
        for conn in list(self.async_request_handler.keys()):
            yielded = self.async_request_handler[conn]
            if not yielded.ready:
                continue
            if yielded.callback:
                ret = yielded.callback(self.request, yielded)
                conn.sendall(ret.response())
            self.inputs.remove(conn)
            del self.async_request_handler[conn]
            conn.close()

    def process(self, conn):
        """
        處理路由系統以及執行函數
        :param conn:
        :return:
        """
        self.request = HttpRequest(conn)
        func = None
        for route in self.routes:
            if re.match(route[0], self.request.url):
                func = route[1]
                break
        if not func:
            return HttpNotFound()
        else:
            return func(self.request)

 

  

使用:

基本使用(沒有使用異步非阻塞):

from s4 import Snow
from s4 import HttpResponse

def index(request):
    return HttpResponse('OK')

routes = [
    (r'/index/', index),
]

app = Snow(routes)
app.run(port=8012) 

 

 

手動的異步非阻塞使用

from s4 import Snow
from s4 import HttpResponse
from s4 import Future

request_list = []

def callback(request, future):
    return HttpResponse(future.value)

def req(request):
    print('請求到來')
    obj = Future(callback=callback)
    request_list.append(obj)
    yield obj

def stop(request):
    obj = request_list[0]
    obj.set_result('done')
    del request_list[0]
    return HttpResponse('stop')

routes = [
    (r'/req/', req),
    (r'/stop/', stop),
]

app = Snow(routes)
app.run(port=8012)

  

因爲咱們在最外層循環了一個{conn:future}的字典,因此須要手動去對stop視圖函數發送請求,由它進行set_result,若是是自動的方法,就是將這個conn添加到select中,由select進行監聽並set_result,

達到程序自動返回的效果。

相關文章
相關標籤/搜索