在我以前寫的《flask中current_app、g、request、session源碼的深究和理解》一文中解釋了flask如何支持多線程
主要經過兩個類來實現,LocalStack和Local,在Local中有兩個屬性,__storage__和__ident_func__,後者用來獲取線程id,從而區分不一樣線程發來的請求python
此次要說的是flask如何開啓多線程web
先從app.run()這個方法看起shell
def run(self, host=None, port=None, debug=None, **options): from werkzeug.serving import run_simple if host is None: host = '127.0.0.1' if port is None: server_name = self.config['SERVER_NAME'] if server_name and ':' in server_name: port = int(server_name.rsplit(':', 1)[1]) else: port = 5000 if debug is not None: self.debug = bool(debug) options.setdefault('use_reloader', self.debug) options.setdefault('use_debugger', self.debug) try: run_simple(host, port, self, **options) #會進入這個函數 finally: # reset the first request information if the development server # reset normally. This makes it possible to restart the server # without reloader and that stuff from an interactive shell. self._got_first_request = False
通過判斷和設置後進入run_simple()這個函數,看下源碼flask
def run_simple(hostname, port, application, use_reloader=False,服務器
use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a list or dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if not isinstance(port, int): raise TypeError('port must be an integer') if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(get_sockaddr(hostname, port, address_family)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() # Do not use relative imports, otherwise "python -m werkzeug.serving" # breaks. from werkzeug._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner() #默認會執行
仍是通過一系列判斷後默認會進入inner()函數,這個函數定義在run_simple()內,屬於閉包,inner()中會執行make_server()這個函數,看下源碼:cookie
def make_server(host=None, port=None, app=None, threaded=False, processes=1,session
request_handler=None, passthrough_errors=False, ssl_context=None, fd=None): """Create a new server instance that is either threaded, or forks or just processes one request after another. """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and " "multi process server.") elif threaded: return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd) elif processes > 1: return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd) else: return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
看到這也很明白了,想要配置多線程或者多進程,則須要設置threaded或processes這兩個參數,而這兩個參數是從app.run()中傳遞過來的:
app.run(**options) ---> run_simple(threaded,processes) ---> make_server(threaded,processes)
默認狀況下flask是單線程,單進程的,想要開啓只須要在run中傳入對應的參數:app.run(threaded=True)便可.
從make_server中可知,flask提供了三種server:ThreadedWSGIServer,ForkingWSGIServer,BaseWSGIServer,默認狀況下是BaseWSGIServer
以線程爲例,看下ThreadedWSGIServer這個類:多線程
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer): #繼承自ThreadingMixIn, BaseWSGIServer閉包
"""A WSGI server that does threading.""" multithread = True daemon_threads = True
ThreadingMixIn = socketserver.ThreadingMixInapp
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
process_request就是對每一個請求產生一個新的線程來處理
最後寫一個很是簡單的應用來驗證以上說法:
from flask import Flask
from flask import _request_ctx_stack
app = Flask(__name__)
@app.route('/')
def index():
print(_request_ctx_stack._local.__ident_func__()) while True: pass return '<h1>hello</h1>'
app.run() #若是須要開啓多線程則app.run(threaded=True)
_request_ctx_stack._local.__ident_func__()對應這get_ident()這個函數,返回當前線程id,爲何要在後面加上while True這句呢,咱們看下get_ident()這個函數的說明:
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
關鍵字我已經加粗了,線程id會在線程結束後重複利用,因此我在路由函數中加了這個死循環來阻塞請求以便於觀察到不一樣的id,這就會產生兩種狀況:
1.沒開啓多線程的狀況下,一次請求過來,服務器直接阻塞,而且以後的其餘請求也都阻塞
2.開啓多線程狀況下,每次都會打印出不一樣的線程id
第一種狀況
- Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
139623180527360
第二種狀況
- Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
140315469436672
140315477829376
140315486222080
140315316901632
140315105163008
140315096770304
140315088377600
結果顯而易見綜上所述:flask支持多線程,但默認沒開啓,其次app.run()只適用於開發環境,生產環境下可使用uWSGI,Gunicorn等web服務器