Tornado 【框架基礎過程分析1】

通常web框架,同步模式下存在的問題

一、計算等待web

  sleep模擬,A用戶需耗費計算量的狀況下佔用線程時,B用戶處於等待狀態網絡

import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        import time
        time.sleep(10)
        self.write("Hello, world")

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Index")
application = tornado.web.Application([
    (r"/main", MainHandler),
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
sleep等待

二、IO等待app

  寫文件 / 網絡返回等請求,A用戶佔用線程,B用戶等待框架

import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        import requests
        requests.get('http://www.google.com')
        self.write('xxxxx')

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Index")
application = tornado.web.Application([
    (r"/main", MainHandler),
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
IO等待

Tornado的解決方式---異步非阻塞

       Future對象 + gen.coroutine裝飾器 + yield生成器異步

一、計算等待socket

import tornado.ioloop
import tornado.web
from tornado import gen
from tornado.concurrent import Future
import time

class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        future = Future()
        # 特殊的形式等待5s
        tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.done)
        yield future
    def done(self, *args, **kwargs):
        self.write('Main')
        self.finish()


class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Index")
application = tornado.web.Application([
    (r"/main", MainHandler),
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
sleep方式

二、IO中的網絡請求async

import tornado.ioloop
import tornado.web
from tornado import gen

class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        from tornado import httpclient
        http = httpclient.AsyncHTTPClient()
        yield http.fetch("http://www.google.com", self.done)

    def done(self, *args, **kwargs):
        self.write('Main')
        self.finish()

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Index")
application = tornado.web.Application([
    (r"/main", MainHandler),
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
http.fetch

Tornado的請求掛起ide

一、用yield結合Future對象來掛起請求函數

import tornado.ioloop
import tornado.web
from tornado import gen
from tornado.concurrent import Future

future = None
class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        global future
        future = Future()
        future.add_done_callback(self.done)

        yield future

    def done(self, *args, **kwargs):
        self.write('Main')
        self.finish()

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        global future
        future.set_result(None)
        self.write("Index")

application = tornado.web.Application([
    (r"/main", MainHandler),
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
Future掛起

本身實現一個異步非阻塞框架

一、select和socket先實現同步阻塞式web框架tornado

import socket
import select

class HttpRequest(object):
    """
    用戶封裝用戶請求信息
    """
    def __init__(self, content):
        """

        :param content:用戶發送的請求數據:請求頭和請求體
        """
        self.content = content

        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):

        temp = self.content.split(b'\r\n\r\n', 1)
        if len(temp) == 1:
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes += h
            self.body_bytes += b

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

# class Future(object):
#     def __init__(self):
#         self.result = None

def main(request):
    return "main"

def index(request):
    return "indexasdfasdfasdf"


routers = [
    ('/main/',main),
    ('/index/',index),
]

def run():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("127.0.0.1", 9999,))
    sock.setblocking(False)
    sock.listen(128)

    inputs = []
    inputs.append(sock)
    while True:
        rlist,wlist,elist = select.select(inputs,[],[],0.05)
        for r in rlist:
            if r == sock:
                """新請求到來"""
                conn,addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:
                """客戶端發來數據"""
                data = b""
                while True:
                    try:
                        chunk = r.recv(1024)
                        data = data + chunk
                    except Exception as e:
                        chunk = None
                    if not chunk:
                        break
                # data進行處理:請求頭和請求體
                request = HttpRequest(data)
                # 1. 請求頭中獲取url
                # 2. 去路由中匹配,獲取指定的函數
                # 3. 執行函數,獲取返回值
                # 4. 將返回值 r.sendall(b'alskdjalksdjf;asfd')
                import re
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    r.sendall(bytes(result,encoding='utf-8'))
                else:
                    r.sendall(b"404")

                inputs.remove(r)
                r.close()

if __name__ == '__main__':
    run()
同步阻塞式

二、自定義一個對象來掛起請求

import socket
import select

class HttpRequest(object):
    """
    用戶封裝用戶請求信息
    """
    def __init__(self, content):
        """

        :param content:用戶發送的請求數據:請求頭和請求體
        """
        self.content = content

        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):

        temp = self.content.split(b'\r\n\r\n', 1)
        if len(temp) == 1:
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes += h
            self.body_bytes += b

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

class Future(object):
    def __init__(self):
        self.result = None
F = None
def main(request):
    global F
    F = Future()
    return F

def stop(request):
    global F
    F.result = b"xxxxxxxxxxxxx"
    return "stop"


def index(request):
    return "indexasdfasdfasdf"


routers = [
    ('/main/',main),
    ('/index/',index),
    ('/stop/',stop),
]

def run():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("127.0.0.1", 9999,))
    sock.setblocking(False)
    sock.listen(128)

    inputs = []
    inputs.append(sock)

    async_request_dict = {
        # 'socket': futrue
    }

    while True:
        rlist,wlist,elist = select.select(inputs,[],[],0.05)
        for r in rlist:
            if r == sock:
                """新請求到來"""
                conn,addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:
                """客戶端發來數據"""
                data = b""
                while True:
                    try:
                        chunk = r.recv(1024)
                        data = data + chunk
                    except Exception as e:
                        chunk = None
                    if not chunk:
                        break
                # data進行處理:請求頭和請求體
                request = HttpRequest(data)
                # 1. 請求頭中獲取url
                # 2. 去路由中匹配,獲取指定的函數
                # 3. 執行函數,獲取返回值
                # 4. 將返回值 r.sendall(b'alskdjalksdjf;asfd')
                import re
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    if isinstance(result,Future):
                        async_request_dict[r] = result
                    else:
                        r.sendall(bytes(result,encoding='utf-8'))
                        inputs.remove(r)
                        r.close()
                else:
                    r.sendall(b"404")
                    inputs.remove(r)
                    r.close()

        for conn in async_request_dict.keys():
            future = async_request_dict[conn]
            if future.result:
                conn.sendall(future.result)
                conn.close()
                del async_request_dict[conn]
                inputs.remove(conn)

if __name__ == '__main__':
    run()
先掛起請求

三、觸發定時器回來處理剛纔掛起的請求

import socket
import select
import time

class HttpRequest(object):
    """
    用戶封裝用戶請求信息
    """
    def __init__(self, content):
        """

        :param content:用戶發送的請求數據:請求頭和請求體
        """
        self.content = content

        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):

        temp = self.content.split(b'\r\n\r\n', 1)
        if len(temp) == 1:
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes += h
            self.body_bytes += b

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

class Future(object):
    def __init__(self,timeout=0):
        self.result = None
        self.timeout = timeout
        self.start = time.time()
def main(request):
    f = Future(5)
    return f

def index(request):
    return "indexasdfasdfasdf"


routers = [
    ('/main/',main),
    ('/index/',index),
]

def run():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("127.0.0.1", 9999,))
    sock.setblocking(False)
    sock.listen(128)

    inputs = []
    inputs.append(sock)

    async_request_dict = {
        # 'socket': futrue
    }

    while True:
        rlist,wlist,elist = select.select(inputs,[],[],0.05)
        for r in rlist:
            if r == sock:
                """新請求到來"""
                conn,addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:
                """客戶端發來數據"""
                data = b""
                while True:
                    try:
                        chunk = r.recv(1024)
                        data = data + chunk
                    except Exception as e:
                        chunk = None
                    if not chunk:
                        break
                # data進行處理:請求頭和請求體
                request = HttpRequest(data)
                # 1. 請求頭中獲取url
                # 2. 去路由中匹配,獲取指定的函數
                # 3. 執行函數,獲取返回值
                # 4. 將返回值 r.sendall(b'alskdjalksdjf;asfd')
                import re
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    if isinstance(result,Future):
                        async_request_dict[r] = result
                    else:
                        r.sendall(bytes(result,encoding='utf-8'))
                        inputs.remove(r)
                        r.close()
                else:
                    r.sendall(b"404")
                    inputs.remove(r)
                    r.close()

        for conn in async_request_dict.keys():
            future = async_request_dict[conn]
            start = future.start
            timeout = future.timeout
            ctime = time.time()
            if (start + timeout) <= ctime :
                future.result = b"timeout"
            if future.result:
                conn.sendall(future.result)
                conn.close()
                del async_request_dict[conn]
                inputs.remove(conn)

if __name__ == '__main__':
    run()
異步非阻塞
相關文章
相關標籤/搜索